Expose concurrency settings to the end users.

Add the workers and concurrency settings to the LOAD commands for
database sources so that users can tweak them now, and add mentions of
them in the documentation too.

From the documentation string of the copy-from method as found in
src/sources/common/methods.lisp:

   We allow WORKER-COUNT simultaneous workers to be active at the same time
   in the context of this COPY object. A single unit of work consist of
   several kinds of workers:

     - a reader getting raw data from the COPY source with `map-rows',
     - N transformers preparing raw data for PostgreSQL COPY protocol,
     - N writers sending the data down to PostgreSQL.

   The N here is setup to the CONCURRENCY parameter: with a CONCURRENCY of
   2, we start (+ 1 2 2) = 5 concurrent tasks, with a CONCURRENCY of 4 we
   start (+ 1 4 4) = 9 concurrent tasks, of which only WORKER-COUNT may be
   active simultaneously.

Those options should find their way in the remaining sources, that's for
a follow-up patch tho.
This commit is contained in:
Dimitri Fontaine 2016-01-15 23:17:55 +01:00
parent fb40a472ab
commit eb45bf0338
13 changed files with 193 additions and 38 deletions

View File

@ -1,7 +1,7 @@
.\" generated with Ronn/v0.7.3
.\" http://github.com/rtomayko/ronn/tree/0.7.3
.
.TH "PGLOADER" "1" "December 2015" "ff" ""
.TH "PGLOADER" "1" "January 2016" "ff" ""
.
.SH "NAME"
\fBpgloader\fR \- PostgreSQL data loader
@ -458,6 +458,41 @@ When given a file that the PostgreSQL \fBCOPY\fR command knows how to parse, and
.P
Note that while the \fBCOPY\fR command is restricted to read either from its standard input or from a local file on the server\'s file system, the command line tool \fBpsql\fR implements a \fB\ecopy\fR command that knows how to stream a file local to the client over the network and into the PostgreSQL server, using the same protocol as pgloader uses\.
.
.SH "A NOTE ABOUT PARALLELISM"
pgloader uses several concurrent tasks to process the data being loaded:
.
.IP "\(bu" 4
a reader task reads the data in,
.
.IP "\(bu" 4
at least one transformer task is responsible for applying the needed transformations to given data so that it fits PostgreSQL expectations, those transformations include CSV like user\-defined \fIprojections\fR, database \fIcasting\fR (default and user given), and PostgreSQL specific \fIformatting\fR of the data for the COPY protocol and in unicode,
.
.IP "\(bu" 4
at least one writer task is responsible for sending the data down to PostgreSQL using the COPY protocol\.
.
.IP "" 0
.
.P
The idea behind having the transformer task do the \fIformatting\fR is so that in the event of bad rows being rejected by PostgreSQL the retry process doesn\'t have to do that step again\.
.
.P
At the moment, the number of transformer and writer tasks are forced into being the same, which allows for a very simple \fIqueueing\fR model to be implemented: the reader task fills in one queue per transformer task, which then pops from that queue and pushes to a writer queue per COPY task\.
.
.P
The parameter \fIworkers\fR allows to control how many worker threads are allowed to be active at any time (that\'s the parallelism level); and the parameter \fIconcurrency\fR allows to control how many tasks are started to handle the data (they may not all run at the same time, depending on the \fIworkers\fR setting)\.
.
.P
With a \fIconcurrency\fR of 2, we start 1 reader thread, 2 transformer threads and 2 writer tasks, that\'s 5 concurrent tasks to schedule into \fIworkers\fR threads\.
.
.P
So with \fBworkers = 4, concurrency = 2\fR, the parallel scheduler will maintain active only 4 of the 5 tasks that are started\.
.
.P
With \fBworkers = 8, concurrency = 1\fR, we then are able to work on several units of work at the same time\. In the database sources, a unit of work is a table, so those settings allow pgloader to be active on as many as 3 tables at any time in the load process\.
.
.P
As the \fBCREATE INDEX\fR threads started by pgloader are only waiting until PostgreSQL is done with the real work, those threads are \fINOT\fR counted into the concurrency levels as detailed here\.
.
.SH "SOURCE FORMATS"
pgloader supports the following input formats:
.
@ -569,6 +604,37 @@ Set of options to apply to the command, using a global syntax of either:
.IP
See each specific command for details\.
.
.IP
All data sources specific commands support the following options:
.
.IP "\(bu" 4
\fIbatch rows = R\fR
.
.IP "\(bu" 4
\fIbatch size = \.\.\. MB\fR
.
.IP "\(bu" 4
\fIbatch concurrency = \.\.\.\fR
.
.IP "" 0
.
.IP
See the section BATCH BEHAVIOUR OPTIONS for more details\.
.
.IP
In addition, the data sources \fImysql\fR, \fIsqlite\fR, \fImssql\fR, \fIixf\fR and \fIdbf\fR all support the following settings:
.
.IP "\(bu" 4
\fIworkers = W\fR
.
.IP "\(bu" 4
\fIconcurrency = C\fR
.
.IP "" 0
.
.IP
See section A NOTE ABOUT PARALLELISM for more details\.
.
.IP "\(bu" 4
\fISET\fR
.
@ -769,10 +835,7 @@ Takes a memory unit as argument, such as \fI20 MB\fR, its default value\. Accept
\fIbatch concurrency\fR
.
.IP
Takes a numeric value as argument, defaults to \fB10\fR\. That\'s the number of batches that pgloader is allows to build in memory, even when only a single batch at a time might be sent to PostgreSQL\.
.
.IP
Supporting more than a single batch being sent at a time is on the TODO list of pgloader, but is not implemented yet\. This option is about controlling the memory needs of pgloader as a trade\-off to the performances characteristics, and not about parallel activity of pgloader\.
Takes a numeric value as argument, defaults to \fB10\fR\. That\'s the number of batches that pgloader is allows to build in memory in each reader thread\. See the \fIworkers\fR setting for how many reader threads are allowed to run at the same time: each of them is allowed as many as \fIbatch concurrency\fR batches\.
.
.IP "" 0
.
@ -1085,8 +1148,7 @@ LOAD FIXED
WITH truncate
SET client_encoding to \'latin1\',
work_mem to \'14MB\',
SET work_mem to \'14MB\',
standard_conforming_strings to \'on\'
BEFORE LOAD DO
@ -1295,8 +1357,7 @@ LOAD COPY
WITH truncate
SET client_encoding to \'latin1\',
work_mem to \'14MB\',
SET work_mem to \'14MB\',
standard_conforming_strings to \'on\'
BEFORE LOAD DO
@ -1647,7 +1708,8 @@ LOAD DATABASE
FROM mysql://root@localhost/sakila
INTO postgresql://localhost:54393/sakila
WITH include drop, create tables, create indexes, reset sequences
WITH include drop, create tables, create indexes, reset sequences,
workers = 8, concurrency = 1
SET maintenance_work_mem to \'128MB\',
work_mem to \'12MB\',

View File

@ -388,6 +388,53 @@ line tool `psql` implements a `\copy` command that knows how to stream a
file local to the client over the network and into the PostgreSQL server,
using the same protocol as pgloader uses.
## A NOTE ABOUT PARALLELISM
pgloader uses several concurrent tasks to process the data being loaded:
- a reader task reads the data in,
- at least one transformer task is responsible for applying the needed
transformations to given data so that it fits PostgreSQL expectations,
those transformations include CSV like user-defined *projections*,
database *casting* (default and user given), and PostgreSQL specific
*formatting* of the data for the COPY protocol and in unicode,
- at least one writer task is responsible for sending the data down to
PostgreSQL using the COPY protocol.
The idea behind having the transformer task do the *formatting* is so that
in the event of bad rows being rejected by PostgreSQL the retry process
doesn't have to do that step again.
At the moment, the number of transformer and writer tasks are forced into
being the same, which allows for a very simple *queueing* model to be
implemented: the reader task fills in one queue per transformer task,
which then pops from that queue and pushes to a writer queue per COPY
task.
The parameter *workers* allows to control how many worker threads are
allowed to be active at any time (that's the parallelism level); and the
parameter *concurrency* allows to control how many tasks are started to
handle the data (they may not all run at the same time, depending on the
*workers* setting).
With a *concurrency* of 2, we start 1 reader thread, 2 transformer threads
and 2 writer tasks, that's 5 concurrent tasks to schedule into *workers*
threads.
So with `workers = 4, concurrency = 2`, the parallel scheduler will
maintain active only 4 of the 5 tasks that are started.
With `workers = 8, concurrency = 1`, we then are able to work on several
units of work at the same time. In the database sources, a unit of work is a
table, so those settings allow pgloader to be active on as many as 3 tables
at any time in the load process.
As the `CREATE INDEX` threads started by pgloader are only waiting until
PostgreSQL is done with the real work, those threads are *NOT* counted into
the concurrency levels as detailed here.
## SOURCE FORMATS
pgloader supports the following input formats:
@ -495,6 +542,22 @@ Some clauses are common to all commands:
See each specific command for details.
All data sources specific commands support the following options:
- *batch rows = R*
- *batch size = ... MB*
- *batch concurrency = ...*
See the section BATCH BEHAVIOUR OPTIONS for more details.
In addition, the data sources *mysql*, *sqlite*, *mssql*, *ixf* and
*dbf* all support the following settings:
- *workers = W*
- *concurrency = C*
See section A NOTE ABOUT PARALLELISM for more details.
- *SET*
This clause allows to specify session parameters to be set for all the
@ -682,14 +745,10 @@ The global batch behaviour options are:
- *batch concurrency*
Takes a numeric value as argument, defaults to `10`. That's the number
of batches that pgloader is allows to build in memory, even when only a
single batch at a time might be sent to PostgreSQL.
Supporting more than a single batch being sent at a time is on the TODO
list of pgloader, but is not implemented yet. This option is about
controlling the memory needs of pgloader as a trade-off to the
performances characteristics, and not about parallel activity of
pgloader.
of batches that pgloader is allows to build in memory in each reader
thread. See the *workers* setting for how many reader threads are
allowed to run at the same time: each of them is allowed as many as
*batch concurrency* batches.
Other options are specific to each input source, please refer to specific
parts of the documentation for their listing and covering.
@ -1399,7 +1458,8 @@ Here's an example:
FROM mysql://root@localhost/sakila
INTO postgresql://localhost:54393/sakila
WITH include drop, create tables, create indexes, reset sequences
WITH include drop, create tables, create indexes, reset sequences,
workers = 8, concurrency = 1
SET maintenance_work_mem to '128MB',
work_mem to '12MB',

View File

@ -18,7 +18,9 @@
(bind (((_ _ _ table-name) tn))
(cons :table-name (text table-name)))))
(defrule dbf-option (or option-batch-rows
(defrule dbf-option (or option-workers
option-concurrency
option-batch-rows
option-batch-size
option-batch-concurrency
option-truncate

View File

@ -18,7 +18,9 @@
(:lambda (tzopt)
(bind (((_ tz) tzopt)) (cons :timezone tz))))
(defrule ixf-option (or option-batch-rows
(defrule ixf-option (or option-workers
option-concurrency
option-batch-rows
option-batch-size
option-batch-concurrency
option-truncate
@ -34,12 +36,6 @@
(defrule ixf-options (and kw-with (and ixf-option (* (and comma ixf-option))))
(:function flatten-option-list))
;;; piggyback on DBF parsing
(defrule ixf-options (and kw-with ixf-option-list)
(:lambda (source)
(bind (((_ opts) source))
(cons :ixf-options opts))))
(defrule ixf-uri (and "ixf://" filename)
(:lambda (source)
(bind (((_ filename) source))

View File

@ -14,7 +14,9 @@
;;;
(make-option-rule create-schemas (and kw-create (? kw-no) kw-schemas))
(defrule mssql-option (or option-batch-rows
(defrule mssql-option (or option-workers
option-concurrency
option-batch-rows
option-batch-size
option-batch-concurrency
option-truncate

View File

@ -8,6 +8,7 @@
;;; MySQL options
;;;
(defrule mysql-option (or option-workers
option-concurrency
option-batch-rows
option-batch-size
option-batch-concurrency

View File

@ -30,6 +30,13 @@
(bind (((_ _ nb) workers))
(cons :workers (parse-integer (text nb))))))
(defrule option-concurrency (and kw-concurrency
equal-sign
(+ (digit-char-p character)))
(:lambda (concurrency)
(bind (((_ _ nb) concurrency))
(cons :concurrency (parse-integer (text nb))))))
(defrule option-batch-rows (and kw-batch kw-rows equal-sign
(+ (digit-char-p character)))
(:lambda (batch-rows)

View File

@ -13,7 +13,9 @@ load database
set work_mem to '16MB', maintenance_work_mem to '512 MB';
|#
(defrule sqlite-option (or option-batch-rows
(defrule sqlite-option (or option-workers
option-concurrency
option-batch-rows
option-batch-size
option-batch-concurrency
option-truncate

View File

@ -77,13 +77,21 @@
(defgeneric copy-database (source
&key
workers
concurrency
truncate
data-only
schema-only
create-tables
include-drop
foreign-keys
create-indexes
reset-sequences)
reset-sequences
disable-triggers
materialize-views
set-table-oids
including
excluding)
(:documentation
"Auto-discover source schema, convert it to PostgreSQL, migrate the data
from the source definition to PostgreSQL for all the discovered

View File

@ -120,7 +120,7 @@
;;;
(defmethod copy-database ((copy db-copy)
&key
(worker-count 8)
(workers 4)
(concurrency 1)
(truncate nil)
(disable-triggers nil)
@ -139,7 +139,7 @@
set-table-oids
materialize-views)
"Export database source data and Import it into PostgreSQL"
(let* ((copy-kernel (make-kernel worker-count))
(let* ((copy-kernel (make-kernel workers))
(copy-channel (let ((lp:*kernel* copy-kernel)) (lp:make-channel)))
(catalog (fetch-metadata
copy

View File

@ -61,12 +61,19 @@
drop-indexes
;; generic API, but ignored here
data-only
workers
concurrency
data-only
schema-only
create-tables
include-drop
foreign-keys
create-indexes
reset-sequences)
reset-sequences
materialize-views
set-table-oids
including
excluding)
"Copy the contents of the COPY formated file to PostgreSQL."
(declare (ignore data-only schema-only
create-tables include-drop

View File

@ -5,10 +5,15 @@ load database
-- WITH include drop, create tables, no truncate,
-- create indexes, reset sequences, foreign keys
-- WITH batch rows = 10000
WITH concurrency = 1, workers = 6
SET maintenance_work_mem to '128MB', work_mem to '12MB', search_path to 'sakila'
CAST type datetime to timestamptz drop default drop not null using zero-dates-to-null,
type date drop not null drop default using zero-dates-to-null
CAST -- type datetime to timestamptz drop default drop not null using zero-dates-to-null,
type date drop not null drop default using zero-dates-to-null,
type datetime to timestamp drop default drop not null using zero-dates-to-null
-- type tinyint to boolean using tinyint-to-boolean,
-- type year to integer drop typemod -- now a default
@ -20,5 +25,6 @@ load database
-- EXCLUDING TABLE NAMES MATCHING ~<ory>
BEFORE LOAD DO
$$ create schema if not exists sakila; $$;
$$ create schema if not exists sakila; $$,
$$ alter database sakila set search_path to sakila, public; $$;

View File

@ -4,6 +4,8 @@ load database
-- including only table names like 'Invoice%'
with include drop, create tables, create indexes, reset sequences
with workers = 4,
concurrency = 1,
include drop, create tables, create indexes, reset sequences
set work_mem to '16MB', maintenance_work_mem to '512 MB';