Add user options to control pgloader batch behaviour.

The new WITH options allows the user to set values for the dynamic
variables *copy-batch-rows*, *copy-batch-size* and *concurrent-batches*.
That's needed in case like in issue #16 even with the batch size
defaulting to what looks like a proper setup.

In a longer term a review of the pgloader memory usage should be done
seriously, the numbers being way higher than the batch sizes we do setup
here.
This commit is contained in:
Dimitri Fontaine 2014-04-27 22:37:17 +02:00
parent 78a988eb47
commit efd11ab759
3 changed files with 141 additions and 19 deletions

View File

@ -236,6 +236,44 @@ Any command may contain comments, following those input rules:
Any place where you could enter a *whitespace* will accept a comment too.
### Batch behaviour options
All pgloader commands have support for a *WITH* clause that allows for
specifying options. Some options are generic and accepted by all commands,
such as the *batch behaviour options*, and some options are specific to a
data source kind, such as the CSV *skip header* options.
The global batch behaviour options are:
- *batch rows*
Takes a numeric value as argument, used as the maximum number of rows
allowed in a batch. The default is `25 000` and can be changed to try
having better performances characteristics or to control pgloader memory
usage;
- *batch size*
Takes a memory unit as argument, such as *20 MB*, its default value.
Accepted multipliers are *kB*, *MB*, *GB*, *TB* and *PB*. The case is
important so as not to be confused about bits versus bytes, we're only
talking bytes here.
- *batch concurrency*
Takes a numeric value as argument, defaults to `10`. That's the number
of batches that pgloader is allows to build in memory, even when only a
single batch at a time might be sent to PostgreSQL.
Supporting more than a single batch being sent at a time is on the TODO
list of pgloader, but is not implemented yet. This option is about
controling the memory needs of pgloader as a trade-off to the
performances characteristics, and not about parallel activity of
pgloader.
Other options are specific to each input source, please refer to specific
parts of the documentation for their listing and covering.
### LOAD CSV
This command instructs pgloader to load data from a `CSV` file. Here's an

View File

@ -87,7 +87,9 @@
;; option for loading from a file
(def-keyword-rule "workers")
(def-keyword-rule "batch")
(def-keyword-rule "rows")
(def-keyword-rule "size")
(def-keyword-rule "concurrency")
(def-keyword-rule "reject")
(def-keyword-rule "file")
(def-keyword-rule "log")
@ -440,6 +442,64 @@
(declare (ignore w e))
(cons :workers (parse-integer (text nb))))))
(defrule option-batch-rows (and kw-batch kw-rows equal-sign
(+ (digit-char-p character)))
(:lambda (batch-rows)
(destructuring-bind (b r e nb) batch-rows
(declare (ignore b r e))
(cons :batch-rows (parse-integer (text nb))))))
(defrule byte-size-multiplier (or #\k #\M #\G #\T #\P)
(:lambda (multiplier)
(case (aref multiplier 0)
(#\k 10)
(#\M 20)
(#\G 30)
(#\T 40)
(#\P 50))))
(defrule byte-size-unit (and ignore-whitespace (? byte-size-multiplier) #\B)
(:lambda (unit)
(destructuring-bind (ws &optional (multiplier 1) byte) unit
(declare (ignore ws byte))
(expt 2 multiplier))))
(defrule batch-size (and (+ (digit-char-p character)) byte-size-unit)
(:lambda (batch-size)
(destructuring-bind (nb unit) batch-size
(* (parse-integer (text nb)) unit))))
(defrule option-batch-size (and kw-batch kw-size equal-sign batch-size)
(:lambda (batch-size)
(destructuring-bind (b s e val) batch-size
(declare (ignore b s e))
(cons :batch-size val))))
(defrule option-batch-concurrency (and kw-batch kw-concurrency equal-sign
(+ (digit-char-p character)))
(:lambda (batch-concurrency)
(destructuring-bind (b c e nb) batch-concurrency
(declare (ignore b c e))
(cons :batch-concurrency (parse-integer (text nb))))))
(defun batch-control-bindings (options)
"Generate the code needed to add batch-control"
`((*copy-batch-rows* (or ,(getf options :batch-rows) *copy-batch-rows*))
(*copy-batch-size* (or ,(getf options :batch-size) *copy-batch-size*))
(*concurrent-batches* (or ,(getf options :batch-concurrency) *concurrent-batches*))))
(defun remove-batch-control-option (options
&key
(option-list '(:batch-rows
:batch-size
:batch-concurrency))
extras)
"Given a list of options, remove the generic ones that should already have
been processed."
(loop :for (k v) :on options :by #'cddr
:unless (member k (append option-list extras))
:append (list k v)))
(defmacro make-option-rule (name rule &optional option)
"Generates a rule named NAME to parse RULE and return OPTION."
(let* ((bindings
@ -475,6 +535,9 @@
(cons :identifier-case action))))
(defrule mysql-option (or option-workers
option-batch-rows
option-batch-size
option-batch-concurrency
option-truncate
option-data-only
option-schema-only
@ -869,6 +932,7 @@
(*pgconn-pass* ,pgpass)
(*pg-dbname* ,pgdb)
(*pg-settings* ',gucs)
,@(batch-control-bindings options)
(pgloader.pgsql::*pgsql-reserved-keywords*
(pgloader.pgsql:list-reserved-keywords ,pgdb))
(source
@ -888,7 +952,7 @@
:state-before state-before
:state-after state-after
:state-indexes state-idx
,@options)
,@(remove-batch-control-option options))
,(sql-code-block pgdb 'state-after after "after load")
@ -910,7 +974,10 @@ load database
set work_mem to '16MB', maintenance_work_mem to '512 MB';
|#
(defrule sqlite-option (or option-truncate
(defrule sqlite-option (or option-batch-rows
option-batch-size
option-batch-concurrency
option-truncate
option-data-only
option-schema-only
option-include-drop
@ -968,6 +1035,7 @@ load database
(*pgconn-pass* ,password)
(*pg-dbname* ,dbname)
(*pg-settings* ',gucs)
,@(batch-control-bindings options)
(pgloader.pgsql::*pgsql-reserved-keywords*
(pgloader.pgsql:list-reserved-keywords ,dbname))
(db
@ -998,7 +1066,7 @@ load database
`(:only-tables ',(list table-name)))
:including ',incl
:excluding ',excl
,@options)))))))
,@(remove-batch-control-option options))))))))
@ -1153,7 +1221,12 @@ load database
(declare (ignore table name e))
(cons :table-name (text table-name)))))
(defrule dbf-option (or option-truncate option-create-table option-table-name))
(defrule dbf-option (or option-batch-rows
option-batch-size
option-batch-concurrency
option-truncate
option-create-table
option-table-name))
(defrule another-dbf-option (and comma dbf-option)
(:lambda (source)
@ -1193,6 +1266,7 @@ load database
(*pgconn-pass* ,password)
(*pg-dbname* ,dbname)
(*pg-settings* ',gucs)
,@(batch-control-bindings options)
(source
,(destructuring-bind (kind url) source
(ecase kind
@ -1218,7 +1292,7 @@ load database
(pgloader.sources:copy-from source
:state-before state-before
,@options)
,@(remove-batch-control-option options))
(report-full-summary "Total import time" *state*
:before state-before)))))))
@ -1319,15 +1393,18 @@ load database
(defrule option-trim-unquoted-blanks (and kw-trim kw-unquoted kw-blanks)
(:constant (cons :trim-blanks t)))
(defrule csv-option (or option-truncate
option-skip-header
(defrule csv-option (or option-batch-rows
option-batch-size
option-batch-concurrency
option-truncate
option-skip-header
option-lines-terminated-by
option-fields-not-enclosed
option-fields-enclosed-by
option-fields-escaped-by
option-fields-terminated-by
option-trim-unquoted-blanks
option-keep-unquoted-blanks))
option-fields-not-enclosed
option-fields-enclosed-by
option-fields-escaped-by
option-fields-terminated-by
option-trim-unquoted-blanks
option-keep-unquoted-blanks))
(defrule another-csv-option (and comma csv-option)
(:lambda (source)
@ -1592,7 +1669,8 @@ load database
(*pgconn-user* ,user)
(*pgconn-pass* ,password)
(*pg-dbname* ,dbname)
(*pg-settings* ',gucs))
(*pg-settings* ',gucs)
,@(batch-control-bindings options))
(progn
,(sql-code-block dbname 'state-before before "before load")
@ -1606,9 +1684,8 @@ load database
:encoding ,encoding
:fields ',fields
:columns ',columns
,@(loop for (k v) on options by #'cddr
unless (eq k :truncate)
append (list k v)))))
,@(remove-batch-control-option
options :extras '(:truncate)))))
(pgloader.sources:copy-from source :truncate truncate))
,(sql-code-block dbname 'state-after after "after load")
@ -1667,7 +1744,10 @@ load database
(declare (ignore open close))
field-defs)))
(defrule fixed-option (or option-truncate
(defrule fixed-option (or option-batch-rows
option-batch-size
option-batch-concurrency
option-truncate
option-skip-header))
(defrule another-fixed-option (and comma fixed-option)
@ -1730,7 +1810,8 @@ load database
(*pgconn-user* ,user)
(*pgconn-pass* ,password)
(*pg-dbname* ,dbname)
(*pg-settings* ',gucs))
(*pg-settings* ',gucs)
,@(batch-control-bindings options))
(progn
,(sql-code-block dbname 'state-before before "before load")

View File

@ -28,6 +28,9 @@ LOAD CSV
WITH truncate,
skip header = 1,
batch rows = 200,
batch size = 1024 kB,
batch concurrency = 3,
fields terminated by '\t'
BEFORE LOAD DO