diff --git a/pgloader.1 b/pgloader.1 index 143c18e..d2a13b3 100644 --- a/pgloader.1 +++ b/pgloader.1 @@ -1774,7 +1774,7 @@ This command instructs pgloader to load data from a database connection\. The on A default set of casting rules are provided and might be overloaded and appended to by the command\. . .P -Here\'s an example: +Here\'s an example using as many options as possible, some of them even being defaults\. Chances are you don\'t need that complex a setup, don\'t copy and paste it, use it only as a reference! . .IP "" 4 . @@ -1785,7 +1785,8 @@ LOAD DATABASE INTO postgresql://localhost:54393/sakila WITH include drop, create tables, create indexes, reset sequences, - workers = 8, concurrency = 1 + workers = 8, concurrency = 1, + multiple readers per thread, rows per range = 50000 SET PostgreSQL PARAMETERS maintenance_work_mem to \'128MB\', @@ -1796,7 +1797,7 @@ LOAD DATABASE net_read_timeout = \'120\', net_write_timeout = \'120\' - CAST type datetime to timestamptz drop default drop not null using zero\-dates\-to\-null, + CAST type bigint when (= precision 20) to bigserial drop typemod, type date drop not null drop default using zero\-dates\-to\-null, \-\- type tinyint to boolean using tinyint\-to\-boolean, type year to integer @@ -1985,6 +1986,30 @@ When this option is listed pgloader refrains from migrating the data over\. Note .IP When this option is listed pgloader only issues the \fBCOPY\fR statements, without doing any other processing\. . +.IP "\(bu" 4 +\fIsingle reader per thread\fR, \fImultiple readers per thread\fR +. +.IP +The default is \fIsingle reader per thread\fR and it means that each MySQL table is read by a single thread as a whole, with a single \fBSELECT\fR statement using no \fBWHERE\fR clause\. +. +.IP +When using \fImultiple readers per thread\fR pgloader may be able to divide the reading work into several threads, as many as the \fIconcurrency\fR setting, which needs to be greater than 1 for this option to kick be activated\. +. +.IP +For each source table, pgloader searches for a primary key over a single numeric column, or a multiple\-column primary key index for which the first column is of a numeric data type (one of \fBinteger\fR or \fBbigint\fR)\. When such an index exists, pgloader runs a query to find the \fImin\fR and \fImax\fR values on this column, and then split that range into many ranges containing a maximum of \fIrows per range\fR\. +. +.IP +When the range list we then obtain contains at least as many ranges than our concurrency setting, then we distribute those ranges to each reader thread\. +. +.IP +So when all the conditions are met, pgloader then starts as many reader thread as the \fIconcurrency\fR setting, and each reader thread issues several queries with a \fBWHERE id >= x AND id < y\fR, where \fBy \- x = rows per range\fR or less (for the last range, depending on the max value just obtained\. +. +.IP "\(bu" 4 +\fIrows per range\fR +. +.IP +How many rows are fetched per \fBSELECT\fR query when using \fImultiple readers per thread\fR, see above for details\. +. .IP "" 0 . diff --git a/pgloader.1.md b/pgloader.1.md index 9e0ef9a..a69f48d 100644 --- a/pgloader.1.md +++ b/pgloader.1.md @@ -1508,14 +1508,17 @@ building. A default set of casting rules are provided and might be overloaded and appended to by the command. -Here's an example: +Here's an example using as many options as possible, some of them even being +defaults. Chances are you don't need that complex a setup, don't copy and +paste it, use it only as a reference! LOAD DATABASE FROM mysql://root@localhost/sakila INTO postgresql://localhost:54393/sakila WITH include drop, create tables, create indexes, reset sequences, - workers = 8, concurrency = 1 + workers = 8, concurrency = 1, + multiple readers per thread, rows per range = 50000 SET PostgreSQL PARAMETERS maintenance_work_mem to '128MB', @@ -1526,7 +1529,7 @@ Here's an example: net_read_timeout = '120', net_write_timeout = '120' - CAST type datetime to timestamptz drop default drop not null using zero-dates-to-null, + CAST type bigint when (= precision 20) to bigserial drop typemod, type date drop not null drop default using zero-dates-to-null, -- type tinyint to boolean using tinyint-to-boolean, type year to integer @@ -1723,6 +1726,39 @@ The `database` command accepts the following clauses and options: When this option is listed pgloader only issues the `COPY` statements, without doing any other processing. + - *single reader per thread*, *multiple readers per thread* + + The default is *single reader per thread* and it means that each + MySQL table is read by a single thread as a whole, with a single + `SELECT` statement using no `WHERE` clause. + + When using *multiple readers per thread* pgloader may be able to + divide the reading work into several threads, as many as the + *concurrency* setting, which needs to be greater than 1 for this + option to kick be activated. + + For each source table, pgloader searches for a primary key over a + single numeric column, or a multiple-column primary key index for + which the first column is of a numeric data type (one of `integer` + or `bigint`). When such an index exists, pgloader runs a query to + find the *min* and *max* values on this column, and then split that + range into many ranges containing a maximum of *rows per range*. + + When the range list we then obtain contains at least as many ranges + than our concurrency setting, then we distribute those ranges to + each reader thread. + + So when all the conditions are met, pgloader then starts as many + reader thread as the *concurrency* setting, and each reader thread + issues several queries with a `WHERE id >= x AND id < y`, where `y - + x = rows per range` or less (for the last range, depending on the + max value just obtained. + + - *rows per range* + + How many rows are fetched per `SELECT` query when using *multiple + readers per thread*, see above for details. + - *SET MySQL PARAMETERS* The *SET MySQL PARAMETERS* allows setting MySQL parameters using the diff --git a/src/package.lisp b/src/package.lisp index 7dd261a..7d2e955 100644 --- a/src/package.lisp +++ b/src/package.lisp @@ -269,6 +269,8 @@ #:unquote #:expand-user-homedir-pathname #:pretty-print-bytes + #:split-range + #:distribute ;; threads #:make-kernel @@ -440,6 +442,7 @@ #:header ;; main protocol/api + #:concurrency-support #:map-rows #:copy-column-list #:queue-raw-data diff --git a/src/params.lisp b/src/params.lisp index bfc1370..d5b86d5 100644 --- a/src/params.lisp +++ b/src/params.lisp @@ -22,6 +22,7 @@ #:*preserve-index-names* #:*copy-batch-rows* #:*copy-batch-size* + #:*rows-per-range* #:*prefetch-rows* #:*pg-settings* #:*mysql-settings* @@ -136,6 +137,9 @@ (defparameter *prefetch-rows* 100000 "How many rows do read in advance in the reader queue.") +(defparameter *rows-per-range* 10000 + "How many rows to read in each reader's thread, per SQL query.") + (defparameter *pg-settings* nil "An alist of GUC names and values.") (defparameter *mysql-settings* nil "An alist of GUC names and values.") diff --git a/src/parsers/command-keywords.lisp b/src/parsers/command-keywords.lisp index e6821e8..941964d 100644 --- a/src/parsers/command-keywords.lisp +++ b/src/parsers/command-keywords.lisp @@ -130,6 +130,13 @@ (def-keyword-rule "including") (def-keyword-rule "excluding") (def-keyword-rule "like") + (def-keyword-rule "multiple") + (def-keyword-rule "single") + (def-keyword-rule "reader") + (def-keyword-rule "readers") + (def-keyword-rule "per") + (def-keyword-rule "thread") + (def-keyword-rule "range") ;; option for loading from an archive (def-keyword-rule "archive") (def-keyword-rule "before") diff --git a/src/parsers/command-mysql.lisp b/src/parsers/command-mysql.lisp index 99c5c7f..d0c6e40 100644 --- a/src/parsers/command-mysql.lisp +++ b/src/parsers/command-mysql.lisp @@ -14,6 +14,9 @@ option-batch-size option-prefetch-rows option-max-parallel-create-index + option-single-reader + option-multiple-readers + option-rows-per-range option-truncate option-disable-triggers option-data-only diff --git a/src/parsers/command-options.lisp b/src/parsers/command-options.lisp index 9c766c1..d024594 100644 --- a/src/parsers/command-options.lisp +++ b/src/parsers/command-options.lisp @@ -82,11 +82,18 @@ (bind (((_ _ nb) prefetch-rows)) (cons :prefetch-rows (parse-integer (text nb)))))) +(defrule option-rows-per-range (and kw-rows kw-per kw-range + equal-sign + (+ (digit-char-p character))) + (:lambda (rows-per-range) + (cons :rows-per-range (parse-integer (text (fifth rows-per-range)))))) + (defun batch-control-bindings (options) "Generate the code needed to add batch-control" - `((*copy-batch-rows* (or ,(getf options :batch-rows) *copy-batch-rows*)) - (*copy-batch-size* (or ,(getf options :batch-size) *copy-batch-size*)) - (*prefetch-rows* (or ,(getf options :prefetch-rows) *prefetch-rows*)))) + `((*copy-batch-rows* (or ,(getf options :batch-rows) *copy-batch-rows*)) + (*copy-batch-size* (or ,(getf options :batch-size) *copy-batch-size*)) + (*prefetch-rows* (or ,(getf options :prefetch-rows) *prefetch-rows*)) + (*rows-per-range* (or ,(getf options :rows-per-range) *rows-per-range*)))) (defun identifier-case-binding (options) "Generate the code needed to bind *identifer-case* to the proper value." @@ -97,6 +104,7 @@ (option-list '(:batch-rows :batch-size :prefetch-rows + :rows-per-range :identifier-case)) extras) "Given a list of options, remove the generic ones that should already have @@ -129,6 +137,14 @@ (make-option-rule reset-sequences (and kw-reset (? kw-no) kw-sequences)) (make-option-rule foreign-keys (and (? kw-no) kw-foreign kw-keys)) +(defrule option-single-reader (and kw-single kw-reader kw-per kw-thread) + (:constant (cons :multiple-readers nil))) + +(defrule option-multiple-readers (and kw-multiple + (or kw-readers kw-reader) + kw-per kw-thread) + (:constant (cons :multiple-readers t))) + (defrule option-schema-only (and kw-schema kw-only) (:constant (cons :schema-only t))) diff --git a/src/sources/common/api.lisp b/src/sources/common/api.lisp index aa4dcc2..4a216df 100644 --- a/src/sources/common/api.lisp +++ b/src/sources/common/api.lisp @@ -34,6 +34,11 @@ (setf (slot-value source 'transforms) (make-list (length (slot-value source 'columns)))))))) +(defgeneric concurrency-support (copy concurrency) + (:documentation + "Returns nil when no concurrency is supported, or a list of copy ojbects + prepared to run concurrently.")) + (defgeneric map-rows (source &key process-row-fn) (:documentation "Load data from SOURCE and funcall PROCESS-ROW-FUN for each row found in diff --git a/src/sources/common/db-methods.lisp b/src/sources/common/db-methods.lisp index 5de5a87..619b68a 100644 --- a/src/sources/common/db-methods.lisp +++ b/src/sources/common/db-methods.lisp @@ -212,6 +212,7 @@ (on-error-stop *on-error-stop*) (worker-count 4) (concurrency 1) + (multiple-readers nil) max-parallel-create-index (truncate nil) (disable-triggers nil) @@ -271,7 +272,9 @@ max-indexes)))) (idx-channel (when idx-kernel (let ((lp:*kernel* idx-kernel)) - (lp:make-channel))))) + (lp:make-channel)))) + + (task-count 0)) ;; apply catalog level transformations to support the database migration ;; that's CAST rules, index WHERE clause rewriting and ALTER commands @@ -327,12 +330,15 @@ ;; copy-from, we have concurrency tasks writing. (progn ; when copy-data (setf (gethash table writers-count) concurrency) - (copy-from table-source - :concurrency concurrency - :kernel copy-kernel - :channel copy-channel - :on-error-stop on-error-stop - :disable-triggers disable-triggers))))) + + (incf task-count + (copy-from table-source + :concurrency concurrency + :multiple-readers multiple-readers + :kernel copy-kernel + :channel copy-channel + :on-error-stop on-error-stop + :disable-triggers disable-triggers)))))) ;; now end the kernels ;; and each time a table is done, launch its indexing @@ -341,37 +347,35 @@ (with-stats-collection ("COPY Threads Completion" :section :post :use-result-as-read t :use-result-as-rows t) - (let ((worker-count (* (hash-table-count writers-count) - (task-count concurrency)))) - (loop :for tasks :below worker-count - :do (destructuring-bind (task table seconds) - (lp:receive-result copy-channel) - (log-message :debug - "Finished processing ~a for ~s ~50T~6$s" - task (format-table-name table) seconds) - (when (eq :writer task) - ;; - ;; Start the CREATE INDEX parallel tasks only when - ;; the data has been fully copied over to the - ;; corresponding table, that's when the writers - ;; count is down to zero. - ;; - (decf (gethash table writers-count)) - (log-message :debug "writers-counts[~a] = ~a" - (format-table-name table) - (gethash table writers-count)) + (loop :repeat task-count + :do (destructuring-bind (task table seconds) + (lp:receive-result copy-channel) + (log-message :debug + "Finished processing ~a for ~s ~50T~6$s" + task (format-table-name table) seconds) + (when (eq :writer task) + ;; + ;; Start the CREATE INDEX parallel tasks only when + ;; the data has been fully copied over to the + ;; corresponding table, that's when the writers + ;; count is down to zero. + ;; + (decf (gethash table writers-count)) + (log-message :debug "writers-counts[~a] = ~a" + (format-table-name table) + (gethash table writers-count)) - (when (and create-indexes - (zerop (gethash table writers-count))) - (alexandria:appendf - pkeys - (create-indexes-in-kernel (target-db copy) - table - idx-kernel - idx-channel)))))) - (prog1 - worker-count - (lp:end-kernel :wait nil)))))) + (when (and create-indexes + (zerop (gethash table writers-count))) + (alexandria:appendf + pkeys + (create-indexes-in-kernel (target-db copy) + table + idx-kernel + idx-channel))))) + :finally (progn + (lp:end-kernel :wait nil) + (return worker-count)))))) (log-message :info "Done with COPYing data, waiting for indexes") diff --git a/src/sources/common/md-methods.lisp b/src/sources/common/md-methods.lisp index 762429d..7744320 100644 --- a/src/sources/common/md-methods.lisp +++ b/src/sources/common/md-methods.lisp @@ -140,38 +140,38 @@ ;; files actually. (let* ((lp:*kernel* (make-kernel worker-count)) (channel (lp:make-channel)) - (path-list (expand-spec (source copy)))) + (path-list (expand-spec (source copy))) + (task-count 0)) (with-stats-collection ("Files Processed" :section :post :use-result-as-read t :use-result-as-rows t) (loop :for path-spec :in path-list :count t :do (let ((table-source (clone-copy-for copy path-spec))) - (copy-from table-source - :concurrency concurrency - :kernel lp:*kernel* - :channel channel - :on-error-stop on-error-stop - :disable-triggers disable-triggers)))) + (incf task-count + (copy-from table-source + :concurrency concurrency + :kernel lp:*kernel* + :channel channel + :on-error-stop on-error-stop + :disable-triggers disable-triggers))))) ;; end kernel (with-stats-collection ("COPY Threads Completion" :section :post :use-result-as-read t :use-result-as-rows t) - (let ((worker-count (* (length path-list) - (task-count concurrency)))) - (loop :for tasks :below worker-count - :do (handler-case - (destructuring-bind (task table seconds) - (lp:receive-result channel) - (log-message :debug - "Finished processing ~a for ~s ~50T~6$s" - task (format-table-name table) seconds)) - (condition (e) - (log-message :fatal "~a" e)))) - (prog1 - worker-count - (lp:end-kernel :wait nil)))) + (loop :repeat task-count + :do (handler-case + (destructuring-bind (task table seconds) + (lp:receive-result channel) + (log-message :debug + "Finished processing ~a for ~s ~50T~6$s" + task (format-table-name table) seconds)) + (condition (e) + (log-message :fatal "~a" e))) + :finally (progn + (lp:end-kernel :wait nil) + (return task-count)))) (lp:end-kernel :wait t)) ;; re-create the indexes from the target table entry diff --git a/src/sources/common/methods.lisp b/src/sources/common/methods.lisp index 6d11fb4..15aecb8 100644 --- a/src/sources/common/methods.lisp +++ b/src/sources/common/methods.lisp @@ -64,58 +64,96 @@ (format-vector-row text-file row (transforms copy))))) (map-rows copy :process-row-fn row-fn)))) -(defun task-count (concurrency) - "Return how many threads we are going to start given a number of WORKERS." - ;; (+ 1 concurrency concurrency) - (+ 1 concurrency)) - (defmethod copy-from ((copy copy) &key (kernel nil k-s-p) (channel nil c-s-p) (worker-count 8) (concurrency 2) + (multiple-readers nil) (on-error-stop *on-error-stop*) disable-triggers) "Copy data from COPY source into PostgreSQL." (let* ((table-name (format-table-name (target copy))) (lp:*kernel* (or kernel (make-kernel worker-count))) (channel (or channel (lp:make-channel))) - (rawq (lq:make-queue :fixed-capacity *concurrent-batches*))) + (readers nil) + (task-count 0)) - (lp:task-handler-bind - ((on-error-stop - #'(lambda (condition) - ;; everything has been handled already - (lp:invoke-transfer-error condition))) - (error - #'(lambda (condition) - (log-message :error "A thread failed with error: ~a" condition) - (if (member *client-min-messages* (list :debug :data)) - #-pgloader-image - (log-message :error "~a" - (trivial-backtrace:print-backtrace condition - :output nil)) - #+pgloader-image - (lp::invoke-debugger condition)) - (lp::invoke-transfer-error condition)))) - (log-message :notice "COPY ~a" table-name) + (flet ((submit-task (channel function &rest args) + (apply #'lp:submit-task channel function args) + (incf task-count))) - ;; start a task to read data from the source into the queue - (lp:submit-task channel #'queue-raw-data copy rawq concurrency) + (lp:task-handler-bind + ((on-error-stop + #'(lambda (condition) + ;; everything has been handled already + (lp:invoke-transfer-error condition))) + (error + #'(lambda (condition) + (log-message :error "A thread failed with error: ~a" condition) + (if (member *client-min-messages* (list :debug :data)) + #-pgloader-image + (log-message :error "~a" + (trivial-backtrace:print-backtrace condition + :output nil)) + #+pgloader-image + (lp::invoke-debugger condition)) + (lp::invoke-transfer-error condition)))) + (log-message :notice "COPY ~a" table-name) - ;; start a task to transform the raw data in the copy format - ;; and send that data down to PostgreSQL - (loop :repeat concurrency - :do (lp:submit-task channel #'pgloader.pgsql::copy-rows-from-queue - copy rawq - :on-error-stop on-error-stop - :disable-triggers disable-triggers)) + ;; Check for Read Concurrency Support from our source + (when (and multiple-readers (< 1 concurrency)) + (let ((label "Check Concurrency Support")) + (with-stats-collection (label :section :pre) + (setf readers (concurrency-support copy concurrency)) + (update-stats :pre label :read 1 :rows (if readers 1 0)) + (when readers + (log-message :notice "Multiple Readers Enabled for ~a" + (format-table-name (target copy))))))) - ;; now wait until both the tasks are over, and kill the kernel - (unless c-s-p - (log-message :debug "waiting for ~d tasks" (task-count concurrency)) - (loop :repeat (task-count concurrency) - :do (lp:receive-result channel)) - (log-message :notice "COPY ~s done." table-name) - (unless k-s-p (lp:end-kernel :wait t)))))) + ;; when reader is non-nil, we have reader concurrency support! + (if readers + ;; here we have detected Concurrency Support: we create as many + ;; readers as writers and create associated couples, each couple + ;; shares its own queue + (let ((rawqs + (loop :repeat concurrency :collect + (lq:make-queue :fixed-capacity *prefetch-rows*)))) + (log-message :info "Read Concurrency Enabled for ~s" + (format-table-name (target copy))) + + (loop :for rawq :in rawqs :for reader :in readers :do + ;; each reader pretends to be alone, pass 1 as concurrency + (submit-task channel #'queue-raw-data reader rawq 1) + + (submit-task channel #'pgloader.pgsql::copy-rows-from-queue + copy rawq + :on-error-stop on-error-stop + :disable-triggers disable-triggers))) + + ;; no Read Concurrency Support detected, start a single reader + ;; task, using a single data queue that is read by multiple + ;; writers. + (let ((rawq + (lq:make-queue :fixed-capacity *prefetch-rows*))) + (submit-task channel #'queue-raw-data copy rawq concurrency) + + ;; start a task to transform the raw data in the copy format + ;; and send that data down to PostgreSQL + (loop :repeat concurrency :do + (submit-task channel #'pgloader.pgsql::copy-rows-from-queue + copy rawq + :on-error-stop on-error-stop + :disable-triggers disable-triggers)))) + + ;; now wait until both the tasks are over, and kill the kernel + (unless c-s-p + (log-message :debug "waiting for ~d tasks" task-count) + (loop :repeat task-count :do (lp:receive-result channel)) + (log-message :notice "COPY ~s done." table-name) + (unless k-s-p (lp:end-kernel :wait t))) + + ;; return task-count, which is how many tasks we submitted to our + ;; lparallel kernel. + task-count)))) diff --git a/src/sources/mysql/mysql.lisp b/src/sources/mysql/mysql.lisp index 1e04357..968b69e 100644 --- a/src/sources/mysql/mysql.lisp +++ b/src/sources/mysql/mysql.lisp @@ -7,7 +7,10 @@ (defclass copy-mysql (db-copy) ((encoding :accessor encoding ; allows forcing encoding :initarg :encoding - :initform nil)) + :initform nil) + (range-list :accessor range-list + :initarg :range-list + :initform nil)) (:documentation "pgloader MySQL Data Source")) (defmethod initialize-instance :after ((source copy-mysql) &key) @@ -29,6 +32,74 @@ ;;; ;;; Implement the specific methods ;;; +(defmethod concurrency-support ((mysql copy-mysql) concurrency) + "Splits the read work thanks WHERE clauses when possible and relevant, + return nil if we decide to read all in a single thread, and a list of as + many copy-mysql instances as CONCURRENCY otherwise. Each copy-mysql + instance in the returned list embeds specifications about how to read + only its partition of the source data." + (unless (= 1 concurrency) + (let* ((indexes (table-index-list (target mysql))) + (pkey (first (remove-if-not #'index-primary indexes))) + (pcol (when pkey (first (index-columns pkey)))) + (coldef (when pcol + (find pcol + (table-column-list (target mysql)) + :key #'column-name + :test #'string=))) + (ptype (when (and coldef (stringp (column-type-name coldef))) + (column-type-name coldef)))) + (when (member ptype (list "integer" "bigint" "serial" "bigserial") + :test #'string=) + ;; the table has a primary key over a integer data type we are able + ;; to generate WHERE clause and range index scans. + (with-connection (*connection* (source-db mysql)) + (let* ((col pcol) + (sql (format nil "select min(`~a`), max(`~a`) from `~a`" + col col (table-source-name (source mysql))))) + (destructuring-bind (min max) + (mapcar #'parse-integer (first (mysql-query sql))) + ;; generate a list of ranges from min to max + (let ((range-list (split-range min max *rows-per-range*))) + (unless (< (length range-list) concurrency) + ;; affect those ranges to each reader, we have CONCURRENCY + ;; of them + (let ((partitions (distribute range-list concurrency))) + (loop :for part :in partitions :collect + (make-instance 'copy-mysql + :source-db (clone-connection + (source-db mysql)) + :target-db (target-db mysql) + :source (source mysql) + :target (target mysql) + :fields (fields mysql) + :columns (columns mysql) + :transforms (transforms mysql) + :encoding (encoding mysql) + :range-list (cons col part))))))))))))) + +(defmacro with-encoding-handler (&body forms) + `(handler-bind + ;; avoid trying to fetch the character at end-of-input position... + ((babel-encodings:end-of-input-in-character + #'(lambda (c) + (update-stats :data (target mysql) :errs 1) + (log-message :error "~a" c) + (invoke-restart 'qmynd-impl::use-nil))) + (babel-encodings:character-decoding-error + #'(lambda (c) + (update-stats :data (target mysql) :errs 1) + (let ((encoding (babel-encodings:character-coding-error-encoding c)) + (position (babel-encodings:character-coding-error-position c)) + (character + (aref (babel-encodings:character-coding-error-buffer c) + (babel-encodings:character-coding-error-position c)))) + (log-message :error + "~a: Illegal ~a character starting at position ~a: ~a." + table-name encoding position character)) + (invoke-restart 'qmynd-impl::use-nil)))) + (progn ,@forms))) + (defmethod map-rows ((mysql copy-mysql) &key process-row-fn) "Extract MySQL data and call PROCESS-ROW-FN function with a single argument (a list of column values) for each row." @@ -43,27 +114,24 @@ (log-message :notice "Force encoding to ~a for ~a" qmynd:*mysql-encoding* table-name)) (let* ((cols (get-column-list (db-name (source-db mysql)) table-name)) - (sql (format nil "SELECT ~{~a~^, ~} FROM `~a`;" cols table-name))) - (handler-bind - ;; avoid trying to fetch the character at end-of-input position... - ((babel-encodings:end-of-input-in-character - #'(lambda (c) - (update-stats :data (target mysql) :errs 1) - (log-message :error "~a" c) - (invoke-restart 'qmynd-impl::use-nil))) - (babel-encodings:character-decoding-error - #'(lambda (c) - (update-stats :data (target mysql) :errs 1) - (let ((encoding (babel-encodings:character-coding-error-encoding c)) - (position (babel-encodings:character-coding-error-position c)) - (character - (aref (babel-encodings:character-coding-error-buffer c) - (babel-encodings:character-coding-error-position c)))) - (log-message :error - "~a: Illegal ~a character starting at position ~a: ~a." - table-name encoding position character)) - (invoke-restart 'qmynd-impl::use-nil)))) - (mysql-query sql :row-fn process-row-fn :result-type 'vector)))))) + (sql (format nil "SELECT ~{~a~^, ~} FROM `~a`" cols table-name))) + + (if (range-list mysql) + ;; read a range at a time, in a loop + (destructuring-bind (colname . ranges) (range-list mysql) + (loop :for (min max) :in ranges :do + (let ((sql (format nil "~a WHERE `~a` >= ~a AND `~a` < ~a" + sql colname min colname max))) + (with-encoding-handler + (mysql-query sql + :row-fn process-row-fn + :result-type 'vector))))) + + ;; read it all, no WHERE clause + (with-encoding-handler + (mysql-query sql + :row-fn process-row-fn + :result-type 'vector))))))) diff --git a/src/utils/threads.lisp b/src/utils/threads.lisp index 0f2199c..c950fc5 100644 --- a/src/utils/threads.lisp +++ b/src/utils/threads.lisp @@ -10,6 +10,7 @@ `((*monitoring-queue* . ,*monitoring-queue*) (*copy-batch-rows* . ,*copy-batch-rows*) (*copy-batch-size* . ,*copy-batch-size*) + (*rows-per-range* . ,*rows-per-range*) (*prefetch-rows* . ,*prefetch-rows*) (*pg-settings* . ',*pg-settings*) (*mysql-settings* . ',*mysql-settings*) diff --git a/src/utils/utils.lisp b/src/utils/utils.lisp index 6c49130..76844ac 100644 --- a/src/utils/utils.lisp +++ b/src/utils/utils.lisp @@ -76,3 +76,21 @@ :until (<= limit bytes) :finally (return (format nil "~5,1f ~a~a" (/ bytes limit) multiple unit))))) + +;;; +;;; Defining ranges and partitions. +;;; +(defun split-range (min max &optional (count *rows-per-range*)) + "Split the range from MIN to MAX into sub-ranges of COUNT elements." + (loop :for i := min :then j + :for j := (+ i count) + :while (< i max) + :collect (list i (min j max)))) + +(defun distribute (list-of-ranges count) + "Split a list of ranges into COUNT sublists." + (let ((result (make-array count :element-type 'list :initial-element nil))) + (loop :for i :from 0 + :for range :in list-of-ranges + :do (push range (aref result (mod i count)))) + (map 'list #'reverse result ))) diff --git a/test/sakila.load b/test/sakila.load index eecc7f1..9c87ed6 100644 --- a/test/sakila.load +++ b/test/sakila.load @@ -7,7 +7,9 @@ load database -- WITH batch rows = 10000 - WITH on error stop, concurrency = 1, workers = 6, + WITH on error stop, concurrency = 2, workers = 6, + prefetch rows = 25000, + -- multiple readers per thread, rows per range = 50000, max parallel create index = 4-- , -- quote identifiers