mirror of
https://github.com/dimitri/pgloader.git
synced 2026-05-04 18:36:12 +02:00
Implement multiple reader per table for MySQL.
Experiment with the idea of splitting the read work in several concurrent threads, where each reader is reading portions of the target table, using a WHERE id <= x and id > y clause in its SELECT query. For this to kick-in a number of conditions needs to be met, as described in the documentation. The main interest might not be faster queries to overall fetch the same data set, but better concurrency with as many readers as writters and each couple its own dedicated queue.
This commit is contained in:
parent
6d66280fa5
commit
0549e74f6d
31
pgloader.1
31
pgloader.1
@ -1774,7 +1774,7 @@ This command instructs pgloader to load data from a database connection\. The on
|
||||
A default set of casting rules are provided and might be overloaded and appended to by the command\.
|
||||
.
|
||||
.P
|
||||
Here\'s an example:
|
||||
Here\'s an example using as many options as possible, some of them even being defaults\. Chances are you don\'t need that complex a setup, don\'t copy and paste it, use it only as a reference!
|
||||
.
|
||||
.IP "" 4
|
||||
.
|
||||
@ -1785,7 +1785,8 @@ LOAD DATABASE
|
||||
INTO postgresql://localhost:54393/sakila
|
||||
|
||||
WITH include drop, create tables, create indexes, reset sequences,
|
||||
workers = 8, concurrency = 1
|
||||
workers = 8, concurrency = 1,
|
||||
multiple readers per thread, rows per range = 50000
|
||||
|
||||
SET PostgreSQL PARAMETERS
|
||||
maintenance_work_mem to \'128MB\',
|
||||
@ -1796,7 +1797,7 @@ LOAD DATABASE
|
||||
net_read_timeout = \'120\',
|
||||
net_write_timeout = \'120\'
|
||||
|
||||
CAST type datetime to timestamptz drop default drop not null using zero\-dates\-to\-null,
|
||||
CAST type bigint when (= precision 20) to bigserial drop typemod,
|
||||
type date drop not null drop default using zero\-dates\-to\-null,
|
||||
\-\- type tinyint to boolean using tinyint\-to\-boolean,
|
||||
type year to integer
|
||||
@ -1985,6 +1986,30 @@ When this option is listed pgloader refrains from migrating the data over\. Note
|
||||
.IP
|
||||
When this option is listed pgloader only issues the \fBCOPY\fR statements, without doing any other processing\.
|
||||
.
|
||||
.IP "\(bu" 4
|
||||
\fIsingle reader per thread\fR, \fImultiple readers per thread\fR
|
||||
.
|
||||
.IP
|
||||
The default is \fIsingle reader per thread\fR and it means that each MySQL table is read by a single thread as a whole, with a single \fBSELECT\fR statement using no \fBWHERE\fR clause\.
|
||||
.
|
||||
.IP
|
||||
When using \fImultiple readers per thread\fR pgloader may be able to divide the reading work into several threads, as many as the \fIconcurrency\fR setting, which needs to be greater than 1 for this option to kick be activated\.
|
||||
.
|
||||
.IP
|
||||
For each source table, pgloader searches for a primary key over a single numeric column, or a multiple\-column primary key index for which the first column is of a numeric data type (one of \fBinteger\fR or \fBbigint\fR)\. When such an index exists, pgloader runs a query to find the \fImin\fR and \fImax\fR values on this column, and then split that range into many ranges containing a maximum of \fIrows per range\fR\.
|
||||
.
|
||||
.IP
|
||||
When the range list we then obtain contains at least as many ranges than our concurrency setting, then we distribute those ranges to each reader thread\.
|
||||
.
|
||||
.IP
|
||||
So when all the conditions are met, pgloader then starts as many reader thread as the \fIconcurrency\fR setting, and each reader thread issues several queries with a \fBWHERE id >= x AND id < y\fR, where \fBy \- x = rows per range\fR or less (for the last range, depending on the max value just obtained\.
|
||||
.
|
||||
.IP "\(bu" 4
|
||||
\fIrows per range\fR
|
||||
.
|
||||
.IP
|
||||
How many rows are fetched per \fBSELECT\fR query when using \fImultiple readers per thread\fR, see above for details\.
|
||||
.
|
||||
.IP "" 0
|
||||
|
||||
.
|
||||
|
||||
@ -1508,14 +1508,17 @@ building.
|
||||
A default set of casting rules are provided and might be overloaded and
|
||||
appended to by the command.
|
||||
|
||||
Here's an example:
|
||||
Here's an example using as many options as possible, some of them even being
|
||||
defaults. Chances are you don't need that complex a setup, don't copy and
|
||||
paste it, use it only as a reference!
|
||||
|
||||
LOAD DATABASE
|
||||
FROM mysql://root@localhost/sakila
|
||||
INTO postgresql://localhost:54393/sakila
|
||||
|
||||
WITH include drop, create tables, create indexes, reset sequences,
|
||||
workers = 8, concurrency = 1
|
||||
workers = 8, concurrency = 1,
|
||||
multiple readers per thread, rows per range = 50000
|
||||
|
||||
SET PostgreSQL PARAMETERS
|
||||
maintenance_work_mem to '128MB',
|
||||
@ -1526,7 +1529,7 @@ Here's an example:
|
||||
net_read_timeout = '120',
|
||||
net_write_timeout = '120'
|
||||
|
||||
CAST type datetime to timestamptz drop default drop not null using zero-dates-to-null,
|
||||
CAST type bigint when (= precision 20) to bigserial drop typemod,
|
||||
type date drop not null drop default using zero-dates-to-null,
|
||||
-- type tinyint to boolean using tinyint-to-boolean,
|
||||
type year to integer
|
||||
@ -1723,6 +1726,39 @@ The `database` command accepts the following clauses and options:
|
||||
When this option is listed pgloader only issues the `COPY`
|
||||
statements, without doing any other processing.
|
||||
|
||||
- *single reader per thread*, *multiple readers per thread*
|
||||
|
||||
The default is *single reader per thread* and it means that each
|
||||
MySQL table is read by a single thread as a whole, with a single
|
||||
`SELECT` statement using no `WHERE` clause.
|
||||
|
||||
When using *multiple readers per thread* pgloader may be able to
|
||||
divide the reading work into several threads, as many as the
|
||||
*concurrency* setting, which needs to be greater than 1 for this
|
||||
option to kick be activated.
|
||||
|
||||
For each source table, pgloader searches for a primary key over a
|
||||
single numeric column, or a multiple-column primary key index for
|
||||
which the first column is of a numeric data type (one of `integer`
|
||||
or `bigint`). When such an index exists, pgloader runs a query to
|
||||
find the *min* and *max* values on this column, and then split that
|
||||
range into many ranges containing a maximum of *rows per range*.
|
||||
|
||||
When the range list we then obtain contains at least as many ranges
|
||||
than our concurrency setting, then we distribute those ranges to
|
||||
each reader thread.
|
||||
|
||||
So when all the conditions are met, pgloader then starts as many
|
||||
reader thread as the *concurrency* setting, and each reader thread
|
||||
issues several queries with a `WHERE id >= x AND id < y`, where `y -
|
||||
x = rows per range` or less (for the last range, depending on the
|
||||
max value just obtained.
|
||||
|
||||
- *rows per range*
|
||||
|
||||
How many rows are fetched per `SELECT` query when using *multiple
|
||||
readers per thread*, see above for details.
|
||||
|
||||
- *SET MySQL PARAMETERS*
|
||||
|
||||
The *SET MySQL PARAMETERS* allows setting MySQL parameters using the
|
||||
|
||||
@ -269,6 +269,8 @@
|
||||
#:unquote
|
||||
#:expand-user-homedir-pathname
|
||||
#:pretty-print-bytes
|
||||
#:split-range
|
||||
#:distribute
|
||||
|
||||
;; threads
|
||||
#:make-kernel
|
||||
@ -440,6 +442,7 @@
|
||||
#:header
|
||||
|
||||
;; main protocol/api
|
||||
#:concurrency-support
|
||||
#:map-rows
|
||||
#:copy-column-list
|
||||
#:queue-raw-data
|
||||
|
||||
@ -22,6 +22,7 @@
|
||||
#:*preserve-index-names*
|
||||
#:*copy-batch-rows*
|
||||
#:*copy-batch-size*
|
||||
#:*rows-per-range*
|
||||
#:*prefetch-rows*
|
||||
#:*pg-settings*
|
||||
#:*mysql-settings*
|
||||
@ -136,6 +137,9 @@
|
||||
(defparameter *prefetch-rows* 100000
|
||||
"How many rows do read in advance in the reader queue.")
|
||||
|
||||
(defparameter *rows-per-range* 10000
|
||||
"How many rows to read in each reader's thread, per SQL query.")
|
||||
|
||||
(defparameter *pg-settings* nil "An alist of GUC names and values.")
|
||||
(defparameter *mysql-settings* nil "An alist of GUC names and values.")
|
||||
|
||||
|
||||
@ -130,6 +130,13 @@
|
||||
(def-keyword-rule "including")
|
||||
(def-keyword-rule "excluding")
|
||||
(def-keyword-rule "like")
|
||||
(def-keyword-rule "multiple")
|
||||
(def-keyword-rule "single")
|
||||
(def-keyword-rule "reader")
|
||||
(def-keyword-rule "readers")
|
||||
(def-keyword-rule "per")
|
||||
(def-keyword-rule "thread")
|
||||
(def-keyword-rule "range")
|
||||
;; option for loading from an archive
|
||||
(def-keyword-rule "archive")
|
||||
(def-keyword-rule "before")
|
||||
|
||||
@ -14,6 +14,9 @@
|
||||
option-batch-size
|
||||
option-prefetch-rows
|
||||
option-max-parallel-create-index
|
||||
option-single-reader
|
||||
option-multiple-readers
|
||||
option-rows-per-range
|
||||
option-truncate
|
||||
option-disable-triggers
|
||||
option-data-only
|
||||
|
||||
@ -82,11 +82,18 @@
|
||||
(bind (((_ _ nb) prefetch-rows))
|
||||
(cons :prefetch-rows (parse-integer (text nb))))))
|
||||
|
||||
(defrule option-rows-per-range (and kw-rows kw-per kw-range
|
||||
equal-sign
|
||||
(+ (digit-char-p character)))
|
||||
(:lambda (rows-per-range)
|
||||
(cons :rows-per-range (parse-integer (text (fifth rows-per-range))))))
|
||||
|
||||
(defun batch-control-bindings (options)
|
||||
"Generate the code needed to add batch-control"
|
||||
`((*copy-batch-rows* (or ,(getf options :batch-rows) *copy-batch-rows*))
|
||||
(*copy-batch-size* (or ,(getf options :batch-size) *copy-batch-size*))
|
||||
(*prefetch-rows* (or ,(getf options :prefetch-rows) *prefetch-rows*))))
|
||||
`((*copy-batch-rows* (or ,(getf options :batch-rows) *copy-batch-rows*))
|
||||
(*copy-batch-size* (or ,(getf options :batch-size) *copy-batch-size*))
|
||||
(*prefetch-rows* (or ,(getf options :prefetch-rows) *prefetch-rows*))
|
||||
(*rows-per-range* (or ,(getf options :rows-per-range) *rows-per-range*))))
|
||||
|
||||
(defun identifier-case-binding (options)
|
||||
"Generate the code needed to bind *identifer-case* to the proper value."
|
||||
@ -97,6 +104,7 @@
|
||||
(option-list '(:batch-rows
|
||||
:batch-size
|
||||
:prefetch-rows
|
||||
:rows-per-range
|
||||
:identifier-case))
|
||||
extras)
|
||||
"Given a list of options, remove the generic ones that should already have
|
||||
@ -129,6 +137,14 @@
|
||||
(make-option-rule reset-sequences (and kw-reset (? kw-no) kw-sequences))
|
||||
(make-option-rule foreign-keys (and (? kw-no) kw-foreign kw-keys))
|
||||
|
||||
(defrule option-single-reader (and kw-single kw-reader kw-per kw-thread)
|
||||
(:constant (cons :multiple-readers nil)))
|
||||
|
||||
(defrule option-multiple-readers (and kw-multiple
|
||||
(or kw-readers kw-reader)
|
||||
kw-per kw-thread)
|
||||
(:constant (cons :multiple-readers t)))
|
||||
|
||||
(defrule option-schema-only (and kw-schema kw-only)
|
||||
(:constant (cons :schema-only t)))
|
||||
|
||||
|
||||
@ -34,6 +34,11 @@
|
||||
(setf (slot-value source 'transforms)
|
||||
(make-list (length (slot-value source 'columns))))))))
|
||||
|
||||
(defgeneric concurrency-support (copy concurrency)
|
||||
(:documentation
|
||||
"Returns nil when no concurrency is supported, or a list of copy ojbects
|
||||
prepared to run concurrently."))
|
||||
|
||||
(defgeneric map-rows (source &key process-row-fn)
|
||||
(:documentation
|
||||
"Load data from SOURCE and funcall PROCESS-ROW-FUN for each row found in
|
||||
|
||||
@ -212,6 +212,7 @@
|
||||
(on-error-stop *on-error-stop*)
|
||||
(worker-count 4)
|
||||
(concurrency 1)
|
||||
(multiple-readers nil)
|
||||
max-parallel-create-index
|
||||
(truncate nil)
|
||||
(disable-triggers nil)
|
||||
@ -271,7 +272,9 @@
|
||||
max-indexes))))
|
||||
(idx-channel (when idx-kernel
|
||||
(let ((lp:*kernel* idx-kernel))
|
||||
(lp:make-channel)))))
|
||||
(lp:make-channel))))
|
||||
|
||||
(task-count 0))
|
||||
|
||||
;; apply catalog level transformations to support the database migration
|
||||
;; that's CAST rules, index WHERE clause rewriting and ALTER commands
|
||||
@ -327,12 +330,15 @@
|
||||
;; copy-from, we have concurrency tasks writing.
|
||||
(progn ; when copy-data
|
||||
(setf (gethash table writers-count) concurrency)
|
||||
(copy-from table-source
|
||||
:concurrency concurrency
|
||||
:kernel copy-kernel
|
||||
:channel copy-channel
|
||||
:on-error-stop on-error-stop
|
||||
:disable-triggers disable-triggers)))))
|
||||
|
||||
(incf task-count
|
||||
(copy-from table-source
|
||||
:concurrency concurrency
|
||||
:multiple-readers multiple-readers
|
||||
:kernel copy-kernel
|
||||
:channel copy-channel
|
||||
:on-error-stop on-error-stop
|
||||
:disable-triggers disable-triggers))))))
|
||||
|
||||
;; now end the kernels
|
||||
;; and each time a table is done, launch its indexing
|
||||
@ -341,37 +347,35 @@
|
||||
(with-stats-collection ("COPY Threads Completion" :section :post
|
||||
:use-result-as-read t
|
||||
:use-result-as-rows t)
|
||||
(let ((worker-count (* (hash-table-count writers-count)
|
||||
(task-count concurrency))))
|
||||
(loop :for tasks :below worker-count
|
||||
:do (destructuring-bind (task table seconds)
|
||||
(lp:receive-result copy-channel)
|
||||
(log-message :debug
|
||||
"Finished processing ~a for ~s ~50T~6$s"
|
||||
task (format-table-name table) seconds)
|
||||
(when (eq :writer task)
|
||||
;;
|
||||
;; Start the CREATE INDEX parallel tasks only when
|
||||
;; the data has been fully copied over to the
|
||||
;; corresponding table, that's when the writers
|
||||
;; count is down to zero.
|
||||
;;
|
||||
(decf (gethash table writers-count))
|
||||
(log-message :debug "writers-counts[~a] = ~a"
|
||||
(format-table-name table)
|
||||
(gethash table writers-count))
|
||||
(loop :repeat task-count
|
||||
:do (destructuring-bind (task table seconds)
|
||||
(lp:receive-result copy-channel)
|
||||
(log-message :debug
|
||||
"Finished processing ~a for ~s ~50T~6$s"
|
||||
task (format-table-name table) seconds)
|
||||
(when (eq :writer task)
|
||||
;;
|
||||
;; Start the CREATE INDEX parallel tasks only when
|
||||
;; the data has been fully copied over to the
|
||||
;; corresponding table, that's when the writers
|
||||
;; count is down to zero.
|
||||
;;
|
||||
(decf (gethash table writers-count))
|
||||
(log-message :debug "writers-counts[~a] = ~a"
|
||||
(format-table-name table)
|
||||
(gethash table writers-count))
|
||||
|
||||
(when (and create-indexes
|
||||
(zerop (gethash table writers-count)))
|
||||
(alexandria:appendf
|
||||
pkeys
|
||||
(create-indexes-in-kernel (target-db copy)
|
||||
table
|
||||
idx-kernel
|
||||
idx-channel))))))
|
||||
(prog1
|
||||
worker-count
|
||||
(lp:end-kernel :wait nil))))))
|
||||
(when (and create-indexes
|
||||
(zerop (gethash table writers-count)))
|
||||
(alexandria:appendf
|
||||
pkeys
|
||||
(create-indexes-in-kernel (target-db copy)
|
||||
table
|
||||
idx-kernel
|
||||
idx-channel)))))
|
||||
:finally (progn
|
||||
(lp:end-kernel :wait nil)
|
||||
(return worker-count))))))
|
||||
|
||||
(log-message :info "Done with COPYing data, waiting for indexes")
|
||||
|
||||
|
||||
@ -140,38 +140,38 @@
|
||||
;; files actually.
|
||||
(let* ((lp:*kernel* (make-kernel worker-count))
|
||||
(channel (lp:make-channel))
|
||||
(path-list (expand-spec (source copy))))
|
||||
(path-list (expand-spec (source copy)))
|
||||
(task-count 0))
|
||||
(with-stats-collection ("Files Processed" :section :post
|
||||
:use-result-as-read t
|
||||
:use-result-as-rows t)
|
||||
(loop :for path-spec :in path-list
|
||||
:count t
|
||||
:do (let ((table-source (clone-copy-for copy path-spec)))
|
||||
(copy-from table-source
|
||||
:concurrency concurrency
|
||||
:kernel lp:*kernel*
|
||||
:channel channel
|
||||
:on-error-stop on-error-stop
|
||||
:disable-triggers disable-triggers))))
|
||||
(incf task-count
|
||||
(copy-from table-source
|
||||
:concurrency concurrency
|
||||
:kernel lp:*kernel*
|
||||
:channel channel
|
||||
:on-error-stop on-error-stop
|
||||
:disable-triggers disable-triggers)))))
|
||||
|
||||
;; end kernel
|
||||
(with-stats-collection ("COPY Threads Completion" :section :post
|
||||
:use-result-as-read t
|
||||
:use-result-as-rows t)
|
||||
(let ((worker-count (* (length path-list)
|
||||
(task-count concurrency))))
|
||||
(loop :for tasks :below worker-count
|
||||
:do (handler-case
|
||||
(destructuring-bind (task table seconds)
|
||||
(lp:receive-result channel)
|
||||
(log-message :debug
|
||||
"Finished processing ~a for ~s ~50T~6$s"
|
||||
task (format-table-name table) seconds))
|
||||
(condition (e)
|
||||
(log-message :fatal "~a" e))))
|
||||
(prog1
|
||||
worker-count
|
||||
(lp:end-kernel :wait nil))))
|
||||
(loop :repeat task-count
|
||||
:do (handler-case
|
||||
(destructuring-bind (task table seconds)
|
||||
(lp:receive-result channel)
|
||||
(log-message :debug
|
||||
"Finished processing ~a for ~s ~50T~6$s"
|
||||
task (format-table-name table) seconds))
|
||||
(condition (e)
|
||||
(log-message :fatal "~a" e)))
|
||||
:finally (progn
|
||||
(lp:end-kernel :wait nil)
|
||||
(return task-count))))
|
||||
(lp:end-kernel :wait t))
|
||||
|
||||
;; re-create the indexes from the target table entry
|
||||
|
||||
@ -64,58 +64,96 @@
|
||||
(format-vector-row text-file row (transforms copy)))))
|
||||
(map-rows copy :process-row-fn row-fn))))
|
||||
|
||||
(defun task-count (concurrency)
|
||||
"Return how many threads we are going to start given a number of WORKERS."
|
||||
;; (+ 1 concurrency concurrency)
|
||||
(+ 1 concurrency))
|
||||
|
||||
(defmethod copy-from ((copy copy)
|
||||
&key
|
||||
(kernel nil k-s-p)
|
||||
(channel nil c-s-p)
|
||||
(worker-count 8)
|
||||
(concurrency 2)
|
||||
(multiple-readers nil)
|
||||
(on-error-stop *on-error-stop*)
|
||||
disable-triggers)
|
||||
"Copy data from COPY source into PostgreSQL."
|
||||
(let* ((table-name (format-table-name (target copy)))
|
||||
(lp:*kernel* (or kernel (make-kernel worker-count)))
|
||||
(channel (or channel (lp:make-channel)))
|
||||
(rawq (lq:make-queue :fixed-capacity *concurrent-batches*)))
|
||||
(readers nil)
|
||||
(task-count 0))
|
||||
|
||||
(lp:task-handler-bind
|
||||
((on-error-stop
|
||||
#'(lambda (condition)
|
||||
;; everything has been handled already
|
||||
(lp:invoke-transfer-error condition)))
|
||||
(error
|
||||
#'(lambda (condition)
|
||||
(log-message :error "A thread failed with error: ~a" condition)
|
||||
(if (member *client-min-messages* (list :debug :data))
|
||||
#-pgloader-image
|
||||
(log-message :error "~a"
|
||||
(trivial-backtrace:print-backtrace condition
|
||||
:output nil))
|
||||
#+pgloader-image
|
||||
(lp::invoke-debugger condition))
|
||||
(lp::invoke-transfer-error condition))))
|
||||
(log-message :notice "COPY ~a" table-name)
|
||||
(flet ((submit-task (channel function &rest args)
|
||||
(apply #'lp:submit-task channel function args)
|
||||
(incf task-count)))
|
||||
|
||||
;; start a task to read data from the source into the queue
|
||||
(lp:submit-task channel #'queue-raw-data copy rawq concurrency)
|
||||
(lp:task-handler-bind
|
||||
((on-error-stop
|
||||
#'(lambda (condition)
|
||||
;; everything has been handled already
|
||||
(lp:invoke-transfer-error condition)))
|
||||
(error
|
||||
#'(lambda (condition)
|
||||
(log-message :error "A thread failed with error: ~a" condition)
|
||||
(if (member *client-min-messages* (list :debug :data))
|
||||
#-pgloader-image
|
||||
(log-message :error "~a"
|
||||
(trivial-backtrace:print-backtrace condition
|
||||
:output nil))
|
||||
#+pgloader-image
|
||||
(lp::invoke-debugger condition))
|
||||
(lp::invoke-transfer-error condition))))
|
||||
(log-message :notice "COPY ~a" table-name)
|
||||
|
||||
;; start a task to transform the raw data in the copy format
|
||||
;; and send that data down to PostgreSQL
|
||||
(loop :repeat concurrency
|
||||
:do (lp:submit-task channel #'pgloader.pgsql::copy-rows-from-queue
|
||||
copy rawq
|
||||
:on-error-stop on-error-stop
|
||||
:disable-triggers disable-triggers))
|
||||
;; Check for Read Concurrency Support from our source
|
||||
(when (and multiple-readers (< 1 concurrency))
|
||||
(let ((label "Check Concurrency Support"))
|
||||
(with-stats-collection (label :section :pre)
|
||||
(setf readers (concurrency-support copy concurrency))
|
||||
(update-stats :pre label :read 1 :rows (if readers 1 0))
|
||||
(when readers
|
||||
(log-message :notice "Multiple Readers Enabled for ~a"
|
||||
(format-table-name (target copy)))))))
|
||||
|
||||
;; now wait until both the tasks are over, and kill the kernel
|
||||
(unless c-s-p
|
||||
(log-message :debug "waiting for ~d tasks" (task-count concurrency))
|
||||
(loop :repeat (task-count concurrency)
|
||||
:do (lp:receive-result channel))
|
||||
(log-message :notice "COPY ~s done." table-name)
|
||||
(unless k-s-p (lp:end-kernel :wait t))))))
|
||||
;; when reader is non-nil, we have reader concurrency support!
|
||||
(if readers
|
||||
;; here we have detected Concurrency Support: we create as many
|
||||
;; readers as writers and create associated couples, each couple
|
||||
;; shares its own queue
|
||||
(let ((rawqs
|
||||
(loop :repeat concurrency :collect
|
||||
(lq:make-queue :fixed-capacity *prefetch-rows*))))
|
||||
(log-message :info "Read Concurrency Enabled for ~s"
|
||||
(format-table-name (target copy)))
|
||||
|
||||
(loop :for rawq :in rawqs :for reader :in readers :do
|
||||
;; each reader pretends to be alone, pass 1 as concurrency
|
||||
(submit-task channel #'queue-raw-data reader rawq 1)
|
||||
|
||||
(submit-task channel #'pgloader.pgsql::copy-rows-from-queue
|
||||
copy rawq
|
||||
:on-error-stop on-error-stop
|
||||
:disable-triggers disable-triggers)))
|
||||
|
||||
;; no Read Concurrency Support detected, start a single reader
|
||||
;; task, using a single data queue that is read by multiple
|
||||
;; writers.
|
||||
(let ((rawq
|
||||
(lq:make-queue :fixed-capacity *prefetch-rows*)))
|
||||
(submit-task channel #'queue-raw-data copy rawq concurrency)
|
||||
|
||||
;; start a task to transform the raw data in the copy format
|
||||
;; and send that data down to PostgreSQL
|
||||
(loop :repeat concurrency :do
|
||||
(submit-task channel #'pgloader.pgsql::copy-rows-from-queue
|
||||
copy rawq
|
||||
:on-error-stop on-error-stop
|
||||
:disable-triggers disable-triggers))))
|
||||
|
||||
;; now wait until both the tasks are over, and kill the kernel
|
||||
(unless c-s-p
|
||||
(log-message :debug "waiting for ~d tasks" task-count)
|
||||
(loop :repeat task-count :do (lp:receive-result channel))
|
||||
(log-message :notice "COPY ~s done." table-name)
|
||||
(unless k-s-p (lp:end-kernel :wait t)))
|
||||
|
||||
;; return task-count, which is how many tasks we submitted to our
|
||||
;; lparallel kernel.
|
||||
task-count))))
|
||||
|
||||
@ -7,7 +7,10 @@
|
||||
(defclass copy-mysql (db-copy)
|
||||
((encoding :accessor encoding ; allows forcing encoding
|
||||
:initarg :encoding
|
||||
:initform nil))
|
||||
:initform nil)
|
||||
(range-list :accessor range-list
|
||||
:initarg :range-list
|
||||
:initform nil))
|
||||
(:documentation "pgloader MySQL Data Source"))
|
||||
|
||||
(defmethod initialize-instance :after ((source copy-mysql) &key)
|
||||
@ -29,6 +32,74 @@
|
||||
;;;
|
||||
;;; Implement the specific methods
|
||||
;;;
|
||||
(defmethod concurrency-support ((mysql copy-mysql) concurrency)
|
||||
"Splits the read work thanks WHERE clauses when possible and relevant,
|
||||
return nil if we decide to read all in a single thread, and a list of as
|
||||
many copy-mysql instances as CONCURRENCY otherwise. Each copy-mysql
|
||||
instance in the returned list embeds specifications about how to read
|
||||
only its partition of the source data."
|
||||
(unless (= 1 concurrency)
|
||||
(let* ((indexes (table-index-list (target mysql)))
|
||||
(pkey (first (remove-if-not #'index-primary indexes)))
|
||||
(pcol (when pkey (first (index-columns pkey))))
|
||||
(coldef (when pcol
|
||||
(find pcol
|
||||
(table-column-list (target mysql))
|
||||
:key #'column-name
|
||||
:test #'string=)))
|
||||
(ptype (when (and coldef (stringp (column-type-name coldef)))
|
||||
(column-type-name coldef))))
|
||||
(when (member ptype (list "integer" "bigint" "serial" "bigserial")
|
||||
:test #'string=)
|
||||
;; the table has a primary key over a integer data type we are able
|
||||
;; to generate WHERE clause and range index scans.
|
||||
(with-connection (*connection* (source-db mysql))
|
||||
(let* ((col pcol)
|
||||
(sql (format nil "select min(`~a`), max(`~a`) from `~a`"
|
||||
col col (table-source-name (source mysql)))))
|
||||
(destructuring-bind (min max)
|
||||
(mapcar #'parse-integer (first (mysql-query sql)))
|
||||
;; generate a list of ranges from min to max
|
||||
(let ((range-list (split-range min max *rows-per-range*)))
|
||||
(unless (< (length range-list) concurrency)
|
||||
;; affect those ranges to each reader, we have CONCURRENCY
|
||||
;; of them
|
||||
(let ((partitions (distribute range-list concurrency)))
|
||||
(loop :for part :in partitions :collect
|
||||
(make-instance 'copy-mysql
|
||||
:source-db (clone-connection
|
||||
(source-db mysql))
|
||||
:target-db (target-db mysql)
|
||||
:source (source mysql)
|
||||
:target (target mysql)
|
||||
:fields (fields mysql)
|
||||
:columns (columns mysql)
|
||||
:transforms (transforms mysql)
|
||||
:encoding (encoding mysql)
|
||||
:range-list (cons col part)))))))))))))
|
||||
|
||||
(defmacro with-encoding-handler (&body forms)
|
||||
`(handler-bind
|
||||
;; avoid trying to fetch the character at end-of-input position...
|
||||
((babel-encodings:end-of-input-in-character
|
||||
#'(lambda (c)
|
||||
(update-stats :data (target mysql) :errs 1)
|
||||
(log-message :error "~a" c)
|
||||
(invoke-restart 'qmynd-impl::use-nil)))
|
||||
(babel-encodings:character-decoding-error
|
||||
#'(lambda (c)
|
||||
(update-stats :data (target mysql) :errs 1)
|
||||
(let ((encoding (babel-encodings:character-coding-error-encoding c))
|
||||
(position (babel-encodings:character-coding-error-position c))
|
||||
(character
|
||||
(aref (babel-encodings:character-coding-error-buffer c)
|
||||
(babel-encodings:character-coding-error-position c))))
|
||||
(log-message :error
|
||||
"~a: Illegal ~a character starting at position ~a: ~a."
|
||||
table-name encoding position character))
|
||||
(invoke-restart 'qmynd-impl::use-nil))))
|
||||
(progn ,@forms)))
|
||||
|
||||
(defmethod map-rows ((mysql copy-mysql) &key process-row-fn)
|
||||
"Extract MySQL data and call PROCESS-ROW-FN function with a single
|
||||
argument (a list of column values) for each row."
|
||||
@ -43,27 +114,24 @@
|
||||
(log-message :notice "Force encoding to ~a for ~a"
|
||||
qmynd:*mysql-encoding* table-name))
|
||||
(let* ((cols (get-column-list (db-name (source-db mysql)) table-name))
|
||||
(sql (format nil "SELECT ~{~a~^, ~} FROM `~a`;" cols table-name)))
|
||||
(handler-bind
|
||||
;; avoid trying to fetch the character at end-of-input position...
|
||||
((babel-encodings:end-of-input-in-character
|
||||
#'(lambda (c)
|
||||
(update-stats :data (target mysql) :errs 1)
|
||||
(log-message :error "~a" c)
|
||||
(invoke-restart 'qmynd-impl::use-nil)))
|
||||
(babel-encodings:character-decoding-error
|
||||
#'(lambda (c)
|
||||
(update-stats :data (target mysql) :errs 1)
|
||||
(let ((encoding (babel-encodings:character-coding-error-encoding c))
|
||||
(position (babel-encodings:character-coding-error-position c))
|
||||
(character
|
||||
(aref (babel-encodings:character-coding-error-buffer c)
|
||||
(babel-encodings:character-coding-error-position c))))
|
||||
(log-message :error
|
||||
"~a: Illegal ~a character starting at position ~a: ~a."
|
||||
table-name encoding position character))
|
||||
(invoke-restart 'qmynd-impl::use-nil))))
|
||||
(mysql-query sql :row-fn process-row-fn :result-type 'vector))))))
|
||||
(sql (format nil "SELECT ~{~a~^, ~} FROM `~a`" cols table-name)))
|
||||
|
||||
(if (range-list mysql)
|
||||
;; read a range at a time, in a loop
|
||||
(destructuring-bind (colname . ranges) (range-list mysql)
|
||||
(loop :for (min max) :in ranges :do
|
||||
(let ((sql (format nil "~a WHERE `~a` >= ~a AND `~a` < ~a"
|
||||
sql colname min colname max)))
|
||||
(with-encoding-handler
|
||||
(mysql-query sql
|
||||
:row-fn process-row-fn
|
||||
:result-type 'vector)))))
|
||||
|
||||
;; read it all, no WHERE clause
|
||||
(with-encoding-handler
|
||||
(mysql-query sql
|
||||
:row-fn process-row-fn
|
||||
:result-type 'vector)))))))
|
||||
|
||||
|
||||
|
||||
|
||||
@ -10,6 +10,7 @@
|
||||
`((*monitoring-queue* . ,*monitoring-queue*)
|
||||
(*copy-batch-rows* . ,*copy-batch-rows*)
|
||||
(*copy-batch-size* . ,*copy-batch-size*)
|
||||
(*rows-per-range* . ,*rows-per-range*)
|
||||
(*prefetch-rows* . ,*prefetch-rows*)
|
||||
(*pg-settings* . ',*pg-settings*)
|
||||
(*mysql-settings* . ',*mysql-settings*)
|
||||
|
||||
@ -76,3 +76,21 @@
|
||||
:until (<= limit bytes)
|
||||
:finally (return
|
||||
(format nil "~5,1f ~a~a" (/ bytes limit) multiple unit)))))
|
||||
|
||||
;;;
|
||||
;;; Defining ranges and partitions.
|
||||
;;;
|
||||
(defun split-range (min max &optional (count *rows-per-range*))
|
||||
"Split the range from MIN to MAX into sub-ranges of COUNT elements."
|
||||
(loop :for i := min :then j
|
||||
:for j := (+ i count)
|
||||
:while (< i max)
|
||||
:collect (list i (min j max))))
|
||||
|
||||
(defun distribute (list-of-ranges count)
|
||||
"Split a list of ranges into COUNT sublists."
|
||||
(let ((result (make-array count :element-type 'list :initial-element nil)))
|
||||
(loop :for i :from 0
|
||||
:for range :in list-of-ranges
|
||||
:do (push range (aref result (mod i count))))
|
||||
(map 'list #'reverse result )))
|
||||
|
||||
@ -7,7 +7,9 @@ load database
|
||||
|
||||
-- WITH batch rows = 10000
|
||||
|
||||
WITH on error stop, concurrency = 1, workers = 6,
|
||||
WITH on error stop, concurrency = 2, workers = 6,
|
||||
prefetch rows = 25000,
|
||||
-- multiple readers per thread, rows per range = 50000,
|
||||
max parallel create index = 4-- ,
|
||||
-- quote identifiers
|
||||
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user