From 633067a0fde3a3db3b3934d179f73cad6fad57ce Mon Sep 17 00:00:00 2001 From: Dimitri Fontaine Date: Tue, 20 Oct 2015 00:32:36 +0200 Subject: [PATCH] Allow more parallelism in database migrations. The newly added statistics are showing that read+write times are not enough to explain how long we wait for the data copying, so it must be the workers setup rather than the workers themselves. From there, let lparallel work its magic in scheduling the work we do in parallel in pgloader: rather than doing blocking receive-result calls for each table, only receive-result at the end of the whole copy-database processing. On test data here on the laptop we go from 6s to 3s to migrate the sakila database from MySQL to PostgreSQL: that's because we have lots of very small tables, so the cost of waiting after each COPY added up quite quickly. In passing, stop sharing the same connection object in between parallel workers that used to be controlled active in-sequence, see the new API clone-connection (which takes over new-pgsql-connection). --- src/connection.lisp | 33 +++++++++++++++++++++++++++++ src/package.lisp | 4 +--- src/pgsql/pgsql.lisp | 4 +++- src/pgsql/queries.lisp | 25 +++++++++++++++------- src/pgsql/schema.lisp | 2 +- src/sources/common/methods.lisp | 20 +++++++++-------- src/sources/mssql/mssql-schema.lisp | 3 +++ src/sources/mssql/mssql.lisp | 22 ++++++++++++++----- src/sources/mysql/mysql-schema.lisp | 3 +++ src/sources/mysql/mysql.lisp | 32 +++++++++++++++++++--------- src/sources/sqlite/sqlite.lisp | 25 +++++++++++++++++----- src/utils/monitor.lisp | 6 +++--- 12 files changed, 134 insertions(+), 45 deletions(-) diff --git a/src/connection.lisp b/src/connection.lisp index 3ee2c21..ecc199e 100644 --- a/src/connection.lisp +++ b/src/connection.lisp @@ -3,6 +3,9 @@ ;; (in-package :pgloader.connection) +;;; +;;; Generic API +;;; (defclass connection () ((type :initarg :type :accessor conn-type) (handle :initarg :conn :accessor conn-handle :initform nil)) @@ -21,12 +24,26 @@ (defgeneric check-connection (connection) (:documentation "Check that we can actually connect.")) +(defgeneric clone-connection (connection) + (:documentation "Instanciate a new connection object with similar properties.")) + + +;;; +;;; File based objects +;;; (defclass fd-connection (connection) ((uri :initarg :uri :accessor fd-uri) (arch :initarg :arch :accessor fd-arch) (path :initarg :path :accessor fd-path)) (:documentation "pgloader connection parameters for a file based data source.")) +(defmethod clone-connection ((fd fd-connection)) + (let ((clone (make-instance 'fd-connection :type (conn-type fd)))) + (loop :for slot :in '(uri arch path) + :do (when (slot-boundp fd slot) + (setf (slot-value clone slot) (slot-value fd slot)))) + clone)) + (define-condition fd-connection-error (connection-error) ((path :initarg :path :reader connection-error-path)) (:report (lambda (err stream) @@ -72,6 +89,9 @@ (setf (fd-path fd) local-filename)))) fd) +;;; +;;; database connections +;;; (defclass db-connection (connection) ((name :initarg :name :accessor db-name) (host :initarg :host :accessor db-host) @@ -80,6 +100,15 @@ (pass :initarg :pass :accessor db-pass)) (:documentation "pgloader connection parameters for a database service.")) +(defmethod clone-connection ((c db-connection)) + (make-instance 'db-connection + :type (conn-type c) + :name (db-name c) + :host (db-host c) + :port (db-port c) + :user (db-user c) + :pass (db-pass c))) + (defmethod print-object ((c db-connection) stream) (print-unreadable-object (c stream :type t :identity t) (with-slots (type name host port user) c @@ -100,6 +129,10 @@ (defgeneric query (db-connection sql &key) (:documentation "Query DB-CONNECTION with SQL query")) + +;;; +;;; Tools for every connection classes +;;; (defmacro with-connection ((var connection) &body forms) "Connect to DB-CONNECTION and handle any condition when doing so, and when connected execute FORMS in a protected way so that we always disconnect diff --git a/src/package.lisp b/src/package.lisp index e1837a1..d4a15af 100644 --- a/src/package.lisp +++ b/src/package.lisp @@ -137,6 +137,7 @@ (:export #:connection #:open-connection #:close-connection + #:clone-connection #:fd-connection #:db-connection #:connection-error @@ -180,7 +181,6 @@ (:export #:pgsql-connection #:pgconn-use-ssl #:pgconn-table-name - #:new-pgsql-connection #:with-pgsql-transaction #:with-pgsql-connection #:pgsql-execute @@ -409,7 +409,6 @@ #:pgloader.sources #:pgloader.queue) (:import-from #:pgloader.transforms #:precision #:scale) (:import-from #:pgloader.pgsql - #:new-pgsql-connection #:with-pgsql-connection #:with-pgsql-transaction #:pgsql-execute @@ -479,7 +478,6 @@ #:pgloader.sources #:pgloader.queue) (:import-from #:pgloader.transforms #:precision #:scale) (:import-from #:pgloader.pgsql - #:new-pgsql-connection #:with-pgsql-connection #:with-pgsql-transaction #:pgsql-execute diff --git a/src/pgsql/pgsql.lisp b/src/pgsql/pgsql.lisp index 10484cc..f1475e8 100644 --- a/src/pgsql/pgsql.lisp +++ b/src/pgsql/pgsql.lisp @@ -72,7 +72,9 @@ unqualified-table-name rows oversized?) (update-stats :data table-name :rows rows :ws ws))) - (when disable-triggers (enable-triggers unqualified-table-name))))) + (when disable-triggers (enable-triggers unqualified-table-name)))) + + (cons :target table-name)) ;;; ;;; Compute how many rows we're going to try loading next, depending on diff --git a/src/pgsql/queries.lisp b/src/pgsql/queries.lisp index 3758a36..a5e7407 100644 --- a/src/pgsql/queries.lisp +++ b/src/pgsql/queries.lisp @@ -15,6 +15,13 @@ "Assign the type slot to pgsql." (setf (slot-value pgconn 'type) "pgsql")) +(defmethod clone-connection ((c pgsql-connection)) + (let ((clone + (change-class (call-next-method c) 'pgsql-connection))) + (setf (pgconn-use-ssl clone) (pgconn-use-ssl c) + (pgconn-table-name clone) (pgconn-table-name c)) + clone)) + (defun new-pgsql-connection (pgconn) "Prepare a new connection object with all the same properties as pgconn, so as to avoid stepping on it's handle" @@ -41,6 +48,7 @@ (defmethod close-connection ((pgconn pgsql-connection)) "Close a PostgreSQL connection." + (assert (not (null (conn-handle pgconn)))) (pomo:disconnect (conn-handle pgconn)) (setf (conn-handle pgconn) nil) pgconn) @@ -81,13 +89,14 @@ (defmacro with-pgsql-connection ((pgconn) &body forms) "Run FROMS within a PostgreSQL connection to DBNAME. To get the connection spec from the DBNAME, use `get-connection-spec'." - `(let (#+unix (cl-postgres::*unix-socket-dir* (get-unix-socket-dir ,pgconn))) - (with-connection (conn ,pgconn) - (let ((pomo:*database* (conn-handle conn))) - (log-message :debug "CONNECTED TO ~s" conn) - (set-session-gucs *pg-settings*) - (handling-pgsql-notices - ,@forms))))) + (let ((conn (gensym "pgsql-conn"))) + `(let (#+unix (cl-postgres::*unix-socket-dir* (get-unix-socket-dir ,pgconn))) + (with-connection (,conn ,pgconn) + (let ((pomo:*database* (conn-handle ,conn))) + (log-message :debug "CONNECTED TO ~s" ,conn) + (set-session-gucs *pg-settings*) + (handling-pgsql-notices + ,@forms)))))) (defun get-unix-socket-dir (pgconn) "When *pgconn* host is a (cons :unix path) value, return the right value @@ -344,7 +353,7 @@ select i.relname, (defun reset-all-sequences (pgconn &key tables) "Reset all sequences to the max value of the column they are attached to." - (let ((newconn (new-pgsql-connection pgconn))) + (let ((newconn (clone-connection pgconn))) (with-pgsql-connection (newconn) (set-session-gucs *pg-settings*) (pomo:execute "set client_min_messages to warning;") diff --git a/src/pgsql/schema.lisp b/src/pgsql/schema.lisp index dd20849..155d312 100644 --- a/src/pgsql/schema.lisp +++ b/src/pgsql/schema.lisp @@ -356,7 +356,7 @@ (lp:submit-task channel #'pgsql-connect-and-execute-with-timing ;; each thread must have its own connection - (new-pgsql-connection pgconn) + (clone-connection pgconn) :post label sql) ;; return the pkey "upgrade" statement diff --git a/src/sources/common/methods.lisp b/src/sources/common/methods.lisp index ed0929e..5e4c2b4 100644 --- a/src/sources/common/methods.lisp +++ b/src/sources/common/methods.lisp @@ -8,8 +8,8 @@ ;;; (defmethod copy-to-queue ((copy copy) queue) "Copy data from given COPY definition into lparallel.queue QUEUE" - (pgloader.queue:map-push-queue copy queue)) - + (pgloader.queue:map-push-queue copy queue) + (cons :source (target copy))) (defmethod copy-column-list ((copy copy)) "Default column list is an empty list." @@ -28,17 +28,18 @@ (defmethod copy-from ((copy copy) &key (kernel nil k-s-p) + (channel nil c-s-p) truncate disable-triggers) "Copy data from COPY source into PostgreSQL." (let* ((lp:*kernel* (or kernel (make-kernel 2))) - (channel (lp:make-channel)) + (channel (or channel (lp:make-channel))) (queue (lq:make-queue :fixed-capacity *concurrent-batches*)) (table-name (format-table-name (target copy)))) (with-stats-collection ((target copy) :dbname (db-name (target-db copy))) - (lp:task-handler-bind ((error #'lp:invoke-transfer-error)) - (log-message :notice "COPY ~s" table-name) + (lp:task-handler-bind () ;; ((error #'lp:invoke-transfer-error)) + (log-message :info "COPY ~s" table-name) ;; start a tast to read data from the source into the queue (lp:submit-task channel #'copy-to-queue copy queue) @@ -55,7 +56,8 @@ :disable-triggers disable-triggers) ;; now wait until both the tasks are over, and kill the kernel - (loop :for tasks :below 2 :do (lp:receive-result channel) - :finally - (log-message :info "COPY ~s done." table-name) - (unless k-s-p (lp:end-kernel))))))) + (unless c-s-p + (loop :for tasks :below 2 :do (lp:receive-result channel) + :finally + (log-message :info "COPY ~s done." table-name) + (unless k-s-p (lp:end-kernel)))))))) diff --git a/src/sources/mssql/mssql-schema.lisp b/src/sources/mssql/mssql-schema.lisp index 3959a92..8059b84 100644 --- a/src/sources/mssql/mssql-schema.lisp +++ b/src/sources/mssql/mssql-schema.lisp @@ -29,6 +29,9 @@ (setf (conn-handle msconn) nil) msconn) +(defmethod clone-connection ((c mssql-connection)) + (change-class (call-next-method c) 'mssql-connection)) + (defmethod query ((msconn mssql-connection) sql &key) "Send SQL query to MSCONN connection." (mssql:query sql :connection (conn-handle msconn))) diff --git a/src/sources/mssql/mssql.lisp b/src/sources/mssql/mssql.lisp index b22afda..85587b4 100644 --- a/src/sources/mssql/mssql.lisp +++ b/src/sources/mssql/mssql.lisp @@ -140,7 +140,9 @@ excluding) "Stream the given MS SQL database down to PostgreSQL." (let* ((cffi:*default-foreign-encoding* encoding) - (copy-kernel (make-kernel 2)) + (copy-kernel (make-kernel 2)) + (copy-channel (let ((lp:*kernel* copy-kernel)) (lp:make-channel))) + (table-count 0) idx-kernel idx-channel) (destructuring-bind (&key all-columns all-indexes all-fkeys pkeys) @@ -215,8 +217,8 @@ :do (let ((table-source (make-instance 'copy-mssql - :source-db (source-db mssql) - :target-db (target-db mssql) + :source-db (clone-connection (source-db mssql)) + :target-db (clone-connection (target-db mssql)) :source (cons schema table-name) :target (qualify-name schema table-name) :fields columns))) @@ -225,8 +227,10 @@ ;; COPY the data to PostgreSQL, using copy-kernel (unless schema-only + (incf table-count) (copy-from table-source :kernel copy-kernel + :channel copy-channel :disable-triggers disable-triggers)) ;; Create the indexes for that table in parallel with the next @@ -250,7 +254,15 @@ idx-channel))))))) ;; now end the kernels - (let ((lp:*kernel* copy-kernel)) (lp:end-kernel)) + (let ((lp:*kernel* copy-kernel)) + (with-stats-collection ("COPY Threads Completion" :section :post) + (loop :for tasks :below (* 2 table-count) + :do (destructuring-bind (task . table-name) + (lp:receive-result copy-channel) + (log-message :debug "Finished processing ~a for ~s" + task table-name))) + (lp:end-kernel))) + (let ((lp:*kernel* idx-kernel)) ;; wait until the indexes are done being built... ;; don't forget accounting for that waiting time. @@ -262,7 +274,7 @@ ;; ;; Complete the PostgreSQL database before handing over. ;; - (complete-pgsql-database (new-pgsql-connection (target-db mssql)) + (complete-pgsql-database (clone-connection (target-db mssql)) all-columns all-fkeys pkeys :data-only data-only :foreign-keys foreign-keys diff --git a/src/sources/mysql/mysql-schema.lisp b/src/sources/mysql/mysql-schema.lisp index 3e0264a..e009ec3 100644 --- a/src/sources/mysql/mysql-schema.lisp +++ b/src/sources/mysql/mysql-schema.lisp @@ -73,6 +73,9 @@ (setf (conn-handle myconn) nil) myconn) +(defmethod clone-connection ((c mysql-connection)) + (change-class (call-next-method c) 'mysql-connection)) + (defmethod query ((myconn mysql-connection) sql &key diff --git a/src/sources/mysql/mysql.lisp b/src/sources/mysql/mysql.lisp index 7a33d73..16747ee 100644 --- a/src/sources/mysql/mysql.lisp +++ b/src/sources/mysql/mysql.lisp @@ -295,7 +295,9 @@ decoding-as materialize-views) "Export MySQL data and Import it into PostgreSQL" - (let* ((copy-kernel (make-kernel 2)) + (let* ((copy-kernel (make-kernel 2)) + (copy-channel (let ((lp:*kernel* copy-kernel)) (lp:make-channel))) + (table-count 0) idx-kernel idx-channel) (destructuring-bind (&key view-columns all-columns @@ -352,13 +354,13 @@ (return-from copy-database))) (loop - for (table-name . columns) in (append all-columns view-columns) + :for (table-name . columns) :in (append all-columns view-columns) - unless columns - do (log-message :error "Table ~s not found, skipping." table-name) + :unless columns + :do (log-message :error "Table ~s not found, skipping." table-name) - when columns - do + :when columns + :do (let* ((encoding ;; force the data encoding when asked to (when decoding-as @@ -368,8 +370,8 @@ (table-source (make-instance 'copy-mysql - :source-db (source-db mysql) - :target-db (target-db mysql) + :source-db (clone-connection (source-db mysql)) + :target-db (clone-connection (target-db mysql)) :source table-name :target (apply-identifier-case table-name) :fields columns @@ -379,8 +381,10 @@ ;; first COPY the data from MySQL to PostgreSQL, using copy-kernel (unless schema-only + (incf table-count) (copy-from table-source :kernel copy-kernel + :channel copy-channel :disable-triggers disable-triggers)) ;; Create the indexes for that table in parallel with the next @@ -401,7 +405,15 @@ indexes idx-kernel idx-channel)))))) ;; now end the kernels - (let ((lp:*kernel* copy-kernel)) (lp:end-kernel)) + (let ((lp:*kernel* copy-kernel)) + (with-stats-collection ("COPY Threads Completion" :section :post) + (loop :for tasks :below (* 2 table-count) + :do (destructuring-bind (task . table-name) + (lp:receive-result copy-channel) + (log-message :debug "Finished processing ~a for ~s" + task table-name))) + (lp:end-kernel))) + (let ((lp:*kernel* idx-kernel)) ;; wait until the indexes are done being built... ;; don't forget accounting for that waiting time. @@ -420,7 +432,7 @@ ;; ;; Complete the PostgreSQL database before handing over. ;; - (complete-pgsql-database (new-pgsql-connection (target-db mysql)) + (complete-pgsql-database (clone-connection (target-db mysql)) all-columns all-fkeys pkeys table-comments column-comments :data-only data-only diff --git a/src/sources/sqlite/sqlite.lisp b/src/sources/sqlite/sqlite.lisp index bded369..216abd8 100644 --- a/src/sources/sqlite/sqlite.lisp +++ b/src/sources/sqlite/sqlite.lisp @@ -24,6 +24,9 @@ (setf (conn-handle slconn) nil) slconn) +(defmethod clone-connection ((slconn sqlite-connection)) + (change-class (call-next-method slconn) 'sqlite-connection)) + (defmethod query ((slconn sqlite-connection) sql &key) (sqlite:execute-to-list (conn-handle slconn) sql)) @@ -155,7 +158,9 @@ "Stream the given SQLite database down to PostgreSQL." (declare (ignore only-tables)) (let* ((cffi:*default-foreign-encoding* encoding) - (copy-kernel (make-kernel 2)) + (copy-kernel (make-kernel 2)) + (copy-channel (let ((lp:*kernel* copy-kernel)) (lp:make-channel))) + (table-count 0) idx-kernel idx-channel) (destructuring-bind (&key all-columns all-indexes pkeys) @@ -193,15 +198,17 @@ do (let ((table-source (make-instance 'copy-sqlite - :source-db (source-db sqlite) - :target-db (target-db sqlite) + :source-db (clone-connection (source-db sqlite)) + :target-db (clone-connection (target-db sqlite)) :source table-name :target (apply-identifier-case table-name) :fields columns))) ;; first COPY the data from SQLite to PostgreSQL, using copy-kernel (unless schema-only + (incf table-count) (copy-from table-source :kernel copy-kernel + :channel copy-channel :disable-triggers disable-triggers)) ;; Create the indexes for that table in parallel with the next @@ -221,12 +228,20 @@ idx-kernel idx-channel)))))) ;; now end the kernels - (let ((lp:*kernel* copy-kernel)) (lp:end-kernel)) + (let ((lp:*kernel* copy-kernel)) + (with-stats-collection ("COPY Threads Completion" :section :post) + (loop :for tasks :below (* 2 table-count) + :do (destructuring-bind (task . table-name) + (lp:receive-result copy-channel) + (log-message :debug "Finished processing ~a for ~s" + task table-name))) + (lp:end-kernel))) + (let ((lp:*kernel* idx-kernel)) ;; wait until the indexes are done being built... ;; don't forget accounting for that waiting time. (when (and create-indexes (not data-only)) - (with-stats-collection ("index build completion" :section :post) + (with-stats-collection ("Index Build Completion" :section :post) (loop for idx in all-indexes do (lp:receive-result idx-channel)))) (lp:end-kernel)) diff --git a/src/utils/monitor.lisp b/src/utils/monitor.lisp index aa342cc..5564d3f 100644 --- a/src/utils/monitor.lisp +++ b/src/utils/monitor.lisp @@ -234,10 +234,10 @@ ;;; ;;; Internal utils ;;; -(defun elapsed-time-since (start) +(defun elapsed-time-since (start &optional (end (get-internal-real-time))) "Return how many seconds ticked between START and now" - (let ((now (get-internal-real-time))) - (coerce (/ (- now start) internal-time-units-per-second) 'double-float))) + (let ((end (or end (get-internal-real-time)))) + (coerce (/ (- end start) internal-time-units-per-second) 'double-float))) ;;;