From 4df3167da1eb23e3170a15d10ce2939fe0641f63 Mon Sep 17 00:00:00 2001 From: Dimitri Fontaine Date: Fri, 23 Oct 2015 00:17:58 +0200 Subject: [PATCH] Introduce another worker thread: transformers. We used to have a reader and a writer cooperating concurrently into loading the data from the source to PostgreSQL. The tranformation of the data was then the responsibility of the reader thread. Measurements showed that the PostgreSQL processes were mostly idle, waiting for the reader to produce data fast enough. In this patch we introduce a third worker thread that is responsible for processing the raw data into pre-formatted batches, allowing the reader to focus on extracting the data only. We now have two lparallel queues involved in the processing, the raw queue contains the vectors of raw data directly, and the processed-queue contains batches of properly encoded strings for the COPY text protocol. On the test laptop the performance gain isn't noticeable yet, it might be that we need much larger data sets to see a gain here. At least the setup isn't detrimental to performances on smaller data sets. Next improvements are going to allow more features: specialized batch retry thread and parallel table copy scheduling for database sources. Let's also continue caring about performances and play with having several worker and writer threads for each reader. In later patches. And some day, too, we will need to make the number of workers a user defined variable rather than something hard coded as today. It's on the todo list, meanwhile, dear user, consider changing the (make-kernel 6) into (make-kernel 12) or something else in src/sources/mysql/mysql.lisp, and consider enlighting me with whatever it is you find by doing so! --- src/package.lisp | 10 ++---- src/pgsql/pgsql.lisp | 13 ++++---- src/queue.lisp | 54 +++++++++++++++------------------ src/sources/common/api.lisp | 10 ++++-- src/sources/common/methods.lisp | 49 ++++++++++++++++++++++++------ src/sources/copy.lisp | 5 +-- src/sources/mysql/mysql.lisp | 16 +++++----- 7 files changed, 89 insertions(+), 68 deletions(-) diff --git a/src/package.lisp b/src/package.lisp index ddda5a8..b55fde3 100644 --- a/src/package.lisp +++ b/src/package.lisp @@ -250,8 +250,9 @@ ;; Main protocol/API #:map-rows + #:queue-raw-data + #:format-data-to-copy #:copy-from - #:copy-to-queue #:copy-to #:copy-database @@ -291,7 +292,7 @@ #:map-rows #:transforms #:target) - (:export #:map-push-queue)) + (:export #:cook-batches)) ;;; @@ -330,7 +331,6 @@ #:csv-specs #:get-pathname #:copy-csv - #:copy-to-queue #:copy-from #:import-database #:guess-csv-params @@ -349,7 +349,6 @@ #:create-indexes-again) (:export #:fixed-connection #:copy-fixed - #:copy-to-queue #:copy-from)) (defpackage #:pgloader.copy @@ -365,7 +364,6 @@ #:create-indexes-again) (:export #:copy-connection #:copy-copy - #:copy-to-queue #:copy-from)) (defpackage #:pgloader.ixf @@ -383,7 +381,6 @@ (:export #:ixf-connection #:copy-ixf #:map-rows - #:copy-to-queue #:copy-from)) (defpackage #:pgloader.db3 @@ -402,7 +399,6 @@ #:copy-db3 #:map-rows #:copy-to - #:copy-to-queue #:copy-from)) (defpackage #:pgloader.mysql diff --git a/src/pgsql/pgsql.lisp b/src/pgsql/pgsql.lisp index 4b90e7b..c7e9556 100644 --- a/src/pgsql/pgsql.lisp +++ b/src/pgsql/pgsql.lisp @@ -62,20 +62,19 @@ (loop :for (mesg batch read oversized?) := (lq:pop-queue queue) :until (eq mesg :end-of-data) - :for (rows ws) := (multiple-value-bind (result secs) - (timing - (copy-batch unqualified-table-name - columns batch read)) - (list result secs)) + :for rows := (copy-batch unqualified-table-name columns batch read) :do (progn ;; The SBCL implementation needs some Garbage Collection ;; decision making help... and now is a pretty good time. #+sbcl (when oversized? (sb-ext:gc :full t)) (log-message :debug "copy-batch ~a ~d row~:p~:[~; [oversized]~]" unqualified-table-name rows oversized?) - (update-stats :data table-name :rows rows :ws ws)))))) + (update-stats :data table-name :rows rows)))))) - (list :target table-name start-time))) + (let ((seconds (elapsed-time-since start-time))) + (log-message :info "Writer for ~a is done in ~fs" table-name seconds) + (update-stats :data table-name :ws seconds) + (list :writer table-name seconds)))) ;;; ;;; Compute how many rows we're going to try loading next, depending on diff --git a/src/queue.lisp b/src/queue.lisp index fec4251..00c9be0 100644 --- a/src/queue.lisp +++ b/src/queue.lisp @@ -17,6 +17,29 @@ (defvar *current-batch* nil) +(defun cook-batches (copy raw-queue processed-queue &optional pre-formatted) + "Cook data from raw-queue into batches sent into processed-queue." + (let ((*current-batch* (make-batch))) + (loop :for row := (lq:pop-queue raw-queue) + :until (eq :end-of-data row) + :do (batch-row row copy processed-queue pre-formatted)) + + ;; finish current-batch + (finish-current-batch copy processed-queue) + + ;; and before calling it a day, push the end-of-data marker + (log-message :debug "End of data.") + (lq:push-queue (list :end-of-data nil nil nil) processed-queue))) + +(defun finish-current-batch (copy queue + &optional oversized? (batch *current-batch*)) + (with-slots (start data count) batch + (when (< 0 count) + (update-stats :data (target copy) + :read count + :rs (elapsed-time-since start)) + (lq:push-queue (list :batch data count oversized?) queue)))) + (declaim (inline oversized?)) (defun oversized? (&optional (batch *current-batch*)) "Return a generalized boolean that is true only when BATCH is considered @@ -27,18 +50,11 @@ (defun batch-row (row copy queue &optional pre-formatted) "Add ROW to the reader batch. When the batch is full, provide it to the writer." - (when (or (eq :data *log-min-messages*) - (eq :data *client-min-messages*)) - (log-message :data "< ~s" row)) (let ((oversized? (oversized? *current-batch*))) (when (or (= (batch-count *current-batch*) *copy-batch-rows*) oversized?) ;; close current batch, prepare next one - (with-slots (start data count bytes) *current-batch* - (update-stats :data (target copy) - :read count - :rs (elapsed-time-since start)) - (lq:push-queue (list :batch data count oversized?) queue)) + (finish-current-batch copy queue oversized?) (setf *current-batch* (make-batch)))) ;; Add ROW to the current BATCH. @@ -61,25 +77,3 @@ (log-message :error "~a" e) (update-stats :data (target copy) :errs 1)))) -(defun map-push-queue (copy queue &optional pre-formatted) - "Apply MAP-ROWS on the COPY instance and a function of ROW that will push - the row into the QUEUE. When MAP-ROWS returns, push :end-of-data in the - queue." - (unwind-protect - (let ((*current-batch* (make-batch))) - (map-rows copy :process-row-fn (lambda (row) - (batch-row row copy queue - pre-formatted))) - - ;; we might have the last batch to send over now - (with-slots (start data count) *current-batch* - (when (< 0 count) - (log-message :debug "Sending last batch (~d rows)" count) - (update-stats :data (target copy) - :read count - :rs (elapsed-time-since start)) - (lq:push-queue (list :batch data count nil) queue)))) - - ;; signal we're done - (log-message :debug "End of data.") - (lq:push-queue (list :end-of-data nil nil nil) queue))) diff --git a/src/sources/common/api.lisp b/src/sources/common/api.lisp index 9640952..9e331d9 100644 --- a/src/sources/common/api.lisp +++ b/src/sources/common/api.lisp @@ -39,10 +39,14 @@ "Load data from SOURCE and funcall PROCESS-ROW-FUN for each row found in the SOURCE data. Each ROW is passed as a vector of objects.")) -(defgeneric copy-to-queue (source queue) +(defgeneric queue-raw-data (source queue) + (:documentation "Send raw data from the reader to the worker queue.")) + +(defgeneric format-data-to-copy (source raw-queue formatted-queue + &optional pre-formatted) (:documentation - "Load data from SOURCE and queue each row into QUEUE. Typical - implementation will directly use pgloader.queue:map-push-queue.")) + "Process raw data from RAW-QUEUE and prepare batches of formatted text to + send down to PostgreSQL with the COPY protocol in FORMATTED-QUEUE.")) (defgeneric copy-column-list (source) (:documentation diff --git a/src/sources/common/methods.lisp b/src/sources/common/methods.lisp index 8000609..3221bb2 100644 --- a/src/sources/common/methods.lisp +++ b/src/sources/common/methods.lisp @@ -6,11 +6,35 @@ ;;; ;;; Common API implementation ;;; -(defmethod copy-to-queue ((copy copy) queue) - "Copy data from given COPY definition into lparallel.queue QUEUE" - (let ((start-time (get-internal-real-time))) - (pgloader.queue:map-push-queue copy queue) - (list :source (target copy) start-time))) +(defmethod queue-raw-data ((copy copy) queue) + "Stream data as read by the map-queue method on the COPY argument into QUEUE, + as given." + (log-message :debug "Reader started for ~a" (target copy)) + (let ((start-time (get-internal-real-time))) + (map-rows copy :process-row-fn (lambda (data) + (when (or (eq :data *log-min-messages*) + (eq :data *client-min-messages*)) + (log-message :data "< ~s" data)) + (lq:push-queue data queue))) + (lq:push-queue :end-of-data queue) + + (let ((seconds (elapsed-time-since start-time))) + (log-message :info "Reader for ~a is done in ~fs" (target copy) seconds) + (list :reader (target copy) seconds)))) + +(defmethod format-data-to-copy ((copy copy) raw-queue formatted-queue + &optional pre-formatted) + "Loop over the data in the RAW-QUEUE and prepare it in batches in the + FORMATED-QUEUE, ready to be sent down to PostgreSQL using the COPY protocol." + (log-message :debug "Transformer in action for ~a!" (target copy)) + (let ((start-time (get-internal-real-time))) + + (pgloader.queue:cook-batches copy raw-queue formatted-queue pre-formatted) + + ;; and return + (let ((seconds (elapsed-time-since start-time))) + (log-message :info "Transformer for ~a is done in ~fs" (target copy) seconds) + (list :worker (target copy) seconds)))) (defmethod copy-column-list ((copy copy)) "Default column list is an empty list." @@ -33,9 +57,10 @@ truncate disable-triggers) "Copy data from COPY source into PostgreSQL." - (let* ((lp:*kernel* (or kernel (make-kernel 2))) + (let* ((lp:*kernel* (or kernel (make-kernel 3))) (channel (or channel (lp:make-channel))) - (queue (lq:make-queue :fixed-capacity *concurrent-batches*)) + (rawq (lq:make-queue)) + (fmtq (lq:make-queue :fixed-capacity *concurrent-batches*)) (table-name (format-table-name (target copy)))) (with-stats-collection ((target copy) :dbname (db-name (target-db copy))) @@ -43,7 +68,11 @@ (log-message :info "COPY ~s" table-name) ;; start a tast to read data from the source into the queue - (lp:submit-task channel #'copy-to-queue copy queue) + (lp:submit-task channel #'queue-raw-data copy rawq) + + ;; now start a transformer thread to process raw vectors from our + ;; source into preprocessed batches to send down to PostgreSQL + (lp:submit-task channel #'format-data-to-copy copy rawq fmtq) ;; and start another task to push that data from the queue into ;; PostgreSQL @@ -51,14 +80,14 @@ #'pgloader.pgsql:copy-from-queue (target-db copy) (target copy) - queue + fmtq :columns (copy-column-list copy) :truncate truncate :disable-triggers disable-triggers) ;; now wait until both the tasks are over, and kill the kernel (unless c-s-p - (loop :for tasks :below 2 :do (lp:receive-result channel) + (loop :for tasks :below 3 :do (lp:receive-result channel) :finally (log-message :info "COPY ~s done." table-name) (unless k-s-p (lp:end-kernel)))))))) diff --git a/src/sources/copy.lisp b/src/sources/copy.lisp index 836e52c..65a8c5e 100644 --- a/src/sources/copy.lisp +++ b/src/sources/copy.lisp @@ -72,8 +72,9 @@ (log-message :error "~a" e) (update-stats :data (target copy) :errs 1)))))) -(defmethod copy-to-queue ((copy copy-copy) queue) +(defmethod format-data-to-copy ((copy copy-copy) raw-queue formatted-queue + &optional pre-formatted) "Copy data from given COPY definition into lparallel.queue DATAQ" - (pgloader.queue:map-push-queue copy queue 'pre-formatted)) + (call-next-method copy raw-queue formatted-queue t)) diff --git a/src/sources/mysql/mysql.lisp b/src/sources/mysql/mysql.lisp index 3c8cb77..3ec2fc6 100644 --- a/src/sources/mysql/mysql.lisp +++ b/src/sources/mysql/mysql.lisp @@ -295,7 +295,7 @@ decoding-as materialize-views) "Export MySQL data and Import it into PostgreSQL" - (let* ((copy-kernel (make-kernel 2)) + (let* ((copy-kernel (make-kernel 6)) (copy-channel (let ((lp:*kernel* copy-kernel)) (lp:make-channel))) (table-count 0) idx-kernel idx-channel) @@ -407,15 +407,13 @@ ;; now end the kernels (let ((lp:*kernel* copy-kernel)) (with-stats-collection ("COPY Threads Completion" :section :post) - (loop :for tasks :below (* 2 table-count) - :do (destructuring-bind (task table-name start-time) + (loop :for tasks :below (* 3 table-count) + :do (destructuring-bind (task table-name seconds) (lp:receive-result copy-channel) - (declare (ignorable start-time)) - (log-message :debug "Finished processing ~a for ~s" - task table-name) - (when (eq :target task) - (update-stats :data table-name - :secs (elapsed-time-since start-time))))) + (log-message :info "Finished processing ~a for ~s ~50T~fs" + task table-name seconds) + (when (eq :writer task) + (update-stats :data table-name :secs seconds)))) (lp:end-kernel))) (let ((lp:*kernel* idx-kernel))