From f86371970fa62a2ddb2d939947428a74c938484a Mon Sep 17 00:00:00 2001 From: Dimitri Fontaine Date: Wed, 24 Jan 2018 21:48:19 +0100 Subject: [PATCH] Review the pgloader COPY implementation further. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Refactor file organisation further to allow for adding a “direct stream” option when the on-error-stop behavior has been selected. This happens currently by default for databases sources. Introduce the new WITH option “on error resume next” which forces the classic behavior of pgloader. The option “on error stop” already existed, its implementation is new. When this new behavior is activated, the data is sent to PostgreSQL directly, without intermediate batches being built. It means that the whole operation fails at the first error, and we don't have any information in memory to try replaying any COPY of the data. It's gone. This behavior should be fine for database migrations as you don't usually want to fix the data manually in intermediate files, you want to fix the problem at the source database and do the whole dance all-over again, up until your casting rules are perfect. This patch might also incurr some performance benenits in terms of both timing and memory usage, though the local testing didn't show much of anything for the moment. --- docs/pgloader.rst | 23 ++- pgloader.asd | 8 +- src/package.lisp | 1 + src/parsers/command-copy.lisp | 4 +- src/parsers/command-csv.lisp | 4 +- src/parsers/command-dbf.lisp | 3 + src/parsers/command-fixed.lisp | 1 + src/parsers/command-ixf.lisp | 3 + src/parsers/command-keywords.lisp | 2 + src/parsers/command-mssql.lisp | 3 + src/parsers/command-mysql.lisp | 3 + src/parsers/command-options.lisp | 4 + src/parsers/command-sqlite.lisp | 3 + src/pg-copy/copy-db-write.lisp | 134 +++++++++++++++++ src/pg-copy/copy-format.lisp | 33 ++++- src/pg-copy/copy-from-queue.lisp | 205 ++++++--------------------- src/pg-copy/copy-rows-in-batch.lisp | 144 +++++++++++++++++++ src/pg-copy/copy-rows-in-stream.lisp | 73 ++++++++++ src/pg-copy/copy-write-batch.lisp | 35 ----- src/sources/common/methods.lisp | 8 +- 20 files changed, 483 insertions(+), 211 deletions(-) create mode 100644 src/pg-copy/copy-db-write.lisp create mode 100644 src/pg-copy/copy-rows-in-batch.lisp create mode 100644 src/pg-copy/copy-rows-in-stream.lisp delete mode 100644 src/pg-copy/copy-write-batch.lisp diff --git a/docs/pgloader.rst b/docs/pgloader.rst index fb51bdf..8f17301 100644 --- a/docs/pgloader.rst +++ b/docs/pgloader.rst @@ -253,6 +253,27 @@ containing the full PostgreSQL client side logs about the rejected data. The `.dat` file is formatted in PostgreSQL the text COPY format as documented in `http://www.postgresql.org/docs/9.2/static/sql-copy.html#AEN66609`. +It is possible to use the following WITH options to control pgloader batch +behavior: + + - *on error stop*, *on error resume next* + + This option controls if pgloader is using building batches of data at + all. The batch implementation allows pgloader to recover errors by + sending the data that PostgreSQL accepts again, and by keeping away the + data that PostgreSQL rejects. + + To enable retrying the data and loading the good parts, use the option + *on error resume next*, which is the default to file based data loads + (such as CSV, IXF or DBF). + + When migrating from another RDMBS technology, it's best to have a + reproducible loading process. In that case it's possible to use *on + error stop* and fix either the casting rules, the data transformation + functions or in cases the input data until your migration runs through + completion. That's why *on error resume next* is the default for SQLite, + MySQL and MS SQL source kinds. + A Note About Performance ------------------------ @@ -472,7 +493,7 @@ See each specific command for details. All data sources specific commands support the following options: - - *on error stop* + - *on error stop*, *on error resume next* - *batch rows = R* - *batch size = ... MB* - *prefetch rows = ...* diff --git a/pgloader.asd b/pgloader.asd index 3aec192..b0ae530 100644 --- a/pgloader.asd +++ b/pgloader.asd @@ -193,9 +193,11 @@ :components ((:file "copy-batch") (:file "copy-format") - (:file "copy-write-batch") - (:file "copy-from-queue") - (:file "copy-retry-batch"))) + (:file "copy-db-write") + (:file "copy-rows-in-stream") + (:file "copy-rows-in-batch") + (:file "copy-retry-batch") + (:file "copy-from-queue"))) (:module "parsers" :depends-on ("params" diff --git a/src/package.lisp b/src/package.lisp index 25efc22..aee98f5 100644 --- a/src/package.lisp +++ b/src/package.lisp @@ -518,6 +518,7 @@ #:database-error-context) (:import-from #:cl-postgres-trivial-utf-8 #:utf-8-byte-length + #:as-utf-8-bytes #:string-to-utf-8-bytes) (:export #:copy-rows-from-queue #:format-vector-row)) diff --git a/src/parsers/command-copy.lisp b/src/parsers/command-copy.lisp index bea3461..555d447 100644 --- a/src/parsers/command-copy.lisp +++ b/src/parsers/command-copy.lisp @@ -34,6 +34,7 @@ (:destructure (kw null) (declare (ignore kw)) (cons :null-as null))) (defrule copy-option (or option-on-error-stop + option-on-error-resume-next option-workers option-concurrency option-batch-rows @@ -135,8 +136,7 @@ :fields ',fields :columns ',columns ,@(remove-batch-control-option - options :extras '(:on-error-stop - :worker-count + options :extras '(:worker-count :concurrency :truncate :drop-indexes diff --git a/src/parsers/command-csv.lisp b/src/parsers/command-csv.lisp index c141f29..88819df 100644 --- a/src/parsers/command-csv.lisp +++ b/src/parsers/command-csv.lisp @@ -114,6 +114,7 @@ (cons :escape-mode escape-mode)))) (defrule csv-option (or option-on-error-stop + option-on-error-resume-next option-workers option-concurrency option-batch-rows @@ -447,8 +448,7 @@ :fields ',fields :columns ',columns ,@(remove-batch-control-option - options :extras '(:on-error-stop - :worker-count + options :extras '(:worker-count :concurrency :truncate :drop-indexes diff --git a/src/parsers/command-dbf.lisp b/src/parsers/command-dbf.lisp index 6d9a66c..0a6fc3c 100644 --- a/src/parsers/command-dbf.lisp +++ b/src/parsers/command-dbf.lisp @@ -19,6 +19,7 @@ (cons :table-name (text table-name))))) (defrule dbf-option (or option-on-error-stop + option-on-error-resume-next option-workers option-concurrency option-batch-rows @@ -98,6 +99,7 @@ (let* (,@(pgsql-connection-bindings pg-db-conn gucs) ,@(batch-control-bindings options) ,@(identifier-case-binding options) + (on-error-stop (getf ',options :on-error-stop)) (source-db (with-stats-collection ("fetch" :section :pre) (expand (fetch-file ,dbf-db-conn)))) (source @@ -111,6 +113,7 @@ (pgloader.sources:copy-database source ,@(remove-batch-control-option options) + :on-error-stop on-error-stop :create-indexes nil :foreign-keys nil :reset-sequences nil) diff --git a/src/parsers/command-fixed.lisp b/src/parsers/command-fixed.lisp index ae36da8..9d85b17 100644 --- a/src/parsers/command-fixed.lisp +++ b/src/parsers/command-fixed.lisp @@ -44,6 +44,7 @@ (bind (((_ field-defs _) source)) field-defs))) (defrule fixed-option (or option-on-error-stop + option-on-error-resume-next option-workers option-concurrency option-batch-rows diff --git a/src/parsers/command-ixf.lisp b/src/parsers/command-ixf.lisp index 4f9f2a9..24511ee 100644 --- a/src/parsers/command-ixf.lisp +++ b/src/parsers/command-ixf.lisp @@ -19,6 +19,7 @@ (bind (((_ tz) tzopt)) (cons :timezone tz)))) (defrule ixf-option (or option-on-error-stop + option-on-error-resume-next option-workers option-concurrency option-batch-rows @@ -83,6 +84,7 @@ ,@(batch-control-bindings options) ,@(identifier-case-binding options) (timezone (getf ',options :timezone)) + (on-error-stop(getf ',options :on-error-stop)) (source-db (with-stats-collection ("fetch" :section :pre) (expand (fetch-file ,ixf-db-conn)))) (source @@ -98,6 +100,7 @@ ,@(remove-batch-control-option options :extras '(:timezone)) + :on-error-stop on-error-stop :foreign-keys nil :reset-sequences nil) diff --git a/src/parsers/command-keywords.lisp b/src/parsers/command-keywords.lisp index 3f7a8e8..d670bb9 100644 --- a/src/parsers/command-keywords.lisp +++ b/src/parsers/command-keywords.lisp @@ -50,6 +50,8 @@ (def-keyword-rule "on") (def-keyword-rule "error") (def-keyword-rule "stop") + (def-keyword-rule "resume") + (def-keyword-rule "next") (def-keyword-rule "parameters") ;; option for loading from a file (def-keyword-rule "workers") diff --git a/src/parsers/command-mssql.lisp b/src/parsers/command-mssql.lisp index fa153b0..f53b92d 100644 --- a/src/parsers/command-mssql.lisp +++ b/src/parsers/command-mssql.lisp @@ -15,6 +15,7 @@ (make-option-rule create-schemas (and kw-create (? kw-no) kw-schemas)) (defrule mssql-option (or option-on-error-stop + option-on-error-resume-next option-workers option-concurrency option-batch-rows @@ -150,6 +151,7 @@ (let* ((*default-cast-rules* ',*mssql-default-cast-rules*) (*cast-rules* ',casts) (*mssql-settings* ',mssql-gucs) + (on-error-stop (getf ',options :on-error-stop t)) ,@(pgsql-connection-bindings pg-db-conn gucs) ,@(batch-control-bindings options) ,@(identifier-case-binding options) @@ -166,6 +168,7 @@ :alter-schema ',alter-schema :alter-table ',alter-table :set-table-oids t + :on-error-stop on-error-stop ,@(remove-batch-control-option options)) ,(sql-code-block pg-db-conn :post after "after load")))) diff --git a/src/parsers/command-mysql.lisp b/src/parsers/command-mysql.lisp index 240be3a..10274cb 100644 --- a/src/parsers/command-mysql.lisp +++ b/src/parsers/command-mysql.lisp @@ -8,6 +8,7 @@ ;;; MySQL options ;;; (defrule mysql-option (or option-on-error-stop + option-on-error-resume-next option-workers option-concurrency option-batch-rows @@ -154,6 +155,7 @@ (*cast-rules* ',casts) (*decoding-as* ',decoding-as) (*mysql-settings* ',mysql-gucs) + (on-error-stop (getf ',options :on-error-stop t)) ,@(pgsql-connection-bindings pg-db-conn gucs) ,@(batch-control-bindings options) ,@(identifier-case-binding options) @@ -171,6 +173,7 @@ :alter-table ',alter-table :alter-schema ',alter-schema :set-table-oids t + :on-error-stop on-error-stop ,@(remove-batch-control-option options)) ,(sql-code-block pg-db-conn :post after "after load")))) diff --git a/src/parsers/command-options.lisp b/src/parsers/command-options.lisp index b136088..b215521 100644 --- a/src/parsers/command-options.lisp +++ b/src/parsers/command-options.lisp @@ -105,6 +105,7 @@ :batch-size :prefetch-rows :rows-per-range + :on-error-stop :identifier-case)) extras) "Given a list of options, remove the generic ones that should already have @@ -160,6 +161,9 @@ (defrule option-on-error-stop (and kw-on kw-error kw-stop) (:constant (cons :on-error-stop t))) +(defrule option-on-error-resume-next (and kw-on kw-error kw-resume kw-next) + (:constant (cons :on-error-stop nil))) + (defrule option-identifiers-case (and (or kw-snake_case kw-downcase kw-quote) diff --git a/src/parsers/command-sqlite.lisp b/src/parsers/command-sqlite.lisp index 0c9f80b..d238bff 100644 --- a/src/parsers/command-sqlite.lisp +++ b/src/parsers/command-sqlite.lisp @@ -14,6 +14,7 @@ load database set work_mem to '16MB', maintenance_work_mem to '512 MB'; |# (defrule sqlite-option (or option-on-error-stop + option-on-error-resume-next option-workers option-concurrency option-batch-rows @@ -99,6 +100,7 @@ load database `(lambda () (let* ((*default-cast-rules* ',*sqlite-default-cast-rules*) (*cast-rules* ',casts) + (on-error-stop (getf ',options :on-error-stop t)) ,@(pgsql-connection-bindings pg-db-conn gucs) ,@(batch-control-bindings options) ,@(identifier-case-binding options) @@ -117,6 +119,7 @@ load database :set-table-oids t :including ',incl :excluding ',excl + :on-error-stop on-error-stop ,@(remove-batch-control-option options)) ,(sql-code-block pg-db-conn :post after "after load")))) diff --git a/src/pg-copy/copy-db-write.lisp b/src/pg-copy/copy-db-write.lisp new file mode 100644 index 0000000..0b09f28 --- /dev/null +++ b/src/pg-copy/copy-db-write.lisp @@ -0,0 +1,134 @@ +;;; +;;; The PostgreSQL COPY TO implementation, with batches and retries. +;;; +;;; Here, sending the data in the COPY stream opened in copy-batch. +;;; +(in-package :pgloader.copy) + +(define-condition copy-init-error (error) + ((table :initarg :table :reader copy-init-error-table) + (columns :initarg :columns :reader copy-init-error-columns) + (condition :initarg :condition :reader copy-init-error-condition)) + (:report (lambda (err stream) + (format stream + "Can't init COPY to ~a~@[(~{~a~^, ~})~]: ~%~a" + (format-table-name (copy-init-error-table err)) + (copy-init-error-columns err) + (copy-init-error-condition err))))) + +;;; +;;; Stream prepared data from *writer-batch* down to PostgreSQL using the +;;; COPY protocol, and retry the batch avoiding known bad rows (from parsing +;;; COPY error messages) in case some data related conditions are signaled. +;;; +(defun db-write-batch (copier batch) + (loop :for count :below (batch-count batch) + :for data :across (batch-data batch) + :do (when data + (db-write-row copier data)) + :finally (return (batch-count batch)))) + +(defun db-write-row (copier data) + "Copy cl-postgres:db-write-row guts to avoid computing utf-8 bytes all + over again, as we reproduced the data formating in pgloader code. The + reason we do that is to be able to lower the cost of retrying batches: + the formating has then already been done." + (let* ((connection (cl-postgres::copier-database copier)) + (cl-postgres::socket (cl-postgres::connection-socket connection))) + (cl-postgres::with-reconnect-restart connection + (cl-postgres::using-connection connection + (cl-postgres::with-syncing + (cl-postgres::write-uint1 cl-postgres::socket 100) + (cl-postgres::write-uint4 cl-postgres::socket (+ 4 (length data))) + (loop :for byte :across data + :do (write-byte byte cl-postgres::socket)))))) + (incf (cl-postgres::copier-count copier))) + + + +;;; +;;; Functions to stream a vector of rows as strings directly into the COPY +;;; socket stream, without using an intermediate buffer. +;;; +(defun db-write-vector-row (copier row &optional (nbcols (length row))) + "Copy cl-postgres:db-write-row guts to avoid computing utf-8 bytes all + over again, as we reproduced the data formating in pgloader code. The + reason we do that is to be able to lower the cost of retrying batches: + the formating has then already been done." + (declare (optimize + (speed 3) + #-ecl(safety 0) #+ecl(safety 1) + (space 0) + (debug 1) + (compilation-speed 0))) + (let* ((col-bytes (map 'vector + (lambda (col) + (if (col-null-p col) 2 + (copy-utf-8-byte-length col))) + row)) + (row-bytes (+ nbcols (reduce #'+ col-bytes))) + (connection (cl-postgres::copier-database copier)) + (cl-postgres::socket (cl-postgres::connection-socket connection))) + (cl-postgres::with-reconnect-restart connection + (cl-postgres::using-connection connection + (cl-postgres::with-syncing + (cl-postgres::write-uint1 cl-postgres::socket 100) + (cl-postgres::write-uint4 cl-postgres::socket (+ 4 row-bytes)) + (macrolet ((send-byte (byte) + `(write-byte ,byte cl-postgres::socket))) + (loop :for col :across row + :for i fixnum :from 1 + :do (if (col-null-p col) + (progn + (send-byte #. (char-code #\\)) + (send-byte #. (char-code #\N))) + + (loop :for char :across col + :do (as-copy-utf-8-bytes char send-byte))) + :do (if (< i nbcols) + (send-byte #. (char-code #\Tab)) + (send-byte #. (char-code #\Newline)))))))) + (incf (cl-postgres::copier-count copier)) + row-bytes)) + + +(defun db-write-escaped-vector-row (copier row &optional (nbcols (length row))) + "Copy cl-postgres:db-write-row guts to avoid computing utf-8 bytes all + over again, as we reproduced the data formating in pgloader code. The + reason we do that is to be able to lower the cost of retrying batches: + the formating has then already been done." + (declare (optimize + (speed 3) + #-ecl(safety 0) #+ecl(safety 1) + (space 0) + (debug 1) + (compilation-speed 0))) + (let* ((col-bytes (map 'vector + (lambda (col) + (if (col-null-p col) 2 + (utf-8-byte-length col))) + row)) + (row-bytes (+ nbcols (reduce #'+ col-bytes))) + (connection (cl-postgres::copier-database copier)) + (cl-postgres::socket (cl-postgres::connection-socket connection))) + (cl-postgres::with-reconnect-restart connection + (cl-postgres::using-connection connection + (cl-postgres::with-syncing + (cl-postgres::write-uint1 cl-postgres::socket 100) + (cl-postgres::write-uint4 cl-postgres::socket (+ 4 row-bytes)) + (macrolet ((send-byte (byte) + `(write-byte ,byte cl-postgres::socket))) + (loop :for col :across row + :for i fixnum :from 1 + :do (if (col-null-p col) + (progn + (send-byte #. (char-code #\\)) + (send-byte #. (char-code #\N))) + + (loop :for char :across col + :do (as-utf-8-bytes char send-byte))) + :do (if (< i nbcols) + (send-byte #. (char-code #\Tab)) + (send-byte #. (char-code #\Newline)))))))) + (incf (cl-postgres::copier-count copier)) + row-bytes)) diff --git a/src/pg-copy/copy-format.lisp b/src/pg-copy/copy-format.lisp index 5f8ca6a..de441c0 100644 --- a/src/pg-copy/copy-format.lisp +++ b/src/pg-copy/copy-format.lisp @@ -13,6 +13,33 @@ ;;; call here. ;;; +(defun prepare-and-format-row (copy nbcols row) + "Prepare given ROW in PostgreSQL COPY format" + (let* ((row (prepare-row copy nbcols row))) + (multiple-value-bind (pg-vector-row bytes) + (if row + (ecase (copy-format copy) + (:raw (format-vector-row nbcols row)) + (:escaped (format-escaped-vector-row nbcols row))) + (values nil 0)) + + ;; we might have to debug + (when pg-vector-row + (log-message :data "> ~s" (map 'string #'code-char pg-vector-row)) + + (values pg-vector-row bytes))))) + +(defun prepare-row (copy nbcols row) + "Prepare given ROW by applying the pre-processing and transformation + functions registered in the COPY context." + (let* ((preprocessed-row (if (preprocessor copy) + (funcall (preprocessor copy) row) + row))) + (cond ((eq :escaped (copy-format copy)) preprocessed-row) + ((null (transforms copy)) preprocessed-row) + (t + (apply-transforms copy nbcols preprocessed-row (transforms copy)))))) + (defun format-vector-row (nb-cols row) (declare (optimize (speed 3) @@ -27,9 +54,9 @@ (len (+ nb-cols (reduce #'+ lens))) (buf (make-array (the fixnum len) :element-type '(unsigned-byte 8)))) (loop :for col :across row - :for i :from 1 - :for position := 0 :then (+ position col-len 1) - :for col-len :across lens + :for i fixnum :from 1 + :for position fixnum := 0 :then (+ position col-len 1) + :for col-len fixnum :across lens :do (if (col-null-p col) (insert-copy-null buf position) (string-to-copy-utf-8-bytes col buf position)) diff --git a/src/pg-copy/copy-from-queue.lisp b/src/pg-copy/copy-from-queue.lisp index 499fc31..f586c13 100644 --- a/src/pg-copy/copy-from-queue.lisp +++ b/src/pg-copy/copy-from-queue.lisp @@ -3,76 +3,6 @@ ;;; (in-package :pgloader.copy) -(defun send-batch (table columns batch - &key - (db pomo:*database*) - on-error-stop) - "Copy current *writer-batch* into TABLE-NAME." - ;; We need to keep a copy of the rows we send through the COPY - ;; protocol to PostgreSQL to be able to process them again in case - ;; of a data error being signaled, that's the BATCH here. - (let ((table-name (format-table-name table)) - (pomo:*database* db)) - ;; We can't use with-pgsql-transaction here because of the specifics - ;; of error handling in case of cl-postgres:open-db-writer errors: the - ;; transaction is dead already when we get a signal, and the COMMIT or - ;; ABORT steps then trigger a protocol error on a #\Z message. - (handler-case - (progn - (pomo:execute "BEGIN") - (let* ((copier - (handler-case - (cl-postgres:open-db-writer db table-name columns) - (condition (c) - ;; failed to open the COPY protocol mode (e.g. missing - ;; columns on the target table), stop here, - ;; transaction is dead already (no ROLLBACK needed). - (log-message :fatal - "Can't init COPY to ~a~@[(~{~a~^, ~})~]: ~%~a" - (format-table-name table) - columns - c) - (update-stats :data table :errs 1) - (return-from send-batch 0))))) - (unwind-protect - (db-write-batch copier batch) - (cl-postgres:close-db-writer copier) - (pomo:execute "COMMIT")))) - - ;; If PostgreSQL signals a data error, process the batch by isolating - ;; erroneous data away and retrying the rest. - (postgresql-retryable (condition) - (pomo:execute "ROLLBACK") - - (log-message :error "PostgreSQL [~s] ~a" table-name condition) - (if on-error-stop - ;; re-signal the condition to upper level - (signal 'on-error-stop :on-condition condition) - - ;; normal behavior, on-error-stop being nil - ;; clean the current transaction before retrying new ones - (let ((errors - (retry-batch table columns batch condition))) - (log-message :debug "retry-batch found ~d errors" errors) - (update-stats :data table :rows (- errors))))) - - (postgresql-unavailable (condition) - - (log-message :error "[PostgreSQL ~s] ~a" table-name condition) - (log-message :error "Copy Batch reconnecting to PostgreSQL") - - ;; in order to avoid Socket error in "connect": ECONNREFUSED if we - ;; try just too soon, wait a little - (sleep 2) - - (cl-postgres:reopen-database db) - (send-batch table columns batch :db db :on-error-stop on-error-stop)) - - (condition (c) - ;; non retryable failures - (log-message :error "Non-retryable error ~a" c) - (pomo:execute "ROLLBACK"))))) - ;;; ;;; We receive raw input rows from an lparallel queue, push their content ;;; down to PostgreSQL, handling any data related errors in the way. @@ -91,82 +21,40 @@ PostgreSQL, and when that's done update stats." (let* ((nbcols (length (table-column-list (pgloader.sources::target copy)))) - (current-batch (make-batch)) (seconds 0)) - ;; add some COPY activity related bits to our COPY object. - (setf (transforms copy) - (let ((funs (transforms copy))) - (unless (every #'null funs) - funs)) + ;; we need to compute some information and have them at the right place + ;; FIXME: review the API here, that's an half-baked refactoring. + (prepare-copy-parameters copy) - ;; FIXME: we should change the API around preprocess-row, someday. - (preprocessor copy) - (pgloader.sources::preprocess-row copy) + (log-message :info "COPY ON ERROR ~:[RESUME NEXT~;STOP~]" on-error-stop) - ;; FIXME: we could change the API around data-is-preformatted-p, - ;; but that's a bigger change than duplicating the information in - ;; the object. - (copy-format copy) - (if (data-is-preformatted-p copy) :escaped :raw)) + (pgloader.pgsql:with-pgsql-connection (pgconn) + (with-schema (unqualified-table-name table) + (with-disabled-triggers (unqualified-table-name + :disable-triggers disable-triggers) + (log-message :info "pgsql:copy-rows-from-queue[~a]: ~a ~a" + (lp:kernel-worker-index) + (format-table-name table) + columns) - (flet ((send-current-batch (unqualified-table-name) - ;; we close over the whole lexical environment or almost... - (let ((batch-start-time (get-internal-real-time))) - (send-batch table - columns - current-batch - :on-error-stop on-error-stop) + (if on-error-stop + ;; + ;; When on-error-stop is true, we don't need to handle batch + ;; processing, we can stop as soon as there's a failure. + ;; + (incf seconds + (stream-rows-to-copy table columns copy nbcols queue)) - (let ((batch-seconds (elapsed-time-since batch-start-time))) - (log-message :debug - "send-batch[~a] ~a ~d row~:p [~a] in ~6$s~@[ [oversized]~]" - (lp:kernel-worker-index) - unqualified-table-name - (batch-count current-batch) - (pretty-print-bytes (batch-bytes current-batch)) - batch-seconds - (batch-oversized-p current-batch)) - (update-stats :data table - :rows (batch-count current-batch) - :bytes (batch-bytes current-batch)) - - ;; return batch-seconds - batch-seconds)))) - (declare (inline send-current-batch)) - - (pgloader.pgsql:with-pgsql-connection (pgconn) - (with-schema (unqualified-table-name table) - (with-disabled-triggers (unqualified-table-name - :disable-triggers disable-triggers) - (log-message :info "pgsql:copy-rows-from-queue[~a]: ~a ~a" - (lp:kernel-worker-index) - (format-table-name table) - columns) - - (loop - :for row := (lq:pop-queue queue) - :until (eq :end-of-data row) - :do - (progn - ;; if current-batch is full, send data to PostgreSQL - ;; and prepare a new batch - (when (batch-full-p current-batch) - (incf seconds (send-current-batch unqualified-table-name)) - (setf current-batch (make-batch)) - - ;; give a little help to our friend, now is a good time - ;; to garbage collect - #+sbcl (sb-ext:gc :full t)) - - ;; also add up the time it takes to format the rows - (let ((start-time (get-internal-real-time))) - (format-row-in-batch copy nbcols row current-batch) - (incf seconds (elapsed-time-since start-time))))) - - ;; the last batch might not be empty - (unless (= 0 (batch-count current-batch)) - (incf seconds (send-current-batch unqualified-table-name))))))) + ;; + ;; When on-error-stop is nil, we actually implement + ;; on-error-resume-next behavior, and for that we need to keep + ;; a batch of rows around in order to replay COPYing its + ;; content around, skipping rows that are rejected by + ;; PostgreSQL. + ;; + (incf seconds + (batch-rows-to-copy table columns copy nbcols queue)))))) ;; each writer thread sends its own stop timestamp and the monitor keeps ;; only the latest entry @@ -177,29 +65,20 @@ seconds) (list :writer table seconds))) +(defun prepare-copy-parameters (copy) + "add some COPY activity related bits to our COPY object." + (setf (transforms copy) + (let ((funs (transforms copy))) + (unless (every #'null funs) + funs)) -(defun format-row-in-batch (copy nbcols row current-batch) - "Given a row from the queue, prepare it for the next batch." - (let* ((row (if (preprocessor copy) - (funcall (preprocessor copy) row) - row)) - (transformed-row (cond ((eq :escaped (copy-format copy)) row) - ((null (transforms copy)) row) - (t - (apply-transforms copy - nbcols - row - (transforms copy)))))) - (multiple-value-bind (pg-vector-row bytes) - (if transformed-row - (ecase (copy-format copy) - (:raw (format-vector-row nbcols transformed-row)) - (:escaped (format-escaped-vector-row nbcols transformed-row))) - (values nil 0)) + ;; FIXME: we should change the API around preprocess-row, someday. + (preprocessor copy) + (pgloader.sources::preprocess-row copy) - ;; we might have to debug - (when pg-vector-row - (log-message :data "> ~s" (map 'string #'code-char pg-vector-row)) + ;; FIXME: we could change the API around data-is-preformatted-p, + ;; but that's a bigger change than duplicating the information in + ;; the object. + (copy-format copy) + (if (data-is-preformatted-p copy) :escaped :raw))) - ;; now add copy-data to current-batch - (push-row current-batch pg-vector-row bytes))))) diff --git a/src/pg-copy/copy-rows-in-batch.lisp b/src/pg-copy/copy-rows-in-batch.lisp new file mode 100644 index 0000000..ee28f33 --- /dev/null +++ b/src/pg-copy/copy-rows-in-batch.lisp @@ -0,0 +1,144 @@ +;;; +;;; The PostgreSQL COPY TO implementation, with batches and retries. +;;; +(in-package :pgloader.copy) + +(defun batch-rows-to-copy (table columns copy nbcols queue) + "Add rows that we pop from QUEUE into a batch, that we then COPY over to + PostgreSQL as soon as the batch is full. This allows sophisticated error + handling and recovery, where we can retry rows that are not rejected by + PostgreSQL." + (let ((seconds 0) + (current-batch (make-batch))) + (loop + :for row := (lq:pop-queue queue) + :until (eq :end-of-data row) + :do (multiple-value-bind (maybe-new-batch seconds-in-this-batch) + (add-row-to-current-batch table columns copy nbcols + current-batch row) + (setf current-batch maybe-new-batch) + (incf seconds seconds-in-this-batch))) + + ;; the last batch might not be empty + (unless (= 0 (batch-count current-batch)) + (incf seconds (send-batch table columns current-batch))) + + seconds)) + + +(defun add-row-to-current-batch (table columns copy nbcols batch row) + "Add another ROW we just received to CURRENT-BATCH, and prepare a new + batch if needed. The current-batch (possibly a new one) is returned." + (let ((seconds 0) + (current-batch batch)) + ;; if current-batch is full, send data to PostgreSQL + ;; and prepare a new batch + (when (batch-full-p current-batch) + (incf seconds (send-batch table columns current-batch)) + (setf current-batch (make-batch)) + + ;; give a little help to our friend, now is a good time + ;; to garbage collect + #+sbcl + (let ((garbage-collect-start (get-internal-real-time))) + (sb-ext:gc :full t) + (incf seconds (elapsed-time-since garbage-collect-start)))) + + ;; also add up the time it takes to format the rows + (let ((start-time (get-internal-real-time))) + (format-row-in-batch copy nbcols row current-batch) + (incf seconds (elapsed-time-since start-time))) + + (values current-batch seconds))) + + +(defun send-batch (table columns batch &key (db pomo:*database*)) + "Copy current *writer-batch* into TABLE-NAME." + ;; We need to keep a copy of the rows we send through the COPY + ;; protocol to PostgreSQL to be able to process them again in case + ;; of a data error being signaled, that's the BATCH here. + (let ((batch-start-time (get-internal-real-time)) + (table-name (format-table-name table)) + (pomo:*database* db)) + ;; We can't use with-pgsql-transaction here because of the specifics + ;; of error handling in case of cl-postgres:open-db-writer errors: the + ;; transaction is dead already when we get a signal, and the COMMIT or + ;; ABORT steps then trigger a protocol error on a #\Z message. + (handler-case + (progn + (pomo:execute "BEGIN") + (let* ((copier + (handler-case + (cl-postgres:open-db-writer db table-name columns) + (condition (c) + ;; failed to open the COPY protocol mode (e.g. missing + ;; columns on the target table), stop here, + ;; transaction is dead already (no ROLLBACK needed). + (error (make-condition 'copy-init-error + :table table + :columns columns + :condition c)))))) + (unwind-protect + (db-write-batch copier batch) + (cl-postgres:close-db-writer copier) + (pomo:execute "COMMIT")))) + + ;; If PostgreSQL signals a data error, process the batch by isolating + ;; erroneous data away and retrying the rest. + (postgresql-retryable (condition) + (pomo:execute "ROLLBACK") + + (log-message :error "PostgreSQL [~s] ~a" table-name condition) + ;; clean the current transaction before retrying new ones + (let ((errors + (retry-batch table columns batch condition))) + (log-message :debug "retry-batch found ~d errors" errors) + (update-stats :data table :rows (- errors)))) + + (postgresql-unavailable (condition) + + (log-message :error "[PostgreSQL ~s] ~a" table-name condition) + (log-message :error "Copy Batch reconnecting to PostgreSQL") + + ;; in order to avoid Socket error in "connect": ECONNREFUSED if we + ;; try just too soon, wait a little + (sleep 2) + + (cl-postgres:reopen-database db) + (send-batch table columns batch :db db)) + + (copy-init-error (condition) + ;; Couldn't init the COPY protocol, process the condition up the + ;; stack + (update-stats :data table :errs 1) + (error condition)) + + (condition (c) + ;; non retryable failures + (log-message :error "Non-retryable error ~a" c) + (pomo:execute "ROLLBACK"))) + + ;; now log about having send a batch, and update our stats with the + ;; time that took + (let ((seconds (elapsed-time-since batch-start-time))) + (log-message :debug + "send-batch[~a] ~a ~d row~:p [~a] in ~6$s~@[ [oversized]~]" + (lp:kernel-worker-index) + (format-table-name table) + (batch-count batch) + (pretty-print-bytes (batch-bytes batch)) + seconds + (batch-oversized-p batch)) + (update-stats :data table + :rows (batch-count batch) + :bytes (batch-bytes batch)) + + ;; and return batch-seconds + seconds))) + + +(defun format-row-in-batch (copy nbcols row current-batch) + "Given a row from the queue, prepare it for the next batch." + (multiple-value-bind (pg-vector-row bytes) + (prepare-and-format-row copy nbcols row) + (push-row current-batch pg-vector-row bytes))) diff --git a/src/pg-copy/copy-rows-in-stream.lisp b/src/pg-copy/copy-rows-in-stream.lisp new file mode 100644 index 0000000..f5f6a9b --- /dev/null +++ b/src/pg-copy/copy-rows-in-stream.lisp @@ -0,0 +1,73 @@ +;;; +;;; The PostgreSQL COPY TO implementation, with batches and retries. +;;; +(in-package :pgloader.copy) + +(defun stream-rows-to-copy (table columns copy nbcols queue + &optional (db pomo:*database*)) + "Directly stream rows that we pop from QUEUE into PostgreSQL database + connection DB." + (let ((rcount 0) + (bytes 0) + (seconds 0)) + (handler-case + (progn + (pomo:execute "BEGIN") + (let* ((table-name (format-table-name table)) + (copier + (handler-case + (cl-postgres:open-db-writer db table-name columns) + (condition (c) + ;; failed to open the COPY protocol mode (e.g. missing + ;; columns on the target table), stop here, + ;; transaction is dead already (no ROLLBACK needed). + (error (make-condition 'copy-init-error + :table table + :columns columns + :condition c)))))) + (unwind-protect + (loop + :for row := (lq:pop-queue queue) + :until (eq :end-of-data row) + :do (multiple-value-bind (row-bytes row-seconds) + (stream-row copier copy nbcols row) + (incf rcount) + (incf bytes row-bytes) + (incf seconds row-seconds))) + (cl-postgres:close-db-writer copier) + (pomo:execute "COMMIT")))) + + (postgresql-unavailable (condition) + ;; We got disconnected, maybe because PostgreSQL is being restarted, + ;; maybe for another reason, but in any case the transaction doesn't + ;; exists anymore, the connection doesn't exists anymore, there's no + ;; need to send anything, not even a ROLLBACK; in that case. + ;; + ;; Re-signal the condition as an error to be processed by the calling + ;; thread, where it's possible to also stop the reader. + (error condition)) + + (copy-init-error (condition) + (update-stats :data table :errs 1) + (error condition)) + + (condition (c) + ;; stop at any failure here, this function doesn't implement any kind + ;; of retry behaviour. + (log-message :error "~a" c) + (pomo:execute "ROLLBACK"))) + + ;; return seconds spent sending data to PostgreSQL + (update-stats :data table :rows rcount :bytes bytes) + seconds)) + +(defun stream-row (stream copy nbcols row) + "Send a single ROW down in the PostgreSQL COPY STREAM." + (let* ((start (get-internal-real-time)) + (row (prepare-row copy nbcols row))) + (when row + (let ((bytes + (ecase (copy-format copy) + (:raw (db-write-vector-row stream row nbcols)) + (:escaped (db-write-escaped-vector-row stream row nbcols))))) + (values bytes (elapsed-time-since start)))))) diff --git a/src/pg-copy/copy-write-batch.lisp b/src/pg-copy/copy-write-batch.lisp deleted file mode 100644 index 8cdf4da..0000000 --- a/src/pg-copy/copy-write-batch.lisp +++ /dev/null @@ -1,35 +0,0 @@ -;;; -;;; The PostgreSQL COPY TO implementation, with batches and retries. -;;; -;;; Here, sending the data in the COPY stream opened in copy-batch. -;;; -(in-package :pgloader.copy) - -;;; -;;; Stream prepared data from *writer-batch* down to PostgreSQL using the -;;; COPY protocol, and retry the batch avoiding known bad rows (from parsing -;;; COPY error messages) in case some data related conditions are signaled. -;;; -(defun db-write-batch (copier batch) - (loop :for count :below (batch-count batch) - :for data :across (batch-data batch) - :do (when data - (db-write-row copier data)) - :finally (return (batch-count batch)))) - -(defun db-write-row (copier data) - "Copy cl-postgres:db-write-row guts to avoid computing utf-8 bytes all - over again, as we reproduced the data formating in pgloader code. The - reason we do that is to be able to lower the cost of retrying batches: - the formating has then already been done." - (let* ((connection (cl-postgres::copier-database copier)) - (cl-postgres::socket (cl-postgres::connection-socket connection))) - (cl-postgres::with-reconnect-restart connection - (cl-postgres::using-connection connection - (cl-postgres::with-syncing - (cl-postgres::write-uint1 cl-postgres::socket 100) - (cl-postgres::write-uint4 cl-postgres::socket (+ 4 (length data))) - (loop :for byte :across data - :do (write-byte byte cl-postgres::socket)))))) - (incf (cl-postgres::copier-count copier))) - diff --git a/src/sources/common/methods.lisp b/src/sources/common/methods.lisp index 51a5470..c92ea3e 100644 --- a/src/sources/common/methods.lisp +++ b/src/sources/common/methods.lisp @@ -85,7 +85,11 @@ (incf task-count))) (lp:task-handler-bind - ((on-error-stop + ((pgloader.copy::copy-init-error + #'(lambda (condition) + ;; everything has been handled already + (lp:invoke-transfer-error condition))) + (on-error-stop #'(lambda (condition) ;; everything has been handled already (lp:invoke-transfer-error condition))) @@ -124,7 +128,7 @@ ;; each reader pretends to be alone, pass 1 as concurrency (submit-task channel #'queue-raw-data reader rawq 1) - (submit-task channel #'pgloader.pgsql::copy-rows-from-queue + (submit-task channel #'pgloader.copy::copy-rows-from-queue copy rawq :on-error-stop on-error-stop :disable-triggers disable-triggers)))