diff --git a/src/package.lisp b/src/package.lisp index dfef370..5af1e14 100644 --- a/src/package.lisp +++ b/src/package.lisp @@ -187,10 +187,10 @@ (defpackage #:pgloader.batch (:use #:cl #:pgloader.params #:pgloader.monitor) - (:export #:*current-batch* - #:make-batch + (:export #:make-batch #:batch-row - #:finish-current-batch)) + #:finish-batch + #:push-end-of-data-message)) ;; diff --git a/src/sources/common/api.lisp b/src/sources/common/api.lisp index a9d8e96..d59366b 100644 --- a/src/sources/common/api.lisp +++ b/src/sources/common/api.lisp @@ -39,6 +39,12 @@ "Load data from SOURCE and funcall PROCESS-ROW-FUN for each row found in the SOURCE data. Each ROW is passed as a vector of objects.")) +(defgeneric proprocess-row (source) + (:documentation + "Some source readers have pre-processing to do on the raw data, such as + CSV user-defined field projections to PostgreSQL columns. This function + returns the pre-processing function, which must be a funcallable object.")) + (defgeneric queue-raw-data (source queue) (:documentation "Send raw data from the reader to the worker queue.")) diff --git a/src/sources/common/db-methods.lisp b/src/sources/common/db-methods.lisp index 60a2461..c95b376 100644 --- a/src/sources/common/db-methods.lisp +++ b/src/sources/common/db-methods.lisp @@ -120,6 +120,8 @@ ;;; (defmethod copy-database ((copy db-copy) &key + (worker-count 8) + (concurrency 1) (truncate nil) (disable-triggers nil) (data-only nil) @@ -137,7 +139,7 @@ set-table-oids materialize-views) "Export database source data and Import it into PostgreSQL" - (let* ((copy-kernel (make-kernel 8)) + (let* ((copy-kernel (make-kernel worker-count)) (copy-channel (let ((lp:*kernel* copy-kernel)) (lp:make-channel))) (catalog (fetch-metadata copy @@ -208,6 +210,7 @@ (unless schema-only (incf table-count) (copy-from table-source + :concurrency concurrency :kernel copy-kernel :channel copy-channel :disable-triggers disable-triggers)) @@ -230,7 +233,8 @@ ;; now end the kernels (end-kernels copy-kernel copy-channel idx-kernel idx-channel - table-count (count-indexes catalog)) + table-count (count-indexes catalog) + :concurrency concurrency) ;; ;; Complete the PostgreSQL database before handing over. @@ -251,16 +255,16 @@ ;;; ;;; Lower level tools ;;; -(defun end-kernels (copy-kernel copy-channel - idx-kernel idx-channel - table-count index-count) +(defun end-kernels (copy-kernel copy-channel idx-kernel idx-channel + table-count index-count + &key (concurrency 2)) "Terminate the lparallel kernels, waiting for all threads." (when copy-kernel (let ((lp:*kernel* copy-kernel)) (with-stats-collection ("COPY Threads Completion" :section :post :use-result-as-read t :use-result-as-rows t) - (let ((workers-count (* 4 table-count))) + (let ((workers-count (* table-count (task-count concurrency)))) (loop :for tasks :below workers-count :do (destructuring-bind (task table-name seconds) (lp:receive-result copy-channel) @@ -270,7 +274,7 @@ (update-stats :data table-name :secs seconds)))) (prog1 workers-count - (lp:end-kernel)))))) + (lp:end-kernel :wait nil)))))) (when idx-kernel (let ((lp:*kernel* idx-kernel)) diff --git a/src/sources/common/md-methods.lisp b/src/sources/common/md-methods.lisp index aec8974..2de1c19 100644 --- a/src/sources/common/md-methods.lisp +++ b/src/sources/common/md-methods.lisp @@ -36,15 +36,15 @@ (log-message :debug "Parsed header columns ~s" (fields copy))) - ;; read in the text file, split it into columns, process NULL - ;; columns the way postmodern expects them, and call - ;; PROCESS-ROW-FN on them - (let ((reformat-then-process - (reformat-then-process :fields (fields copy) - :columns (columns copy) - :target (target copy) - :process-row-fn process-row-fn))) - (process-rows copy input reformat-then-process)))))) + ;; read in the text file, split it into columns + (process-rows copy input process-row-fn))))) + +(defmethod preprocess-row ((copy md-copy)) + "The file based readers possibly have extra work to do with user defined + fields to columns projections (mapping)." + (reformat-then-process :fields (fields copy) + :columns (columns copy) + :target (target copy))) (defmethod copy-column-list ((copy md-copy)) "We did reformat-then-process the column list, so we now send them in the diff --git a/src/sources/common/methods.lisp b/src/sources/common/methods.lisp index 24ad738..3b696df 100644 --- a/src/sources/common/methods.lisp +++ b/src/sources/common/methods.lisp @@ -6,76 +6,100 @@ ;;; ;;; Common API implementation ;;; -(defmethod queue-raw-data ((copy copy) queue) +(defmethod queue-raw-data ((copy copy) queue-list) "Stream data as read by the map-queue method on the COPY argument into QUEUE, as given." (log-message :debug "Reader started for ~a" (format-table-name (target copy))) - (let ((start-time (get-internal-real-time)) - (*current-batch* (make-batch))) - (map-rows copy :process-row-fn (lambda (row) - (when (or (eq :data *log-min-messages*) - (eq :data *client-min-messages*)) - (log-message :data "< ~s" row)) - (batch-row row (target copy) queue))) - ;; last batch - (finish-current-batch (target copy) queue) + (let* ((start-time (get-internal-real-time)) + (blist (loop :for queue :in queue-list :collect (make-batch))) + (bclist (nconc blist blist)) ; build a circular list + (qlist (copy-list queue-list)) + (qclist (nconc qlist qlist)) ; build a circular list + (process-row + (lambda (row) + (when (or (eq :data *log-min-messages*) + (eq :data *client-min-messages*)) + (log-message :data "< ~s" row)) + (prog1 + (setf (car bclist) ; batch-row might create a new batch + (batch-row (car bclist) row (target copy) (car qclist))) + ;; round robin on batches and queues + (setf bclist (cdr bclist) + qclist (cdr qclist)))))) - ;; mark end of stream - (lq:push-queue (list :end-of-data nil nil nil) queue) + ;; call the source-specific method for reading input data + (map-rows copy :process-row-fn process-row) + + ;; process last batches and send them to queues + ;; and mark end of stream + (loop :repeat (length queue-list) + :for batch :in blist + :for queue :in qlist + :do (progn + (finish-batch batch (target copy) queue) + (push-end-of-data-message queue))) (let ((seconds (elapsed-time-since start-time))) - (log-message :info "Reader for ~a is done in ~6$s" + (log-message :debug "Reader for ~a is done in ~6$s" (format-table-name (target copy)) seconds) - (list :reader (target copy) seconds)))) + (list :reader (target copy) seconds)))) -(defmethod format-data-to-copy ((copy copy) raw-queue formatted-queue +(defmethod format-data-to-copy ((copy copy) input-queue output-queue &optional pre-formatted) "Loop over the data in the RAW-QUEUE and prepare it in batches in the FORMATED-QUEUE, ready to be sent down to PostgreSQL using the COPY protocol." - (log-message :debug "Transformer in action for ~a!" + (log-message :debug "Transformer ~a in action for ~a!" + (lp:kernel-worker-index) (format-table-name (target copy))) - (let ((start-time (get-internal-real-time))) + (let* ((start-time (get-internal-real-time)) + (preprocess (preprocess-row copy))) - (loop :for (mesg batch count oversized?) := (lq:pop-queue raw-queue) + (loop :for (mesg batch count oversized?) := (lq:pop-queue input-queue) :until (eq :end-of-data mesg) :do (let ((batch-start-time (get-internal-real-time))) ;; transform each row of the batch into a copy-string (loop :for i :below count - :do (let ((copy-string - (handler-case - (with-output-to-string (s) - (format-vector-row s - (aref batch i) - (transforms copy) - pre-formatted)) - (condition (e) - (log-message :error "~a" e) - (update-stats :data (target copy) :errs 1) - nil)))) + :do (let* ((row (if preprocess + (funcall preprocess (aref batch i)) + (aref batch i))) + (copy-data + (handler-case + (format-vector-row row + (transforms copy) + pre-formatted) + (condition (e) + (log-message :error "~a" e) + (update-stats :data (target copy) :errs 1) + nil)))) (when (or (eq :data *log-min-messages*) (eq :data *client-min-messages*)) - (log-message :data "> ~s" copy-string)) + (log-message :data "> ~s" copy-data)) - (setf (aref batch i) copy-string))) + (setf (aref batch i) copy-data))) ;; the batch is ready, log about it (log-message :debug "format-data-to-copy[~a] ~d row in ~6$s" (lp:kernel-worker-index) count (elapsed-time-since batch-start-time)) - ;; and send the formatted batch of copy-strings down to PostgreSQL - (lq:push-queue (list mesg batch count oversized?) formatted-queue))) - ;; mark end of stream, twice because we hardcode 2 COPY processes, see below - (lq:push-queue (list :end-of-data nil nil nil) formatted-queue) - (lq:push-queue (list :end-of-data nil nil nil) formatted-queue) + ;; and send the formatted batch of copy-strings down to PostgreSQL + (lq:push-queue (list mesg batch count oversized?) output-queue))) + + ;; mark end of stream + (push-end-of-data-message output-queue) ;; and return (let ((seconds (elapsed-time-since start-time))) - (log-message :info "Transformer for ~a is done in ~6$s" + (log-message :info "Transformer[~a] for ~a is done in ~6$s" + (lp:kernel-worker-index) (format-table-name (target copy)) seconds) (list :worker (target copy) seconds)))) +(defmethod preprocess-row ((copy copy)) + "The default preprocessing of raw data is to do nothing." + nil) + (defmethod copy-column-list ((copy copy)) "Default column list is an empty list." nil) @@ -90,47 +114,62 @@ (format-vector-row text-file row (transforms copy))))) (map-rows copy :process-row-fn row-fn)))) +(defun task-count (concurrency) + "Return how many threads we are going to start given a number of WORKERS." + (+ 1 concurrency concurrency)) + (defmethod copy-from ((copy copy) &key (kernel nil k-s-p) (channel nil c-s-p) + (worker-count 8) + (concurrency 2) truncate disable-triggers) - "Copy data from COPY source into PostgreSQL." - (let* ((lp:*kernel* (or kernel (make-kernel 4))) - (channel (or channel (lp:make-channel))) - (rawq (lq:make-queue :fixed-capacity *concurrent-batches*)) - (fmtq (lq:make-queue :fixed-capacity *concurrent-batches*)) - (table-name (format-table-name (target copy)))) + "Copy data from COPY source into PostgreSQL. + + We allow WORKER-COUNT simultaneous workers to be active at the same time + in the context of this COPY object. A single unit of work consist of + several kinds of workers: + + - a reader getting raw data from the COPY source with `map-rows', + - N transformers preparing raw data for PostgreSQL COPY protocol, + - N writers sending the data down to PostgreSQL. + + The N here is setup to the CONCURRENCY parameter: with a CONCURRENCY of + 2, we start (+ 1 2 2) = 5 concurrent tasks, with a CONCURRENCY of 4 we + start (+ 1 4 4) = 9 concurrent tasks, of which only WORKER-COUNT may be + active simultaneously." + (let* ((table-name (format-table-name (target copy))) + (lp:*kernel* (or kernel (make-kernel worker-count))) + (channel (or channel (lp:make-channel))) + ;; Now, prepare data queues for workers: + ;; reader -> transformers -> writers + (rawqs (loop :repeat concurrency :collect + (lq:make-queue :fixed-capacity *concurrent-batches*))) + (fmtqs (loop :repeat concurrency :collect + (lq:make-queue :fixed-capacity *concurrent-batches*)))) (with-stats-collection ((target copy) :dbname (db-name (target-db copy))) (lp:task-handler-bind () ;; ((error #'lp:invoke-transfer-error)) (log-message :info "COPY ~s" table-name) - ;; start a tast to read data from the source into the queue - (lp:submit-task channel #'queue-raw-data copy rawq) + ;; start a task to read data from the source into the queue + (lp:submit-task channel #'queue-raw-data copy rawqs) - ;; now start a transformer thread to process raw vectors from our + ;; now start transformer threads to process raw vectors from our ;; source into preprocessed batches to send down to PostgreSQL - (lp:submit-task channel #'format-data-to-copy copy rawq fmtq) + (loop :for rawq :in rawqs + :for fmtq :in fmtqs + :do (lp:submit-task channel #'format-data-to-copy copy rawq fmtq)) - ;; And start two tasks to push that data from the queue into - ;; PostgreSQL; Andres Freund research/benchmarks show that in every - ;; PostgreSQL releases up to 9.5 included the highest throughput of - ;; COPY TO the same table is achieved with 2 concurrent clients... - ;; - ;; See Extension Lock Scalability slide in - ;; http://www.anarazel.de/talks/pgconf-eu-2015-10-30/concurrency.pdf - ;; - ;; Let's just hardcode 2 threads for that then. - ;; ;; Also, we need to do the TRUNCATE here before starting the ;; threads, so that it's done just once. (when truncate (truncate-tables (clone-connection (target-db copy)) (target copy))) - (loop :for w :below 2 + (loop :for fmtq :in fmtqs :do (lp:submit-task channel #'pgloader.pgsql:copy-from-queue (clone-connection (target-db copy)) @@ -141,7 +180,5 @@ ;; now wait until both the tasks are over, and kill the kernel (unless c-s-p - (loop :repeat (lp:kernel-worker-count) - :do (lp:receive-result channel) - :finally (progn (log-message :info "COPY ~s done." table-name) - (unless k-s-p (lp:end-kernel))))))))) + (log-message :info "COPY ~s done." table-name) + (unless k-s-p (lp:end-kernel :wait t))))))) diff --git a/src/sources/common/project-fields.lisp b/src/sources/common/project-fields.lisp index 89636df..29c6c74 100644 --- a/src/sources/common/project-fields.lisp +++ b/src/sources/common/project-fields.lisp @@ -119,7 +119,7 @@ ;; allow for some debugging (if compile (compile nil projection) projection)))) -(defun reformat-then-process (&key fields columns target process-row-fn) +(defun reformat-then-process (&key fields columns target) "Return a lambda form to apply to each row we read. The lambda closes over the READ paramater, which is a counter of how many @@ -130,12 +130,10 @@ (if (or (null row) (and (null (car row)) (null (cdr row)))) (log-message :notice "Skipping empty line.") - (let ((projected-vector - (handler-case - (funcall projection row) - (condition (e) - (update-stats :data target :errs 1) - (log-message :error "Could not read input: ~a" e))))) - (when projected-vector - (funcall process-row-fn projected-vector))))))) + + (handler-case + (funcall projection row) + (condition (e) + (update-stats :data target :errs 1) + (log-message :error "Could not read input: ~a" e))))))) diff --git a/src/utils/batch.lisp b/src/utils/batch.lisp index 7c65b27..e51d4ec 100644 --- a/src/utils/batch.lisp +++ b/src/utils/batch.lisp @@ -10,18 +10,14 @@ ;;; (defstruct batch (start (get-internal-real-time) :type fixnum) - (data (make-array *copy-batch-rows* :element-type 'simple-string) - :type (vector simple-string *)) + (data (make-array *copy-batch-rows*)) (count 0 :type fixnum) (bytes 0 :type fixnum)) -(defvar *current-batch* nil) - -(defun finish-current-batch (target queue - &optional oversized? (batch *current-batch*)) +(defun finish-batch (batch target queue &optional oversized?) (with-slots (start data count) batch (when (< 0 count) - (log-message :debug "finish-current-batch[~a] ~d row~:p in ~6$s" + (log-message :debug "finish-batch[~a] ~d row~:p in ~6$s" (lp:kernel-worker-index) count (elapsed-time-since start)) (update-stats :data target @@ -29,24 +25,36 @@ :rs (elapsed-time-since start)) (lq:push-queue (list :batch data count oversized?) queue)))) +(defun push-end-of-data-message (queue) + "Push a fake batch marker to signal end-of-data in QUEUE." + ;; the message must look like the finish-batch message overall + (lq:push-queue (list :end-of-data nil nil nil) queue)) + (declaim (inline oversized?)) -(defun oversized? (&optional (batch *current-batch*)) +(defun oversized? (batch) "Return a generalized boolean that is true only when BATCH is considered over-sized when its size in BYTES is compared *copy-batch-size*." (and *copy-batch-size* ; defaults to nil (<= *copy-batch-size* (batch-bytes batch)))) -(defun batch-row (row target queue) +(defun batch-row (batch row target queue) "Add ROW to the reader batch. When the batch is full, provide it to the writer." - (let ((oversized? (oversized? *current-batch*))) - (when (or (= (batch-count *current-batch*) *copy-batch-rows*) - oversized?) - ;; close current batch, prepare next one - (finish-current-batch target queue oversized?) - (setf *current-batch* (make-batch)))) + (let ((maybe-new-batch + (let ((oversized? (oversized? batch))) + (if (or (= (batch-count batch) *copy-batch-rows*) + oversized?) + (progn + ;; close current batch, prepare next one + (finish-batch batch target queue oversized?) + (make-batch)) - (with-slots (data count bytes) *current-batch* - (setf (aref data count) row) - (incf count))) + ;; return given batch, it's still current + batch)))) + + (with-slots (data count bytes) maybe-new-batch + (setf (aref data count) row) + (incf count)) + + maybe-new-batch))