From 0549e74f6dbc368e7213854d85eb4807e207f69f Mon Sep 17 00:00:00 2001 From: Dimitri Fontaine Date: Wed, 28 Jun 2017 16:23:18 +0200 Subject: [PATCH] Implement multiple reader per table for MySQL. Experiment with the idea of splitting the read work in several concurrent threads, where each reader is reading portions of the target table, using a WHERE id <= x and id > y clause in its SELECT query. For this to kick-in a number of conditions needs to be met, as described in the documentation. The main interest might not be faster queries to overall fetch the same data set, but better concurrency with as many readers as writters and each couple its own dedicated queue. --- pgloader.1 | 31 +++++++- pgloader.1.md | 42 ++++++++++- src/package.lisp | 3 + src/params.lisp | 4 + src/parsers/command-keywords.lisp | 7 ++ src/parsers/command-mysql.lisp | 3 + src/parsers/command-options.lisp | 22 +++++- src/sources/common/api.lisp | 5 ++ src/sources/common/db-methods.lisp | 78 ++++++++++--------- src/sources/common/md-methods.lisp | 42 +++++------ src/sources/common/methods.lisp | 116 +++++++++++++++++++---------- src/sources/mysql/mysql.lisp | 112 ++++++++++++++++++++++------ src/utils/threads.lisp | 1 + src/utils/utils.lisp | 18 +++++ test/sakila.load | 4 +- 15 files changed, 359 insertions(+), 129 deletions(-) diff --git a/pgloader.1 b/pgloader.1 index 143c18e..d2a13b3 100644 --- a/pgloader.1 +++ b/pgloader.1 @@ -1774,7 +1774,7 @@ This command instructs pgloader to load data from a database connection\. The on A default set of casting rules are provided and might be overloaded and appended to by the command\. . .P -Here\'s an example: +Here\'s an example using as many options as possible, some of them even being defaults\. Chances are you don\'t need that complex a setup, don\'t copy and paste it, use it only as a reference! . .IP "" 4 . @@ -1785,7 +1785,8 @@ LOAD DATABASE INTO postgresql://localhost:54393/sakila WITH include drop, create tables, create indexes, reset sequences, - workers = 8, concurrency = 1 + workers = 8, concurrency = 1, + multiple readers per thread, rows per range = 50000 SET PostgreSQL PARAMETERS maintenance_work_mem to \'128MB\', @@ -1796,7 +1797,7 @@ LOAD DATABASE net_read_timeout = \'120\', net_write_timeout = \'120\' - CAST type datetime to timestamptz drop default drop not null using zero\-dates\-to\-null, + CAST type bigint when (= precision 20) to bigserial drop typemod, type date drop not null drop default using zero\-dates\-to\-null, \-\- type tinyint to boolean using tinyint\-to\-boolean, type year to integer @@ -1985,6 +1986,30 @@ When this option is listed pgloader refrains from migrating the data over\. Note .IP When this option is listed pgloader only issues the \fBCOPY\fR statements, without doing any other processing\. . +.IP "\(bu" 4 +\fIsingle reader per thread\fR, \fImultiple readers per thread\fR +. +.IP +The default is \fIsingle reader per thread\fR and it means that each MySQL table is read by a single thread as a whole, with a single \fBSELECT\fR statement using no \fBWHERE\fR clause\. +. +.IP +When using \fImultiple readers per thread\fR pgloader may be able to divide the reading work into several threads, as many as the \fIconcurrency\fR setting, which needs to be greater than 1 for this option to kick be activated\. +. +.IP +For each source table, pgloader searches for a primary key over a single numeric column, or a multiple\-column primary key index for which the first column is of a numeric data type (one of \fBinteger\fR or \fBbigint\fR)\. When such an index exists, pgloader runs a query to find the \fImin\fR and \fImax\fR values on this column, and then split that range into many ranges containing a maximum of \fIrows per range\fR\. +. +.IP +When the range list we then obtain contains at least as many ranges than our concurrency setting, then we distribute those ranges to each reader thread\. +. +.IP +So when all the conditions are met, pgloader then starts as many reader thread as the \fIconcurrency\fR setting, and each reader thread issues several queries with a \fBWHERE id >= x AND id < y\fR, where \fBy \- x = rows per range\fR or less (for the last range, depending on the max value just obtained\. +. +.IP "\(bu" 4 +\fIrows per range\fR +. +.IP +How many rows are fetched per \fBSELECT\fR query when using \fImultiple readers per thread\fR, see above for details\. +. .IP "" 0 . diff --git a/pgloader.1.md b/pgloader.1.md index 9e0ef9a..a69f48d 100644 --- a/pgloader.1.md +++ b/pgloader.1.md @@ -1508,14 +1508,17 @@ building. A default set of casting rules are provided and might be overloaded and appended to by the command. -Here's an example: +Here's an example using as many options as possible, some of them even being +defaults. Chances are you don't need that complex a setup, don't copy and +paste it, use it only as a reference! LOAD DATABASE FROM mysql://root@localhost/sakila INTO postgresql://localhost:54393/sakila WITH include drop, create tables, create indexes, reset sequences, - workers = 8, concurrency = 1 + workers = 8, concurrency = 1, + multiple readers per thread, rows per range = 50000 SET PostgreSQL PARAMETERS maintenance_work_mem to '128MB', @@ -1526,7 +1529,7 @@ Here's an example: net_read_timeout = '120', net_write_timeout = '120' - CAST type datetime to timestamptz drop default drop not null using zero-dates-to-null, + CAST type bigint when (= precision 20) to bigserial drop typemod, type date drop not null drop default using zero-dates-to-null, -- type tinyint to boolean using tinyint-to-boolean, type year to integer @@ -1723,6 +1726,39 @@ The `database` command accepts the following clauses and options: When this option is listed pgloader only issues the `COPY` statements, without doing any other processing. + - *single reader per thread*, *multiple readers per thread* + + The default is *single reader per thread* and it means that each + MySQL table is read by a single thread as a whole, with a single + `SELECT` statement using no `WHERE` clause. + + When using *multiple readers per thread* pgloader may be able to + divide the reading work into several threads, as many as the + *concurrency* setting, which needs to be greater than 1 for this + option to kick be activated. + + For each source table, pgloader searches for a primary key over a + single numeric column, or a multiple-column primary key index for + which the first column is of a numeric data type (one of `integer` + or `bigint`). When such an index exists, pgloader runs a query to + find the *min* and *max* values on this column, and then split that + range into many ranges containing a maximum of *rows per range*. + + When the range list we then obtain contains at least as many ranges + than our concurrency setting, then we distribute those ranges to + each reader thread. + + So when all the conditions are met, pgloader then starts as many + reader thread as the *concurrency* setting, and each reader thread + issues several queries with a `WHERE id >= x AND id < y`, where `y - + x = rows per range` or less (for the last range, depending on the + max value just obtained. + + - *rows per range* + + How many rows are fetched per `SELECT` query when using *multiple + readers per thread*, see above for details. + - *SET MySQL PARAMETERS* The *SET MySQL PARAMETERS* allows setting MySQL parameters using the diff --git a/src/package.lisp b/src/package.lisp index 7dd261a..7d2e955 100644 --- a/src/package.lisp +++ b/src/package.lisp @@ -269,6 +269,8 @@ #:unquote #:expand-user-homedir-pathname #:pretty-print-bytes + #:split-range + #:distribute ;; threads #:make-kernel @@ -440,6 +442,7 @@ #:header ;; main protocol/api + #:concurrency-support #:map-rows #:copy-column-list #:queue-raw-data diff --git a/src/params.lisp b/src/params.lisp index bfc1370..d5b86d5 100644 --- a/src/params.lisp +++ b/src/params.lisp @@ -22,6 +22,7 @@ #:*preserve-index-names* #:*copy-batch-rows* #:*copy-batch-size* + #:*rows-per-range* #:*prefetch-rows* #:*pg-settings* #:*mysql-settings* @@ -136,6 +137,9 @@ (defparameter *prefetch-rows* 100000 "How many rows do read in advance in the reader queue.") +(defparameter *rows-per-range* 10000 + "How many rows to read in each reader's thread, per SQL query.") + (defparameter *pg-settings* nil "An alist of GUC names and values.") (defparameter *mysql-settings* nil "An alist of GUC names and values.") diff --git a/src/parsers/command-keywords.lisp b/src/parsers/command-keywords.lisp index e6821e8..941964d 100644 --- a/src/parsers/command-keywords.lisp +++ b/src/parsers/command-keywords.lisp @@ -130,6 +130,13 @@ (def-keyword-rule "including") (def-keyword-rule "excluding") (def-keyword-rule "like") + (def-keyword-rule "multiple") + (def-keyword-rule "single") + (def-keyword-rule "reader") + (def-keyword-rule "readers") + (def-keyword-rule "per") + (def-keyword-rule "thread") + (def-keyword-rule "range") ;; option for loading from an archive (def-keyword-rule "archive") (def-keyword-rule "before") diff --git a/src/parsers/command-mysql.lisp b/src/parsers/command-mysql.lisp index 99c5c7f..d0c6e40 100644 --- a/src/parsers/command-mysql.lisp +++ b/src/parsers/command-mysql.lisp @@ -14,6 +14,9 @@ option-batch-size option-prefetch-rows option-max-parallel-create-index + option-single-reader + option-multiple-readers + option-rows-per-range option-truncate option-disable-triggers option-data-only diff --git a/src/parsers/command-options.lisp b/src/parsers/command-options.lisp index 9c766c1..d024594 100644 --- a/src/parsers/command-options.lisp +++ b/src/parsers/command-options.lisp @@ -82,11 +82,18 @@ (bind (((_ _ nb) prefetch-rows)) (cons :prefetch-rows (parse-integer (text nb)))))) +(defrule option-rows-per-range (and kw-rows kw-per kw-range + equal-sign + (+ (digit-char-p character))) + (:lambda (rows-per-range) + (cons :rows-per-range (parse-integer (text (fifth rows-per-range)))))) + (defun batch-control-bindings (options) "Generate the code needed to add batch-control" - `((*copy-batch-rows* (or ,(getf options :batch-rows) *copy-batch-rows*)) - (*copy-batch-size* (or ,(getf options :batch-size) *copy-batch-size*)) - (*prefetch-rows* (or ,(getf options :prefetch-rows) *prefetch-rows*)))) + `((*copy-batch-rows* (or ,(getf options :batch-rows) *copy-batch-rows*)) + (*copy-batch-size* (or ,(getf options :batch-size) *copy-batch-size*)) + (*prefetch-rows* (or ,(getf options :prefetch-rows) *prefetch-rows*)) + (*rows-per-range* (or ,(getf options :rows-per-range) *rows-per-range*)))) (defun identifier-case-binding (options) "Generate the code needed to bind *identifer-case* to the proper value." @@ -97,6 +104,7 @@ (option-list '(:batch-rows :batch-size :prefetch-rows + :rows-per-range :identifier-case)) extras) "Given a list of options, remove the generic ones that should already have @@ -129,6 +137,14 @@ (make-option-rule reset-sequences (and kw-reset (? kw-no) kw-sequences)) (make-option-rule foreign-keys (and (? kw-no) kw-foreign kw-keys)) +(defrule option-single-reader (and kw-single kw-reader kw-per kw-thread) + (:constant (cons :multiple-readers nil))) + +(defrule option-multiple-readers (and kw-multiple + (or kw-readers kw-reader) + kw-per kw-thread) + (:constant (cons :multiple-readers t))) + (defrule option-schema-only (and kw-schema kw-only) (:constant (cons :schema-only t))) diff --git a/src/sources/common/api.lisp b/src/sources/common/api.lisp index aa4dcc2..4a216df 100644 --- a/src/sources/common/api.lisp +++ b/src/sources/common/api.lisp @@ -34,6 +34,11 @@ (setf (slot-value source 'transforms) (make-list (length (slot-value source 'columns)))))))) +(defgeneric concurrency-support (copy concurrency) + (:documentation + "Returns nil when no concurrency is supported, or a list of copy ojbects + prepared to run concurrently.")) + (defgeneric map-rows (source &key process-row-fn) (:documentation "Load data from SOURCE and funcall PROCESS-ROW-FUN for each row found in diff --git a/src/sources/common/db-methods.lisp b/src/sources/common/db-methods.lisp index 5de5a87..619b68a 100644 --- a/src/sources/common/db-methods.lisp +++ b/src/sources/common/db-methods.lisp @@ -212,6 +212,7 @@ (on-error-stop *on-error-stop*) (worker-count 4) (concurrency 1) + (multiple-readers nil) max-parallel-create-index (truncate nil) (disable-triggers nil) @@ -271,7 +272,9 @@ max-indexes)))) (idx-channel (when idx-kernel (let ((lp:*kernel* idx-kernel)) - (lp:make-channel))))) + (lp:make-channel)))) + + (task-count 0)) ;; apply catalog level transformations to support the database migration ;; that's CAST rules, index WHERE clause rewriting and ALTER commands @@ -327,12 +330,15 @@ ;; copy-from, we have concurrency tasks writing. (progn ; when copy-data (setf (gethash table writers-count) concurrency) - (copy-from table-source - :concurrency concurrency - :kernel copy-kernel - :channel copy-channel - :on-error-stop on-error-stop - :disable-triggers disable-triggers))))) + + (incf task-count + (copy-from table-source + :concurrency concurrency + :multiple-readers multiple-readers + :kernel copy-kernel + :channel copy-channel + :on-error-stop on-error-stop + :disable-triggers disable-triggers)))))) ;; now end the kernels ;; and each time a table is done, launch its indexing @@ -341,37 +347,35 @@ (with-stats-collection ("COPY Threads Completion" :section :post :use-result-as-read t :use-result-as-rows t) - (let ((worker-count (* (hash-table-count writers-count) - (task-count concurrency)))) - (loop :for tasks :below worker-count - :do (destructuring-bind (task table seconds) - (lp:receive-result copy-channel) - (log-message :debug - "Finished processing ~a for ~s ~50T~6$s" - task (format-table-name table) seconds) - (when (eq :writer task) - ;; - ;; Start the CREATE INDEX parallel tasks only when - ;; the data has been fully copied over to the - ;; corresponding table, that's when the writers - ;; count is down to zero. - ;; - (decf (gethash table writers-count)) - (log-message :debug "writers-counts[~a] = ~a" - (format-table-name table) - (gethash table writers-count)) + (loop :repeat task-count + :do (destructuring-bind (task table seconds) + (lp:receive-result copy-channel) + (log-message :debug + "Finished processing ~a for ~s ~50T~6$s" + task (format-table-name table) seconds) + (when (eq :writer task) + ;; + ;; Start the CREATE INDEX parallel tasks only when + ;; the data has been fully copied over to the + ;; corresponding table, that's when the writers + ;; count is down to zero. + ;; + (decf (gethash table writers-count)) + (log-message :debug "writers-counts[~a] = ~a" + (format-table-name table) + (gethash table writers-count)) - (when (and create-indexes - (zerop (gethash table writers-count))) - (alexandria:appendf - pkeys - (create-indexes-in-kernel (target-db copy) - table - idx-kernel - idx-channel)))))) - (prog1 - worker-count - (lp:end-kernel :wait nil)))))) + (when (and create-indexes + (zerop (gethash table writers-count))) + (alexandria:appendf + pkeys + (create-indexes-in-kernel (target-db copy) + table + idx-kernel + idx-channel))))) + :finally (progn + (lp:end-kernel :wait nil) + (return worker-count)))))) (log-message :info "Done with COPYing data, waiting for indexes") diff --git a/src/sources/common/md-methods.lisp b/src/sources/common/md-methods.lisp index 762429d..7744320 100644 --- a/src/sources/common/md-methods.lisp +++ b/src/sources/common/md-methods.lisp @@ -140,38 +140,38 @@ ;; files actually. (let* ((lp:*kernel* (make-kernel worker-count)) (channel (lp:make-channel)) - (path-list (expand-spec (source copy)))) + (path-list (expand-spec (source copy))) + (task-count 0)) (with-stats-collection ("Files Processed" :section :post :use-result-as-read t :use-result-as-rows t) (loop :for path-spec :in path-list :count t :do (let ((table-source (clone-copy-for copy path-spec))) - (copy-from table-source - :concurrency concurrency - :kernel lp:*kernel* - :channel channel - :on-error-stop on-error-stop - :disable-triggers disable-triggers)))) + (incf task-count + (copy-from table-source + :concurrency concurrency + :kernel lp:*kernel* + :channel channel + :on-error-stop on-error-stop + :disable-triggers disable-triggers))))) ;; end kernel (with-stats-collection ("COPY Threads Completion" :section :post :use-result-as-read t :use-result-as-rows t) - (let ((worker-count (* (length path-list) - (task-count concurrency)))) - (loop :for tasks :below worker-count - :do (handler-case - (destructuring-bind (task table seconds) - (lp:receive-result channel) - (log-message :debug - "Finished processing ~a for ~s ~50T~6$s" - task (format-table-name table) seconds)) - (condition (e) - (log-message :fatal "~a" e)))) - (prog1 - worker-count - (lp:end-kernel :wait nil)))) + (loop :repeat task-count + :do (handler-case + (destructuring-bind (task table seconds) + (lp:receive-result channel) + (log-message :debug + "Finished processing ~a for ~s ~50T~6$s" + task (format-table-name table) seconds)) + (condition (e) + (log-message :fatal "~a" e))) + :finally (progn + (lp:end-kernel :wait nil) + (return task-count)))) (lp:end-kernel :wait t)) ;; re-create the indexes from the target table entry diff --git a/src/sources/common/methods.lisp b/src/sources/common/methods.lisp index 6d11fb4..15aecb8 100644 --- a/src/sources/common/methods.lisp +++ b/src/sources/common/methods.lisp @@ -64,58 +64,96 @@ (format-vector-row text-file row (transforms copy))))) (map-rows copy :process-row-fn row-fn)))) -(defun task-count (concurrency) - "Return how many threads we are going to start given a number of WORKERS." - ;; (+ 1 concurrency concurrency) - (+ 1 concurrency)) - (defmethod copy-from ((copy copy) &key (kernel nil k-s-p) (channel nil c-s-p) (worker-count 8) (concurrency 2) + (multiple-readers nil) (on-error-stop *on-error-stop*) disable-triggers) "Copy data from COPY source into PostgreSQL." (let* ((table-name (format-table-name (target copy))) (lp:*kernel* (or kernel (make-kernel worker-count))) (channel (or channel (lp:make-channel))) - (rawq (lq:make-queue :fixed-capacity *concurrent-batches*))) + (readers nil) + (task-count 0)) - (lp:task-handler-bind - ((on-error-stop - #'(lambda (condition) - ;; everything has been handled already - (lp:invoke-transfer-error condition))) - (error - #'(lambda (condition) - (log-message :error "A thread failed with error: ~a" condition) - (if (member *client-min-messages* (list :debug :data)) - #-pgloader-image - (log-message :error "~a" - (trivial-backtrace:print-backtrace condition - :output nil)) - #+pgloader-image - (lp::invoke-debugger condition)) - (lp::invoke-transfer-error condition)))) - (log-message :notice "COPY ~a" table-name) + (flet ((submit-task (channel function &rest args) + (apply #'lp:submit-task channel function args) + (incf task-count))) - ;; start a task to read data from the source into the queue - (lp:submit-task channel #'queue-raw-data copy rawq concurrency) + (lp:task-handler-bind + ((on-error-stop + #'(lambda (condition) + ;; everything has been handled already + (lp:invoke-transfer-error condition))) + (error + #'(lambda (condition) + (log-message :error "A thread failed with error: ~a" condition) + (if (member *client-min-messages* (list :debug :data)) + #-pgloader-image + (log-message :error "~a" + (trivial-backtrace:print-backtrace condition + :output nil)) + #+pgloader-image + (lp::invoke-debugger condition)) + (lp::invoke-transfer-error condition)))) + (log-message :notice "COPY ~a" table-name) - ;; start a task to transform the raw data in the copy format - ;; and send that data down to PostgreSQL - (loop :repeat concurrency - :do (lp:submit-task channel #'pgloader.pgsql::copy-rows-from-queue - copy rawq - :on-error-stop on-error-stop - :disable-triggers disable-triggers)) + ;; Check for Read Concurrency Support from our source + (when (and multiple-readers (< 1 concurrency)) + (let ((label "Check Concurrency Support")) + (with-stats-collection (label :section :pre) + (setf readers (concurrency-support copy concurrency)) + (update-stats :pre label :read 1 :rows (if readers 1 0)) + (when readers + (log-message :notice "Multiple Readers Enabled for ~a" + (format-table-name (target copy))))))) - ;; now wait until both the tasks are over, and kill the kernel - (unless c-s-p - (log-message :debug "waiting for ~d tasks" (task-count concurrency)) - (loop :repeat (task-count concurrency) - :do (lp:receive-result channel)) - (log-message :notice "COPY ~s done." table-name) - (unless k-s-p (lp:end-kernel :wait t)))))) + ;; when reader is non-nil, we have reader concurrency support! + (if readers + ;; here we have detected Concurrency Support: we create as many + ;; readers as writers and create associated couples, each couple + ;; shares its own queue + (let ((rawqs + (loop :repeat concurrency :collect + (lq:make-queue :fixed-capacity *prefetch-rows*)))) + (log-message :info "Read Concurrency Enabled for ~s" + (format-table-name (target copy))) + + (loop :for rawq :in rawqs :for reader :in readers :do + ;; each reader pretends to be alone, pass 1 as concurrency + (submit-task channel #'queue-raw-data reader rawq 1) + + (submit-task channel #'pgloader.pgsql::copy-rows-from-queue + copy rawq + :on-error-stop on-error-stop + :disable-triggers disable-triggers))) + + ;; no Read Concurrency Support detected, start a single reader + ;; task, using a single data queue that is read by multiple + ;; writers. + (let ((rawq + (lq:make-queue :fixed-capacity *prefetch-rows*))) + (submit-task channel #'queue-raw-data copy rawq concurrency) + + ;; start a task to transform the raw data in the copy format + ;; and send that data down to PostgreSQL + (loop :repeat concurrency :do + (submit-task channel #'pgloader.pgsql::copy-rows-from-queue + copy rawq + :on-error-stop on-error-stop + :disable-triggers disable-triggers)))) + + ;; now wait until both the tasks are over, and kill the kernel + (unless c-s-p + (log-message :debug "waiting for ~d tasks" task-count) + (loop :repeat task-count :do (lp:receive-result channel)) + (log-message :notice "COPY ~s done." table-name) + (unless k-s-p (lp:end-kernel :wait t))) + + ;; return task-count, which is how many tasks we submitted to our + ;; lparallel kernel. + task-count)))) diff --git a/src/sources/mysql/mysql.lisp b/src/sources/mysql/mysql.lisp index 1e04357..968b69e 100644 --- a/src/sources/mysql/mysql.lisp +++ b/src/sources/mysql/mysql.lisp @@ -7,7 +7,10 @@ (defclass copy-mysql (db-copy) ((encoding :accessor encoding ; allows forcing encoding :initarg :encoding - :initform nil)) + :initform nil) + (range-list :accessor range-list + :initarg :range-list + :initform nil)) (:documentation "pgloader MySQL Data Source")) (defmethod initialize-instance :after ((source copy-mysql) &key) @@ -29,6 +32,74 @@ ;;; ;;; Implement the specific methods ;;; +(defmethod concurrency-support ((mysql copy-mysql) concurrency) + "Splits the read work thanks WHERE clauses when possible and relevant, + return nil if we decide to read all in a single thread, and a list of as + many copy-mysql instances as CONCURRENCY otherwise. Each copy-mysql + instance in the returned list embeds specifications about how to read + only its partition of the source data." + (unless (= 1 concurrency) + (let* ((indexes (table-index-list (target mysql))) + (pkey (first (remove-if-not #'index-primary indexes))) + (pcol (when pkey (first (index-columns pkey)))) + (coldef (when pcol + (find pcol + (table-column-list (target mysql)) + :key #'column-name + :test #'string=))) + (ptype (when (and coldef (stringp (column-type-name coldef))) + (column-type-name coldef)))) + (when (member ptype (list "integer" "bigint" "serial" "bigserial") + :test #'string=) + ;; the table has a primary key over a integer data type we are able + ;; to generate WHERE clause and range index scans. + (with-connection (*connection* (source-db mysql)) + (let* ((col pcol) + (sql (format nil "select min(`~a`), max(`~a`) from `~a`" + col col (table-source-name (source mysql))))) + (destructuring-bind (min max) + (mapcar #'parse-integer (first (mysql-query sql))) + ;; generate a list of ranges from min to max + (let ((range-list (split-range min max *rows-per-range*))) + (unless (< (length range-list) concurrency) + ;; affect those ranges to each reader, we have CONCURRENCY + ;; of them + (let ((partitions (distribute range-list concurrency))) + (loop :for part :in partitions :collect + (make-instance 'copy-mysql + :source-db (clone-connection + (source-db mysql)) + :target-db (target-db mysql) + :source (source mysql) + :target (target mysql) + :fields (fields mysql) + :columns (columns mysql) + :transforms (transforms mysql) + :encoding (encoding mysql) + :range-list (cons col part))))))))))))) + +(defmacro with-encoding-handler (&body forms) + `(handler-bind + ;; avoid trying to fetch the character at end-of-input position... + ((babel-encodings:end-of-input-in-character + #'(lambda (c) + (update-stats :data (target mysql) :errs 1) + (log-message :error "~a" c) + (invoke-restart 'qmynd-impl::use-nil))) + (babel-encodings:character-decoding-error + #'(lambda (c) + (update-stats :data (target mysql) :errs 1) + (let ((encoding (babel-encodings:character-coding-error-encoding c)) + (position (babel-encodings:character-coding-error-position c)) + (character + (aref (babel-encodings:character-coding-error-buffer c) + (babel-encodings:character-coding-error-position c)))) + (log-message :error + "~a: Illegal ~a character starting at position ~a: ~a." + table-name encoding position character)) + (invoke-restart 'qmynd-impl::use-nil)))) + (progn ,@forms))) + (defmethod map-rows ((mysql copy-mysql) &key process-row-fn) "Extract MySQL data and call PROCESS-ROW-FN function with a single argument (a list of column values) for each row." @@ -43,27 +114,24 @@ (log-message :notice "Force encoding to ~a for ~a" qmynd:*mysql-encoding* table-name)) (let* ((cols (get-column-list (db-name (source-db mysql)) table-name)) - (sql (format nil "SELECT ~{~a~^, ~} FROM `~a`;" cols table-name))) - (handler-bind - ;; avoid trying to fetch the character at end-of-input position... - ((babel-encodings:end-of-input-in-character - #'(lambda (c) - (update-stats :data (target mysql) :errs 1) - (log-message :error "~a" c) - (invoke-restart 'qmynd-impl::use-nil))) - (babel-encodings:character-decoding-error - #'(lambda (c) - (update-stats :data (target mysql) :errs 1) - (let ((encoding (babel-encodings:character-coding-error-encoding c)) - (position (babel-encodings:character-coding-error-position c)) - (character - (aref (babel-encodings:character-coding-error-buffer c) - (babel-encodings:character-coding-error-position c)))) - (log-message :error - "~a: Illegal ~a character starting at position ~a: ~a." - table-name encoding position character)) - (invoke-restart 'qmynd-impl::use-nil)))) - (mysql-query sql :row-fn process-row-fn :result-type 'vector)))))) + (sql (format nil "SELECT ~{~a~^, ~} FROM `~a`" cols table-name))) + + (if (range-list mysql) + ;; read a range at a time, in a loop + (destructuring-bind (colname . ranges) (range-list mysql) + (loop :for (min max) :in ranges :do + (let ((sql (format nil "~a WHERE `~a` >= ~a AND `~a` < ~a" + sql colname min colname max))) + (with-encoding-handler + (mysql-query sql + :row-fn process-row-fn + :result-type 'vector))))) + + ;; read it all, no WHERE clause + (with-encoding-handler + (mysql-query sql + :row-fn process-row-fn + :result-type 'vector))))))) diff --git a/src/utils/threads.lisp b/src/utils/threads.lisp index 0f2199c..c950fc5 100644 --- a/src/utils/threads.lisp +++ b/src/utils/threads.lisp @@ -10,6 +10,7 @@ `((*monitoring-queue* . ,*monitoring-queue*) (*copy-batch-rows* . ,*copy-batch-rows*) (*copy-batch-size* . ,*copy-batch-size*) + (*rows-per-range* . ,*rows-per-range*) (*prefetch-rows* . ,*prefetch-rows*) (*pg-settings* . ',*pg-settings*) (*mysql-settings* . ',*mysql-settings*) diff --git a/src/utils/utils.lisp b/src/utils/utils.lisp index 6c49130..76844ac 100644 --- a/src/utils/utils.lisp +++ b/src/utils/utils.lisp @@ -76,3 +76,21 @@ :until (<= limit bytes) :finally (return (format nil "~5,1f ~a~a" (/ bytes limit) multiple unit))))) + +;;; +;;; Defining ranges and partitions. +;;; +(defun split-range (min max &optional (count *rows-per-range*)) + "Split the range from MIN to MAX into sub-ranges of COUNT elements." + (loop :for i := min :then j + :for j := (+ i count) + :while (< i max) + :collect (list i (min j max)))) + +(defun distribute (list-of-ranges count) + "Split a list of ranges into COUNT sublists." + (let ((result (make-array count :element-type 'list :initial-element nil))) + (loop :for i :from 0 + :for range :in list-of-ranges + :do (push range (aref result (mod i count)))) + (map 'list #'reverse result ))) diff --git a/test/sakila.load b/test/sakila.load index eecc7f1..9c87ed6 100644 --- a/test/sakila.load +++ b/test/sakila.load @@ -7,7 +7,9 @@ load database -- WITH batch rows = 10000 - WITH on error stop, concurrency = 1, workers = 6, + WITH on error stop, concurrency = 2, workers = 6, + prefetch rows = 25000, + -- multiple readers per thread, rows per range = 50000, max parallel create index = 4-- , -- quote identifiers