diff --git a/pgloader.asd b/pgloader.asd index 7f0b844..fbbbe3e 100644 --- a/pgloader.asd +++ b/pgloader.asd @@ -40,7 +40,6 @@ :components ((:file "params") (:file "package" :depends-on ("params")) - (:file "queue" :depends-on ("params" "package")) (:module "monkey" :components @@ -51,6 +50,7 @@ :depends-on ("package" "params") :components ((:file "charsets") + (:file "batch") (:file "threads") (:file "logs") (:file "utils") @@ -121,8 +121,7 @@ "connection" "pgsql" "utils" - "parsers" - "queue") + "parsers") :components ((:module "common" :components diff --git a/src/package.lisp b/src/package.lisp index a2442c8..6bb68f5 100644 --- a/src/package.lisp +++ b/src/package.lisp @@ -110,6 +110,13 @@ #:show-encodings #:make-external-format)) +(defpackage #:pgloader.batch + (:use #:cl #:pgloader.params #:pgloader.monitor) + (:export #:*current-batch* + #:make-batch + #:batch-row + #:finish-current-batch)) + ;; ;; Not really a source, more a util package to deal with http and zip @@ -226,7 +233,7 @@ (defpackage #:pgloader.sources (:use #:cl #:pgloader.params #:pgloader.utils #:pgloader.connection - #:pgloader.schema #:pgloader.pgsql) + #:pgloader.schema #:pgloader.pgsql #:pgloader.batch) (:import-from #:pgloader.transforms #:precision #:scale @@ -286,17 +293,6 @@ #:*cast-rules* #:cast)) - -(defpackage #:pgloader.queue - (:use #:cl #:pgloader.params #:pgloader.monitor) - (:import-from #:pgloader.pgsql - #:format-vector-row) - (:import-from #:pgloader.sources - #:map-rows - #:transforms - #:target) - (:export #:cook-batches)) - ;;; ;;; Other utilities @@ -325,7 +321,7 @@ (defpackage #:pgloader.csv (:use #:cl #:pgloader.params #:pgloader.utils #:pgloader.connection - #:pgloader.sources #:pgloader.queue) + #:pgloader.sources) (:import-from #:pgloader.pgsql #:maybe-drop-indexes #:create-indexes-again) @@ -342,7 +338,7 @@ (defpackage #:pgloader.fixed (:use #:cl #:pgloader.params #:pgloader.utils #:pgloader.connection - #:pgloader.sources #:pgloader.queue) + #:pgloader.sources) (:import-from #:pgloader.csv #:csv-connection #:specs @@ -357,7 +353,7 @@ (defpackage #:pgloader.copy (:use #:cl #:pgloader.params #:pgloader.utils #:pgloader.connection - #:pgloader.sources #:pgloader.queue) + #:pgloader.sources) (:import-from #:pgloader.csv #:csv-connection #:specs @@ -372,7 +368,7 @@ (defpackage #:pgloader.ixf (:use #:cl #:pgloader.params #:pgloader.utils #:pgloader.connection - #:pgloader.sources #:pgloader.queue) + #:pgloader.sources) (:import-from #:pgloader.pgsql #:with-pgsql-transaction #:pgsql-execute @@ -389,7 +385,7 @@ (defpackage #:pgloader.db3 (:use #:cl #:pgloader.params #:pgloader.utils #:pgloader.connection - #:pgloader.sources #:pgloader.queue) + #:pgloader.sources) (:import-from #:pgloader.pgsql #:with-pgsql-transaction #:pgsql-execute @@ -407,7 +403,7 @@ (defpackage #:pgloader.mysql (:use #:cl #:pgloader.params #:pgloader.utils #:pgloader.connection - #:pgloader.sources #:pgloader.queue) + #:pgloader.sources) (:import-from #:pgloader.transforms #:precision #:scale) (:import-from #:pgloader.pgsql #:with-pgsql-connection @@ -448,7 +444,7 @@ (defpackage #:pgloader.sqlite (:use #:cl #:pgloader.params #:pgloader.utils #:pgloader.connection - #:pgloader.sources #:pgloader.queue) + #:pgloader.sources) (:import-from #:pgloader.transforms #:precision #:scale) (:import-from #:pgloader.pgsql #:with-pgsql-transaction @@ -476,7 +472,7 @@ (defpackage #:pgloader.mssql (:use #:cl #:pgloader.params #:pgloader.utils #:pgloader.connection - #:pgloader.sources #:pgloader.queue) + #:pgloader.sources) (:import-from #:pgloader.transforms #:precision #:scale) (:import-from #:pgloader.pgsql #:with-pgsql-connection diff --git a/src/pgsql/pgsql.lisp b/src/pgsql/pgsql.lisp index e08bbbd..980dc5a 100644 --- a/src/pgsql/pgsql.lisp +++ b/src/pgsql/pgsql.lisp @@ -18,13 +18,11 @@ ;; of a data error being signaled, that's the BATCH here. (let ((copier (cl-postgres:open-db-writer db table-name columns))) (unwind-protect - (loop for i below batch-rows - for copy-string = (aref batch i) - do (when (or (eq :data *log-min-messages*) - (eq :data *client-min-messages*)) - (log-message :data "> ~s" copy-string)) - do (cl-postgres:db-write-row copier nil copy-string) - finally (return batch-rows)) + (loop :for i :below batch-rows + :for copy-string := (aref batch i) + :do (when copy-string + (cl-postgres:db-write-row copier nil copy-string)) + :finally (return batch-rows)) (cl-postgres:close-db-writer copier)))) ;; If PostgreSQL signals a data error, process the batch by isolating @@ -62,7 +60,7 @@ (loop :for (mesg batch read oversized?) := (lq:pop-queue queue) - :until (eq mesg :end-of-data) + :until (eq :end-of-data mesg) :for rows := (copy-batch unqualified-table-name columns batch read) :do (progn ;; The SBCL implementation needs some Garbage Collection diff --git a/src/queue.lisp b/src/queue.lisp deleted file mode 100644 index 470e411..0000000 --- a/src/queue.lisp +++ /dev/null @@ -1,82 +0,0 @@ -;;; -;;; Tools to handle internal queueing, using lparallel.queue -;;; -(in-package :pgloader.queue) - -;;; -;;; The pgloader architectures uses a reader thread and a writer thread. The -;;; reader fills in batches of data from the source of data, and the writer -;;; pushes the data down to PostgreSQL using the COPY protocol. -;;; -(defstruct batch - (start (get-internal-real-time) :type fixnum) - (data (make-array *copy-batch-rows* :element-type 'simple-string) - :type (vector simple-string *)) - (count 0 :type fixnum) - (bytes 0 :type fixnum)) - -(defvar *current-batch* nil) - -(defun cook-batches (copy raw-queue processed-queue &optional pre-formatted) - "Cook data from raw-queue into batches sent into processed-queue." - (let ((*current-batch* (make-batch))) - (loop :for row := (lq:pop-queue raw-queue) - :until (eq :end-of-data row) - :do (batch-row row copy processed-queue pre-formatted)) - - ;; finish current-batch - (finish-current-batch copy processed-queue) - - ;; and before calling it a day, push the end-of-data marker - (log-message :debug "End of data.") - - ;; we hardcode 2 parallel COPY writers, see copy-from implementation. - (lq:push-queue (list :end-of-data nil nil nil) processed-queue) - (lq:push-queue (list :end-of-data nil nil nil) processed-queue))) - -(defun finish-current-batch (copy queue - &optional oversized? (batch *current-batch*)) - (with-slots (start data count) batch - (when (< 0 count) - (update-stats :data (target copy) - :read count - :rs (elapsed-time-since start)) - (lq:push-queue (list :batch data count oversized?) queue)))) - -(declaim (inline oversized?)) -(defun oversized? (&optional (batch *current-batch*)) - "Return a generalized boolean that is true only when BATCH is considered - over-sized when its size in BYTES is compared *copy-batch-size*." - (and *copy-batch-size* ; defaults to nil - (<= *copy-batch-size* (batch-bytes batch)))) - -(defun batch-row (row copy queue &optional pre-formatted) - "Add ROW to the reader batch. When the batch is full, provide it to the - writer." - (let ((oversized? (oversized? *current-batch*))) - (when (or (= (batch-count *current-batch*) *copy-batch-rows*) - oversized?) - ;; close current batch, prepare next one - (finish-current-batch copy queue oversized?) - (setf *current-batch* (make-batch)))) - - ;; Add ROW to the current BATCH. - ;; - ;; All the data transformation takes place here, so that we batch fully - ;; formed COPY TEXT string ready to go in the PostgreSQL stream. - (handler-case - (with-slots (data count bytes) *current-batch* - (let ((copy-string - (with-output-to-string (s) - (let ((c-s-bytes (format-vector-row s row - (transforms copy) - pre-formatted))) - (when *copy-batch-size* ; running under memory watch - (incf bytes c-s-bytes)))))) - (setf (aref data count) copy-string) - (incf count))) - - (condition (e) - (log-message :error "~a" e) - (update-stats :data (target copy) :errs 1)))) - diff --git a/src/sources/common/methods.lisp b/src/sources/common/methods.lisp index d67a072..42b9127 100644 --- a/src/sources/common/methods.lisp +++ b/src/sources/common/methods.lisp @@ -10,13 +10,18 @@ "Stream data as read by the map-queue method on the COPY argument into QUEUE, as given." (log-message :debug "Reader started for ~a" (target copy)) - (let ((start-time (get-internal-real-time))) - (map-rows copy :process-row-fn (lambda (data) + (let ((start-time (get-internal-real-time)) + (*current-batch* (make-batch))) + (map-rows copy :process-row-fn (lambda (row) (when (or (eq :data *log-min-messages*) (eq :data *client-min-messages*)) - (log-message :data "< ~s" data)) - (lq:push-queue data queue))) - (lq:push-queue :end-of-data queue) + (log-message :data "< ~s" row)) + (batch-row row (target copy) queue))) + ;; last batch + (finish-current-batch (target copy) queue) + + ;; mark end of stream + (lq:push-queue (list :end-of-data nil nil nil) queue) (let ((seconds (elapsed-time-since start-time))) (log-message :info "Reader for ~a is done in ~6$s" (target copy) seconds) @@ -29,12 +34,44 @@ (log-message :debug "Transformer in action for ~a!" (target copy)) (let ((start-time (get-internal-real-time))) - (pgloader.queue:cook-batches copy raw-queue formatted-queue pre-formatted) + (loop :for (mesg batch count oversized?) := (lq:pop-queue raw-queue) + :until (eq :end-of-data mesg) + :do (let ((batch-start-time (get-internal-real-time))) + ;; transform each row of the batch into a copy-string + (loop :for i :below count + :do (let ((copy-string + (handler-case + (with-output-to-string (s) + (format-vector-row s + (aref batch i) + (transforms copy) + pre-formatted)) + (condition (e) + (log-message :error "~a" e) + (update-stats :data (target copy) :errs 1) + nil)))) + (when (or (eq :data *log-min-messages*) + (eq :data *client-min-messages*)) + (log-message :data "> ~s" copy-string)) + + (setf (aref batch i) copy-string))) + + ;; the batch is ready, log about it + (log-message :debug "format-data-to-copy[~a] ~d row in ~6$s" + (lp:kernel-worker-index) + count + (elapsed-time-since batch-start-time)) + ;; and send the formatted batch of copy-strings down to PostgreSQL + (lq:push-queue (list mesg batch count oversized?) formatted-queue))) + + ;; mark end of stream, twice because we hardcode 2 COPY processes, see below + (lq:push-queue (list :end-of-data nil nil nil) formatted-queue) + (lq:push-queue (list :end-of-data nil nil nil) formatted-queue) ;; and return (let ((seconds (elapsed-time-since start-time))) - (log-message :info "Transformer for ~a is done in ~6$s" (target copy) seconds) - (list :worker (target copy) seconds)))) + (log-message :info "Transformer for ~a is done in ~6$s" (target copy) seconds) + (list :worker (target copy) seconds)))) (defmethod copy-column-list ((copy copy)) "Default column list is an empty list." @@ -59,7 +96,7 @@ "Copy data from COPY source into PostgreSQL." (let* ((lp:*kernel* (or kernel (make-kernel 4))) (channel (or channel (lp:make-channel))) - (rawq (lq:make-queue)) + (rawq (lq:make-queue :fixed-capacity *concurrent-batches*)) (fmtq (lq:make-queue :fixed-capacity *concurrent-batches*)) (table-name (format-table-name (target copy)))) diff --git a/src/utils/batch.lisp b/src/utils/batch.lisp new file mode 100644 index 0000000..7c65b27 --- /dev/null +++ b/src/utils/batch.lisp @@ -0,0 +1,52 @@ +;;; +;;; Tools to handle internal queueing, using lparallel.queue +;;; +(in-package :pgloader.batch) + +;;; +;;; The pgloader architectures uses a reader thread and a writer thread. The +;;; reader fills in batches of data from the source of data, and the writer +;;; pushes the data down to PostgreSQL using the COPY protocol. +;;; +(defstruct batch + (start (get-internal-real-time) :type fixnum) + (data (make-array *copy-batch-rows* :element-type 'simple-string) + :type (vector simple-string *)) + (count 0 :type fixnum) + (bytes 0 :type fixnum)) + +(defvar *current-batch* nil) + +(defun finish-current-batch (target queue + &optional oversized? (batch *current-batch*)) + (with-slots (start data count) batch + (when (< 0 count) + (log-message :debug "finish-current-batch[~a] ~d row~:p in ~6$s" + (lp:kernel-worker-index) count + (elapsed-time-since start)) + (update-stats :data target + :read count + :rs (elapsed-time-since start)) + (lq:push-queue (list :batch data count oversized?) queue)))) + +(declaim (inline oversized?)) +(defun oversized? (&optional (batch *current-batch*)) + "Return a generalized boolean that is true only when BATCH is considered + over-sized when its size in BYTES is compared *copy-batch-size*." + (and *copy-batch-size* ; defaults to nil + (<= *copy-batch-size* (batch-bytes batch)))) + +(defun batch-row (row target queue) + "Add ROW to the reader batch. When the batch is full, provide it to the + writer." + (let ((oversized? (oversized? *current-batch*))) + (when (or (= (batch-count *current-batch*) *copy-batch-rows*) + oversized?) + ;; close current batch, prepare next one + (finish-current-batch target queue oversized?) + (setf *current-batch* (make-batch)))) + + (with-slots (data count bytes) *current-batch* + (setf (aref data count) row) + (incf count))) +