diff --git a/pgloader.1 b/pgloader.1 index 3403249..b7f1c6c 100644 --- a/pgloader.1 +++ b/pgloader.1 @@ -1,7 +1,7 @@ .\" generated with Ronn/v0.7.3 .\" http://github.com/rtomayko/ronn/tree/0.7.3 . -.TH "PGLOADER" "1" "December 2015" "ff" "" +.TH "PGLOADER" "1" "January 2016" "ff" "" . .SH "NAME" \fBpgloader\fR \- PostgreSQL data loader @@ -458,6 +458,41 @@ When given a file that the PostgreSQL \fBCOPY\fR command knows how to parse, and .P Note that while the \fBCOPY\fR command is restricted to read either from its standard input or from a local file on the server\'s file system, the command line tool \fBpsql\fR implements a \fB\ecopy\fR command that knows how to stream a file local to the client over the network and into the PostgreSQL server, using the same protocol as pgloader uses\. . +.SH "A NOTE ABOUT PARALLELISM" +pgloader uses several concurrent tasks to process the data being loaded: +. +.IP "\(bu" 4 +a reader task reads the data in, +. +.IP "\(bu" 4 +at least one transformer task is responsible for applying the needed transformations to given data so that it fits PostgreSQL expectations, those transformations include CSV like user\-defined \fIprojections\fR, database \fIcasting\fR (default and user given), and PostgreSQL specific \fIformatting\fR of the data for the COPY protocol and in unicode, +. +.IP "\(bu" 4 +at least one writer task is responsible for sending the data down to PostgreSQL using the COPY protocol\. +. +.IP "" 0 +. +.P +The idea behind having the transformer task do the \fIformatting\fR is so that in the event of bad rows being rejected by PostgreSQL the retry process doesn\'t have to do that step again\. +. +.P +At the moment, the number of transformer and writer tasks are forced into being the same, which allows for a very simple \fIqueueing\fR model to be implemented: the reader task fills in one queue per transformer task, which then pops from that queue and pushes to a writer queue per COPY task\. +. +.P +The parameter \fIworkers\fR allows to control how many worker threads are allowed to be active at any time (that\'s the parallelism level); and the parameter \fIconcurrency\fR allows to control how many tasks are started to handle the data (they may not all run at the same time, depending on the \fIworkers\fR setting)\. +. +.P +With a \fIconcurrency\fR of 2, we start 1 reader thread, 2 transformer threads and 2 writer tasks, that\'s 5 concurrent tasks to schedule into \fIworkers\fR threads\. +. +.P +So with \fBworkers = 4, concurrency = 2\fR, the parallel scheduler will maintain active only 4 of the 5 tasks that are started\. +. +.P +With \fBworkers = 8, concurrency = 1\fR, we then are able to work on several units of work at the same time\. In the database sources, a unit of work is a table, so those settings allow pgloader to be active on as many as 3 tables at any time in the load process\. +. +.P +As the \fBCREATE INDEX\fR threads started by pgloader are only waiting until PostgreSQL is done with the real work, those threads are \fINOT\fR counted into the concurrency levels as detailed here\. +. .SH "SOURCE FORMATS" pgloader supports the following input formats: . @@ -569,6 +604,37 @@ Set of options to apply to the command, using a global syntax of either: .IP See each specific command for details\. . +.IP +All data sources specific commands support the following options: +. +.IP "\(bu" 4 +\fIbatch rows = R\fR +. +.IP "\(bu" 4 +\fIbatch size = \.\.\. MB\fR +. +.IP "\(bu" 4 +\fIbatch concurrency = \.\.\.\fR +. +.IP "" 0 +. +.IP +See the section BATCH BEHAVIOUR OPTIONS for more details\. +. +.IP +In addition, the data sources \fImysql\fR, \fIsqlite\fR, \fImssql\fR, \fIixf\fR and \fIdbf\fR all support the following settings: +. +.IP "\(bu" 4 +\fIworkers = W\fR +. +.IP "\(bu" 4 +\fIconcurrency = C\fR +. +.IP "" 0 +. +.IP +See section A NOTE ABOUT PARALLELISM for more details\. +. .IP "\(bu" 4 \fISET\fR . @@ -769,10 +835,7 @@ Takes a memory unit as argument, such as \fI20 MB\fR, its default value\. Accept \fIbatch concurrency\fR . .IP -Takes a numeric value as argument, defaults to \fB10\fR\. That\'s the number of batches that pgloader is allows to build in memory, even when only a single batch at a time might be sent to PostgreSQL\. -. -.IP -Supporting more than a single batch being sent at a time is on the TODO list of pgloader, but is not implemented yet\. This option is about controlling the memory needs of pgloader as a trade\-off to the performances characteristics, and not about parallel activity of pgloader\. +Takes a numeric value as argument, defaults to \fB10\fR\. That\'s the number of batches that pgloader is allows to build in memory in each reader thread\. See the \fIworkers\fR setting for how many reader threads are allowed to run at the same time: each of them is allowed as many as \fIbatch concurrency\fR batches\. . .IP "" 0 . @@ -1085,8 +1148,7 @@ LOAD FIXED WITH truncate - SET client_encoding to \'latin1\', - work_mem to \'14MB\', + SET work_mem to \'14MB\', standard_conforming_strings to \'on\' BEFORE LOAD DO @@ -1295,8 +1357,7 @@ LOAD COPY WITH truncate - SET client_encoding to \'latin1\', - work_mem to \'14MB\', + SET work_mem to \'14MB\', standard_conforming_strings to \'on\' BEFORE LOAD DO @@ -1647,7 +1708,8 @@ LOAD DATABASE FROM mysql://root@localhost/sakila INTO postgresql://localhost:54393/sakila - WITH include drop, create tables, create indexes, reset sequences + WITH include drop, create tables, create indexes, reset sequences, + workers = 8, concurrency = 1 SET maintenance_work_mem to \'128MB\', work_mem to \'12MB\', diff --git a/pgloader.1.md b/pgloader.1.md index bbde4e5..b47a1ea 100644 --- a/pgloader.1.md +++ b/pgloader.1.md @@ -388,6 +388,53 @@ line tool `psql` implements a `\copy` command that knows how to stream a file local to the client over the network and into the PostgreSQL server, using the same protocol as pgloader uses. +## A NOTE ABOUT PARALLELISM + +pgloader uses several concurrent tasks to process the data being loaded: + + - a reader task reads the data in, + + - at least one transformer task is responsible for applying the needed + transformations to given data so that it fits PostgreSQL expectations, + those transformations include CSV like user-defined *projections*, + database *casting* (default and user given), and PostgreSQL specific + *formatting* of the data for the COPY protocol and in unicode, + + - at least one writer task is responsible for sending the data down to + PostgreSQL using the COPY protocol. + +The idea behind having the transformer task do the *formatting* is so that +in the event of bad rows being rejected by PostgreSQL the retry process +doesn't have to do that step again. + +At the moment, the number of transformer and writer tasks are forced into +being the same, which allows for a very simple *queueing* model to be +implemented: the reader task fills in one queue per transformer task, +which then pops from that queue and pushes to a writer queue per COPY +task. + +The parameter *workers* allows to control how many worker threads are +allowed to be active at any time (that's the parallelism level); and the +parameter *concurrency* allows to control how many tasks are started to +handle the data (they may not all run at the same time, depending on the +*workers* setting). + +With a *concurrency* of 2, we start 1 reader thread, 2 transformer threads +and 2 writer tasks, that's 5 concurrent tasks to schedule into *workers* +threads. + +So with `workers = 4, concurrency = 2`, the parallel scheduler will +maintain active only 4 of the 5 tasks that are started. + +With `workers = 8, concurrency = 1`, we then are able to work on several +units of work at the same time. In the database sources, a unit of work is a +table, so those settings allow pgloader to be active on as many as 3 tables +at any time in the load process. + +As the `CREATE INDEX` threads started by pgloader are only waiting until +PostgreSQL is done with the real work, those threads are *NOT* counted into +the concurrency levels as detailed here. + ## SOURCE FORMATS pgloader supports the following input formats: @@ -495,6 +542,22 @@ Some clauses are common to all commands: See each specific command for details. + All data sources specific commands support the following options: + + - *batch rows = R* + - *batch size = ... MB* + - *batch concurrency = ...* + + See the section BATCH BEHAVIOUR OPTIONS for more details. + + In addition, the data sources *mysql*, *sqlite*, *mssql*, *ixf* and + *dbf* all support the following settings: + + - *workers = W* + - *concurrency = C* + + See section A NOTE ABOUT PARALLELISM for more details. + - *SET* This clause allows to specify session parameters to be set for all the @@ -682,14 +745,10 @@ The global batch behaviour options are: - *batch concurrency* Takes a numeric value as argument, defaults to `10`. That's the number - of batches that pgloader is allows to build in memory, even when only a - single batch at a time might be sent to PostgreSQL. - - Supporting more than a single batch being sent at a time is on the TODO - list of pgloader, but is not implemented yet. This option is about - controlling the memory needs of pgloader as a trade-off to the - performances characteristics, and not about parallel activity of - pgloader. + of batches that pgloader is allows to build in memory in each reader + thread. See the *workers* setting for how many reader threads are + allowed to run at the same time: each of them is allowed as many as + *batch concurrency* batches. Other options are specific to each input source, please refer to specific parts of the documentation for their listing and covering. @@ -1399,7 +1458,8 @@ Here's an example: FROM mysql://root@localhost/sakila INTO postgresql://localhost:54393/sakila - WITH include drop, create tables, create indexes, reset sequences + WITH include drop, create tables, create indexes, reset sequences, + workers = 8, concurrency = 1 SET maintenance_work_mem to '128MB', work_mem to '12MB', diff --git a/src/parsers/command-dbf.lisp b/src/parsers/command-dbf.lisp index 425b259..6b98eaf 100644 --- a/src/parsers/command-dbf.lisp +++ b/src/parsers/command-dbf.lisp @@ -18,7 +18,9 @@ (bind (((_ _ _ table-name) tn)) (cons :table-name (text table-name))))) -(defrule dbf-option (or option-batch-rows +(defrule dbf-option (or option-workers + option-concurrency + option-batch-rows option-batch-size option-batch-concurrency option-truncate diff --git a/src/parsers/command-ixf.lisp b/src/parsers/command-ixf.lisp index b41cc1f..c25b061 100644 --- a/src/parsers/command-ixf.lisp +++ b/src/parsers/command-ixf.lisp @@ -18,7 +18,9 @@ (:lambda (tzopt) (bind (((_ tz) tzopt)) (cons :timezone tz)))) -(defrule ixf-option (or option-batch-rows +(defrule ixf-option (or option-workers + option-concurrency + option-batch-rows option-batch-size option-batch-concurrency option-truncate @@ -34,12 +36,6 @@ (defrule ixf-options (and kw-with (and ixf-option (* (and comma ixf-option)))) (:function flatten-option-list)) -;;; piggyback on DBF parsing -(defrule ixf-options (and kw-with ixf-option-list) - (:lambda (source) - (bind (((_ opts) source)) - (cons :ixf-options opts)))) - (defrule ixf-uri (and "ixf://" filename) (:lambda (source) (bind (((_ filename) source)) diff --git a/src/parsers/command-mssql.lisp b/src/parsers/command-mssql.lisp index 49a2cab..edc5b78 100644 --- a/src/parsers/command-mssql.lisp +++ b/src/parsers/command-mssql.lisp @@ -14,7 +14,9 @@ ;;; (make-option-rule create-schemas (and kw-create (? kw-no) kw-schemas)) -(defrule mssql-option (or option-batch-rows +(defrule mssql-option (or option-workers + option-concurrency + option-batch-rows option-batch-size option-batch-concurrency option-truncate diff --git a/src/parsers/command-mysql.lisp b/src/parsers/command-mysql.lisp index 574a377..e7ade35 100644 --- a/src/parsers/command-mysql.lisp +++ b/src/parsers/command-mysql.lisp @@ -8,6 +8,7 @@ ;;; MySQL options ;;; (defrule mysql-option (or option-workers + option-concurrency option-batch-rows option-batch-size option-batch-concurrency diff --git a/src/parsers/command-options.lisp b/src/parsers/command-options.lisp index de560b3..23e8433 100644 --- a/src/parsers/command-options.lisp +++ b/src/parsers/command-options.lisp @@ -30,6 +30,13 @@ (bind (((_ _ nb) workers)) (cons :workers (parse-integer (text nb)))))) +(defrule option-concurrency (and kw-concurrency + equal-sign + (+ (digit-char-p character))) + (:lambda (concurrency) + (bind (((_ _ nb) concurrency)) + (cons :concurrency (parse-integer (text nb)))))) + (defrule option-batch-rows (and kw-batch kw-rows equal-sign (+ (digit-char-p character))) (:lambda (batch-rows) diff --git a/src/parsers/command-sqlite.lisp b/src/parsers/command-sqlite.lisp index 293d595..a682f03 100644 --- a/src/parsers/command-sqlite.lisp +++ b/src/parsers/command-sqlite.lisp @@ -13,7 +13,9 @@ load database set work_mem to '16MB', maintenance_work_mem to '512 MB'; |# -(defrule sqlite-option (or option-batch-rows +(defrule sqlite-option (or option-workers + option-concurrency + option-batch-rows option-batch-size option-batch-concurrency option-truncate diff --git a/src/sources/common/api.lisp b/src/sources/common/api.lisp index d59366b..3780b89 100644 --- a/src/sources/common/api.lisp +++ b/src/sources/common/api.lisp @@ -77,13 +77,21 @@ (defgeneric copy-database (source &key + workers + concurrency truncate data-only schema-only create-tables include-drop + foreign-keys create-indexes - reset-sequences) + reset-sequences + disable-triggers + materialize-views + set-table-oids + including + excluding) (:documentation "Auto-discover source schema, convert it to PostgreSQL, migrate the data from the source definition to PostgreSQL for all the discovered diff --git a/src/sources/common/db-methods.lisp b/src/sources/common/db-methods.lisp index c931eff..fd32088 100644 --- a/src/sources/common/db-methods.lisp +++ b/src/sources/common/db-methods.lisp @@ -120,7 +120,7 @@ ;;; (defmethod copy-database ((copy db-copy) &key - (worker-count 8) + (workers 4) (concurrency 1) (truncate nil) (disable-triggers nil) @@ -139,7 +139,7 @@ set-table-oids materialize-views) "Export database source data and Import it into PostgreSQL" - (let* ((copy-kernel (make-kernel worker-count)) + (let* ((copy-kernel (make-kernel workers)) (copy-channel (let ((lp:*kernel* copy-kernel)) (lp:make-channel))) (catalog (fetch-metadata copy diff --git a/src/sources/common/md-methods.lisp b/src/sources/common/md-methods.lisp index 5943dee..6677a9c 100644 --- a/src/sources/common/md-methods.lisp +++ b/src/sources/common/md-methods.lisp @@ -61,12 +61,19 @@ drop-indexes ;; generic API, but ignored here - data-only + workers + concurrency + data-only schema-only create-tables include-drop + foreign-keys create-indexes - reset-sequences) + reset-sequences + materialize-views + set-table-oids + including + excluding) "Copy the contents of the COPY formated file to PostgreSQL." (declare (ignore data-only schema-only create-tables include-drop diff --git a/test/sakila.load b/test/sakila.load index 241d8e9..c61c6c3 100644 --- a/test/sakila.load +++ b/test/sakila.load @@ -5,10 +5,15 @@ load database -- WITH include drop, create tables, no truncate, -- create indexes, reset sequences, foreign keys + -- WITH batch rows = 10000 + + WITH concurrency = 1, workers = 6 + SET maintenance_work_mem to '128MB', work_mem to '12MB', search_path to 'sakila' - CAST type datetime to timestamptz drop default drop not null using zero-dates-to-null, - type date drop not null drop default using zero-dates-to-null + CAST -- type datetime to timestamptz drop default drop not null using zero-dates-to-null, + type date drop not null drop default using zero-dates-to-null, + type datetime to timestamp drop default drop not null using zero-dates-to-null -- type tinyint to boolean using tinyint-to-boolean, -- type year to integer drop typemod -- now a default @@ -20,5 +25,6 @@ load database -- EXCLUDING TABLE NAMES MATCHING ~ BEFORE LOAD DO - $$ create schema if not exists sakila; $$; + $$ create schema if not exists sakila; $$, + $$ alter database sakila set search_path to sakila, public; $$; diff --git a/test/sqlite-chinook.load b/test/sqlite-chinook.load index febf85b..2b9b36d 100644 --- a/test/sqlite-chinook.load +++ b/test/sqlite-chinook.load @@ -4,6 +4,8 @@ load database -- including only table names like 'Invoice%' - with include drop, create tables, create indexes, reset sequences + with workers = 4, + concurrency = 1, + include drop, create tables, create indexes, reset sequences set work_mem to '16MB', maintenance_work_mem to '512 MB'; \ No newline at end of file