diff --git a/src/package.lisp b/src/package.lisp index ddda5a8..b55fde3 100644 --- a/src/package.lisp +++ b/src/package.lisp @@ -250,8 +250,9 @@ ;; Main protocol/API #:map-rows + #:queue-raw-data + #:format-data-to-copy #:copy-from - #:copy-to-queue #:copy-to #:copy-database @@ -291,7 +292,7 @@ #:map-rows #:transforms #:target) - (:export #:map-push-queue)) + (:export #:cook-batches)) ;;; @@ -330,7 +331,6 @@ #:csv-specs #:get-pathname #:copy-csv - #:copy-to-queue #:copy-from #:import-database #:guess-csv-params @@ -349,7 +349,6 @@ #:create-indexes-again) (:export #:fixed-connection #:copy-fixed - #:copy-to-queue #:copy-from)) (defpackage #:pgloader.copy @@ -365,7 +364,6 @@ #:create-indexes-again) (:export #:copy-connection #:copy-copy - #:copy-to-queue #:copy-from)) (defpackage #:pgloader.ixf @@ -383,7 +381,6 @@ (:export #:ixf-connection #:copy-ixf #:map-rows - #:copy-to-queue #:copy-from)) (defpackage #:pgloader.db3 @@ -402,7 +399,6 @@ #:copy-db3 #:map-rows #:copy-to - #:copy-to-queue #:copy-from)) (defpackage #:pgloader.mysql diff --git a/src/pgsql/pgsql.lisp b/src/pgsql/pgsql.lisp index 4b90e7b..c7e9556 100644 --- a/src/pgsql/pgsql.lisp +++ b/src/pgsql/pgsql.lisp @@ -62,20 +62,19 @@ (loop :for (mesg batch read oversized?) := (lq:pop-queue queue) :until (eq mesg :end-of-data) - :for (rows ws) := (multiple-value-bind (result secs) - (timing - (copy-batch unqualified-table-name - columns batch read)) - (list result secs)) + :for rows := (copy-batch unqualified-table-name columns batch read) :do (progn ;; The SBCL implementation needs some Garbage Collection ;; decision making help... and now is a pretty good time. #+sbcl (when oversized? (sb-ext:gc :full t)) (log-message :debug "copy-batch ~a ~d row~:p~:[~; [oversized]~]" unqualified-table-name rows oversized?) - (update-stats :data table-name :rows rows :ws ws)))))) + (update-stats :data table-name :rows rows)))))) - (list :target table-name start-time))) + (let ((seconds (elapsed-time-since start-time))) + (log-message :info "Writer for ~a is done in ~fs" table-name seconds) + (update-stats :data table-name :ws seconds) + (list :writer table-name seconds)))) ;;; ;;; Compute how many rows we're going to try loading next, depending on diff --git a/src/queue.lisp b/src/queue.lisp index fec4251..00c9be0 100644 --- a/src/queue.lisp +++ b/src/queue.lisp @@ -17,6 +17,29 @@ (defvar *current-batch* nil) +(defun cook-batches (copy raw-queue processed-queue &optional pre-formatted) + "Cook data from raw-queue into batches sent into processed-queue." + (let ((*current-batch* (make-batch))) + (loop :for row := (lq:pop-queue raw-queue) + :until (eq :end-of-data row) + :do (batch-row row copy processed-queue pre-formatted)) + + ;; finish current-batch + (finish-current-batch copy processed-queue) + + ;; and before calling it a day, push the end-of-data marker + (log-message :debug "End of data.") + (lq:push-queue (list :end-of-data nil nil nil) processed-queue))) + +(defun finish-current-batch (copy queue + &optional oversized? (batch *current-batch*)) + (with-slots (start data count) batch + (when (< 0 count) + (update-stats :data (target copy) + :read count + :rs (elapsed-time-since start)) + (lq:push-queue (list :batch data count oversized?) queue)))) + (declaim (inline oversized?)) (defun oversized? (&optional (batch *current-batch*)) "Return a generalized boolean that is true only when BATCH is considered @@ -27,18 +50,11 @@ (defun batch-row (row copy queue &optional pre-formatted) "Add ROW to the reader batch. When the batch is full, provide it to the writer." - (when (or (eq :data *log-min-messages*) - (eq :data *client-min-messages*)) - (log-message :data "< ~s" row)) (let ((oversized? (oversized? *current-batch*))) (when (or (= (batch-count *current-batch*) *copy-batch-rows*) oversized?) ;; close current batch, prepare next one - (with-slots (start data count bytes) *current-batch* - (update-stats :data (target copy) - :read count - :rs (elapsed-time-since start)) - (lq:push-queue (list :batch data count oversized?) queue)) + (finish-current-batch copy queue oversized?) (setf *current-batch* (make-batch)))) ;; Add ROW to the current BATCH. @@ -61,25 +77,3 @@ (log-message :error "~a" e) (update-stats :data (target copy) :errs 1)))) -(defun map-push-queue (copy queue &optional pre-formatted) - "Apply MAP-ROWS on the COPY instance and a function of ROW that will push - the row into the QUEUE. When MAP-ROWS returns, push :end-of-data in the - queue." - (unwind-protect - (let ((*current-batch* (make-batch))) - (map-rows copy :process-row-fn (lambda (row) - (batch-row row copy queue - pre-formatted))) - - ;; we might have the last batch to send over now - (with-slots (start data count) *current-batch* - (when (< 0 count) - (log-message :debug "Sending last batch (~d rows)" count) - (update-stats :data (target copy) - :read count - :rs (elapsed-time-since start)) - (lq:push-queue (list :batch data count nil) queue)))) - - ;; signal we're done - (log-message :debug "End of data.") - (lq:push-queue (list :end-of-data nil nil nil) queue))) diff --git a/src/sources/common/api.lisp b/src/sources/common/api.lisp index 9640952..9e331d9 100644 --- a/src/sources/common/api.lisp +++ b/src/sources/common/api.lisp @@ -39,10 +39,14 @@ "Load data from SOURCE and funcall PROCESS-ROW-FUN for each row found in the SOURCE data. Each ROW is passed as a vector of objects.")) -(defgeneric copy-to-queue (source queue) +(defgeneric queue-raw-data (source queue) + (:documentation "Send raw data from the reader to the worker queue.")) + +(defgeneric format-data-to-copy (source raw-queue formatted-queue + &optional pre-formatted) (:documentation - "Load data from SOURCE and queue each row into QUEUE. Typical - implementation will directly use pgloader.queue:map-push-queue.")) + "Process raw data from RAW-QUEUE and prepare batches of formatted text to + send down to PostgreSQL with the COPY protocol in FORMATTED-QUEUE.")) (defgeneric copy-column-list (source) (:documentation diff --git a/src/sources/common/methods.lisp b/src/sources/common/methods.lisp index 8000609..3221bb2 100644 --- a/src/sources/common/methods.lisp +++ b/src/sources/common/methods.lisp @@ -6,11 +6,35 @@ ;;; ;;; Common API implementation ;;; -(defmethod copy-to-queue ((copy copy) queue) - "Copy data from given COPY definition into lparallel.queue QUEUE" - (let ((start-time (get-internal-real-time))) - (pgloader.queue:map-push-queue copy queue) - (list :source (target copy) start-time))) +(defmethod queue-raw-data ((copy copy) queue) + "Stream data as read by the map-queue method on the COPY argument into QUEUE, + as given." + (log-message :debug "Reader started for ~a" (target copy)) + (let ((start-time (get-internal-real-time))) + (map-rows copy :process-row-fn (lambda (data) + (when (or (eq :data *log-min-messages*) + (eq :data *client-min-messages*)) + (log-message :data "< ~s" data)) + (lq:push-queue data queue))) + (lq:push-queue :end-of-data queue) + + (let ((seconds (elapsed-time-since start-time))) + (log-message :info "Reader for ~a is done in ~fs" (target copy) seconds) + (list :reader (target copy) seconds)))) + +(defmethod format-data-to-copy ((copy copy) raw-queue formatted-queue + &optional pre-formatted) + "Loop over the data in the RAW-QUEUE and prepare it in batches in the + FORMATED-QUEUE, ready to be sent down to PostgreSQL using the COPY protocol." + (log-message :debug "Transformer in action for ~a!" (target copy)) + (let ((start-time (get-internal-real-time))) + + (pgloader.queue:cook-batches copy raw-queue formatted-queue pre-formatted) + + ;; and return + (let ((seconds (elapsed-time-since start-time))) + (log-message :info "Transformer for ~a is done in ~fs" (target copy) seconds) + (list :worker (target copy) seconds)))) (defmethod copy-column-list ((copy copy)) "Default column list is an empty list." @@ -33,9 +57,10 @@ truncate disable-triggers) "Copy data from COPY source into PostgreSQL." - (let* ((lp:*kernel* (or kernel (make-kernel 2))) + (let* ((lp:*kernel* (or kernel (make-kernel 3))) (channel (or channel (lp:make-channel))) - (queue (lq:make-queue :fixed-capacity *concurrent-batches*)) + (rawq (lq:make-queue)) + (fmtq (lq:make-queue :fixed-capacity *concurrent-batches*)) (table-name (format-table-name (target copy)))) (with-stats-collection ((target copy) :dbname (db-name (target-db copy))) @@ -43,7 +68,11 @@ (log-message :info "COPY ~s" table-name) ;; start a tast to read data from the source into the queue - (lp:submit-task channel #'copy-to-queue copy queue) + (lp:submit-task channel #'queue-raw-data copy rawq) + + ;; now start a transformer thread to process raw vectors from our + ;; source into preprocessed batches to send down to PostgreSQL + (lp:submit-task channel #'format-data-to-copy copy rawq fmtq) ;; and start another task to push that data from the queue into ;; PostgreSQL @@ -51,14 +80,14 @@ #'pgloader.pgsql:copy-from-queue (target-db copy) (target copy) - queue + fmtq :columns (copy-column-list copy) :truncate truncate :disable-triggers disable-triggers) ;; now wait until both the tasks are over, and kill the kernel (unless c-s-p - (loop :for tasks :below 2 :do (lp:receive-result channel) + (loop :for tasks :below 3 :do (lp:receive-result channel) :finally (log-message :info "COPY ~s done." table-name) (unless k-s-p (lp:end-kernel)))))))) diff --git a/src/sources/copy.lisp b/src/sources/copy.lisp index 836e52c..65a8c5e 100644 --- a/src/sources/copy.lisp +++ b/src/sources/copy.lisp @@ -72,8 +72,9 @@ (log-message :error "~a" e) (update-stats :data (target copy) :errs 1)))))) -(defmethod copy-to-queue ((copy copy-copy) queue) +(defmethod format-data-to-copy ((copy copy-copy) raw-queue formatted-queue + &optional pre-formatted) "Copy data from given COPY definition into lparallel.queue DATAQ" - (pgloader.queue:map-push-queue copy queue 'pre-formatted)) + (call-next-method copy raw-queue formatted-queue t)) diff --git a/src/sources/mysql/mysql.lisp b/src/sources/mysql/mysql.lisp index 3c8cb77..3ec2fc6 100644 --- a/src/sources/mysql/mysql.lisp +++ b/src/sources/mysql/mysql.lisp @@ -295,7 +295,7 @@ decoding-as materialize-views) "Export MySQL data and Import it into PostgreSQL" - (let* ((copy-kernel (make-kernel 2)) + (let* ((copy-kernel (make-kernel 6)) (copy-channel (let ((lp:*kernel* copy-kernel)) (lp:make-channel))) (table-count 0) idx-kernel idx-channel) @@ -407,15 +407,13 @@ ;; now end the kernels (let ((lp:*kernel* copy-kernel)) (with-stats-collection ("COPY Threads Completion" :section :post) - (loop :for tasks :below (* 2 table-count) - :do (destructuring-bind (task table-name start-time) + (loop :for tasks :below (* 3 table-count) + :do (destructuring-bind (task table-name seconds) (lp:receive-result copy-channel) - (declare (ignorable start-time)) - (log-message :debug "Finished processing ~a for ~s" - task table-name) - (when (eq :target task) - (update-stats :data table-name - :secs (elapsed-time-since start-time))))) + (log-message :info "Finished processing ~a for ~s ~50T~fs" + task table-name seconds) + (when (eq :writer task) + (update-stats :data table-name :secs seconds)))) (lp:end-kernel))) (let ((lp:*kernel* idx-kernel))