Allow more parallelism in database migrations.

The newly added statistics are showing that read+write times are not
enough to explain how long we wait for the data copying, so it must be
the workers setup rather than the workers themselves.

From there, let lparallel work its magic in scheduling the work we do in
parallel in pgloader: rather than doing blocking receive-result calls
for each table, only receive-result at the end of the whole
copy-database processing.

On test data here on the laptop we go from 6s to 3s to migrate the
sakila database from MySQL to PostgreSQL: that's because we have lots of
very small tables, so the cost of waiting after each COPY added up quite
quickly.

In passing, stop sharing the same connection object in between parallel
workers that used to be controlled active in-sequence, see the new API
clone-connection (which takes over new-pgsql-connection).
This commit is contained in:
Dimitri Fontaine 2015-10-20 00:32:36 +02:00
parent 187565b181
commit 633067a0fd
12 changed files with 134 additions and 45 deletions

View File

@ -3,6 +3,9 @@
;; ;;
(in-package :pgloader.connection) (in-package :pgloader.connection)
;;;
;;; Generic API
;;;
(defclass connection () (defclass connection ()
((type :initarg :type :accessor conn-type) ((type :initarg :type :accessor conn-type)
(handle :initarg :conn :accessor conn-handle :initform nil)) (handle :initarg :conn :accessor conn-handle :initform nil))
@ -21,12 +24,26 @@
(defgeneric check-connection (connection) (defgeneric check-connection (connection)
(:documentation "Check that we can actually connect.")) (:documentation "Check that we can actually connect."))
(defgeneric clone-connection (connection)
(:documentation "Instanciate a new connection object with similar properties."))
;;;
;;; File based objects
;;;
(defclass fd-connection (connection) (defclass fd-connection (connection)
((uri :initarg :uri :accessor fd-uri) ((uri :initarg :uri :accessor fd-uri)
(arch :initarg :arch :accessor fd-arch) (arch :initarg :arch :accessor fd-arch)
(path :initarg :path :accessor fd-path)) (path :initarg :path :accessor fd-path))
(:documentation "pgloader connection parameters for a file based data source.")) (:documentation "pgloader connection parameters for a file based data source."))
(defmethod clone-connection ((fd fd-connection))
(let ((clone (make-instance 'fd-connection :type (conn-type fd))))
(loop :for slot :in '(uri arch path)
:do (when (slot-boundp fd slot)
(setf (slot-value clone slot) (slot-value fd slot))))
clone))
(define-condition fd-connection-error (connection-error) (define-condition fd-connection-error (connection-error)
((path :initarg :path :reader connection-error-path)) ((path :initarg :path :reader connection-error-path))
(:report (lambda (err stream) (:report (lambda (err stream)
@ -72,6 +89,9 @@
(setf (fd-path fd) local-filename)))) (setf (fd-path fd) local-filename))))
fd) fd)
;;;
;;; database connections
;;;
(defclass db-connection (connection) (defclass db-connection (connection)
((name :initarg :name :accessor db-name) ((name :initarg :name :accessor db-name)
(host :initarg :host :accessor db-host) (host :initarg :host :accessor db-host)
@ -80,6 +100,15 @@
(pass :initarg :pass :accessor db-pass)) (pass :initarg :pass :accessor db-pass))
(:documentation "pgloader connection parameters for a database service.")) (:documentation "pgloader connection parameters for a database service."))
(defmethod clone-connection ((c db-connection))
(make-instance 'db-connection
:type (conn-type c)
:name (db-name c)
:host (db-host c)
:port (db-port c)
:user (db-user c)
:pass (db-pass c)))
(defmethod print-object ((c db-connection) stream) (defmethod print-object ((c db-connection) stream)
(print-unreadable-object (c stream :type t :identity t) (print-unreadable-object (c stream :type t :identity t)
(with-slots (type name host port user) c (with-slots (type name host port user) c
@ -100,6 +129,10 @@
(defgeneric query (db-connection sql &key) (defgeneric query (db-connection sql &key)
(:documentation "Query DB-CONNECTION with SQL query")) (:documentation "Query DB-CONNECTION with SQL query"))
;;;
;;; Tools for every connection classes
;;;
(defmacro with-connection ((var connection) &body forms) (defmacro with-connection ((var connection) &body forms)
"Connect to DB-CONNECTION and handle any condition when doing so, and when "Connect to DB-CONNECTION and handle any condition when doing so, and when
connected execute FORMS in a protected way so that we always disconnect connected execute FORMS in a protected way so that we always disconnect

View File

@ -137,6 +137,7 @@
(:export #:connection (:export #:connection
#:open-connection #:open-connection
#:close-connection #:close-connection
#:clone-connection
#:fd-connection #:fd-connection
#:db-connection #:db-connection
#:connection-error #:connection-error
@ -180,7 +181,6 @@
(:export #:pgsql-connection (:export #:pgsql-connection
#:pgconn-use-ssl #:pgconn-use-ssl
#:pgconn-table-name #:pgconn-table-name
#:new-pgsql-connection
#:with-pgsql-transaction #:with-pgsql-transaction
#:with-pgsql-connection #:with-pgsql-connection
#:pgsql-execute #:pgsql-execute
@ -409,7 +409,6 @@
#:pgloader.sources #:pgloader.queue) #:pgloader.sources #:pgloader.queue)
(:import-from #:pgloader.transforms #:precision #:scale) (:import-from #:pgloader.transforms #:precision #:scale)
(:import-from #:pgloader.pgsql (:import-from #:pgloader.pgsql
#:new-pgsql-connection
#:with-pgsql-connection #:with-pgsql-connection
#:with-pgsql-transaction #:with-pgsql-transaction
#:pgsql-execute #:pgsql-execute
@ -479,7 +478,6 @@
#:pgloader.sources #:pgloader.queue) #:pgloader.sources #:pgloader.queue)
(:import-from #:pgloader.transforms #:precision #:scale) (:import-from #:pgloader.transforms #:precision #:scale)
(:import-from #:pgloader.pgsql (:import-from #:pgloader.pgsql
#:new-pgsql-connection
#:with-pgsql-connection #:with-pgsql-connection
#:with-pgsql-transaction #:with-pgsql-transaction
#:pgsql-execute #:pgsql-execute

View File

@ -72,7 +72,9 @@
unqualified-table-name rows oversized?) unqualified-table-name rows oversized?)
(update-stats :data table-name :rows rows :ws ws))) (update-stats :data table-name :rows rows :ws ws)))
(when disable-triggers (enable-triggers unqualified-table-name))))) (when disable-triggers (enable-triggers unqualified-table-name))))
(cons :target table-name))
;;; ;;;
;;; Compute how many rows we're going to try loading next, depending on ;;; Compute how many rows we're going to try loading next, depending on

View File

@ -15,6 +15,13 @@
"Assign the type slot to pgsql." "Assign the type slot to pgsql."
(setf (slot-value pgconn 'type) "pgsql")) (setf (slot-value pgconn 'type) "pgsql"))
(defmethod clone-connection ((c pgsql-connection))
(let ((clone
(change-class (call-next-method c) 'pgsql-connection)))
(setf (pgconn-use-ssl clone) (pgconn-use-ssl c)
(pgconn-table-name clone) (pgconn-table-name c))
clone))
(defun new-pgsql-connection (pgconn) (defun new-pgsql-connection (pgconn)
"Prepare a new connection object with all the same properties as pgconn, "Prepare a new connection object with all the same properties as pgconn,
so as to avoid stepping on it's handle" so as to avoid stepping on it's handle"
@ -41,6 +48,7 @@
(defmethod close-connection ((pgconn pgsql-connection)) (defmethod close-connection ((pgconn pgsql-connection))
"Close a PostgreSQL connection." "Close a PostgreSQL connection."
(assert (not (null (conn-handle pgconn))))
(pomo:disconnect (conn-handle pgconn)) (pomo:disconnect (conn-handle pgconn))
(setf (conn-handle pgconn) nil) (setf (conn-handle pgconn) nil)
pgconn) pgconn)
@ -81,13 +89,14 @@
(defmacro with-pgsql-connection ((pgconn) &body forms) (defmacro with-pgsql-connection ((pgconn) &body forms)
"Run FROMS within a PostgreSQL connection to DBNAME. To get the connection "Run FROMS within a PostgreSQL connection to DBNAME. To get the connection
spec from the DBNAME, use `get-connection-spec'." spec from the DBNAME, use `get-connection-spec'."
(let ((conn (gensym "pgsql-conn")))
`(let (#+unix (cl-postgres::*unix-socket-dir* (get-unix-socket-dir ,pgconn))) `(let (#+unix (cl-postgres::*unix-socket-dir* (get-unix-socket-dir ,pgconn)))
(with-connection (conn ,pgconn) (with-connection (,conn ,pgconn)
(let ((pomo:*database* (conn-handle conn))) (let ((pomo:*database* (conn-handle ,conn)))
(log-message :debug "CONNECTED TO ~s" conn) (log-message :debug "CONNECTED TO ~s" ,conn)
(set-session-gucs *pg-settings*) (set-session-gucs *pg-settings*)
(handling-pgsql-notices (handling-pgsql-notices
,@forms))))) ,@forms))))))
(defun get-unix-socket-dir (pgconn) (defun get-unix-socket-dir (pgconn)
"When *pgconn* host is a (cons :unix path) value, return the right value "When *pgconn* host is a (cons :unix path) value, return the right value
@ -344,7 +353,7 @@ select i.relname,
(defun reset-all-sequences (pgconn &key tables) (defun reset-all-sequences (pgconn &key tables)
"Reset all sequences to the max value of the column they are attached to." "Reset all sequences to the max value of the column they are attached to."
(let ((newconn (new-pgsql-connection pgconn))) (let ((newconn (clone-connection pgconn)))
(with-pgsql-connection (newconn) (with-pgsql-connection (newconn)
(set-session-gucs *pg-settings*) (set-session-gucs *pg-settings*)
(pomo:execute "set client_min_messages to warning;") (pomo:execute "set client_min_messages to warning;")

View File

@ -356,7 +356,7 @@
(lp:submit-task channel (lp:submit-task channel
#'pgsql-connect-and-execute-with-timing #'pgsql-connect-and-execute-with-timing
;; each thread must have its own connection ;; each thread must have its own connection
(new-pgsql-connection pgconn) (clone-connection pgconn)
:post label sql) :post label sql)
;; return the pkey "upgrade" statement ;; return the pkey "upgrade" statement

View File

@ -8,8 +8,8 @@
;;; ;;;
(defmethod copy-to-queue ((copy copy) queue) (defmethod copy-to-queue ((copy copy) queue)
"Copy data from given COPY definition into lparallel.queue QUEUE" "Copy data from given COPY definition into lparallel.queue QUEUE"
(pgloader.queue:map-push-queue copy queue)) (pgloader.queue:map-push-queue copy queue)
(cons :source (target copy)))
(defmethod copy-column-list ((copy copy)) (defmethod copy-column-list ((copy copy))
"Default column list is an empty list." "Default column list is an empty list."
@ -28,17 +28,18 @@
(defmethod copy-from ((copy copy) (defmethod copy-from ((copy copy)
&key &key
(kernel nil k-s-p) (kernel nil k-s-p)
(channel nil c-s-p)
truncate truncate
disable-triggers) disable-triggers)
"Copy data from COPY source into PostgreSQL." "Copy data from COPY source into PostgreSQL."
(let* ((lp:*kernel* (or kernel (make-kernel 2))) (let* ((lp:*kernel* (or kernel (make-kernel 2)))
(channel (lp:make-channel)) (channel (or channel (lp:make-channel)))
(queue (lq:make-queue :fixed-capacity *concurrent-batches*)) (queue (lq:make-queue :fixed-capacity *concurrent-batches*))
(table-name (format-table-name (target copy)))) (table-name (format-table-name (target copy))))
(with-stats-collection ((target copy) :dbname (db-name (target-db copy))) (with-stats-collection ((target copy) :dbname (db-name (target-db copy)))
(lp:task-handler-bind ((error #'lp:invoke-transfer-error)) (lp:task-handler-bind () ;; ((error #'lp:invoke-transfer-error))
(log-message :notice "COPY ~s" table-name) (log-message :info "COPY ~s" table-name)
;; start a tast to read data from the source into the queue ;; start a tast to read data from the source into the queue
(lp:submit-task channel #'copy-to-queue copy queue) (lp:submit-task channel #'copy-to-queue copy queue)
@ -55,7 +56,8 @@
:disable-triggers disable-triggers) :disable-triggers disable-triggers)
;; now wait until both the tasks are over, and kill the kernel ;; now wait until both the tasks are over, and kill the kernel
(unless c-s-p
(loop :for tasks :below 2 :do (lp:receive-result channel) (loop :for tasks :below 2 :do (lp:receive-result channel)
:finally :finally
(log-message :info "COPY ~s done." table-name) (log-message :info "COPY ~s done." table-name)
(unless k-s-p (lp:end-kernel))))))) (unless k-s-p (lp:end-kernel))))))))

View File

@ -29,6 +29,9 @@
(setf (conn-handle msconn) nil) (setf (conn-handle msconn) nil)
msconn) msconn)
(defmethod clone-connection ((c mssql-connection))
(change-class (call-next-method c) 'mssql-connection))
(defmethod query ((msconn mssql-connection) sql &key) (defmethod query ((msconn mssql-connection) sql &key)
"Send SQL query to MSCONN connection." "Send SQL query to MSCONN connection."
(mssql:query sql :connection (conn-handle msconn))) (mssql:query sql :connection (conn-handle msconn)))

View File

@ -141,6 +141,8 @@
"Stream the given MS SQL database down to PostgreSQL." "Stream the given MS SQL database down to PostgreSQL."
(let* ((cffi:*default-foreign-encoding* encoding) (let* ((cffi:*default-foreign-encoding* encoding)
(copy-kernel (make-kernel 2)) (copy-kernel (make-kernel 2))
(copy-channel (let ((lp:*kernel* copy-kernel)) (lp:make-channel)))
(table-count 0)
idx-kernel idx-channel) idx-kernel idx-channel)
(destructuring-bind (&key all-columns all-indexes all-fkeys pkeys) (destructuring-bind (&key all-columns all-indexes all-fkeys pkeys)
@ -215,8 +217,8 @@
:do :do
(let ((table-source (let ((table-source
(make-instance 'copy-mssql (make-instance 'copy-mssql
:source-db (source-db mssql) :source-db (clone-connection (source-db mssql))
:target-db (target-db mssql) :target-db (clone-connection (target-db mssql))
:source (cons schema table-name) :source (cons schema table-name)
:target (qualify-name schema table-name) :target (qualify-name schema table-name)
:fields columns))) :fields columns)))
@ -225,8 +227,10 @@
;; COPY the data to PostgreSQL, using copy-kernel ;; COPY the data to PostgreSQL, using copy-kernel
(unless schema-only (unless schema-only
(incf table-count)
(copy-from table-source (copy-from table-source
:kernel copy-kernel :kernel copy-kernel
:channel copy-channel
:disable-triggers disable-triggers)) :disable-triggers disable-triggers))
;; Create the indexes for that table in parallel with the next ;; Create the indexes for that table in parallel with the next
@ -250,7 +254,15 @@
idx-channel))))))) idx-channel)))))))
;; now end the kernels ;; now end the kernels
(let ((lp:*kernel* copy-kernel)) (lp:end-kernel)) (let ((lp:*kernel* copy-kernel))
(with-stats-collection ("COPY Threads Completion" :section :post)
(loop :for tasks :below (* 2 table-count)
:do (destructuring-bind (task . table-name)
(lp:receive-result copy-channel)
(log-message :debug "Finished processing ~a for ~s"
task table-name)))
(lp:end-kernel)))
(let ((lp:*kernel* idx-kernel)) (let ((lp:*kernel* idx-kernel))
;; wait until the indexes are done being built... ;; wait until the indexes are done being built...
;; don't forget accounting for that waiting time. ;; don't forget accounting for that waiting time.
@ -262,7 +274,7 @@
;; ;;
;; Complete the PostgreSQL database before handing over. ;; Complete the PostgreSQL database before handing over.
;; ;;
(complete-pgsql-database (new-pgsql-connection (target-db mssql)) (complete-pgsql-database (clone-connection (target-db mssql))
all-columns all-fkeys pkeys all-columns all-fkeys pkeys
:data-only data-only :data-only data-only
:foreign-keys foreign-keys :foreign-keys foreign-keys

View File

@ -73,6 +73,9 @@
(setf (conn-handle myconn) nil) (setf (conn-handle myconn) nil)
myconn) myconn)
(defmethod clone-connection ((c mysql-connection))
(change-class (call-next-method c) 'mysql-connection))
(defmethod query ((myconn mysql-connection) (defmethod query ((myconn mysql-connection)
sql sql
&key &key

View File

@ -296,6 +296,8 @@
materialize-views) materialize-views)
"Export MySQL data and Import it into PostgreSQL" "Export MySQL data and Import it into PostgreSQL"
(let* ((copy-kernel (make-kernel 2)) (let* ((copy-kernel (make-kernel 2))
(copy-channel (let ((lp:*kernel* copy-kernel)) (lp:make-channel)))
(table-count 0)
idx-kernel idx-channel) idx-kernel idx-channel)
(destructuring-bind (&key view-columns all-columns (destructuring-bind (&key view-columns all-columns
@ -352,13 +354,13 @@
(return-from copy-database))) (return-from copy-database)))
(loop (loop
for (table-name . columns) in (append all-columns view-columns) :for (table-name . columns) :in (append all-columns view-columns)
unless columns :unless columns
do (log-message :error "Table ~s not found, skipping." table-name) :do (log-message :error "Table ~s not found, skipping." table-name)
when columns :when columns
do :do
(let* ((encoding (let* ((encoding
;; force the data encoding when asked to ;; force the data encoding when asked to
(when decoding-as (when decoding-as
@ -368,8 +370,8 @@
(table-source (table-source
(make-instance 'copy-mysql (make-instance 'copy-mysql
:source-db (source-db mysql) :source-db (clone-connection (source-db mysql))
:target-db (target-db mysql) :target-db (clone-connection (target-db mysql))
:source table-name :source table-name
:target (apply-identifier-case table-name) :target (apply-identifier-case table-name)
:fields columns :fields columns
@ -379,8 +381,10 @@
;; first COPY the data from MySQL to PostgreSQL, using copy-kernel ;; first COPY the data from MySQL to PostgreSQL, using copy-kernel
(unless schema-only (unless schema-only
(incf table-count)
(copy-from table-source (copy-from table-source
:kernel copy-kernel :kernel copy-kernel
:channel copy-channel
:disable-triggers disable-triggers)) :disable-triggers disable-triggers))
;; Create the indexes for that table in parallel with the next ;; Create the indexes for that table in parallel with the next
@ -401,7 +405,15 @@
indexes idx-kernel idx-channel)))))) indexes idx-kernel idx-channel))))))
;; now end the kernels ;; now end the kernels
(let ((lp:*kernel* copy-kernel)) (lp:end-kernel)) (let ((lp:*kernel* copy-kernel))
(with-stats-collection ("COPY Threads Completion" :section :post)
(loop :for tasks :below (* 2 table-count)
:do (destructuring-bind (task . table-name)
(lp:receive-result copy-channel)
(log-message :debug "Finished processing ~a for ~s"
task table-name)))
(lp:end-kernel)))
(let ((lp:*kernel* idx-kernel)) (let ((lp:*kernel* idx-kernel))
;; wait until the indexes are done being built... ;; wait until the indexes are done being built...
;; don't forget accounting for that waiting time. ;; don't forget accounting for that waiting time.
@ -420,7 +432,7 @@
;; ;;
;; Complete the PostgreSQL database before handing over. ;; Complete the PostgreSQL database before handing over.
;; ;;
(complete-pgsql-database (new-pgsql-connection (target-db mysql)) (complete-pgsql-database (clone-connection (target-db mysql))
all-columns all-fkeys pkeys all-columns all-fkeys pkeys
table-comments column-comments table-comments column-comments
:data-only data-only :data-only data-only

View File

@ -24,6 +24,9 @@
(setf (conn-handle slconn) nil) (setf (conn-handle slconn) nil)
slconn) slconn)
(defmethod clone-connection ((slconn sqlite-connection))
(change-class (call-next-method slconn) 'sqlite-connection))
(defmethod query ((slconn sqlite-connection) sql &key) (defmethod query ((slconn sqlite-connection) sql &key)
(sqlite:execute-to-list (conn-handle slconn) sql)) (sqlite:execute-to-list (conn-handle slconn) sql))
@ -156,6 +159,8 @@
(declare (ignore only-tables)) (declare (ignore only-tables))
(let* ((cffi:*default-foreign-encoding* encoding) (let* ((cffi:*default-foreign-encoding* encoding)
(copy-kernel (make-kernel 2)) (copy-kernel (make-kernel 2))
(copy-channel (let ((lp:*kernel* copy-kernel)) (lp:make-channel)))
(table-count 0)
idx-kernel idx-channel) idx-kernel idx-channel)
(destructuring-bind (&key all-columns all-indexes pkeys) (destructuring-bind (&key all-columns all-indexes pkeys)
@ -193,15 +198,17 @@
do do
(let ((table-source (let ((table-source
(make-instance 'copy-sqlite (make-instance 'copy-sqlite
:source-db (source-db sqlite) :source-db (clone-connection (source-db sqlite))
:target-db (target-db sqlite) :target-db (clone-connection (target-db sqlite))
:source table-name :source table-name
:target (apply-identifier-case table-name) :target (apply-identifier-case table-name)
:fields columns))) :fields columns)))
;; first COPY the data from SQLite to PostgreSQL, using copy-kernel ;; first COPY the data from SQLite to PostgreSQL, using copy-kernel
(unless schema-only (unless schema-only
(incf table-count)
(copy-from table-source (copy-from table-source
:kernel copy-kernel :kernel copy-kernel
:channel copy-channel
:disable-triggers disable-triggers)) :disable-triggers disable-triggers))
;; Create the indexes for that table in parallel with the next ;; Create the indexes for that table in parallel with the next
@ -221,12 +228,20 @@
idx-kernel idx-channel)))))) idx-kernel idx-channel))))))
;; now end the kernels ;; now end the kernels
(let ((lp:*kernel* copy-kernel)) (lp:end-kernel)) (let ((lp:*kernel* copy-kernel))
(with-stats-collection ("COPY Threads Completion" :section :post)
(loop :for tasks :below (* 2 table-count)
:do (destructuring-bind (task . table-name)
(lp:receive-result copy-channel)
(log-message :debug "Finished processing ~a for ~s"
task table-name)))
(lp:end-kernel)))
(let ((lp:*kernel* idx-kernel)) (let ((lp:*kernel* idx-kernel))
;; wait until the indexes are done being built... ;; wait until the indexes are done being built...
;; don't forget accounting for that waiting time. ;; don't forget accounting for that waiting time.
(when (and create-indexes (not data-only)) (when (and create-indexes (not data-only))
(with-stats-collection ("index build completion" :section :post) (with-stats-collection ("Index Build Completion" :section :post)
(loop for idx in all-indexes do (lp:receive-result idx-channel)))) (loop for idx in all-indexes do (lp:receive-result idx-channel))))
(lp:end-kernel)) (lp:end-kernel))

View File

@ -234,10 +234,10 @@
;;; ;;;
;;; Internal utils ;;; Internal utils
;;; ;;;
(defun elapsed-time-since (start) (defun elapsed-time-since (start &optional (end (get-internal-real-time)))
"Return how many seconds ticked between START and now" "Return how many seconds ticked between START and now"
(let ((now (get-internal-real-time))) (let ((end (or end (get-internal-real-time))))
(coerce (/ (- now start) internal-time-units-per-second) 'double-float))) (coerce (/ (- end start) internal-time-units-per-second) 'double-float)))
;;; ;;;