From efd11ab7599ef689f7df2109ec0827a1499d9c26 Mon Sep 17 00:00:00 2001 From: Dimitri Fontaine Date: Sun, 27 Apr 2014 22:37:17 +0200 Subject: [PATCH] Add user options to control pgloader batch behaviour. The new WITH options allows the user to set values for the dynamic variables *copy-batch-rows*, *copy-batch-size* and *concurrent-batches*. That's needed in case like in issue #16 even with the batch size defaulting to what looks like a proper setup. In a longer term a review of the pgloader memory usage should be done seriously, the numbers being way higher than the batch sizes we do setup here. --- pgloader.1.md | 38 +++++++++++++ src/parser.lisp | 119 +++++++++++++++++++++++++++++++++------- test/csv-districts.load | 3 + 3 files changed, 141 insertions(+), 19 deletions(-) diff --git a/pgloader.1.md b/pgloader.1.md index e83c361..46fc98d 100644 --- a/pgloader.1.md +++ b/pgloader.1.md @@ -236,6 +236,44 @@ Any command may contain comments, following those input rules: Any place where you could enter a *whitespace* will accept a comment too. +### Batch behaviour options + +All pgloader commands have support for a *WITH* clause that allows for +specifying options. Some options are generic and accepted by all commands, +such as the *batch behaviour options*, and some options are specific to a +data source kind, such as the CSV *skip header* options. + +The global batch behaviour options are: + + - *batch rows* + + Takes a numeric value as argument, used as the maximum number of rows + allowed in a batch. The default is `25 000` and can be changed to try + having better performances characteristics or to control pgloader memory + usage; + + - *batch size* + + Takes a memory unit as argument, such as *20 MB*, its default value. + Accepted multipliers are *kB*, *MB*, *GB*, *TB* and *PB*. The case is + important so as not to be confused about bits versus bytes, we're only + talking bytes here. + + - *batch concurrency* + + Takes a numeric value as argument, defaults to `10`. That's the number + of batches that pgloader is allows to build in memory, even when only a + single batch at a time might be sent to PostgreSQL. + + Supporting more than a single batch being sent at a time is on the TODO + list of pgloader, but is not implemented yet. This option is about + controling the memory needs of pgloader as a trade-off to the + performances characteristics, and not about parallel activity of + pgloader. + +Other options are specific to each input source, please refer to specific +parts of the documentation for their listing and covering. + ### LOAD CSV This command instructs pgloader to load data from a `CSV` file. Here's an diff --git a/src/parser.lisp b/src/parser.lisp index d672854..2dfb620 100644 --- a/src/parser.lisp +++ b/src/parser.lisp @@ -87,7 +87,9 @@ ;; option for loading from a file (def-keyword-rule "workers") (def-keyword-rule "batch") + (def-keyword-rule "rows") (def-keyword-rule "size") + (def-keyword-rule "concurrency") (def-keyword-rule "reject") (def-keyword-rule "file") (def-keyword-rule "log") @@ -440,6 +442,64 @@ (declare (ignore w e)) (cons :workers (parse-integer (text nb)))))) +(defrule option-batch-rows (and kw-batch kw-rows equal-sign + (+ (digit-char-p character))) + (:lambda (batch-rows) + (destructuring-bind (b r e nb) batch-rows + (declare (ignore b r e)) + (cons :batch-rows (parse-integer (text nb)))))) + +(defrule byte-size-multiplier (or #\k #\M #\G #\T #\P) + (:lambda (multiplier) + (case (aref multiplier 0) + (#\k 10) + (#\M 20) + (#\G 30) + (#\T 40) + (#\P 50)))) + +(defrule byte-size-unit (and ignore-whitespace (? byte-size-multiplier) #\B) + (:lambda (unit) + (destructuring-bind (ws &optional (multiplier 1) byte) unit + (declare (ignore ws byte)) + (expt 2 multiplier)))) + +(defrule batch-size (and (+ (digit-char-p character)) byte-size-unit) + (:lambda (batch-size) + (destructuring-bind (nb unit) batch-size + (* (parse-integer (text nb)) unit)))) + +(defrule option-batch-size (and kw-batch kw-size equal-sign batch-size) + (:lambda (batch-size) + (destructuring-bind (b s e val) batch-size + (declare (ignore b s e)) + (cons :batch-size val)))) + +(defrule option-batch-concurrency (and kw-batch kw-concurrency equal-sign + (+ (digit-char-p character))) + (:lambda (batch-concurrency) + (destructuring-bind (b c e nb) batch-concurrency + (declare (ignore b c e)) + (cons :batch-concurrency (parse-integer (text nb)))))) + +(defun batch-control-bindings (options) + "Generate the code needed to add batch-control" + `((*copy-batch-rows* (or ,(getf options :batch-rows) *copy-batch-rows*)) + (*copy-batch-size* (or ,(getf options :batch-size) *copy-batch-size*)) + (*concurrent-batches* (or ,(getf options :batch-concurrency) *concurrent-batches*)))) + +(defun remove-batch-control-option (options + &key + (option-list '(:batch-rows + :batch-size + :batch-concurrency)) + extras) + "Given a list of options, remove the generic ones that should already have + been processed." + (loop :for (k v) :on options :by #'cddr + :unless (member k (append option-list extras)) + :append (list k v))) + (defmacro make-option-rule (name rule &optional option) "Generates a rule named NAME to parse RULE and return OPTION." (let* ((bindings @@ -475,6 +535,9 @@ (cons :identifier-case action)))) (defrule mysql-option (or option-workers + option-batch-rows + option-batch-size + option-batch-concurrency option-truncate option-data-only option-schema-only @@ -869,6 +932,7 @@ (*pgconn-pass* ,pgpass) (*pg-dbname* ,pgdb) (*pg-settings* ',gucs) + ,@(batch-control-bindings options) (pgloader.pgsql::*pgsql-reserved-keywords* (pgloader.pgsql:list-reserved-keywords ,pgdb)) (source @@ -888,7 +952,7 @@ :state-before state-before :state-after state-after :state-indexes state-idx - ,@options) + ,@(remove-batch-control-option options)) ,(sql-code-block pgdb 'state-after after "after load") @@ -910,7 +974,10 @@ load database set work_mem to '16MB', maintenance_work_mem to '512 MB'; |# -(defrule sqlite-option (or option-truncate +(defrule sqlite-option (or option-batch-rows + option-batch-size + option-batch-concurrency + option-truncate option-data-only option-schema-only option-include-drop @@ -968,6 +1035,7 @@ load database (*pgconn-pass* ,password) (*pg-dbname* ,dbname) (*pg-settings* ',gucs) + ,@(batch-control-bindings options) (pgloader.pgsql::*pgsql-reserved-keywords* (pgloader.pgsql:list-reserved-keywords ,dbname)) (db @@ -998,7 +1066,7 @@ load database `(:only-tables ',(list table-name))) :including ',incl :excluding ',excl - ,@options))))))) + ,@(remove-batch-control-option options)))))))) @@ -1153,7 +1221,12 @@ load database (declare (ignore table name e)) (cons :table-name (text table-name))))) -(defrule dbf-option (or option-truncate option-create-table option-table-name)) +(defrule dbf-option (or option-batch-rows + option-batch-size + option-batch-concurrency + option-truncate + option-create-table + option-table-name)) (defrule another-dbf-option (and comma dbf-option) (:lambda (source) @@ -1193,6 +1266,7 @@ load database (*pgconn-pass* ,password) (*pg-dbname* ,dbname) (*pg-settings* ',gucs) + ,@(batch-control-bindings options) (source ,(destructuring-bind (kind url) source (ecase kind @@ -1218,7 +1292,7 @@ load database (pgloader.sources:copy-from source :state-before state-before - ,@options) + ,@(remove-batch-control-option options)) (report-full-summary "Total import time" *state* :before state-before))))))) @@ -1319,15 +1393,18 @@ load database (defrule option-trim-unquoted-blanks (and kw-trim kw-unquoted kw-blanks) (:constant (cons :trim-blanks t))) -(defrule csv-option (or option-truncate - option-skip-header +(defrule csv-option (or option-batch-rows + option-batch-size + option-batch-concurrency + option-truncate + option-skip-header option-lines-terminated-by - option-fields-not-enclosed - option-fields-enclosed-by - option-fields-escaped-by - option-fields-terminated-by - option-trim-unquoted-blanks - option-keep-unquoted-blanks)) + option-fields-not-enclosed + option-fields-enclosed-by + option-fields-escaped-by + option-fields-terminated-by + option-trim-unquoted-blanks + option-keep-unquoted-blanks)) (defrule another-csv-option (and comma csv-option) (:lambda (source) @@ -1592,7 +1669,8 @@ load database (*pgconn-user* ,user) (*pgconn-pass* ,password) (*pg-dbname* ,dbname) - (*pg-settings* ',gucs)) + (*pg-settings* ',gucs) + ,@(batch-control-bindings options)) (progn ,(sql-code-block dbname 'state-before before "before load") @@ -1606,9 +1684,8 @@ load database :encoding ,encoding :fields ',fields :columns ',columns - ,@(loop for (k v) on options by #'cddr - unless (eq k :truncate) - append (list k v))))) + ,@(remove-batch-control-option + options :extras '(:truncate))))) (pgloader.sources:copy-from source :truncate truncate)) ,(sql-code-block dbname 'state-after after "after load") @@ -1667,7 +1744,10 @@ load database (declare (ignore open close)) field-defs))) -(defrule fixed-option (or option-truncate +(defrule fixed-option (or option-batch-rows + option-batch-size + option-batch-concurrency + option-truncate option-skip-header)) (defrule another-fixed-option (and comma fixed-option) @@ -1730,7 +1810,8 @@ load database (*pgconn-user* ,user) (*pgconn-pass* ,password) (*pg-dbname* ,dbname) - (*pg-settings* ',gucs)) + (*pg-settings* ',gucs) + ,@(batch-control-bindings options)) (progn ,(sql-code-block dbname 'state-before before "before load") diff --git a/test/csv-districts.load b/test/csv-districts.load index 519bb83..8f6cb9b 100644 --- a/test/csv-districts.load +++ b/test/csv-districts.load @@ -28,6 +28,9 @@ LOAD CSV WITH truncate, skip header = 1, + batch rows = 200, + batch size = 1024 kB, + batch concurrency = 3, fields terminated by '\t' BEFORE LOAD DO