diff --git a/docs/pgloader.rst b/docs/pgloader.rst index fb51bdf..8f17301 100644 --- a/docs/pgloader.rst +++ b/docs/pgloader.rst @@ -253,6 +253,27 @@ containing the full PostgreSQL client side logs about the rejected data. The `.dat` file is formatted in PostgreSQL the text COPY format as documented in `http://www.postgresql.org/docs/9.2/static/sql-copy.html#AEN66609`. +It is possible to use the following WITH options to control pgloader batch +behavior: + + - *on error stop*, *on error resume next* + + This option controls if pgloader is using building batches of data at + all. The batch implementation allows pgloader to recover errors by + sending the data that PostgreSQL accepts again, and by keeping away the + data that PostgreSQL rejects. + + To enable retrying the data and loading the good parts, use the option + *on error resume next*, which is the default to file based data loads + (such as CSV, IXF or DBF). + + When migrating from another RDMBS technology, it's best to have a + reproducible loading process. In that case it's possible to use *on + error stop* and fix either the casting rules, the data transformation + functions or in cases the input data until your migration runs through + completion. That's why *on error resume next* is the default for SQLite, + MySQL and MS SQL source kinds. + A Note About Performance ------------------------ @@ -472,7 +493,7 @@ See each specific command for details. All data sources specific commands support the following options: - - *on error stop* + - *on error stop*, *on error resume next* - *batch rows = R* - *batch size = ... MB* - *prefetch rows = ...* diff --git a/pgloader.asd b/pgloader.asd index 3aec192..b0ae530 100644 --- a/pgloader.asd +++ b/pgloader.asd @@ -193,9 +193,11 @@ :components ((:file "copy-batch") (:file "copy-format") - (:file "copy-write-batch") - (:file "copy-from-queue") - (:file "copy-retry-batch"))) + (:file "copy-db-write") + (:file "copy-rows-in-stream") + (:file "copy-rows-in-batch") + (:file "copy-retry-batch") + (:file "copy-from-queue"))) (:module "parsers" :depends-on ("params" diff --git a/src/package.lisp b/src/package.lisp index 25efc22..aee98f5 100644 --- a/src/package.lisp +++ b/src/package.lisp @@ -518,6 +518,7 @@ #:database-error-context) (:import-from #:cl-postgres-trivial-utf-8 #:utf-8-byte-length + #:as-utf-8-bytes #:string-to-utf-8-bytes) (:export #:copy-rows-from-queue #:format-vector-row)) diff --git a/src/parsers/command-copy.lisp b/src/parsers/command-copy.lisp index bea3461..555d447 100644 --- a/src/parsers/command-copy.lisp +++ b/src/parsers/command-copy.lisp @@ -34,6 +34,7 @@ (:destructure (kw null) (declare (ignore kw)) (cons :null-as null))) (defrule copy-option (or option-on-error-stop + option-on-error-resume-next option-workers option-concurrency option-batch-rows @@ -135,8 +136,7 @@ :fields ',fields :columns ',columns ,@(remove-batch-control-option - options :extras '(:on-error-stop - :worker-count + options :extras '(:worker-count :concurrency :truncate :drop-indexes diff --git a/src/parsers/command-csv.lisp b/src/parsers/command-csv.lisp index c141f29..88819df 100644 --- a/src/parsers/command-csv.lisp +++ b/src/parsers/command-csv.lisp @@ -114,6 +114,7 @@ (cons :escape-mode escape-mode)))) (defrule csv-option (or option-on-error-stop + option-on-error-resume-next option-workers option-concurrency option-batch-rows @@ -447,8 +448,7 @@ :fields ',fields :columns ',columns ,@(remove-batch-control-option - options :extras '(:on-error-stop - :worker-count + options :extras '(:worker-count :concurrency :truncate :drop-indexes diff --git a/src/parsers/command-dbf.lisp b/src/parsers/command-dbf.lisp index 6d9a66c..0a6fc3c 100644 --- a/src/parsers/command-dbf.lisp +++ b/src/parsers/command-dbf.lisp @@ -19,6 +19,7 @@ (cons :table-name (text table-name))))) (defrule dbf-option (or option-on-error-stop + option-on-error-resume-next option-workers option-concurrency option-batch-rows @@ -98,6 +99,7 @@ (let* (,@(pgsql-connection-bindings pg-db-conn gucs) ,@(batch-control-bindings options) ,@(identifier-case-binding options) + (on-error-stop (getf ',options :on-error-stop)) (source-db (with-stats-collection ("fetch" :section :pre) (expand (fetch-file ,dbf-db-conn)))) (source @@ -111,6 +113,7 @@ (pgloader.sources:copy-database source ,@(remove-batch-control-option options) + :on-error-stop on-error-stop :create-indexes nil :foreign-keys nil :reset-sequences nil) diff --git a/src/parsers/command-fixed.lisp b/src/parsers/command-fixed.lisp index ae36da8..9d85b17 100644 --- a/src/parsers/command-fixed.lisp +++ b/src/parsers/command-fixed.lisp @@ -44,6 +44,7 @@ (bind (((_ field-defs _) source)) field-defs))) (defrule fixed-option (or option-on-error-stop + option-on-error-resume-next option-workers option-concurrency option-batch-rows diff --git a/src/parsers/command-ixf.lisp b/src/parsers/command-ixf.lisp index 4f9f2a9..24511ee 100644 --- a/src/parsers/command-ixf.lisp +++ b/src/parsers/command-ixf.lisp @@ -19,6 +19,7 @@ (bind (((_ tz) tzopt)) (cons :timezone tz)))) (defrule ixf-option (or option-on-error-stop + option-on-error-resume-next option-workers option-concurrency option-batch-rows @@ -83,6 +84,7 @@ ,@(batch-control-bindings options) ,@(identifier-case-binding options) (timezone (getf ',options :timezone)) + (on-error-stop(getf ',options :on-error-stop)) (source-db (with-stats-collection ("fetch" :section :pre) (expand (fetch-file ,ixf-db-conn)))) (source @@ -98,6 +100,7 @@ ,@(remove-batch-control-option options :extras '(:timezone)) + :on-error-stop on-error-stop :foreign-keys nil :reset-sequences nil) diff --git a/src/parsers/command-keywords.lisp b/src/parsers/command-keywords.lisp index 3f7a8e8..d670bb9 100644 --- a/src/parsers/command-keywords.lisp +++ b/src/parsers/command-keywords.lisp @@ -50,6 +50,8 @@ (def-keyword-rule "on") (def-keyword-rule "error") (def-keyword-rule "stop") + (def-keyword-rule "resume") + (def-keyword-rule "next") (def-keyword-rule "parameters") ;; option for loading from a file (def-keyword-rule "workers") diff --git a/src/parsers/command-mssql.lisp b/src/parsers/command-mssql.lisp index fa153b0..f53b92d 100644 --- a/src/parsers/command-mssql.lisp +++ b/src/parsers/command-mssql.lisp @@ -15,6 +15,7 @@ (make-option-rule create-schemas (and kw-create (? kw-no) kw-schemas)) (defrule mssql-option (or option-on-error-stop + option-on-error-resume-next option-workers option-concurrency option-batch-rows @@ -150,6 +151,7 @@ (let* ((*default-cast-rules* ',*mssql-default-cast-rules*) (*cast-rules* ',casts) (*mssql-settings* ',mssql-gucs) + (on-error-stop (getf ',options :on-error-stop t)) ,@(pgsql-connection-bindings pg-db-conn gucs) ,@(batch-control-bindings options) ,@(identifier-case-binding options) @@ -166,6 +168,7 @@ :alter-schema ',alter-schema :alter-table ',alter-table :set-table-oids t + :on-error-stop on-error-stop ,@(remove-batch-control-option options)) ,(sql-code-block pg-db-conn :post after "after load")))) diff --git a/src/parsers/command-mysql.lisp b/src/parsers/command-mysql.lisp index 240be3a..10274cb 100644 --- a/src/parsers/command-mysql.lisp +++ b/src/parsers/command-mysql.lisp @@ -8,6 +8,7 @@ ;;; MySQL options ;;; (defrule mysql-option (or option-on-error-stop + option-on-error-resume-next option-workers option-concurrency option-batch-rows @@ -154,6 +155,7 @@ (*cast-rules* ',casts) (*decoding-as* ',decoding-as) (*mysql-settings* ',mysql-gucs) + (on-error-stop (getf ',options :on-error-stop t)) ,@(pgsql-connection-bindings pg-db-conn gucs) ,@(batch-control-bindings options) ,@(identifier-case-binding options) @@ -171,6 +173,7 @@ :alter-table ',alter-table :alter-schema ',alter-schema :set-table-oids t + :on-error-stop on-error-stop ,@(remove-batch-control-option options)) ,(sql-code-block pg-db-conn :post after "after load")))) diff --git a/src/parsers/command-options.lisp b/src/parsers/command-options.lisp index b136088..b215521 100644 --- a/src/parsers/command-options.lisp +++ b/src/parsers/command-options.lisp @@ -105,6 +105,7 @@ :batch-size :prefetch-rows :rows-per-range + :on-error-stop :identifier-case)) extras) "Given a list of options, remove the generic ones that should already have @@ -160,6 +161,9 @@ (defrule option-on-error-stop (and kw-on kw-error kw-stop) (:constant (cons :on-error-stop t))) +(defrule option-on-error-resume-next (and kw-on kw-error kw-resume kw-next) + (:constant (cons :on-error-stop nil))) + (defrule option-identifiers-case (and (or kw-snake_case kw-downcase kw-quote) diff --git a/src/parsers/command-sqlite.lisp b/src/parsers/command-sqlite.lisp index 0c9f80b..d238bff 100644 --- a/src/parsers/command-sqlite.lisp +++ b/src/parsers/command-sqlite.lisp @@ -14,6 +14,7 @@ load database set work_mem to '16MB', maintenance_work_mem to '512 MB'; |# (defrule sqlite-option (or option-on-error-stop + option-on-error-resume-next option-workers option-concurrency option-batch-rows @@ -99,6 +100,7 @@ load database `(lambda () (let* ((*default-cast-rules* ',*sqlite-default-cast-rules*) (*cast-rules* ',casts) + (on-error-stop (getf ',options :on-error-stop t)) ,@(pgsql-connection-bindings pg-db-conn gucs) ,@(batch-control-bindings options) ,@(identifier-case-binding options) @@ -117,6 +119,7 @@ load database :set-table-oids t :including ',incl :excluding ',excl + :on-error-stop on-error-stop ,@(remove-batch-control-option options)) ,(sql-code-block pg-db-conn :post after "after load")))) diff --git a/src/pg-copy/copy-db-write.lisp b/src/pg-copy/copy-db-write.lisp new file mode 100644 index 0000000..0b09f28 --- /dev/null +++ b/src/pg-copy/copy-db-write.lisp @@ -0,0 +1,134 @@ +;;; +;;; The PostgreSQL COPY TO implementation, with batches and retries. +;;; +;;; Here, sending the data in the COPY stream opened in copy-batch. +;;; +(in-package :pgloader.copy) + +(define-condition copy-init-error (error) + ((table :initarg :table :reader copy-init-error-table) + (columns :initarg :columns :reader copy-init-error-columns) + (condition :initarg :condition :reader copy-init-error-condition)) + (:report (lambda (err stream) + (format stream + "Can't init COPY to ~a~@[(~{~a~^, ~})~]: ~%~a" + (format-table-name (copy-init-error-table err)) + (copy-init-error-columns err) + (copy-init-error-condition err))))) + +;;; +;;; Stream prepared data from *writer-batch* down to PostgreSQL using the +;;; COPY protocol, and retry the batch avoiding known bad rows (from parsing +;;; COPY error messages) in case some data related conditions are signaled. +;;; +(defun db-write-batch (copier batch) + (loop :for count :below (batch-count batch) + :for data :across (batch-data batch) + :do (when data + (db-write-row copier data)) + :finally (return (batch-count batch)))) + +(defun db-write-row (copier data) + "Copy cl-postgres:db-write-row guts to avoid computing utf-8 bytes all + over again, as we reproduced the data formating in pgloader code. The + reason we do that is to be able to lower the cost of retrying batches: + the formating has then already been done." + (let* ((connection (cl-postgres::copier-database copier)) + (cl-postgres::socket (cl-postgres::connection-socket connection))) + (cl-postgres::with-reconnect-restart connection + (cl-postgres::using-connection connection + (cl-postgres::with-syncing + (cl-postgres::write-uint1 cl-postgres::socket 100) + (cl-postgres::write-uint4 cl-postgres::socket (+ 4 (length data))) + (loop :for byte :across data + :do (write-byte byte cl-postgres::socket)))))) + (incf (cl-postgres::copier-count copier))) + + + +;;; +;;; Functions to stream a vector of rows as strings directly into the COPY +;;; socket stream, without using an intermediate buffer. +;;; +(defun db-write-vector-row (copier row &optional (nbcols (length row))) + "Copy cl-postgres:db-write-row guts to avoid computing utf-8 bytes all + over again, as we reproduced the data formating in pgloader code. The + reason we do that is to be able to lower the cost of retrying batches: + the formating has then already been done." + (declare (optimize + (speed 3) + #-ecl(safety 0) #+ecl(safety 1) + (space 0) + (debug 1) + (compilation-speed 0))) + (let* ((col-bytes (map 'vector + (lambda (col) + (if (col-null-p col) 2 + (copy-utf-8-byte-length col))) + row)) + (row-bytes (+ nbcols (reduce #'+ col-bytes))) + (connection (cl-postgres::copier-database copier)) + (cl-postgres::socket (cl-postgres::connection-socket connection))) + (cl-postgres::with-reconnect-restart connection + (cl-postgres::using-connection connection + (cl-postgres::with-syncing + (cl-postgres::write-uint1 cl-postgres::socket 100) + (cl-postgres::write-uint4 cl-postgres::socket (+ 4 row-bytes)) + (macrolet ((send-byte (byte) + `(write-byte ,byte cl-postgres::socket))) + (loop :for col :across row + :for i fixnum :from 1 + :do (if (col-null-p col) + (progn + (send-byte #. (char-code #\\)) + (send-byte #. (char-code #\N))) + + (loop :for char :across col + :do (as-copy-utf-8-bytes char send-byte))) + :do (if (< i nbcols) + (send-byte #. (char-code #\Tab)) + (send-byte #. (char-code #\Newline)))))))) + (incf (cl-postgres::copier-count copier)) + row-bytes)) + + +(defun db-write-escaped-vector-row (copier row &optional (nbcols (length row))) + "Copy cl-postgres:db-write-row guts to avoid computing utf-8 bytes all + over again, as we reproduced the data formating in pgloader code. The + reason we do that is to be able to lower the cost of retrying batches: + the formating has then already been done." + (declare (optimize + (speed 3) + #-ecl(safety 0) #+ecl(safety 1) + (space 0) + (debug 1) + (compilation-speed 0))) + (let* ((col-bytes (map 'vector + (lambda (col) + (if (col-null-p col) 2 + (utf-8-byte-length col))) + row)) + (row-bytes (+ nbcols (reduce #'+ col-bytes))) + (connection (cl-postgres::copier-database copier)) + (cl-postgres::socket (cl-postgres::connection-socket connection))) + (cl-postgres::with-reconnect-restart connection + (cl-postgres::using-connection connection + (cl-postgres::with-syncing + (cl-postgres::write-uint1 cl-postgres::socket 100) + (cl-postgres::write-uint4 cl-postgres::socket (+ 4 row-bytes)) + (macrolet ((send-byte (byte) + `(write-byte ,byte cl-postgres::socket))) + (loop :for col :across row + :for i fixnum :from 1 + :do (if (col-null-p col) + (progn + (send-byte #. (char-code #\\)) + (send-byte #. (char-code #\N))) + + (loop :for char :across col + :do (as-utf-8-bytes char send-byte))) + :do (if (< i nbcols) + (send-byte #. (char-code #\Tab)) + (send-byte #. (char-code #\Newline)))))))) + (incf (cl-postgres::copier-count copier)) + row-bytes)) diff --git a/src/pg-copy/copy-format.lisp b/src/pg-copy/copy-format.lisp index 5f8ca6a..de441c0 100644 --- a/src/pg-copy/copy-format.lisp +++ b/src/pg-copy/copy-format.lisp @@ -13,6 +13,33 @@ ;;; call here. ;;; +(defun prepare-and-format-row (copy nbcols row) + "Prepare given ROW in PostgreSQL COPY format" + (let* ((row (prepare-row copy nbcols row))) + (multiple-value-bind (pg-vector-row bytes) + (if row + (ecase (copy-format copy) + (:raw (format-vector-row nbcols row)) + (:escaped (format-escaped-vector-row nbcols row))) + (values nil 0)) + + ;; we might have to debug + (when pg-vector-row + (log-message :data "> ~s" (map 'string #'code-char pg-vector-row)) + + (values pg-vector-row bytes))))) + +(defun prepare-row (copy nbcols row) + "Prepare given ROW by applying the pre-processing and transformation + functions registered in the COPY context." + (let* ((preprocessed-row (if (preprocessor copy) + (funcall (preprocessor copy) row) + row))) + (cond ((eq :escaped (copy-format copy)) preprocessed-row) + ((null (transforms copy)) preprocessed-row) + (t + (apply-transforms copy nbcols preprocessed-row (transforms copy)))))) + (defun format-vector-row (nb-cols row) (declare (optimize (speed 3) @@ -27,9 +54,9 @@ (len (+ nb-cols (reduce #'+ lens))) (buf (make-array (the fixnum len) :element-type '(unsigned-byte 8)))) (loop :for col :across row - :for i :from 1 - :for position := 0 :then (+ position col-len 1) - :for col-len :across lens + :for i fixnum :from 1 + :for position fixnum := 0 :then (+ position col-len 1) + :for col-len fixnum :across lens :do (if (col-null-p col) (insert-copy-null buf position) (string-to-copy-utf-8-bytes col buf position)) diff --git a/src/pg-copy/copy-from-queue.lisp b/src/pg-copy/copy-from-queue.lisp index 499fc31..f586c13 100644 --- a/src/pg-copy/copy-from-queue.lisp +++ b/src/pg-copy/copy-from-queue.lisp @@ -3,76 +3,6 @@ ;;; (in-package :pgloader.copy) -(defun send-batch (table columns batch - &key - (db pomo:*database*) - on-error-stop) - "Copy current *writer-batch* into TABLE-NAME." - ;; We need to keep a copy of the rows we send through the COPY - ;; protocol to PostgreSQL to be able to process them again in case - ;; of a data error being signaled, that's the BATCH here. - (let ((table-name (format-table-name table)) - (pomo:*database* db)) - ;; We can't use with-pgsql-transaction here because of the specifics - ;; of error handling in case of cl-postgres:open-db-writer errors: the - ;; transaction is dead already when we get a signal, and the COMMIT or - ;; ABORT steps then trigger a protocol error on a #\Z message. - (handler-case - (progn - (pomo:execute "BEGIN") - (let* ((copier - (handler-case - (cl-postgres:open-db-writer db table-name columns) - (condition (c) - ;; failed to open the COPY protocol mode (e.g. missing - ;; columns on the target table), stop here, - ;; transaction is dead already (no ROLLBACK needed). - (log-message :fatal - "Can't init COPY to ~a~@[(~{~a~^, ~})~]: ~%~a" - (format-table-name table) - columns - c) - (update-stats :data table :errs 1) - (return-from send-batch 0))))) - (unwind-protect - (db-write-batch copier batch) - (cl-postgres:close-db-writer copier) - (pomo:execute "COMMIT")))) - - ;; If PostgreSQL signals a data error, process the batch by isolating - ;; erroneous data away and retrying the rest. - (postgresql-retryable (condition) - (pomo:execute "ROLLBACK") - - (log-message :error "PostgreSQL [~s] ~a" table-name condition) - (if on-error-stop - ;; re-signal the condition to upper level - (signal 'on-error-stop :on-condition condition) - - ;; normal behavior, on-error-stop being nil - ;; clean the current transaction before retrying new ones - (let ((errors - (retry-batch table columns batch condition))) - (log-message :debug "retry-batch found ~d errors" errors) - (update-stats :data table :rows (- errors))))) - - (postgresql-unavailable (condition) - - (log-message :error "[PostgreSQL ~s] ~a" table-name condition) - (log-message :error "Copy Batch reconnecting to PostgreSQL") - - ;; in order to avoid Socket error in "connect": ECONNREFUSED if we - ;; try just too soon, wait a little - (sleep 2) - - (cl-postgres:reopen-database db) - (send-batch table columns batch :db db :on-error-stop on-error-stop)) - - (condition (c) - ;; non retryable failures - (log-message :error "Non-retryable error ~a" c) - (pomo:execute "ROLLBACK"))))) - ;;; ;;; We receive raw input rows from an lparallel queue, push their content ;;; down to PostgreSQL, handling any data related errors in the way. @@ -91,82 +21,40 @@ PostgreSQL, and when that's done update stats." (let* ((nbcols (length (table-column-list (pgloader.sources::target copy)))) - (current-batch (make-batch)) (seconds 0)) - ;; add some COPY activity related bits to our COPY object. - (setf (transforms copy) - (let ((funs (transforms copy))) - (unless (every #'null funs) - funs)) + ;; we need to compute some information and have them at the right place + ;; FIXME: review the API here, that's an half-baked refactoring. + (prepare-copy-parameters copy) - ;; FIXME: we should change the API around preprocess-row, someday. - (preprocessor copy) - (pgloader.sources::preprocess-row copy) + (log-message :info "COPY ON ERROR ~:[RESUME NEXT~;STOP~]" on-error-stop) - ;; FIXME: we could change the API around data-is-preformatted-p, - ;; but that's a bigger change than duplicating the information in - ;; the object. - (copy-format copy) - (if (data-is-preformatted-p copy) :escaped :raw)) + (pgloader.pgsql:with-pgsql-connection (pgconn) + (with-schema (unqualified-table-name table) + (with-disabled-triggers (unqualified-table-name + :disable-triggers disable-triggers) + (log-message :info "pgsql:copy-rows-from-queue[~a]: ~a ~a" + (lp:kernel-worker-index) + (format-table-name table) + columns) - (flet ((send-current-batch (unqualified-table-name) - ;; we close over the whole lexical environment or almost... - (let ((batch-start-time (get-internal-real-time))) - (send-batch table - columns - current-batch - :on-error-stop on-error-stop) + (if on-error-stop + ;; + ;; When on-error-stop is true, we don't need to handle batch + ;; processing, we can stop as soon as there's a failure. + ;; + (incf seconds + (stream-rows-to-copy table columns copy nbcols queue)) - (let ((batch-seconds (elapsed-time-since batch-start-time))) - (log-message :debug - "send-batch[~a] ~a ~d row~:p [~a] in ~6$s~@[ [oversized]~]" - (lp:kernel-worker-index) - unqualified-table-name - (batch-count current-batch) - (pretty-print-bytes (batch-bytes current-batch)) - batch-seconds - (batch-oversized-p current-batch)) - (update-stats :data table - :rows (batch-count current-batch) - :bytes (batch-bytes current-batch)) - - ;; return batch-seconds - batch-seconds)))) - (declare (inline send-current-batch)) - - (pgloader.pgsql:with-pgsql-connection (pgconn) - (with-schema (unqualified-table-name table) - (with-disabled-triggers (unqualified-table-name - :disable-triggers disable-triggers) - (log-message :info "pgsql:copy-rows-from-queue[~a]: ~a ~a" - (lp:kernel-worker-index) - (format-table-name table) - columns) - - (loop - :for row := (lq:pop-queue queue) - :until (eq :end-of-data row) - :do - (progn - ;; if current-batch is full, send data to PostgreSQL - ;; and prepare a new batch - (when (batch-full-p current-batch) - (incf seconds (send-current-batch unqualified-table-name)) - (setf current-batch (make-batch)) - - ;; give a little help to our friend, now is a good time - ;; to garbage collect - #+sbcl (sb-ext:gc :full t)) - - ;; also add up the time it takes to format the rows - (let ((start-time (get-internal-real-time))) - (format-row-in-batch copy nbcols row current-batch) - (incf seconds (elapsed-time-since start-time))))) - - ;; the last batch might not be empty - (unless (= 0 (batch-count current-batch)) - (incf seconds (send-current-batch unqualified-table-name))))))) + ;; + ;; When on-error-stop is nil, we actually implement + ;; on-error-resume-next behavior, and for that we need to keep + ;; a batch of rows around in order to replay COPYing its + ;; content around, skipping rows that are rejected by + ;; PostgreSQL. + ;; + (incf seconds + (batch-rows-to-copy table columns copy nbcols queue)))))) ;; each writer thread sends its own stop timestamp and the monitor keeps ;; only the latest entry @@ -177,29 +65,20 @@ seconds) (list :writer table seconds))) +(defun prepare-copy-parameters (copy) + "add some COPY activity related bits to our COPY object." + (setf (transforms copy) + (let ((funs (transforms copy))) + (unless (every #'null funs) + funs)) -(defun format-row-in-batch (copy nbcols row current-batch) - "Given a row from the queue, prepare it for the next batch." - (let* ((row (if (preprocessor copy) - (funcall (preprocessor copy) row) - row)) - (transformed-row (cond ((eq :escaped (copy-format copy)) row) - ((null (transforms copy)) row) - (t - (apply-transforms copy - nbcols - row - (transforms copy)))))) - (multiple-value-bind (pg-vector-row bytes) - (if transformed-row - (ecase (copy-format copy) - (:raw (format-vector-row nbcols transformed-row)) - (:escaped (format-escaped-vector-row nbcols transformed-row))) - (values nil 0)) + ;; FIXME: we should change the API around preprocess-row, someday. + (preprocessor copy) + (pgloader.sources::preprocess-row copy) - ;; we might have to debug - (when pg-vector-row - (log-message :data "> ~s" (map 'string #'code-char pg-vector-row)) + ;; FIXME: we could change the API around data-is-preformatted-p, + ;; but that's a bigger change than duplicating the information in + ;; the object. + (copy-format copy) + (if (data-is-preformatted-p copy) :escaped :raw))) - ;; now add copy-data to current-batch - (push-row current-batch pg-vector-row bytes))))) diff --git a/src/pg-copy/copy-rows-in-batch.lisp b/src/pg-copy/copy-rows-in-batch.lisp new file mode 100644 index 0000000..ee28f33 --- /dev/null +++ b/src/pg-copy/copy-rows-in-batch.lisp @@ -0,0 +1,144 @@ +;;; +;;; The PostgreSQL COPY TO implementation, with batches and retries. +;;; +(in-package :pgloader.copy) + +(defun batch-rows-to-copy (table columns copy nbcols queue) + "Add rows that we pop from QUEUE into a batch, that we then COPY over to + PostgreSQL as soon as the batch is full. This allows sophisticated error + handling and recovery, where we can retry rows that are not rejected by + PostgreSQL." + (let ((seconds 0) + (current-batch (make-batch))) + (loop + :for row := (lq:pop-queue queue) + :until (eq :end-of-data row) + :do (multiple-value-bind (maybe-new-batch seconds-in-this-batch) + (add-row-to-current-batch table columns copy nbcols + current-batch row) + (setf current-batch maybe-new-batch) + (incf seconds seconds-in-this-batch))) + + ;; the last batch might not be empty + (unless (= 0 (batch-count current-batch)) + (incf seconds (send-batch table columns current-batch))) + + seconds)) + + +(defun add-row-to-current-batch (table columns copy nbcols batch row) + "Add another ROW we just received to CURRENT-BATCH, and prepare a new + batch if needed. The current-batch (possibly a new one) is returned." + (let ((seconds 0) + (current-batch batch)) + ;; if current-batch is full, send data to PostgreSQL + ;; and prepare a new batch + (when (batch-full-p current-batch) + (incf seconds (send-batch table columns current-batch)) + (setf current-batch (make-batch)) + + ;; give a little help to our friend, now is a good time + ;; to garbage collect + #+sbcl + (let ((garbage-collect-start (get-internal-real-time))) + (sb-ext:gc :full t) + (incf seconds (elapsed-time-since garbage-collect-start)))) + + ;; also add up the time it takes to format the rows + (let ((start-time (get-internal-real-time))) + (format-row-in-batch copy nbcols row current-batch) + (incf seconds (elapsed-time-since start-time))) + + (values current-batch seconds))) + + +(defun send-batch (table columns batch &key (db pomo:*database*)) + "Copy current *writer-batch* into TABLE-NAME." + ;; We need to keep a copy of the rows we send through the COPY + ;; protocol to PostgreSQL to be able to process them again in case + ;; of a data error being signaled, that's the BATCH here. + (let ((batch-start-time (get-internal-real-time)) + (table-name (format-table-name table)) + (pomo:*database* db)) + ;; We can't use with-pgsql-transaction here because of the specifics + ;; of error handling in case of cl-postgres:open-db-writer errors: the + ;; transaction is dead already when we get a signal, and the COMMIT or + ;; ABORT steps then trigger a protocol error on a #\Z message. + (handler-case + (progn + (pomo:execute "BEGIN") + (let* ((copier + (handler-case + (cl-postgres:open-db-writer db table-name columns) + (condition (c) + ;; failed to open the COPY protocol mode (e.g. missing + ;; columns on the target table), stop here, + ;; transaction is dead already (no ROLLBACK needed). + (error (make-condition 'copy-init-error + :table table + :columns columns + :condition c)))))) + (unwind-protect + (db-write-batch copier batch) + (cl-postgres:close-db-writer copier) + (pomo:execute "COMMIT")))) + + ;; If PostgreSQL signals a data error, process the batch by isolating + ;; erroneous data away and retrying the rest. + (postgresql-retryable (condition) + (pomo:execute "ROLLBACK") + + (log-message :error "PostgreSQL [~s] ~a" table-name condition) + ;; clean the current transaction before retrying new ones + (let ((errors + (retry-batch table columns batch condition))) + (log-message :debug "retry-batch found ~d errors" errors) + (update-stats :data table :rows (- errors)))) + + (postgresql-unavailable (condition) + + (log-message :error "[PostgreSQL ~s] ~a" table-name condition) + (log-message :error "Copy Batch reconnecting to PostgreSQL") + + ;; in order to avoid Socket error in "connect": ECONNREFUSED if we + ;; try just too soon, wait a little + (sleep 2) + + (cl-postgres:reopen-database db) + (send-batch table columns batch :db db)) + + (copy-init-error (condition) + ;; Couldn't init the COPY protocol, process the condition up the + ;; stack + (update-stats :data table :errs 1) + (error condition)) + + (condition (c) + ;; non retryable failures + (log-message :error "Non-retryable error ~a" c) + (pomo:execute "ROLLBACK"))) + + ;; now log about having send a batch, and update our stats with the + ;; time that took + (let ((seconds (elapsed-time-since batch-start-time))) + (log-message :debug + "send-batch[~a] ~a ~d row~:p [~a] in ~6$s~@[ [oversized]~]" + (lp:kernel-worker-index) + (format-table-name table) + (batch-count batch) + (pretty-print-bytes (batch-bytes batch)) + seconds + (batch-oversized-p batch)) + (update-stats :data table + :rows (batch-count batch) + :bytes (batch-bytes batch)) + + ;; and return batch-seconds + seconds))) + + +(defun format-row-in-batch (copy nbcols row current-batch) + "Given a row from the queue, prepare it for the next batch." + (multiple-value-bind (pg-vector-row bytes) + (prepare-and-format-row copy nbcols row) + (push-row current-batch pg-vector-row bytes))) diff --git a/src/pg-copy/copy-rows-in-stream.lisp b/src/pg-copy/copy-rows-in-stream.lisp new file mode 100644 index 0000000..f5f6a9b --- /dev/null +++ b/src/pg-copy/copy-rows-in-stream.lisp @@ -0,0 +1,73 @@ +;;; +;;; The PostgreSQL COPY TO implementation, with batches and retries. +;;; +(in-package :pgloader.copy) + +(defun stream-rows-to-copy (table columns copy nbcols queue + &optional (db pomo:*database*)) + "Directly stream rows that we pop from QUEUE into PostgreSQL database + connection DB." + (let ((rcount 0) + (bytes 0) + (seconds 0)) + (handler-case + (progn + (pomo:execute "BEGIN") + (let* ((table-name (format-table-name table)) + (copier + (handler-case + (cl-postgres:open-db-writer db table-name columns) + (condition (c) + ;; failed to open the COPY protocol mode (e.g. missing + ;; columns on the target table), stop here, + ;; transaction is dead already (no ROLLBACK needed). + (error (make-condition 'copy-init-error + :table table + :columns columns + :condition c)))))) + (unwind-protect + (loop + :for row := (lq:pop-queue queue) + :until (eq :end-of-data row) + :do (multiple-value-bind (row-bytes row-seconds) + (stream-row copier copy nbcols row) + (incf rcount) + (incf bytes row-bytes) + (incf seconds row-seconds))) + (cl-postgres:close-db-writer copier) + (pomo:execute "COMMIT")))) + + (postgresql-unavailable (condition) + ;; We got disconnected, maybe because PostgreSQL is being restarted, + ;; maybe for another reason, but in any case the transaction doesn't + ;; exists anymore, the connection doesn't exists anymore, there's no + ;; need to send anything, not even a ROLLBACK; in that case. + ;; + ;; Re-signal the condition as an error to be processed by the calling + ;; thread, where it's possible to also stop the reader. + (error condition)) + + (copy-init-error (condition) + (update-stats :data table :errs 1) + (error condition)) + + (condition (c) + ;; stop at any failure here, this function doesn't implement any kind + ;; of retry behaviour. + (log-message :error "~a" c) + (pomo:execute "ROLLBACK"))) + + ;; return seconds spent sending data to PostgreSQL + (update-stats :data table :rows rcount :bytes bytes) + seconds)) + +(defun stream-row (stream copy nbcols row) + "Send a single ROW down in the PostgreSQL COPY STREAM." + (let* ((start (get-internal-real-time)) + (row (prepare-row copy nbcols row))) + (when row + (let ((bytes + (ecase (copy-format copy) + (:raw (db-write-vector-row stream row nbcols)) + (:escaped (db-write-escaped-vector-row stream row nbcols))))) + (values bytes (elapsed-time-since start)))))) diff --git a/src/pg-copy/copy-write-batch.lisp b/src/pg-copy/copy-write-batch.lisp deleted file mode 100644 index 8cdf4da..0000000 --- a/src/pg-copy/copy-write-batch.lisp +++ /dev/null @@ -1,35 +0,0 @@ -;;; -;;; The PostgreSQL COPY TO implementation, with batches and retries. -;;; -;;; Here, sending the data in the COPY stream opened in copy-batch. -;;; -(in-package :pgloader.copy) - -;;; -;;; Stream prepared data from *writer-batch* down to PostgreSQL using the -;;; COPY protocol, and retry the batch avoiding known bad rows (from parsing -;;; COPY error messages) in case some data related conditions are signaled. -;;; -(defun db-write-batch (copier batch) - (loop :for count :below (batch-count batch) - :for data :across (batch-data batch) - :do (when data - (db-write-row copier data)) - :finally (return (batch-count batch)))) - -(defun db-write-row (copier data) - "Copy cl-postgres:db-write-row guts to avoid computing utf-8 bytes all - over again, as we reproduced the data formating in pgloader code. The - reason we do that is to be able to lower the cost of retrying batches: - the formating has then already been done." - (let* ((connection (cl-postgres::copier-database copier)) - (cl-postgres::socket (cl-postgres::connection-socket connection))) - (cl-postgres::with-reconnect-restart connection - (cl-postgres::using-connection connection - (cl-postgres::with-syncing - (cl-postgres::write-uint1 cl-postgres::socket 100) - (cl-postgres::write-uint4 cl-postgres::socket (+ 4 (length data))) - (loop :for byte :across data - :do (write-byte byte cl-postgres::socket)))))) - (incf (cl-postgres::copier-count copier))) - diff --git a/src/sources/common/methods.lisp b/src/sources/common/methods.lisp index 51a5470..c92ea3e 100644 --- a/src/sources/common/methods.lisp +++ b/src/sources/common/methods.lisp @@ -85,7 +85,11 @@ (incf task-count))) (lp:task-handler-bind - ((on-error-stop + ((pgloader.copy::copy-init-error + #'(lambda (condition) + ;; everything has been handled already + (lp:invoke-transfer-error condition))) + (on-error-stop #'(lambda (condition) ;; everything has been handled already (lp:invoke-transfer-error condition))) @@ -124,7 +128,7 @@ ;; each reader pretends to be alone, pass 1 as concurrency (submit-task channel #'queue-raw-data reader rawq 1) - (submit-task channel #'pgloader.pgsql::copy-rows-from-queue + (submit-task channel #'pgloader.copy::copy-rows-from-queue copy rawq :on-error-stop on-error-stop :disable-triggers disable-triggers)))