diff --git a/src/pgsql/pgsql.lisp b/src/pgsql/pgsql.lisp index c7e9556..e08bbbd 100644 --- a/src/pgsql/pgsql.lisp +++ b/src/pgsql/pgsql.lisp @@ -57,7 +57,8 @@ (with-schema (unqualified-table-name table-name) (with-disabled-triggers (unqualified-table-name :disable-triggers disable-triggers) - (log-message :info "pgsql:copy-from-queue: ~a ~a" table-name columns) + (log-message :info "pgsql:copy-from-queue[~a]: ~a ~a" + (lp:kernel-worker-index) table-name columns) (loop :for (mesg batch read oversized?) := (lq:pop-queue queue) @@ -67,12 +68,14 @@ ;; The SBCL implementation needs some Garbage Collection ;; decision making help... and now is a pretty good time. #+sbcl (when oversized? (sb-ext:gc :full t)) - (log-message :debug "copy-batch ~a ~d row~:p~:[~; [oversized]~]" + (log-message :debug "copy-batch[~a] ~a ~d row~:p~:[~; [oversized]~]" + (lp:kernel-worker-index) unqualified-table-name rows oversized?) (update-stats :data table-name :rows rows)))))) (let ((seconds (elapsed-time-since start-time))) - (log-message :info "Writer for ~a is done in ~fs" table-name seconds) + (log-message :info "Writer[~a] for ~a is done in ~6$s" + (lp:kernel-worker-index) table-name seconds) (update-stats :data table-name :ws seconds) (list :writer table-name seconds)))) diff --git a/src/queue.lisp b/src/queue.lisp index 00c9be0..470e411 100644 --- a/src/queue.lisp +++ b/src/queue.lisp @@ -29,6 +29,9 @@ ;; and before calling it a day, push the end-of-data marker (log-message :debug "End of data.") + + ;; we hardcode 2 parallel COPY writers, see copy-from implementation. + (lq:push-queue (list :end-of-data nil nil nil) processed-queue) (lq:push-queue (list :end-of-data nil nil nil) processed-queue))) (defun finish-current-batch (copy queue diff --git a/src/sources/common/methods.lisp b/src/sources/common/methods.lisp index 3221bb2..d67a072 100644 --- a/src/sources/common/methods.lisp +++ b/src/sources/common/methods.lisp @@ -19,7 +19,7 @@ (lq:push-queue :end-of-data queue) (let ((seconds (elapsed-time-since start-time))) - (log-message :info "Reader for ~a is done in ~fs" (target copy) seconds) + (log-message :info "Reader for ~a is done in ~6$s" (target copy) seconds) (list :reader (target copy) seconds)))) (defmethod format-data-to-copy ((copy copy) raw-queue formatted-queue @@ -33,7 +33,7 @@ ;; and return (let ((seconds (elapsed-time-since start-time))) - (log-message :info "Transformer for ~a is done in ~fs" (target copy) seconds) + (log-message :info "Transformer for ~a is done in ~6$s" (target copy) seconds) (list :worker (target copy) seconds)))) (defmethod copy-column-list ((copy copy)) @@ -57,7 +57,7 @@ truncate disable-triggers) "Copy data from COPY source into PostgreSQL." - (let* ((lp:*kernel* (or kernel (make-kernel 3))) + (let* ((lp:*kernel* (or kernel (make-kernel 4))) (channel (or channel (lp:make-channel))) (rawq (lq:make-queue)) (fmtq (lq:make-queue :fixed-capacity *concurrent-batches*)) @@ -74,20 +74,28 @@ ;; source into preprocessed batches to send down to PostgreSQL (lp:submit-task channel #'format-data-to-copy copy rawq fmtq) - ;; and start another task to push that data from the queue into - ;; PostgreSQL - (lp:submit-task channel - #'pgloader.pgsql:copy-from-queue - (target-db copy) - (target copy) - fmtq - :columns (copy-column-list copy) - :truncate truncate - :disable-triggers disable-triggers) + ;; And start two tasks to push that data from the queue into + ;; PostgreSQL; Andres Freund research/benchmarks show that in every + ;; PostgreSQL releases up to 9.5 included the highest throughput of + ;; COPY TO the same table is achieved with 2 concurrent clients... + ;; + ;; See Extension Lock Scalability slide in + ;; http://www.anarazel.de/talks/pgconf-eu-2015-10-30/concurrency.pdf + ;; + ;; Let's just hardcode 2 threads for that then. + (loop :for w :below 2 + :do (lp:submit-task channel + #'pgloader.pgsql:copy-from-queue + (clone-connection (target-db copy)) + (target copy) + fmtq + :columns (copy-column-list copy) + :truncate truncate + :disable-triggers disable-triggers)) ;; now wait until both the tasks are over, and kill the kernel (unless c-s-p - (loop :for tasks :below 3 :do (lp:receive-result channel) - :finally - (log-message :info "COPY ~s done." table-name) - (unless k-s-p (lp:end-kernel)))))))) + (loop :repeat (lp:kernel-worker-count) + :do (lp:receive-result channel) + :finally (progn (log-message :info "COPY ~s done." table-name) + (unless k-s-p (lp:end-kernel))))))))) diff --git a/src/sources/mysql/mysql.lisp b/src/sources/mysql/mysql.lisp index 3ec2fc6..646986c 100644 --- a/src/sources/mysql/mysql.lisp +++ b/src/sources/mysql/mysql.lisp @@ -295,7 +295,7 @@ decoding-as materialize-views) "Export MySQL data and Import it into PostgreSQL" - (let* ((copy-kernel (make-kernel 6)) + (let* ((copy-kernel (make-kernel 8)) (copy-channel (let ((lp:*kernel* copy-kernel)) (lp:make-channel))) (table-count 0) idx-kernel idx-channel) @@ -407,10 +407,10 @@ ;; now end the kernels (let ((lp:*kernel* copy-kernel)) (with-stats-collection ("COPY Threads Completion" :section :post) - (loop :for tasks :below (* 3 table-count) + (loop :for tasks :below (* 4 table-count) :do (destructuring-bind (task table-name seconds) (lp:receive-result copy-channel) - (log-message :info "Finished processing ~a for ~s ~50T~fs" + (log-message :info "Finished processing ~a for ~s ~50T~6$s" task table-name seconds) (when (eq :writer task) (update-stats :data table-name :secs seconds))))