Review parallelism and memory behavior.

The previous patch made format-vector-row allocate its memory in one go
rather than byte after byte with vector-push-extend. In this patch we review
our usage of batches and parallelism.

Now the reader pushes each row directly to the lparallel queue and writers
concurrently consume from it, cook batches in COPY format, and then send
that chunk of data down to PostgreSQL. When looking at runtime profiles, the
time spent writing in PostgreSQL is a fraction of the time spent reading
from MySQL, so we consider that the writing thread has enough time to do the
data mungling without slowing us down.

The most interesting factor here is the memory behavor of pgloader, which
seems more stable than before, and easier to cope with for SBCL's GC.

Note that batch concurrency is no more, replaced by prefetch rows: the
reader thread no longer build batches and the count of items in the reader
queue is now a number a rows, not of batches of them.

Anyway, with this patch in I can't reproduce the following issues:

Fixes #337, Fixes #420.
This commit is contained in:
Dimitri Fontaine 2017-06-27 19:14:57 +02:00
parent 7f737a5f55
commit 6d66280fa5
22 changed files with 231 additions and 292 deletions

View File

@ -467,23 +467,14 @@ Note that while the \fBCOPY\fR command is restricted to read either from its sta
pgloader uses several concurrent tasks to process the data being loaded: pgloader uses several concurrent tasks to process the data being loaded:
. .
.IP "\(bu" 4 .IP "\(bu" 4
a reader task reads the data in, a reader task reads the data in and pushes it to a queue,
. .
.IP "\(bu" 4 .IP "\(bu" 4
at least one transformer task is responsible for applying the needed transformations to given data so that it fits PostgreSQL expectations, those transformations include CSV like user\-defined \fIprojections\fR, database \fIcasting\fR (default and user given), and PostgreSQL specific \fIformatting\fR of the data for the COPY protocol and in unicode, at last one write task feeds from the queue and formats the raw into the PostgreSQL COPY format in batches (so that it\'s possible to then retry a failed batch without reading the data from source again), and then sends the data to PostgreSQL using the COPY protocol\.
.
.IP "\(bu" 4
at least one writer task is responsible for sending the data down to PostgreSQL using the COPY protocol\.
. .
.IP "" 0 .IP "" 0
. .
.P .P
The idea behind having the transformer task do the \fIformatting\fR is so that in the event of bad rows being rejected by PostgreSQL the retry process doesn\'t have to do that step again\.
.
.P
At the moment, the number of transformer and writer tasks are forced into being the same, which allows for a very simple \fIqueueing\fR model to be implemented: the reader task fills in one queue per transformer task, which then pops from that queue and pushes to a writer queue per COPY task\.
.
.P
The parameter \fIworkers\fR allows to control how many worker threads are allowed to be active at any time (that\'s the parallelism level); and the parameter \fIconcurrency\fR allows to control how many tasks are started to handle the data (they may not all run at the same time, depending on the \fIworkers\fR setting)\. The parameter \fIworkers\fR allows to control how many worker threads are allowed to be active at any time (that\'s the parallelism level); and the parameter \fIconcurrency\fR allows to control how many tasks are started to handle the data (they may not all run at the same time, depending on the \fIworkers\fR setting)\.
. .
.P .P
@ -493,21 +484,12 @@ We allow \fIworkers\fR simultaneous workers to be active at the same time in the
a reader getting raw data from the source, a reader getting raw data from the source,
. .
.IP "\(bu" 4 .IP "\(bu" 4
N transformers preparing raw data for PostgreSQL COPY protocol, N writers preparing and sending the data down to PostgreSQL\.
.
.IP "\(bu" 4
N writers sending the data down to PostgreSQL\.
. .
.IP "" 0 .IP "" 0
. .
.P .P
The N here is setup to the \fIconcurrency\fR parameter: with a \fICONCURRENCY\fR of 2, we start (+ 1 2 2) = 5 concurrent tasks, with a \fIconcurrency\fR of 4 we start (+ 1 4 4) = 9 concurrent tasks, of which only \fIworkers\fR may be active simultaneously\. The N here is setup to the \fIconcurrency\fR parameter: with a \fICONCURRENCY\fR of 2, we start (+ 1 2) = 3 concurrent tasks, with a \fIconcurrency\fR of 4 we start (+ 1 4) = 9 concurrent tasks, of which only \fIworkers\fR may be active simultaneously\.
.
.P
So with \fBworkers = 4, concurrency = 2\fR, the parallel scheduler will maintain active only 4 of the 5 tasks that are started\.
.
.P
With \fBworkers = 8, concurrency = 1\fR, we then are able to work on several units of work at the same time\. In the database sources, a unit of work is a table, so those settings allow pgloader to be active on as many as 3 tables at any time in the load process\.
. .
.P .P
The defaults are \fBworkers = 4, concurrency = 1\fR when loading from a database source, and \fBworkers = 8, concurrency = 2\fR when loading from something else (currently, a file)\. Those defaults are arbitrary and waiting for feedback from users, so please consider providing feedback if you play with the settings\. The defaults are \fBworkers = 4, concurrency = 1\fR when loading from a database source, and \fBworkers = 8, concurrency = 2\fR when loading from something else (currently, a file)\. Those defaults are arbitrary and waiting for feedback from users, so please consider providing feedback if you play with the settings\.
@ -642,7 +624,7 @@ All data sources specific commands support the following options:
\fIbatch size = \.\.\. MB\fR \fIbatch size = \.\.\. MB\fR
. .
.IP "\(bu" 4 .IP "\(bu" 4
\fIbatch concurrency = \.\.\.\fR \fIprefetch rows = \.\.\.\fR
. .
.IP "" 0 .IP "" 0
. .
@ -863,10 +845,10 @@ Takes a numeric value as argument, used as the maximum number of rows allowed in
Takes a memory unit as argument, such as \fI20 MB\fR, its default value\. Accepted multipliers are \fIkB\fR, \fIMB\fR, \fIGB\fR, \fITB\fR and \fIPB\fR\. The case is important so as not to be confused about bits versus bytes, we\'re only talking bytes here\. Takes a memory unit as argument, such as \fI20 MB\fR, its default value\. Accepted multipliers are \fIkB\fR, \fIMB\fR, \fIGB\fR, \fITB\fR and \fIPB\fR\. The case is important so as not to be confused about bits versus bytes, we\'re only talking bytes here\.
. .
.IP "\(bu" 4 .IP "\(bu" 4
\fIbatch concurrency\fR \fIprefetch rows\fR
. .
.IP .IP
Takes a numeric value as argument, defaults to \fB10\fR\. That\'s the number of batches that pgloader is allows to build in memory in each reader thread\. See the \fIworkers\fR setting for how many reader threads are allowed to run at the same time: each of them is allowed as many as \fIbatch concurrency\fR batches\. Takes a numeric value as argument, defaults to \fB100000\fR\. That\'s the number of rows that pgloader is allowed to read in memory in each reader thread\. See the \fIworkers\fR setting for how many reader threads are allowed to run at the same time\.
. .
.IP "" 0 .IP "" 0
. .

View File

@ -406,26 +406,12 @@ using the same protocol as pgloader uses.
pgloader uses several concurrent tasks to process the data being loaded: pgloader uses several concurrent tasks to process the data being loaded:
- a reader task reads the data in, - a reader task reads the data in and pushes it to a queue,
- at least one transformer task is responsible for applying the needed - at last one write task feeds from the queue and formats the raw into the
transformations to given data so that it fits PostgreSQL expectations, PostgreSQL COPY format in batches (so that it's possible to then retry a
those transformations include CSV like user-defined *projections*, failed batch without reading the data from source again), and then sends
database *casting* (default and user given), and PostgreSQL specific the data to PostgreSQL using the COPY protocol.
*formatting* of the data for the COPY protocol and in unicode,
- at least one writer task is responsible for sending the data down to
PostgreSQL using the COPY protocol.
The idea behind having the transformer task do the *formatting* is so that
in the event of bad rows being rejected by PostgreSQL the retry process
doesn't have to do that step again.
At the moment, the number of transformer and writer tasks are forced into
being the same, which allows for a very simple *queueing* model to be
implemented: the reader task fills in one queue per transformer task,
which then pops from that queue and pushes to a writer queue per COPY
task.
The parameter *workers* allows to control how many worker threads are The parameter *workers* allows to control how many worker threads are
allowed to be active at any time (that's the parallelism level); and the allowed to be active at any time (that's the parallelism level); and the
@ -438,22 +424,13 @@ context of a single table. A single unit of work consist of several kinds of
workers: workers:
- a reader getting raw data from the source, - a reader getting raw data from the source,
- N transformers preparing raw data for PostgreSQL COPY protocol, - N writers preparing and sending the data down to PostgreSQL.
- N writers sending the data down to PostgreSQL.
The N here is setup to the *concurrency* parameter: with a *CONCURRENCY* of The N here is setup to the *concurrency* parameter: with a *CONCURRENCY* of
2, we start (+ 1 2 2) = 5 concurrent tasks, with a *concurrency* of 4 we 2, we start (+ 1 2) = 3 concurrent tasks, with a *concurrency* of 4 we start
start (+ 1 4 4) = 9 concurrent tasks, of which only *workers* may be active (+ 1 4) = 9 concurrent tasks, of which only *workers* may be active
simultaneously. simultaneously.
So with `workers = 4, concurrency = 2`, the parallel scheduler will
maintain active only 4 of the 5 tasks that are started.
With `workers = 8, concurrency = 1`, we then are able to work on several
units of work at the same time. In the database sources, a unit of work is a
table, so those settings allow pgloader to be active on as many as 3 tables
at any time in the load process.
The defaults are `workers = 4, concurrency = 1` when loading from a database The defaults are `workers = 4, concurrency = 1` when loading from a database
source, and `workers = 8, concurrency = 2` when loading from something else source, and `workers = 8, concurrency = 2` when loading from something else
(currently, a file). Those defaults are arbitrary and waiting for feedback (currently, a file). Those defaults are arbitrary and waiting for feedback
@ -581,7 +558,7 @@ Some clauses are common to all commands:
- *on error stop* - *on error stop*
- *batch rows = R* - *batch rows = R*
- *batch size = ... MB* - *batch size = ... MB*
- *batch concurrency = ...* - *prefetch rows = ...*
See the section BATCH BEHAVIOUR OPTIONS for more details. See the section BATCH BEHAVIOUR OPTIONS for more details.
@ -777,13 +754,12 @@ The global batch behaviour options are:
important so as not to be confused about bits versus bytes, we're only important so as not to be confused about bits versus bytes, we're only
talking bytes here. talking bytes here.
- *batch concurrency* - *prefetch rows*
Takes a numeric value as argument, defaults to `10`. That's the number Takes a numeric value as argument, defaults to `100000`. That's the
of batches that pgloader is allows to build in memory in each reader number of rows that pgloader is allowed to read in memory in each reader
thread. See the *workers* setting for how many reader threads are thread. See the *workers* setting for how many reader threads are
allowed to run at the same time: each of them is allowed as many as allowed to run at the same time.
*batch concurrency* batches.
Other options are specific to each input source, please refer to specific Other options are specific to each input source, please refer to specific
parts of the documentation for their listing and covering. parts of the documentation for their listing and covering.

View File

@ -234,9 +234,27 @@
#:elapsed-time-since #:elapsed-time-since
#:timing)) #:timing))
(defpackage #:pgloader.batch
(:use #:cl #:pgloader.params #:pgloader.monitor)
(:export #:make-batch
#:batch-p
#:batch-start
#:batch-data
#:batch-count
#:batch-max-count
#:batch-max-count
#:batch-bytes
#:push-row
#:batch-oversized-p
#:batch-full-p))
(defpackage #:pgloader.utils (defpackage #:pgloader.utils
(:use #:cl (:use #:cl
#:pgloader.params #:pgloader.catalog #:pgloader.monitor #:pgloader.state) #:pgloader.params
#:pgloader.catalog
#:pgloader.monitor
#:pgloader.state
#:pgloader.batch)
(:import-from #:alexandria (:import-from #:alexandria
#:appendf #:appendf
#:read-file-into-string) #:read-file-into-string)
@ -250,6 +268,7 @@
#:camelCase-to-colname #:camelCase-to-colname
#:unquote #:unquote
#:expand-user-homedir-pathname #:expand-user-homedir-pathname
#:pretty-print-bytes
;; threads ;; threads
#:make-kernel #:make-kernel
@ -265,13 +284,7 @@
(cl-user::export-inherited-symbols "pgloader.catalog" "pgloader.utils") (cl-user::export-inherited-symbols "pgloader.catalog" "pgloader.utils")
(cl-user::export-inherited-symbols "pgloader.monitor" "pgloader.utils") (cl-user::export-inherited-symbols "pgloader.monitor" "pgloader.utils")
(cl-user::export-inherited-symbols "pgloader.state" "pgloader.utils") (cl-user::export-inherited-symbols "pgloader.state" "pgloader.utils")
(cl-user::export-inherited-symbols "pgloader.batch" "pgloader.utils")
(defpackage #:pgloader.batch
(:use #:cl #:pgloader.params #:pgloader.monitor)
(:export #:make-batch
#:batch-row
#:finish-batch
#:push-end-of-data-message))
;; ;;
@ -352,6 +365,7 @@
#:truncate-tables #:truncate-tables
#:copy-from-file #:copy-from-file
#:copy-from-queue #:copy-from-queue
#:copy-from-batch
#:set-table-oids #:set-table-oids
#:create-sqltypes #:create-sqltypes
@ -429,7 +443,7 @@
#:map-rows #:map-rows
#:copy-column-list #:copy-column-list
#:queue-raw-data #:queue-raw-data
#:format-data-to-copy #:data-is-preformatted-p
#:copy-from #:copy-from
#:copy-to #:copy-to
#:copy-database #:copy-database

View File

@ -22,7 +22,7 @@
#:*preserve-index-names* #:*preserve-index-names*
#:*copy-batch-rows* #:*copy-batch-rows*
#:*copy-batch-size* #:*copy-batch-size*
#:*concurrent-batches* #:*prefetch-rows*
#:*pg-settings* #:*pg-settings*
#:*mysql-settings* #:*mysql-settings*
#:*default-tmpdir* #:*default-tmpdir*
@ -133,8 +133,8 @@
(defparameter *copy-batch-size* (* 20 1024 1024) (defparameter *copy-batch-size* (* 20 1024 1024)
"Maximum memory size allowed for a single batch.") "Maximum memory size allowed for a single batch.")
(defparameter *concurrent-batches* 10 (defparameter *prefetch-rows* 100000
"How many batches do we stack in the queue in advance.") "How many rows do read in advance in the reader queue.")
(defparameter *pg-settings* nil "An alist of GUC names and values.") (defparameter *pg-settings* nil "An alist of GUC names and values.")
(defparameter *mysql-settings* nil "An alist of GUC names and values.") (defparameter *mysql-settings* nil "An alist of GUC names and values.")

View File

@ -38,7 +38,7 @@
option-concurrency option-concurrency
option-batch-rows option-batch-rows
option-batch-size option-batch-size
option-batch-concurrency option-prefetch-rows
option-max-parallel-create-index option-max-parallel-create-index
option-truncate option-truncate
option-drop-indexes option-drop-indexes

View File

@ -108,7 +108,7 @@
option-concurrency option-concurrency
option-batch-rows option-batch-rows
option-batch-size option-batch-size
option-batch-concurrency option-prefetch-rows
option-max-parallel-create-index option-max-parallel-create-index
option-truncate option-truncate
option-disable-triggers option-disable-triggers

View File

@ -23,7 +23,7 @@
option-concurrency option-concurrency
option-batch-rows option-batch-rows
option-batch-size option-batch-size
option-batch-concurrency option-prefetch-rows
option-truncate option-truncate
option-disable-triggers option-disable-triggers
option-data-only option-data-only

View File

@ -48,7 +48,7 @@
option-concurrency option-concurrency
option-batch-rows option-batch-rows
option-batch-size option-batch-size
option-batch-concurrency option-prefetch-rows
option-max-parallel-create-index option-max-parallel-create-index
option-truncate option-truncate
option-drop-indexes option-drop-indexes

View File

@ -23,7 +23,7 @@
option-concurrency option-concurrency
option-batch-rows option-batch-rows
option-batch-size option-batch-size
option-batch-concurrency option-prefetch-rows
option-truncate option-truncate
option-disable-triggers option-disable-triggers
option-identifiers-case option-identifiers-case

View File

@ -56,6 +56,7 @@
(def-keyword-rule "workers") (def-keyword-rule "workers")
(def-keyword-rule "batch") (def-keyword-rule "batch")
(def-keyword-rule "rows") (def-keyword-rule "rows")
(def-keyword-rule "prefetch")
(def-keyword-rule "size") (def-keyword-rule "size")
(def-keyword-rule "concurrency") (def-keyword-rule "concurrency")
(def-keyword-rule "max") (def-keyword-rule "max")

View File

@ -19,7 +19,7 @@
option-concurrency option-concurrency
option-batch-rows option-batch-rows
option-batch-size option-batch-size
option-batch-concurrency option-prefetch-rows
option-max-parallel-create-index option-max-parallel-create-index
option-truncate option-truncate
option-disable-triggers option-disable-triggers

View File

@ -12,7 +12,7 @@
option-concurrency option-concurrency
option-batch-rows option-batch-rows
option-batch-size option-batch-size
option-batch-concurrency option-prefetch-rows
option-max-parallel-create-index option-max-parallel-create-index
option-truncate option-truncate
option-disable-triggers option-disable-triggers

View File

@ -73,17 +73,20 @@
(bind (((_ _ _ val) batch-size)) (bind (((_ _ _ val) batch-size))
(cons :batch-size val)))) (cons :batch-size val))))
(defrule option-batch-concurrency (and kw-batch kw-concurrency equal-sign ;;; deprecated, but still accept it in the parsing
(defrule option-prefetch-rows (and (or (and kw-batch kw-concurrency)
(and kw-prefetch kw-rows))
equal-sign
(+ (digit-char-p character))) (+ (digit-char-p character)))
(:lambda (batch-concurrency) (:lambda (prefetch-rows)
(bind (((_ _ _ nb) batch-concurrency)) (bind (((_ _ nb) prefetch-rows))
(cons :batch-concurrency (parse-integer (text nb)))))) (cons :prefetch-rows (parse-integer (text nb))))))
(defun batch-control-bindings (options) (defun batch-control-bindings (options)
"Generate the code needed to add batch-control" "Generate the code needed to add batch-control"
`((*copy-batch-rows* (or ,(getf options :batch-rows) *copy-batch-rows*)) `((*copy-batch-rows* (or ,(getf options :batch-rows) *copy-batch-rows*))
(*copy-batch-size* (or ,(getf options :batch-size) *copy-batch-size*)) (*copy-batch-size* (or ,(getf options :batch-size) *copy-batch-size*))
(*concurrent-batches* (or ,(getf options :batch-concurrency) *concurrent-batches*)))) (*prefetch-rows* (or ,(getf options :prefetch-rows) *prefetch-rows*))))
(defun identifier-case-binding (options) (defun identifier-case-binding (options)
"Generate the code needed to bind *identifer-case* to the proper value." "Generate the code needed to bind *identifer-case* to the proper value."
@ -93,7 +96,7 @@
&key &key
(option-list '(:batch-rows (option-list '(:batch-rows
:batch-size :batch-size
:batch-concurrency :prefetch-rows
:identifier-case)) :identifier-case))
extras) extras)
"Given a list of options, remove the generic ones that should already have "Given a list of options, remove the generic ones that should already have

View File

@ -18,7 +18,7 @@ load database
option-concurrency option-concurrency
option-batch-rows option-batch-rows
option-batch-size option-batch-size
option-batch-concurrency option-prefetch-rows
option-max-parallel-create-index option-max-parallel-create-index
option-truncate option-truncate
option-disable-triggers option-disable-triggers

View File

@ -93,55 +93,108 @@
(pomo:execute "ROLLBACK")))))) (pomo:execute "ROLLBACK"))))))
;;; ;;;
;;; We receive fully prepared batch from an lparallel queue, push their ;;; We receive raw input rows from an lparallel queue, push their content
;;; content down to PostgreSQL, handling any data related errors in the way. ;;; down to PostgreSQL, handling any data related errors in the way.
;;; ;;;
(defun copy-from-queue (pgconn table queue (defun copy-rows-from-queue (copy queue
&key &key
columns
disable-triggers disable-triggers
on-error-stop) on-error-stop
"Fetch from the QUEUE messages containing how many rows are in the (columns
*writer-batch* for us to send down to PostgreSQL, and when that's done (pgloader.sources:copy-column-list copy))
update stats." &aux
(let ((seconds 0)) (pgconn (clone-connection
(pgloader.sources:target-db copy)))
(table (pgloader.sources:target copy)))
"Fetch rows from the QUEUE, prepare them in batches and send them down to
PostgreSQL, and when that's done update stats."
(let ((preprocessor (pgloader.sources::preprocess-row copy))
(pre-formatted (pgloader.sources:data-is-preformatted-p copy))
(current-batch (make-batch))
(seconds 0))
(flet ((send-current-batch (unqualified-table-name)
;; we close over the whole lexical environment or almost...
(let ((batch-start-time (get-internal-real-time)))
(copy-batch table
columns
(batch-data current-batch)
(batch-count current-batch)
:on-error-stop on-error-stop)
(let ((batch-seconds (elapsed-time-since batch-start-time)))
(log-message :debug
"copy-batch[~a] ~a ~d row~:p [~a] in ~6$s~@[ [oversized]~]"
(lp:kernel-worker-index)
unqualified-table-name
(batch-count current-batch)
(pretty-print-bytes (batch-bytes current-batch))
batch-seconds
(batch-oversized-p current-batch))
(update-stats :data table :rows (batch-count current-batch))
;; return batch-seconds
batch-seconds))))
(declare (inline send-current-batch))
(with-pgsql-connection (pgconn) (with-pgsql-connection (pgconn)
(with-schema (unqualified-table-name table) (with-schema (unqualified-table-name table)
(with-disabled-triggers (unqualified-table-name (with-disabled-triggers (unqualified-table-name
:disable-triggers disable-triggers) :disable-triggers disable-triggers)
(log-message :info "pgsql:copy-from-queue[~a]: ~a ~a" (log-message :info "pgsql:copy-rows-from-queue[~a]: ~a ~a"
(lp:kernel-worker-index) (lp:kernel-worker-index)
(format-table-name table) (format-table-name table)
columns) columns)
(loop (loop
:for (mesg batch read oversized?) := (lq:pop-queue queue) :for row := (lq:pop-queue queue)
:until (eq :end-of-data mesg) :until (eq :end-of-data row)
:for (rows batch-seconds) := :do
(let ((start-time (get-internal-real-time))) (progn
(list (copy-batch table columns batch read ;; if current-batch is full, send data to PostgreSQL
:on-error-stop on-error-stop) ;; and prepare a new batch
(elapsed-time-since start-time))) (when (batch-full-p current-batch)
:do (progn (let ((batch-seconds
;; The SBCL implementation needs some Garbage Collection (send-current-batch unqualified-table-name)))
;; decision making help... and now is a pretty good time. (incf seconds batch-seconds))
#+sbcl (when oversized? (setf current-batch (make-batch)))
(log-message :debug "Forcing a full GC.")
(sb-ext:gc :full t)) (format-row-in-batch copy row current-batch
(log-message :debug preprocessor pre-formatted)))
"copy-batch[~a] ~a ~d row~:p in ~6$s~@[ [oversized]~]"
(lp:kernel-worker-index) ;; the last batch might not be empty
unqualified-table-name (unless (= 0 (batch-count current-batch))
rows (send-current-batch unqualified-table-name))))))
batch-seconds
oversized?)
(update-stats :data table :rows rows)
(incf seconds batch-seconds))))))
;; each writer thread sends its own stop timestamp and the monitor keeps ;; each writer thread sends its own stop timestamp and the monitor keeps
;; only the latest entry ;; only the latest entry
(update-stats :data table :ws seconds :stop (get-internal-real-time)) (update-stats :data table :ws seconds :stop (get-internal-real-time))
(log-message :debug "Writer[~a] for ~a is done in ~6$s" (log-message :debug "Writer[~a] for ~a is done in ~6$s"
(lp:kernel-worker-index) (lp:kernel-worker-index)
(format-table-name table) seconds) (format-table-name table)
seconds)
(list :writer table seconds))) (list :writer table seconds)))
(declaim (inline send-current-batch))
(defun format-row-in-batch (copy row current-batch preprocessor pre-formatted)
"Given a row from the queue, prepare it for the next batch."
(metabang.bind:bind
((row (if preprocessor (funcall preprocessor row) row))
((:values copy-data bytes)
(handler-case
(format-vector-row row
(pgloader.sources::transforms copy)
pre-formatted)
(condition (e)
(log-message :error "~a" e)
(update-stats :data (pgloader.sources:target copy) :errs 1)
(values nil 0)))))
;; we might have to debug
(when copy-data
(log-message :data "> ~s"
(map 'string #'code-char copy-data)))
;; now add copy-data to current-batch
(push-row current-batch copy-data bytes)))

View File

@ -45,11 +45,10 @@
CSV user-defined field projections to PostgreSQL columns. This function CSV user-defined field projections to PostgreSQL columns. This function
returns the pre-processing function, which must be a funcallable object.")) returns the pre-processing function, which must be a funcallable object."))
(defgeneric queue-raw-data (source queue) (defgeneric queue-raw-data (source queue concurrency)
(:documentation "Send raw data from the reader to the worker queue.")) (:documentation "Send raw data from the reader to the worker queue."))
(defgeneric format-data-to-copy (source raw-queue formatted-queue (defgeneric data-is-preformatted-p (source)
&optional pre-formatted)
(:documentation (:documentation
"Process raw data from RAW-QUEUE and prepare batches of formatted text to "Process raw data from RAW-QUEUE and prepare batches of formatted text to
send down to PostgreSQL with the COPY protocol in FORMATTED-QUEUE.")) send down to PostgreSQL with the COPY protocol in FORMATTED-QUEUE."))

View File

@ -373,6 +373,8 @@
worker-count worker-count
(lp:end-kernel :wait nil)))))) (lp:end-kernel :wait nil))))))
(log-message :info "Done with COPYing data, waiting for indexes")
(when create-indexes (when create-indexes
(let ((lp:*kernel* idx-kernel)) (let ((lp:*kernel* idx-kernel))
;; wait until the indexes are done being built... ;; wait until the indexes are done being built...

View File

@ -6,26 +6,25 @@
;;; ;;;
;;; Common API implementation ;;; Common API implementation
;;; ;;;
(defmethod queue-raw-data ((copy copy) queue-list) (defmethod queue-raw-data ((copy copy) rawq concurrency)
"Stream data as read by the map-queue method on the COPY argument into QUEUE, "Stream data as read by the map-queue method on the COPY argument into QUEUE,
as given." as given."
(log-message :debug "Reader started for ~a" (format-table-name (target copy))) (log-message :debug "Reader started for ~a" (format-table-name (target copy)))
(let* ((start-time (get-internal-real-time)) (let* ((start-time (get-internal-real-time))
(blist (loop :for queue :in queue-list :collect (make-batch))) (row-count 0)
(bclist (nconc blist blist)) ; build a circular list
(qlist (copy-list queue-list))
(qclist (nconc qlist qlist)) ; build a circular list
(process-row (process-row
(lambda (row) (if (or (eq :data *log-min-messages*)
(when (or (eq :data *log-min-messages*)
(eq :data *client-min-messages*)) (eq :data *client-min-messages*))
(log-message :data "< ~s" row)) ;; when debugging, use a lambda with debug traces
(prog1 (lambda (row)
(setf (car bclist) ; batch-row might create a new batch (log-message :data "< ~s" row)
(batch-row (car bclist) row (target copy) (car qclist))) (lq:push-queue row rawq)
;; round robin on batches and queues (incf row-count))
(setf bclist (cdr bclist)
qclist (cdr qclist)))))) ;; usual non-debug case
(lambda (row)
(lq:push-queue row rawq)
(incf row-count)))))
;; signal we are starting ;; signal we are starting
(update-stats :data (target copy) :start start-time) (update-stats :data (target copy) :start start-time)
@ -35,69 +34,17 @@
;; process last batches and send them to queues ;; process last batches and send them to queues
;; and mark end of stream ;; and mark end of stream
(loop :repeat (length queue-list) (loop :repeat concurrency :do (lq:push-queue :end-of-data rawq))
:for batch :in blist
:for queue :in qlist
:do (progn
(finish-batch batch (target copy) queue)
(push-end-of-data-message queue)))
(let ((seconds (elapsed-time-since start-time))) (let ((seconds (elapsed-time-since start-time)))
(log-message :debug "Reader for ~a is done in ~6$s" (log-message :debug "Reader for ~a is done in ~6$s"
(format-table-name (target copy)) seconds) (format-table-name (target copy)) seconds)
(update-stats :data (target copy) :read row-count :rs seconds)
(list :reader (target copy) seconds)))) (list :reader (target copy) seconds))))
(defmethod format-data-to-copy ((copy copy) input-queue output-queue (defmethod data-is-preformatted-p ((copy copy))
&optional pre-formatted) "By default, data is not preformatted."
"Loop over the data in the RAW-QUEUE and prepare it in batches in the nil)
FORMATED-QUEUE, ready to be sent down to PostgreSQL using the COPY protocol."
(log-message :debug "Transformer ~a in action for ~a!"
(lp:kernel-worker-index)
(format-table-name (target copy)))
(let* ((start-time (get-internal-real-time))
(preprocess (preprocess-row copy)))
(loop :for (mesg batch count oversized?) := (lq:pop-queue input-queue)
:until (eq :end-of-data mesg)
:do (let ((batch-start-time (get-internal-real-time)))
;; transform each row of the batch into a copy-string
(loop :for i :below count
:do (let* ((row (if preprocess
(funcall preprocess (aref batch i))
(aref batch i)))
(copy-data
(handler-case
(format-vector-row row
(transforms copy)
pre-formatted)
(condition (e)
(log-message :error "~a" e)
(update-stats :data (target copy) :errs 1)
nil))))
(when (or (eq :data *log-min-messages*)
(eq :data *client-min-messages*))
(log-message :data "> ~s" copy-data))
(setf (aref batch i) copy-data)))
;; the batch is ready, log about it
(log-message :debug "format-data-to-copy[~a] ~d row in ~6$s"
(lp:kernel-worker-index)
count
(elapsed-time-since batch-start-time))
;; and send the formatted batch of copy-strings down to PostgreSQL
(lq:push-queue (list mesg batch count oversized?) output-queue)))
;; mark end of stream
(push-end-of-data-message output-queue)
;; and return
(let ((seconds (elapsed-time-since start-time)))
(log-message :info "Transformer[~a] for ~a is done in ~6$s"
(lp:kernel-worker-index)
(format-table-name (target copy)) seconds)
(list :worker (target copy) seconds))))
(defmethod preprocess-row ((copy copy)) (defmethod preprocess-row ((copy copy))
"The default preprocessing of raw data is to do nothing." "The default preprocessing of raw data is to do nothing."
@ -119,7 +66,8 @@
(defun task-count (concurrency) (defun task-count (concurrency)
"Return how many threads we are going to start given a number of WORKERS." "Return how many threads we are going to start given a number of WORKERS."
(+ 1 concurrency concurrency)) ;; (+ 1 concurrency concurrency)
(+ 1 concurrency))
(defmethod copy-from ((copy copy) (defmethod copy-from ((copy copy)
&key &key
@ -129,29 +77,11 @@
(concurrency 2) (concurrency 2)
(on-error-stop *on-error-stop*) (on-error-stop *on-error-stop*)
disable-triggers) disable-triggers)
"Copy data from COPY source into PostgreSQL. "Copy data from COPY source into PostgreSQL."
We allow WORKER-COUNT simultaneous workers to be active at the same time
in the context of this COPY object. A single unit of work consist of
several kinds of workers:
- a reader getting raw data from the COPY source with `map-rows',
- N transformers preparing raw data for PostgreSQL COPY protocol,
- N writers sending the data down to PostgreSQL.
The N here is setup to the CONCURRENCY parameter: with a CONCURRENCY of
2, we start (+ 1 2 2) = 5 concurrent tasks, with a CONCURRENCY of 4 we
start (+ 1 4 4) = 9 concurrent tasks, of which only WORKER-COUNT may be
active simultaneously."
(let* ((table-name (format-table-name (target copy))) (let* ((table-name (format-table-name (target copy)))
(lp:*kernel* (or kernel (make-kernel worker-count))) (lp:*kernel* (or kernel (make-kernel worker-count)))
(channel (or channel (lp:make-channel))) (channel (or channel (lp:make-channel)))
;; Now, prepare data queues for workers: (rawq (lq:make-queue :fixed-capacity *concurrent-batches*)))
;; reader -> transformers -> writers
(rawqs (loop :repeat concurrency :collect
(lq:make-queue :fixed-capacity *concurrent-batches*)))
(fmtqs (loop :repeat concurrency :collect
(lq:make-queue :fixed-capacity *concurrent-batches*))))
(lp:task-handler-bind (lp:task-handler-bind
((on-error-stop ((on-error-stop
@ -172,21 +102,13 @@
(log-message :notice "COPY ~a" table-name) (log-message :notice "COPY ~a" table-name)
;; start a task to read data from the source into the queue ;; start a task to read data from the source into the queue
(lp:submit-task channel #'queue-raw-data copy rawqs) (lp:submit-task channel #'queue-raw-data copy rawq concurrency)
;; now start transformer threads to process raw vectors from our ;; start a task to transform the raw data in the copy format
;; source into preprocessed batches to send down to PostgreSQL ;; and send that data down to PostgreSQL
(loop :for rawq :in rawqs (loop :repeat concurrency
:for fmtq :in fmtqs :do (lp:submit-task channel #'pgloader.pgsql::copy-rows-from-queue
:do (lp:submit-task channel #'format-data-to-copy copy rawq fmtq)) copy rawq
(loop :for fmtq :in fmtqs
:do (lp:submit-task channel
#'pgloader.pgsql:copy-from-queue
(clone-connection (target-db copy))
(target copy)
fmtq
:columns (copy-column-list copy)
:on-error-stop on-error-stop :on-error-stop on-error-stop
:disable-triggers disable-triggers)) :disable-triggers disable-triggers))

View File

@ -67,9 +67,5 @@
(log-message :error "~a" e) (log-message :error "~a" e)
(update-stats :data (target copy) :errs 1)))))) (update-stats :data (target copy) :errs 1))))))
(defmethod format-data-to-copy ((copy copy-copy) raw-queue formatted-queue (defmethod data-is-preformatted-p ((copy copy-copy)) t)
&optional pre-formatted)
"Copy data from given COPY definition into lparallel.queue DATAQ"
(call-next-method copy raw-queue formatted-queue t))

View File

@ -9,11 +9,16 @@
;;; pushes the data down to PostgreSQL using the COPY protocol. ;;; pushes the data down to PostgreSQL using the COPY protocol.
;;; ;;;
(defstruct (batch (defstruct (batch
;; we use &key as a trick for &aux to see the max-count, think let* (:constructor
(:constructor make-batch (&key (max-count (init-batch-max-count)) make-batch (&key
&aux (data (make-array max-count))))) (max-count (init-batch-max-count))
&aux
(data
(make-array max-count
:element-type '(simple-array
(unsigned-byte 8)))))))
(start (get-internal-real-time) :type fixnum) (start (get-internal-real-time) :type fixnum)
(data nil :type simple-array) (data nil :type array)
(count 0 :type fixnum) (count 0 :type fixnum)
(max-count 0 :type fixnum) (max-count 0 :type fixnum)
(bytes 0 :type fixnum)) (bytes 0 :type fixnum))
@ -28,47 +33,19 @@
;; 0.7 < 0.7 + (random 0.6) < 1.3 ;; 0.7 < 0.7 + (random 0.6) < 1.3
(truncate (* batch-rows (+ 0.7 (random 0.6))))) (truncate (* batch-rows (+ 0.7 (random 0.6)))))
(defun finish-batch (batch target queue &optional oversized?) (defun batch-oversized-p (batch)
(with-slots (start data count) batch
(when (< 0 count)
(log-message :debug "finish-batch[~a] ~d row~:p in ~6$s"
(lp:kernel-worker-index) count
(elapsed-time-since start))
(update-stats :data target
:read count
:rs (elapsed-time-since start))
(lq:push-queue (list :batch data count oversized?) queue))))
(defun push-end-of-data-message (queue)
"Push a fake batch marker to signal end-of-data in QUEUE."
;; the message must look like the finish-batch message overall
(lq:push-queue (list :end-of-data nil nil nil) queue))
(declaim (inline oversized?))
(defun oversized? (batch)
"Return a generalized boolean that is true only when BATCH is considered "Return a generalized boolean that is true only when BATCH is considered
over-sized when its size in BYTES is compared *copy-batch-size*." over-sized when its size in BYTES is compared *copy-batch-size*."
(and *copy-batch-size* ; defaults to nil (and *copy-batch-size* ; defaults to nil
(<= *copy-batch-size* (batch-bytes batch)))) (<= *copy-batch-size* (batch-bytes batch))))
(defun batch-row (batch row target queue) (defun batch-full-p (batch)
"Add ROW to the reader batch. When the batch is full, provide it to the (or (= (batch-count batch) (batch-max-count batch))
writer." (batch-oversized-p batch)))
(let ((maybe-new-batch
(let ((oversized? (oversized? batch)))
(if (or (= (batch-count batch) (batch-max-count batch))
oversized?)
(progn
;; close current batch, prepare next one
(finish-batch batch target queue oversized?)
(make-batch))
;; return given batch, it's still current (defun push-row (batch row &optional row-bytes)
batch)))) (with-slots (data count bytes) batch
(with-slots (data count bytes) maybe-new-batch
(setf (aref data count) row) (setf (aref data count) row)
(incf count)) (incf count)
(when row-bytes
maybe-new-batch)) (incf bytes row-bytes))))

View File

@ -10,7 +10,7 @@
`((*monitoring-queue* . ,*monitoring-queue*) `((*monitoring-queue* . ,*monitoring-queue*)
(*copy-batch-rows* . ,*copy-batch-rows*) (*copy-batch-rows* . ,*copy-batch-rows*)
(*copy-batch-size* . ,*copy-batch-size*) (*copy-batch-size* . ,*copy-batch-size*)
(*concurrent-batches* . ,*concurrent-batches*) (*prefetch-rows* . ,*prefetch-rows*)
(*pg-settings* . ',*pg-settings*) (*pg-settings* . ',*pg-settings*)
(*mysql-settings* . ',*mysql-settings*) (*mysql-settings* . ',*mysql-settings*)
(*root-dir* . ,*root-dir*) (*root-dir* . ,*root-dir*)

View File

@ -62,3 +62,17 @@
(t (t
(uiop:parse-unix-namestring namestring)))))) (uiop:parse-unix-namestring namestring))))))
;;;
;;; For log messages
;;;
(defun pretty-print-bytes (bytes &key (unit "B"))
"Return a string to reprensent bytes in human readable format, with units"
(let ((bytes (or bytes 0)))
(loop
:for multiple :in '("T" "G" "M" "k")
:for power :in '(40 30 20 10 1)
:for limit := (expt 2 power)
:until (<= limit bytes)
:finally (return
(format nil "~5,1f ~a~a" (/ bytes limit) multiple unit)))))