mirror of
https://github.com/dimitri/pgloader.git
synced 2026-05-05 10:56:10 +02:00
Review load parallelism settings.
pgloader parallel workload is still hardcoded, but at least the code now
uses clear parameters as input so that it will be possible in a later
patch to expose them to the end-user.
The notions of workers and concurrency are now handled as follows:
- concurrency is how many tasks are allowed to happen at once, by
default we have a reader thread, a transformer thread and a COPY
thread all actives for each table being loaded,
- worker-count is how many parallel threads are allowed to run
simultaneously and default to 8 currently, which means that in a
typical migration from a database source and given default
concurrency or 1 (3 threads), we might be loaded up to 3 different
tables at any time.
The idea is to expose those settings to the user in the load file and as
command line options (such as --jobs) and see what it gives us. It might
help e.g. use more cores in loading a single CSV file.
As of this patch, there still can only be only one reader thread and the
number of transformer threads must be the same as the number of COPY
threads.
Finally, the CSV-like files user-defined projections are now handled in
the tranformation threads rather than in the reader thread...
This commit is contained in:
parent
94ef8674ec
commit
f256e12a4f
@ -187,10 +187,10 @@
|
||||
|
||||
(defpackage #:pgloader.batch
|
||||
(:use #:cl #:pgloader.params #:pgloader.monitor)
|
||||
(:export #:*current-batch*
|
||||
#:make-batch
|
||||
(:export #:make-batch
|
||||
#:batch-row
|
||||
#:finish-current-batch))
|
||||
#:finish-batch
|
||||
#:push-end-of-data-message))
|
||||
|
||||
|
||||
;;
|
||||
|
||||
@ -39,6 +39,12 @@
|
||||
"Load data from SOURCE and funcall PROCESS-ROW-FUN for each row found in
|
||||
the SOURCE data. Each ROW is passed as a vector of objects."))
|
||||
|
||||
(defgeneric proprocess-row (source)
|
||||
(:documentation
|
||||
"Some source readers have pre-processing to do on the raw data, such as
|
||||
CSV user-defined field projections to PostgreSQL columns. This function
|
||||
returns the pre-processing function, which must be a funcallable object."))
|
||||
|
||||
(defgeneric queue-raw-data (source queue)
|
||||
(:documentation "Send raw data from the reader to the worker queue."))
|
||||
|
||||
|
||||
@ -120,6 +120,8 @@
|
||||
;;;
|
||||
(defmethod copy-database ((copy db-copy)
|
||||
&key
|
||||
(worker-count 8)
|
||||
(concurrency 1)
|
||||
(truncate nil)
|
||||
(disable-triggers nil)
|
||||
(data-only nil)
|
||||
@ -137,7 +139,7 @@
|
||||
set-table-oids
|
||||
materialize-views)
|
||||
"Export database source data and Import it into PostgreSQL"
|
||||
(let* ((copy-kernel (make-kernel 8))
|
||||
(let* ((copy-kernel (make-kernel worker-count))
|
||||
(copy-channel (let ((lp:*kernel* copy-kernel)) (lp:make-channel)))
|
||||
(catalog (fetch-metadata
|
||||
copy
|
||||
@ -208,6 +210,7 @@
|
||||
(unless schema-only
|
||||
(incf table-count)
|
||||
(copy-from table-source
|
||||
:concurrency concurrency
|
||||
:kernel copy-kernel
|
||||
:channel copy-channel
|
||||
:disable-triggers disable-triggers))
|
||||
@ -230,7 +233,8 @@
|
||||
|
||||
;; now end the kernels
|
||||
(end-kernels copy-kernel copy-channel idx-kernel idx-channel
|
||||
table-count (count-indexes catalog))
|
||||
table-count (count-indexes catalog)
|
||||
:concurrency concurrency)
|
||||
|
||||
;;
|
||||
;; Complete the PostgreSQL database before handing over.
|
||||
@ -251,16 +255,16 @@
|
||||
;;;
|
||||
;;; Lower level tools
|
||||
;;;
|
||||
(defun end-kernels (copy-kernel copy-channel
|
||||
idx-kernel idx-channel
|
||||
table-count index-count)
|
||||
(defun end-kernels (copy-kernel copy-channel idx-kernel idx-channel
|
||||
table-count index-count
|
||||
&key (concurrency 2))
|
||||
"Terminate the lparallel kernels, waiting for all threads."
|
||||
(when copy-kernel
|
||||
(let ((lp:*kernel* copy-kernel))
|
||||
(with-stats-collection ("COPY Threads Completion" :section :post
|
||||
:use-result-as-read t
|
||||
:use-result-as-rows t)
|
||||
(let ((workers-count (* 4 table-count)))
|
||||
(let ((workers-count (* table-count (task-count concurrency))))
|
||||
(loop :for tasks :below workers-count
|
||||
:do (destructuring-bind (task table-name seconds)
|
||||
(lp:receive-result copy-channel)
|
||||
@ -270,7 +274,7 @@
|
||||
(update-stats :data table-name :secs seconds))))
|
||||
(prog1
|
||||
workers-count
|
||||
(lp:end-kernel))))))
|
||||
(lp:end-kernel :wait nil))))))
|
||||
|
||||
(when idx-kernel
|
||||
(let ((lp:*kernel* idx-kernel))
|
||||
|
||||
@ -36,15 +36,15 @@
|
||||
|
||||
(log-message :debug "Parsed header columns ~s" (fields copy)))
|
||||
|
||||
;; read in the text file, split it into columns, process NULL
|
||||
;; columns the way postmodern expects them, and call
|
||||
;; PROCESS-ROW-FN on them
|
||||
(let ((reformat-then-process
|
||||
(reformat-then-process :fields (fields copy)
|
||||
:columns (columns copy)
|
||||
:target (target copy)
|
||||
:process-row-fn process-row-fn)))
|
||||
(process-rows copy input reformat-then-process))))))
|
||||
;; read in the text file, split it into columns
|
||||
(process-rows copy input process-row-fn)))))
|
||||
|
||||
(defmethod preprocess-row ((copy md-copy))
|
||||
"The file based readers possibly have extra work to do with user defined
|
||||
fields to columns projections (mapping)."
|
||||
(reformat-then-process :fields (fields copy)
|
||||
:columns (columns copy)
|
||||
:target (target copy)))
|
||||
|
||||
(defmethod copy-column-list ((copy md-copy))
|
||||
"We did reformat-then-process the column list, so we now send them in the
|
||||
|
||||
@ -6,76 +6,100 @@
|
||||
;;;
|
||||
;;; Common API implementation
|
||||
;;;
|
||||
(defmethod queue-raw-data ((copy copy) queue)
|
||||
(defmethod queue-raw-data ((copy copy) queue-list)
|
||||
"Stream data as read by the map-queue method on the COPY argument into QUEUE,
|
||||
as given."
|
||||
(log-message :debug "Reader started for ~a" (format-table-name (target copy)))
|
||||
(let ((start-time (get-internal-real-time))
|
||||
(*current-batch* (make-batch)))
|
||||
(map-rows copy :process-row-fn (lambda (row)
|
||||
(when (or (eq :data *log-min-messages*)
|
||||
(eq :data *client-min-messages*))
|
||||
(log-message :data "< ~s" row))
|
||||
(batch-row row (target copy) queue)))
|
||||
;; last batch
|
||||
(finish-current-batch (target copy) queue)
|
||||
(let* ((start-time (get-internal-real-time))
|
||||
(blist (loop :for queue :in queue-list :collect (make-batch)))
|
||||
(bclist (nconc blist blist)) ; build a circular list
|
||||
(qlist (copy-list queue-list))
|
||||
(qclist (nconc qlist qlist)) ; build a circular list
|
||||
(process-row
|
||||
(lambda (row)
|
||||
(when (or (eq :data *log-min-messages*)
|
||||
(eq :data *client-min-messages*))
|
||||
(log-message :data "< ~s" row))
|
||||
(prog1
|
||||
(setf (car bclist) ; batch-row might create a new batch
|
||||
(batch-row (car bclist) row (target copy) (car qclist)))
|
||||
;; round robin on batches and queues
|
||||
(setf bclist (cdr bclist)
|
||||
qclist (cdr qclist))))))
|
||||
|
||||
;; mark end of stream
|
||||
(lq:push-queue (list :end-of-data nil nil nil) queue)
|
||||
;; call the source-specific method for reading input data
|
||||
(map-rows copy :process-row-fn process-row)
|
||||
|
||||
;; process last batches and send them to queues
|
||||
;; and mark end of stream
|
||||
(loop :repeat (length queue-list)
|
||||
:for batch :in blist
|
||||
:for queue :in qlist
|
||||
:do (progn
|
||||
(finish-batch batch (target copy) queue)
|
||||
(push-end-of-data-message queue)))
|
||||
|
||||
(let ((seconds (elapsed-time-since start-time)))
|
||||
(log-message :info "Reader for ~a is done in ~6$s"
|
||||
(log-message :debug "Reader for ~a is done in ~6$s"
|
||||
(format-table-name (target copy)) seconds)
|
||||
(list :reader (target copy) seconds))))
|
||||
(list :reader (target copy) seconds))))
|
||||
|
||||
(defmethod format-data-to-copy ((copy copy) raw-queue formatted-queue
|
||||
(defmethod format-data-to-copy ((copy copy) input-queue output-queue
|
||||
&optional pre-formatted)
|
||||
"Loop over the data in the RAW-QUEUE and prepare it in batches in the
|
||||
FORMATED-QUEUE, ready to be sent down to PostgreSQL using the COPY protocol."
|
||||
(log-message :debug "Transformer in action for ~a!"
|
||||
(log-message :debug "Transformer ~a in action for ~a!"
|
||||
(lp:kernel-worker-index)
|
||||
(format-table-name (target copy)))
|
||||
(let ((start-time (get-internal-real-time)))
|
||||
(let* ((start-time (get-internal-real-time))
|
||||
(preprocess (preprocess-row copy)))
|
||||
|
||||
(loop :for (mesg batch count oversized?) := (lq:pop-queue raw-queue)
|
||||
(loop :for (mesg batch count oversized?) := (lq:pop-queue input-queue)
|
||||
:until (eq :end-of-data mesg)
|
||||
:do (let ((batch-start-time (get-internal-real-time)))
|
||||
;; transform each row of the batch into a copy-string
|
||||
(loop :for i :below count
|
||||
:do (let ((copy-string
|
||||
(handler-case
|
||||
(with-output-to-string (s)
|
||||
(format-vector-row s
|
||||
(aref batch i)
|
||||
(transforms copy)
|
||||
pre-formatted))
|
||||
(condition (e)
|
||||
(log-message :error "~a" e)
|
||||
(update-stats :data (target copy) :errs 1)
|
||||
nil))))
|
||||
:do (let* ((row (if preprocess
|
||||
(funcall preprocess (aref batch i))
|
||||
(aref batch i)))
|
||||
(copy-data
|
||||
(handler-case
|
||||
(format-vector-row row
|
||||
(transforms copy)
|
||||
pre-formatted)
|
||||
(condition (e)
|
||||
(log-message :error "~a" e)
|
||||
(update-stats :data (target copy) :errs 1)
|
||||
nil))))
|
||||
(when (or (eq :data *log-min-messages*)
|
||||
(eq :data *client-min-messages*))
|
||||
(log-message :data "> ~s" copy-string))
|
||||
(log-message :data "> ~s" copy-data))
|
||||
|
||||
(setf (aref batch i) copy-string)))
|
||||
(setf (aref batch i) copy-data)))
|
||||
|
||||
;; the batch is ready, log about it
|
||||
(log-message :debug "format-data-to-copy[~a] ~d row in ~6$s"
|
||||
(lp:kernel-worker-index)
|
||||
count
|
||||
(elapsed-time-since batch-start-time))
|
||||
;; and send the formatted batch of copy-strings down to PostgreSQL
|
||||
(lq:push-queue (list mesg batch count oversized?) formatted-queue)))
|
||||
|
||||
;; mark end of stream, twice because we hardcode 2 COPY processes, see below
|
||||
(lq:push-queue (list :end-of-data nil nil nil) formatted-queue)
|
||||
(lq:push-queue (list :end-of-data nil nil nil) formatted-queue)
|
||||
;; and send the formatted batch of copy-strings down to PostgreSQL
|
||||
(lq:push-queue (list mesg batch count oversized?) output-queue)))
|
||||
|
||||
;; mark end of stream
|
||||
(push-end-of-data-message output-queue)
|
||||
|
||||
;; and return
|
||||
(let ((seconds (elapsed-time-since start-time)))
|
||||
(log-message :info "Transformer for ~a is done in ~6$s"
|
||||
(log-message :info "Transformer[~a] for ~a is done in ~6$s"
|
||||
(lp:kernel-worker-index)
|
||||
(format-table-name (target copy)) seconds)
|
||||
(list :worker (target copy) seconds))))
|
||||
|
||||
(defmethod preprocess-row ((copy copy))
|
||||
"The default preprocessing of raw data is to do nothing."
|
||||
nil)
|
||||
|
||||
(defmethod copy-column-list ((copy copy))
|
||||
"Default column list is an empty list."
|
||||
nil)
|
||||
@ -90,47 +114,62 @@
|
||||
(format-vector-row text-file row (transforms copy)))))
|
||||
(map-rows copy :process-row-fn row-fn))))
|
||||
|
||||
(defun task-count (concurrency)
|
||||
"Return how many threads we are going to start given a number of WORKERS."
|
||||
(+ 1 concurrency concurrency))
|
||||
|
||||
(defmethod copy-from ((copy copy)
|
||||
&key
|
||||
(kernel nil k-s-p)
|
||||
(channel nil c-s-p)
|
||||
(worker-count 8)
|
||||
(concurrency 2)
|
||||
truncate
|
||||
disable-triggers)
|
||||
"Copy data from COPY source into PostgreSQL."
|
||||
(let* ((lp:*kernel* (or kernel (make-kernel 4)))
|
||||
(channel (or channel (lp:make-channel)))
|
||||
(rawq (lq:make-queue :fixed-capacity *concurrent-batches*))
|
||||
(fmtq (lq:make-queue :fixed-capacity *concurrent-batches*))
|
||||
(table-name (format-table-name (target copy))))
|
||||
"Copy data from COPY source into PostgreSQL.
|
||||
|
||||
We allow WORKER-COUNT simultaneous workers to be active at the same time
|
||||
in the context of this COPY object. A single unit of work consist of
|
||||
several kinds of workers:
|
||||
|
||||
- a reader getting raw data from the COPY source with `map-rows',
|
||||
- N transformers preparing raw data for PostgreSQL COPY protocol,
|
||||
- N writers sending the data down to PostgreSQL.
|
||||
|
||||
The N here is setup to the CONCURRENCY parameter: with a CONCURRENCY of
|
||||
2, we start (+ 1 2 2) = 5 concurrent tasks, with a CONCURRENCY of 4 we
|
||||
start (+ 1 4 4) = 9 concurrent tasks, of which only WORKER-COUNT may be
|
||||
active simultaneously."
|
||||
(let* ((table-name (format-table-name (target copy)))
|
||||
(lp:*kernel* (or kernel (make-kernel worker-count)))
|
||||
(channel (or channel (lp:make-channel)))
|
||||
;; Now, prepare data queues for workers:
|
||||
;; reader -> transformers -> writers
|
||||
(rawqs (loop :repeat concurrency :collect
|
||||
(lq:make-queue :fixed-capacity *concurrent-batches*)))
|
||||
(fmtqs (loop :repeat concurrency :collect
|
||||
(lq:make-queue :fixed-capacity *concurrent-batches*))))
|
||||
|
||||
(with-stats-collection ((target copy) :dbname (db-name (target-db copy)))
|
||||
(lp:task-handler-bind () ;; ((error #'lp:invoke-transfer-error))
|
||||
(log-message :info "COPY ~s" table-name)
|
||||
|
||||
;; start a tast to read data from the source into the queue
|
||||
(lp:submit-task channel #'queue-raw-data copy rawq)
|
||||
;; start a task to read data from the source into the queue
|
||||
(lp:submit-task channel #'queue-raw-data copy rawqs)
|
||||
|
||||
;; now start a transformer thread to process raw vectors from our
|
||||
;; now start transformer threads to process raw vectors from our
|
||||
;; source into preprocessed batches to send down to PostgreSQL
|
||||
(lp:submit-task channel #'format-data-to-copy copy rawq fmtq)
|
||||
(loop :for rawq :in rawqs
|
||||
:for fmtq :in fmtqs
|
||||
:do (lp:submit-task channel #'format-data-to-copy copy rawq fmtq))
|
||||
|
||||
;; And start two tasks to push that data from the queue into
|
||||
;; PostgreSQL; Andres Freund research/benchmarks show that in every
|
||||
;; PostgreSQL releases up to 9.5 included the highest throughput of
|
||||
;; COPY TO the same table is achieved with 2 concurrent clients...
|
||||
;;
|
||||
;; See Extension Lock Scalability slide in
|
||||
;; http://www.anarazel.de/talks/pgconf-eu-2015-10-30/concurrency.pdf
|
||||
;;
|
||||
;; Let's just hardcode 2 threads for that then.
|
||||
;;
|
||||
;; Also, we need to do the TRUNCATE here before starting the
|
||||
;; threads, so that it's done just once.
|
||||
(when truncate
|
||||
(truncate-tables (clone-connection (target-db copy))
|
||||
(target copy)))
|
||||
|
||||
(loop :for w :below 2
|
||||
(loop :for fmtq :in fmtqs
|
||||
:do (lp:submit-task channel
|
||||
#'pgloader.pgsql:copy-from-queue
|
||||
(clone-connection (target-db copy))
|
||||
@ -141,7 +180,5 @@
|
||||
|
||||
;; now wait until both the tasks are over, and kill the kernel
|
||||
(unless c-s-p
|
||||
(loop :repeat (lp:kernel-worker-count)
|
||||
:do (lp:receive-result channel)
|
||||
:finally (progn (log-message :info "COPY ~s done." table-name)
|
||||
(unless k-s-p (lp:end-kernel)))))))))
|
||||
(log-message :info "COPY ~s done." table-name)
|
||||
(unless k-s-p (lp:end-kernel :wait t)))))))
|
||||
|
||||
@ -119,7 +119,7 @@
|
||||
;; allow for some debugging
|
||||
(if compile (compile nil projection) projection))))
|
||||
|
||||
(defun reformat-then-process (&key fields columns target process-row-fn)
|
||||
(defun reformat-then-process (&key fields columns target)
|
||||
"Return a lambda form to apply to each row we read.
|
||||
|
||||
The lambda closes over the READ paramater, which is a counter of how many
|
||||
@ -130,12 +130,10 @@
|
||||
(if (or (null row)
|
||||
(and (null (car row)) (null (cdr row))))
|
||||
(log-message :notice "Skipping empty line.")
|
||||
(let ((projected-vector
|
||||
(handler-case
|
||||
(funcall projection row)
|
||||
(condition (e)
|
||||
(update-stats :data target :errs 1)
|
||||
(log-message :error "Could not read input: ~a" e)))))
|
||||
(when projected-vector
|
||||
(funcall process-row-fn projected-vector)))))))
|
||||
|
||||
(handler-case
|
||||
(funcall projection row)
|
||||
(condition (e)
|
||||
(update-stats :data target :errs 1)
|
||||
(log-message :error "Could not read input: ~a" e)))))))
|
||||
|
||||
|
||||
@ -10,18 +10,14 @@
|
||||
;;;
|
||||
(defstruct batch
|
||||
(start (get-internal-real-time) :type fixnum)
|
||||
(data (make-array *copy-batch-rows* :element-type 'simple-string)
|
||||
:type (vector simple-string *))
|
||||
(data (make-array *copy-batch-rows*))
|
||||
(count 0 :type fixnum)
|
||||
(bytes 0 :type fixnum))
|
||||
|
||||
(defvar *current-batch* nil)
|
||||
|
||||
(defun finish-current-batch (target queue
|
||||
&optional oversized? (batch *current-batch*))
|
||||
(defun finish-batch (batch target queue &optional oversized?)
|
||||
(with-slots (start data count) batch
|
||||
(when (< 0 count)
|
||||
(log-message :debug "finish-current-batch[~a] ~d row~:p in ~6$s"
|
||||
(log-message :debug "finish-batch[~a] ~d row~:p in ~6$s"
|
||||
(lp:kernel-worker-index) count
|
||||
(elapsed-time-since start))
|
||||
(update-stats :data target
|
||||
@ -29,24 +25,36 @@
|
||||
:rs (elapsed-time-since start))
|
||||
(lq:push-queue (list :batch data count oversized?) queue))))
|
||||
|
||||
(defun push-end-of-data-message (queue)
|
||||
"Push a fake batch marker to signal end-of-data in QUEUE."
|
||||
;; the message must look like the finish-batch message overall
|
||||
(lq:push-queue (list :end-of-data nil nil nil) queue))
|
||||
|
||||
(declaim (inline oversized?))
|
||||
(defun oversized? (&optional (batch *current-batch*))
|
||||
(defun oversized? (batch)
|
||||
"Return a generalized boolean that is true only when BATCH is considered
|
||||
over-sized when its size in BYTES is compared *copy-batch-size*."
|
||||
(and *copy-batch-size* ; defaults to nil
|
||||
(<= *copy-batch-size* (batch-bytes batch))))
|
||||
|
||||
(defun batch-row (row target queue)
|
||||
(defun batch-row (batch row target queue)
|
||||
"Add ROW to the reader batch. When the batch is full, provide it to the
|
||||
writer."
|
||||
(let ((oversized? (oversized? *current-batch*)))
|
||||
(when (or (= (batch-count *current-batch*) *copy-batch-rows*)
|
||||
oversized?)
|
||||
;; close current batch, prepare next one
|
||||
(finish-current-batch target queue oversized?)
|
||||
(setf *current-batch* (make-batch))))
|
||||
(let ((maybe-new-batch
|
||||
(let ((oversized? (oversized? batch)))
|
||||
(if (or (= (batch-count batch) *copy-batch-rows*)
|
||||
oversized?)
|
||||
(progn
|
||||
;; close current batch, prepare next one
|
||||
(finish-batch batch target queue oversized?)
|
||||
(make-batch))
|
||||
|
||||
(with-slots (data count bytes) *current-batch*
|
||||
(setf (aref data count) row)
|
||||
(incf count)))
|
||||
;; return given batch, it's still current
|
||||
batch))))
|
||||
|
||||
(with-slots (data count bytes) maybe-new-batch
|
||||
(setf (aref data count) row)
|
||||
(incf count))
|
||||
|
||||
maybe-new-batch))
|
||||
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user