Introduce another worker thread: transformers.

We used to have a reader and a writer cooperating concurrently into
loading the data from the source to PostgreSQL. The tranformation of the
data was then the responsibility of the reader thread.

Measurements showed that the PostgreSQL processes were mostly idle,
waiting for the reader to produce data fast enough.

In this patch we introduce a third worker thread that is responsible for
processing the raw data into pre-formatted batches, allowing the reader
to focus on extracting the data only. We now have two lparallel queues
involved in the processing, the raw queue contains the vectors of raw
data directly, and the processed-queue contains batches of properly
encoded strings for the COPY text protocol.

On the test laptop the performance gain isn't noticeable yet, it might
be that we need much larger data sets to see a gain here. At least the
setup isn't detrimental to performances on smaller data sets.

Next improvements are going to allow more features: specialized batch
retry thread and parallel table copy scheduling for database sources.
Let's also continue caring about performances and play with having
several worker and writer threads for each reader. In later patches.

And some day, too, we will need to make the number of workers a user
defined variable rather than something hard coded as today. It's on the
todo list, meanwhile, dear user, consider changing the (make-kernel 6)
into (make-kernel 12) or something else in src/sources/mysql/mysql.lisp,
and consider enlighting me with whatever it is you find by doing so!
This commit is contained in:
Dimitri Fontaine 2015-10-23 00:17:58 +02:00
parent 4f3b3472a2
commit 4df3167da1
7 changed files with 89 additions and 68 deletions

View File

@ -250,8 +250,9 @@
;; Main protocol/API
#:map-rows
#:queue-raw-data
#:format-data-to-copy
#:copy-from
#:copy-to-queue
#:copy-to
#:copy-database
@ -291,7 +292,7 @@
#:map-rows
#:transforms
#:target)
(:export #:map-push-queue))
(:export #:cook-batches))
;;;
@ -330,7 +331,6 @@
#:csv-specs
#:get-pathname
#:copy-csv
#:copy-to-queue
#:copy-from
#:import-database
#:guess-csv-params
@ -349,7 +349,6 @@
#:create-indexes-again)
(:export #:fixed-connection
#:copy-fixed
#:copy-to-queue
#:copy-from))
(defpackage #:pgloader.copy
@ -365,7 +364,6 @@
#:create-indexes-again)
(:export #:copy-connection
#:copy-copy
#:copy-to-queue
#:copy-from))
(defpackage #:pgloader.ixf
@ -383,7 +381,6 @@
(:export #:ixf-connection
#:copy-ixf
#:map-rows
#:copy-to-queue
#:copy-from))
(defpackage #:pgloader.db3
@ -402,7 +399,6 @@
#:copy-db3
#:map-rows
#:copy-to
#:copy-to-queue
#:copy-from))
(defpackage #:pgloader.mysql

View File

@ -62,20 +62,19 @@
(loop
:for (mesg batch read oversized?) := (lq:pop-queue queue)
:until (eq mesg :end-of-data)
:for (rows ws) := (multiple-value-bind (result secs)
(timing
(copy-batch unqualified-table-name
columns batch read))
(list result secs))
:for rows := (copy-batch unqualified-table-name columns batch read)
:do (progn
;; The SBCL implementation needs some Garbage Collection
;; decision making help... and now is a pretty good time.
#+sbcl (when oversized? (sb-ext:gc :full t))
(log-message :debug "copy-batch ~a ~d row~:p~:[~; [oversized]~]"
unqualified-table-name rows oversized?)
(update-stats :data table-name :rows rows :ws ws))))))
(update-stats :data table-name :rows rows))))))
(list :target table-name start-time)))
(let ((seconds (elapsed-time-since start-time)))
(log-message :info "Writer for ~a is done in ~fs" table-name seconds)
(update-stats :data table-name :ws seconds)
(list :writer table-name seconds))))
;;;
;;; Compute how many rows we're going to try loading next, depending on

View File

@ -17,6 +17,29 @@
(defvar *current-batch* nil)
(defun cook-batches (copy raw-queue processed-queue &optional pre-formatted)
"Cook data from raw-queue into batches sent into processed-queue."
(let ((*current-batch* (make-batch)))
(loop :for row := (lq:pop-queue raw-queue)
:until (eq :end-of-data row)
:do (batch-row row copy processed-queue pre-formatted))
;; finish current-batch
(finish-current-batch copy processed-queue)
;; and before calling it a day, push the end-of-data marker
(log-message :debug "End of data.")
(lq:push-queue (list :end-of-data nil nil nil) processed-queue)))
(defun finish-current-batch (copy queue
&optional oversized? (batch *current-batch*))
(with-slots (start data count) batch
(when (< 0 count)
(update-stats :data (target copy)
:read count
:rs (elapsed-time-since start))
(lq:push-queue (list :batch data count oversized?) queue))))
(declaim (inline oversized?))
(defun oversized? (&optional (batch *current-batch*))
"Return a generalized boolean that is true only when BATCH is considered
@ -27,18 +50,11 @@
(defun batch-row (row copy queue &optional pre-formatted)
"Add ROW to the reader batch. When the batch is full, provide it to the
writer."
(when (or (eq :data *log-min-messages*)
(eq :data *client-min-messages*))
(log-message :data "< ~s" row))
(let ((oversized? (oversized? *current-batch*)))
(when (or (= (batch-count *current-batch*) *copy-batch-rows*)
oversized?)
;; close current batch, prepare next one
(with-slots (start data count bytes) *current-batch*
(update-stats :data (target copy)
:read count
:rs (elapsed-time-since start))
(lq:push-queue (list :batch data count oversized?) queue))
(finish-current-batch copy queue oversized?)
(setf *current-batch* (make-batch))))
;; Add ROW to the current BATCH.
@ -61,25 +77,3 @@
(log-message :error "~a" e)
(update-stats :data (target copy) :errs 1))))
(defun map-push-queue (copy queue &optional pre-formatted)
"Apply MAP-ROWS on the COPY instance and a function of ROW that will push
the row into the QUEUE. When MAP-ROWS returns, push :end-of-data in the
queue."
(unwind-protect
(let ((*current-batch* (make-batch)))
(map-rows copy :process-row-fn (lambda (row)
(batch-row row copy queue
pre-formatted)))
;; we might have the last batch to send over now
(with-slots (start data count) *current-batch*
(when (< 0 count)
(log-message :debug "Sending last batch (~d rows)" count)
(update-stats :data (target copy)
:read count
:rs (elapsed-time-since start))
(lq:push-queue (list :batch data count nil) queue))))
;; signal we're done
(log-message :debug "End of data.")
(lq:push-queue (list :end-of-data nil nil nil) queue)))

View File

@ -39,10 +39,14 @@
"Load data from SOURCE and funcall PROCESS-ROW-FUN for each row found in
the SOURCE data. Each ROW is passed as a vector of objects."))
(defgeneric copy-to-queue (source queue)
(defgeneric queue-raw-data (source queue)
(:documentation "Send raw data from the reader to the worker queue."))
(defgeneric format-data-to-copy (source raw-queue formatted-queue
&optional pre-formatted)
(:documentation
"Load data from SOURCE and queue each row into QUEUE. Typical
implementation will directly use pgloader.queue:map-push-queue."))
"Process raw data from RAW-QUEUE and prepare batches of formatted text to
send down to PostgreSQL with the COPY protocol in FORMATTED-QUEUE."))
(defgeneric copy-column-list (source)
(:documentation

View File

@ -6,11 +6,35 @@
;;;
;;; Common API implementation
;;;
(defmethod copy-to-queue ((copy copy) queue)
"Copy data from given COPY definition into lparallel.queue QUEUE"
(let ((start-time (get-internal-real-time)))
(pgloader.queue:map-push-queue copy queue)
(list :source (target copy) start-time)))
(defmethod queue-raw-data ((copy copy) queue)
"Stream data as read by the map-queue method on the COPY argument into QUEUE,
as given."
(log-message :debug "Reader started for ~a" (target copy))
(let ((start-time (get-internal-real-time)))
(map-rows copy :process-row-fn (lambda (data)
(when (or (eq :data *log-min-messages*)
(eq :data *client-min-messages*))
(log-message :data "< ~s" data))
(lq:push-queue data queue)))
(lq:push-queue :end-of-data queue)
(let ((seconds (elapsed-time-since start-time)))
(log-message :info "Reader for ~a is done in ~fs" (target copy) seconds)
(list :reader (target copy) seconds))))
(defmethod format-data-to-copy ((copy copy) raw-queue formatted-queue
&optional pre-formatted)
"Loop over the data in the RAW-QUEUE and prepare it in batches in the
FORMATED-QUEUE, ready to be sent down to PostgreSQL using the COPY protocol."
(log-message :debug "Transformer in action for ~a!" (target copy))
(let ((start-time (get-internal-real-time)))
(pgloader.queue:cook-batches copy raw-queue formatted-queue pre-formatted)
;; and return
(let ((seconds (elapsed-time-since start-time)))
(log-message :info "Transformer for ~a is done in ~fs" (target copy) seconds)
(list :worker (target copy) seconds))))
(defmethod copy-column-list ((copy copy))
"Default column list is an empty list."
@ -33,9 +57,10 @@
truncate
disable-triggers)
"Copy data from COPY source into PostgreSQL."
(let* ((lp:*kernel* (or kernel (make-kernel 2)))
(let* ((lp:*kernel* (or kernel (make-kernel 3)))
(channel (or channel (lp:make-channel)))
(queue (lq:make-queue :fixed-capacity *concurrent-batches*))
(rawq (lq:make-queue))
(fmtq (lq:make-queue :fixed-capacity *concurrent-batches*))
(table-name (format-table-name (target copy))))
(with-stats-collection ((target copy) :dbname (db-name (target-db copy)))
@ -43,7 +68,11 @@
(log-message :info "COPY ~s" table-name)
;; start a tast to read data from the source into the queue
(lp:submit-task channel #'copy-to-queue copy queue)
(lp:submit-task channel #'queue-raw-data copy rawq)
;; now start a transformer thread to process raw vectors from our
;; source into preprocessed batches to send down to PostgreSQL
(lp:submit-task channel #'format-data-to-copy copy rawq fmtq)
;; and start another task to push that data from the queue into
;; PostgreSQL
@ -51,14 +80,14 @@
#'pgloader.pgsql:copy-from-queue
(target-db copy)
(target copy)
queue
fmtq
:columns (copy-column-list copy)
:truncate truncate
:disable-triggers disable-triggers)
;; now wait until both the tasks are over, and kill the kernel
(unless c-s-p
(loop :for tasks :below 2 :do (lp:receive-result channel)
(loop :for tasks :below 3 :do (lp:receive-result channel)
:finally
(log-message :info "COPY ~s done." table-name)
(unless k-s-p (lp:end-kernel))))))))

View File

@ -72,8 +72,9 @@
(log-message :error "~a" e)
(update-stats :data (target copy) :errs 1))))))
(defmethod copy-to-queue ((copy copy-copy) queue)
(defmethod format-data-to-copy ((copy copy-copy) raw-queue formatted-queue
&optional pre-formatted)
"Copy data from given COPY definition into lparallel.queue DATAQ"
(pgloader.queue:map-push-queue copy queue 'pre-formatted))
(call-next-method copy raw-queue formatted-queue t))

View File

@ -295,7 +295,7 @@
decoding-as
materialize-views)
"Export MySQL data and Import it into PostgreSQL"
(let* ((copy-kernel (make-kernel 2))
(let* ((copy-kernel (make-kernel 6))
(copy-channel (let ((lp:*kernel* copy-kernel)) (lp:make-channel)))
(table-count 0)
idx-kernel idx-channel)
@ -407,15 +407,13 @@
;; now end the kernels
(let ((lp:*kernel* copy-kernel))
(with-stats-collection ("COPY Threads Completion" :section :post)
(loop :for tasks :below (* 2 table-count)
:do (destructuring-bind (task table-name start-time)
(loop :for tasks :below (* 3 table-count)
:do (destructuring-bind (task table-name seconds)
(lp:receive-result copy-channel)
(declare (ignorable start-time))
(log-message :debug "Finished processing ~a for ~s"
task table-name)
(when (eq :target task)
(update-stats :data table-name
:secs (elapsed-time-since start-time)))))
(log-message :info "Finished processing ~a for ~s ~50T~fs"
task table-name seconds)
(when (eq :writer task)
(update-stats :data table-name :secs seconds))))
(lp:end-kernel)))
(let ((lp:*kernel* idx-kernel))