Review the pgloader COPY implementation further.

Refactor file organisation further to allow for adding a “direct stream”
option when the on-error-stop behavior has been selected. This happens
currently by default for databases sources.

Introduce the new WITH option “on error resume next” which forces the
classic behavior of pgloader. The option “on error stop” already existed,
its implementation is new.

When this new behavior is activated, the data is sent to PostgreSQL
directly, without intermediate batches being built. It means that the whole
operation fails at the first error, and we don't have any information in
memory to try replaying any COPY of the data. It's gone.

This behavior should be fine for database migrations as you don't usually
want to fix the data manually in intermediate files, you want to fix the
problem at the source database and do the whole dance all-over again, up
until your casting rules are perfect.

This patch might also incurr some performance benenits in terms of both
timing and memory usage, though the local testing didn't show much of
anything for the moment.
This commit is contained in:
Dimitri Fontaine 2018-01-24 21:48:19 +01:00
parent 8ee799070a
commit f86371970f
20 changed files with 483 additions and 211 deletions

View File

@ -253,6 +253,27 @@ containing the full PostgreSQL client side logs about the rejected data.
The `.dat` file is formatted in PostgreSQL the text COPY format as documented
in `http://www.postgresql.org/docs/9.2/static/sql-copy.html#AEN66609`.
It is possible to use the following WITH options to control pgloader batch
behavior:
- *on error stop*, *on error resume next*
This option controls if pgloader is using building batches of data at
all. The batch implementation allows pgloader to recover errors by
sending the data that PostgreSQL accepts again, and by keeping away the
data that PostgreSQL rejects.
To enable retrying the data and loading the good parts, use the option
*on error resume next*, which is the default to file based data loads
(such as CSV, IXF or DBF).
When migrating from another RDMBS technology, it's best to have a
reproducible loading process. In that case it's possible to use *on
error stop* and fix either the casting rules, the data transformation
functions or in cases the input data until your migration runs through
completion. That's why *on error resume next* is the default for SQLite,
MySQL and MS SQL source kinds.
A Note About Performance
------------------------
@ -472,7 +493,7 @@ See each specific command for details.
All data sources specific commands support the following options:
- *on error stop*
- *on error stop*, *on error resume next*
- *batch rows = R*
- *batch size = ... MB*
- *prefetch rows = ...*

View File

@ -193,9 +193,11 @@
:components
((:file "copy-batch")
(:file "copy-format")
(:file "copy-write-batch")
(:file "copy-from-queue")
(:file "copy-retry-batch")))
(:file "copy-db-write")
(:file "copy-rows-in-stream")
(:file "copy-rows-in-batch")
(:file "copy-retry-batch")
(:file "copy-from-queue")))
(:module "parsers"
:depends-on ("params"

View File

@ -518,6 +518,7 @@
#:database-error-context)
(:import-from #:cl-postgres-trivial-utf-8
#:utf-8-byte-length
#:as-utf-8-bytes
#:string-to-utf-8-bytes)
(:export #:copy-rows-from-queue
#:format-vector-row))

View File

@ -34,6 +34,7 @@
(:destructure (kw null) (declare (ignore kw)) (cons :null-as null)))
(defrule copy-option (or option-on-error-stop
option-on-error-resume-next
option-workers
option-concurrency
option-batch-rows
@ -135,8 +136,7 @@
:fields ',fields
:columns ',columns
,@(remove-batch-control-option
options :extras '(:on-error-stop
:worker-count
options :extras '(:worker-count
:concurrency
:truncate
:drop-indexes

View File

@ -114,6 +114,7 @@
(cons :escape-mode escape-mode))))
(defrule csv-option (or option-on-error-stop
option-on-error-resume-next
option-workers
option-concurrency
option-batch-rows
@ -447,8 +448,7 @@
:fields ',fields
:columns ',columns
,@(remove-batch-control-option
options :extras '(:on-error-stop
:worker-count
options :extras '(:worker-count
:concurrency
:truncate
:drop-indexes

View File

@ -19,6 +19,7 @@
(cons :table-name (text table-name)))))
(defrule dbf-option (or option-on-error-stop
option-on-error-resume-next
option-workers
option-concurrency
option-batch-rows
@ -98,6 +99,7 @@
(let* (,@(pgsql-connection-bindings pg-db-conn gucs)
,@(batch-control-bindings options)
,@(identifier-case-binding options)
(on-error-stop (getf ',options :on-error-stop))
(source-db (with-stats-collection ("fetch" :section :pre)
(expand (fetch-file ,dbf-db-conn))))
(source
@ -111,6 +113,7 @@
(pgloader.sources:copy-database source
,@(remove-batch-control-option options)
:on-error-stop on-error-stop
:create-indexes nil
:foreign-keys nil
:reset-sequences nil)

View File

@ -44,6 +44,7 @@
(bind (((_ field-defs _) source)) field-defs)))
(defrule fixed-option (or option-on-error-stop
option-on-error-resume-next
option-workers
option-concurrency
option-batch-rows

View File

@ -19,6 +19,7 @@
(bind (((_ tz) tzopt)) (cons :timezone tz))))
(defrule ixf-option (or option-on-error-stop
option-on-error-resume-next
option-workers
option-concurrency
option-batch-rows
@ -83,6 +84,7 @@
,@(batch-control-bindings options)
,@(identifier-case-binding options)
(timezone (getf ',options :timezone))
(on-error-stop(getf ',options :on-error-stop))
(source-db (with-stats-collection ("fetch" :section :pre)
(expand (fetch-file ,ixf-db-conn))))
(source
@ -98,6 +100,7 @@
,@(remove-batch-control-option
options
:extras '(:timezone))
:on-error-stop on-error-stop
:foreign-keys nil
:reset-sequences nil)

View File

@ -50,6 +50,8 @@
(def-keyword-rule "on")
(def-keyword-rule "error")
(def-keyword-rule "stop")
(def-keyword-rule "resume")
(def-keyword-rule "next")
(def-keyword-rule "parameters")
;; option for loading from a file
(def-keyword-rule "workers")

View File

@ -15,6 +15,7 @@
(make-option-rule create-schemas (and kw-create (? kw-no) kw-schemas))
(defrule mssql-option (or option-on-error-stop
option-on-error-resume-next
option-workers
option-concurrency
option-batch-rows
@ -150,6 +151,7 @@
(let* ((*default-cast-rules* ',*mssql-default-cast-rules*)
(*cast-rules* ',casts)
(*mssql-settings* ',mssql-gucs)
(on-error-stop (getf ',options :on-error-stop t))
,@(pgsql-connection-bindings pg-db-conn gucs)
,@(batch-control-bindings options)
,@(identifier-case-binding options)
@ -166,6 +168,7 @@
:alter-schema ',alter-schema
:alter-table ',alter-table
:set-table-oids t
:on-error-stop on-error-stop
,@(remove-batch-control-option options))
,(sql-code-block pg-db-conn :post after "after load"))))

View File

@ -8,6 +8,7 @@
;;; MySQL options
;;;
(defrule mysql-option (or option-on-error-stop
option-on-error-resume-next
option-workers
option-concurrency
option-batch-rows
@ -154,6 +155,7 @@
(*cast-rules* ',casts)
(*decoding-as* ',decoding-as)
(*mysql-settings* ',mysql-gucs)
(on-error-stop (getf ',options :on-error-stop t))
,@(pgsql-connection-bindings pg-db-conn gucs)
,@(batch-control-bindings options)
,@(identifier-case-binding options)
@ -171,6 +173,7 @@
:alter-table ',alter-table
:alter-schema ',alter-schema
:set-table-oids t
:on-error-stop on-error-stop
,@(remove-batch-control-option options))
,(sql-code-block pg-db-conn :post after "after load"))))

View File

@ -105,6 +105,7 @@
:batch-size
:prefetch-rows
:rows-per-range
:on-error-stop
:identifier-case))
extras)
"Given a list of options, remove the generic ones that should already have
@ -160,6 +161,9 @@
(defrule option-on-error-stop (and kw-on kw-error kw-stop)
(:constant (cons :on-error-stop t)))
(defrule option-on-error-resume-next (and kw-on kw-error kw-resume kw-next)
(:constant (cons :on-error-stop nil)))
(defrule option-identifiers-case (and (or kw-snake_case
kw-downcase
kw-quote)

View File

@ -14,6 +14,7 @@ load database
set work_mem to '16MB', maintenance_work_mem to '512 MB';
|#
(defrule sqlite-option (or option-on-error-stop
option-on-error-resume-next
option-workers
option-concurrency
option-batch-rows
@ -99,6 +100,7 @@ load database
`(lambda ()
(let* ((*default-cast-rules* ',*sqlite-default-cast-rules*)
(*cast-rules* ',casts)
(on-error-stop (getf ',options :on-error-stop t))
,@(pgsql-connection-bindings pg-db-conn gucs)
,@(batch-control-bindings options)
,@(identifier-case-binding options)
@ -117,6 +119,7 @@ load database
:set-table-oids t
:including ',incl
:excluding ',excl
:on-error-stop on-error-stop
,@(remove-batch-control-option options))
,(sql-code-block pg-db-conn :post after "after load"))))

View File

@ -0,0 +1,134 @@
;;;
;;; The PostgreSQL COPY TO implementation, with batches and retries.
;;;
;;; Here, sending the data in the COPY stream opened in copy-batch.
;;;
(in-package :pgloader.copy)
(define-condition copy-init-error (error)
((table :initarg :table :reader copy-init-error-table)
(columns :initarg :columns :reader copy-init-error-columns)
(condition :initarg :condition :reader copy-init-error-condition))
(:report (lambda (err stream)
(format stream
"Can't init COPY to ~a~@[(~{~a~^, ~})~]: ~%~a"
(format-table-name (copy-init-error-table err))
(copy-init-error-columns err)
(copy-init-error-condition err)))))
;;;
;;; Stream prepared data from *writer-batch* down to PostgreSQL using the
;;; COPY protocol, and retry the batch avoiding known bad rows (from parsing
;;; COPY error messages) in case some data related conditions are signaled.
;;;
(defun db-write-batch (copier batch)
(loop :for count :below (batch-count batch)
:for data :across (batch-data batch)
:do (when data
(db-write-row copier data))
:finally (return (batch-count batch))))
(defun db-write-row (copier data)
"Copy cl-postgres:db-write-row guts to avoid computing utf-8 bytes all
over again, as we reproduced the data formating in pgloader code. The
reason we do that is to be able to lower the cost of retrying batches:
the formating has then already been done."
(let* ((connection (cl-postgres::copier-database copier))
(cl-postgres::socket (cl-postgres::connection-socket connection)))
(cl-postgres::with-reconnect-restart connection
(cl-postgres::using-connection connection
(cl-postgres::with-syncing
(cl-postgres::write-uint1 cl-postgres::socket 100)
(cl-postgres::write-uint4 cl-postgres::socket (+ 4 (length data)))
(loop :for byte :across data
:do (write-byte byte cl-postgres::socket))))))
(incf (cl-postgres::copier-count copier)))
;;;
;;; Functions to stream a vector of rows as strings directly into the COPY
;;; socket stream, without using an intermediate buffer.
;;;
(defun db-write-vector-row (copier row &optional (nbcols (length row)))
"Copy cl-postgres:db-write-row guts to avoid computing utf-8 bytes all
over again, as we reproduced the data formating in pgloader code. The
reason we do that is to be able to lower the cost of retrying batches:
the formating has then already been done."
(declare (optimize
(speed 3)
#-ecl(safety 0) #+ecl(safety 1)
(space 0)
(debug 1)
(compilation-speed 0)))
(let* ((col-bytes (map 'vector
(lambda (col)
(if (col-null-p col) 2
(copy-utf-8-byte-length col)))
row))
(row-bytes (+ nbcols (reduce #'+ col-bytes)))
(connection (cl-postgres::copier-database copier))
(cl-postgres::socket (cl-postgres::connection-socket connection)))
(cl-postgres::with-reconnect-restart connection
(cl-postgres::using-connection connection
(cl-postgres::with-syncing
(cl-postgres::write-uint1 cl-postgres::socket 100)
(cl-postgres::write-uint4 cl-postgres::socket (+ 4 row-bytes))
(macrolet ((send-byte (byte)
`(write-byte ,byte cl-postgres::socket)))
(loop :for col :across row
:for i fixnum :from 1
:do (if (col-null-p col)
(progn
(send-byte #. (char-code #\\))
(send-byte #. (char-code #\N)))
(loop :for char :across col
:do (as-copy-utf-8-bytes char send-byte)))
:do (if (< i nbcols)
(send-byte #. (char-code #\Tab))
(send-byte #. (char-code #\Newline))))))))
(incf (cl-postgres::copier-count copier))
row-bytes))
(defun db-write-escaped-vector-row (copier row &optional (nbcols (length row)))
"Copy cl-postgres:db-write-row guts to avoid computing utf-8 bytes all
over again, as we reproduced the data formating in pgloader code. The
reason we do that is to be able to lower the cost of retrying batches:
the formating has then already been done."
(declare (optimize
(speed 3)
#-ecl(safety 0) #+ecl(safety 1)
(space 0)
(debug 1)
(compilation-speed 0)))
(let* ((col-bytes (map 'vector
(lambda (col)
(if (col-null-p col) 2
(utf-8-byte-length col)))
row))
(row-bytes (+ nbcols (reduce #'+ col-bytes)))
(connection (cl-postgres::copier-database copier))
(cl-postgres::socket (cl-postgres::connection-socket connection)))
(cl-postgres::with-reconnect-restart connection
(cl-postgres::using-connection connection
(cl-postgres::with-syncing
(cl-postgres::write-uint1 cl-postgres::socket 100)
(cl-postgres::write-uint4 cl-postgres::socket (+ 4 row-bytes))
(macrolet ((send-byte (byte)
`(write-byte ,byte cl-postgres::socket)))
(loop :for col :across row
:for i fixnum :from 1
:do (if (col-null-p col)
(progn
(send-byte #. (char-code #\\))
(send-byte #. (char-code #\N)))
(loop :for char :across col
:do (as-utf-8-bytes char send-byte)))
:do (if (< i nbcols)
(send-byte #. (char-code #\Tab))
(send-byte #. (char-code #\Newline))))))))
(incf (cl-postgres::copier-count copier))
row-bytes))

View File

@ -13,6 +13,33 @@
;;; call here.
;;;
(defun prepare-and-format-row (copy nbcols row)
"Prepare given ROW in PostgreSQL COPY format"
(let* ((row (prepare-row copy nbcols row)))
(multiple-value-bind (pg-vector-row bytes)
(if row
(ecase (copy-format copy)
(:raw (format-vector-row nbcols row))
(:escaped (format-escaped-vector-row nbcols row)))
(values nil 0))
;; we might have to debug
(when pg-vector-row
(log-message :data "> ~s" (map 'string #'code-char pg-vector-row))
(values pg-vector-row bytes)))))
(defun prepare-row (copy nbcols row)
"Prepare given ROW by applying the pre-processing and transformation
functions registered in the COPY context."
(let* ((preprocessed-row (if (preprocessor copy)
(funcall (preprocessor copy) row)
row)))
(cond ((eq :escaped (copy-format copy)) preprocessed-row)
((null (transforms copy)) preprocessed-row)
(t
(apply-transforms copy nbcols preprocessed-row (transforms copy))))))
(defun format-vector-row (nb-cols row)
(declare (optimize
(speed 3)
@ -27,9 +54,9 @@
(len (+ nb-cols (reduce #'+ lens)))
(buf (make-array (the fixnum len) :element-type '(unsigned-byte 8))))
(loop :for col :across row
:for i :from 1
:for position := 0 :then (+ position col-len 1)
:for col-len :across lens
:for i fixnum :from 1
:for position fixnum := 0 :then (+ position col-len 1)
:for col-len fixnum :across lens
:do (if (col-null-p col)
(insert-copy-null buf position)
(string-to-copy-utf-8-bytes col buf position))

View File

@ -3,76 +3,6 @@
;;;
(in-package :pgloader.copy)
(defun send-batch (table columns batch
&key
(db pomo:*database*)
on-error-stop)
"Copy current *writer-batch* into TABLE-NAME."
;; We need to keep a copy of the rows we send through the COPY
;; protocol to PostgreSQL to be able to process them again in case
;; of a data error being signaled, that's the BATCH here.
(let ((table-name (format-table-name table))
(pomo:*database* db))
;; We can't use with-pgsql-transaction here because of the specifics
;; of error handling in case of cl-postgres:open-db-writer errors: the
;; transaction is dead already when we get a signal, and the COMMIT or
;; ABORT steps then trigger a protocol error on a #\Z message.
(handler-case
(progn
(pomo:execute "BEGIN")
(let* ((copier
(handler-case
(cl-postgres:open-db-writer db table-name columns)
(condition (c)
;; failed to open the COPY protocol mode (e.g. missing
;; columns on the target table), stop here,
;; transaction is dead already (no ROLLBACK needed).
(log-message :fatal
"Can't init COPY to ~a~@[(~{~a~^, ~})~]: ~%~a"
(format-table-name table)
columns
c)
(update-stats :data table :errs 1)
(return-from send-batch 0)))))
(unwind-protect
(db-write-batch copier batch)
(cl-postgres:close-db-writer copier)
(pomo:execute "COMMIT"))))
;; If PostgreSQL signals a data error, process the batch by isolating
;; erroneous data away and retrying the rest.
(postgresql-retryable (condition)
(pomo:execute "ROLLBACK")
(log-message :error "PostgreSQL [~s] ~a" table-name condition)
(if on-error-stop
;; re-signal the condition to upper level
(signal 'on-error-stop :on-condition condition)
;; normal behavior, on-error-stop being nil
;; clean the current transaction before retrying new ones
(let ((errors
(retry-batch table columns batch condition)))
(log-message :debug "retry-batch found ~d errors" errors)
(update-stats :data table :rows (- errors)))))
(postgresql-unavailable (condition)
(log-message :error "[PostgreSQL ~s] ~a" table-name condition)
(log-message :error "Copy Batch reconnecting to PostgreSQL")
;; in order to avoid Socket error in "connect": ECONNREFUSED if we
;; try just too soon, wait a little
(sleep 2)
(cl-postgres:reopen-database db)
(send-batch table columns batch :db db :on-error-stop on-error-stop))
(condition (c)
;; non retryable failures
(log-message :error "Non-retryable error ~a" c)
(pomo:execute "ROLLBACK")))))
;;;
;;; We receive raw input rows from an lparallel queue, push their content
;;; down to PostgreSQL, handling any data related errors in the way.
@ -91,82 +21,40 @@
PostgreSQL, and when that's done update stats."
(let* ((nbcols (length
(table-column-list (pgloader.sources::target copy))))
(current-batch (make-batch))
(seconds 0))
;; add some COPY activity related bits to our COPY object.
(setf (transforms copy)
(let ((funs (transforms copy)))
(unless (every #'null funs)
funs))
;; we need to compute some information and have them at the right place
;; FIXME: review the API here, that's an half-baked refactoring.
(prepare-copy-parameters copy)
;; FIXME: we should change the API around preprocess-row, someday.
(preprocessor copy)
(pgloader.sources::preprocess-row copy)
(log-message :info "COPY ON ERROR ~:[RESUME NEXT~;STOP~]" on-error-stop)
;; FIXME: we could change the API around data-is-preformatted-p,
;; but that's a bigger change than duplicating the information in
;; the object.
(copy-format copy)
(if (data-is-preformatted-p copy) :escaped :raw))
(pgloader.pgsql:with-pgsql-connection (pgconn)
(with-schema (unqualified-table-name table)
(with-disabled-triggers (unqualified-table-name
:disable-triggers disable-triggers)
(log-message :info "pgsql:copy-rows-from-queue[~a]: ~a ~a"
(lp:kernel-worker-index)
(format-table-name table)
columns)
(flet ((send-current-batch (unqualified-table-name)
;; we close over the whole lexical environment or almost...
(let ((batch-start-time (get-internal-real-time)))
(send-batch table
columns
current-batch
:on-error-stop on-error-stop)
(if on-error-stop
;;
;; When on-error-stop is true, we don't need to handle batch
;; processing, we can stop as soon as there's a failure.
;;
(incf seconds
(stream-rows-to-copy table columns copy nbcols queue))
(let ((batch-seconds (elapsed-time-since batch-start-time)))
(log-message :debug
"send-batch[~a] ~a ~d row~:p [~a] in ~6$s~@[ [oversized]~]"
(lp:kernel-worker-index)
unqualified-table-name
(batch-count current-batch)
(pretty-print-bytes (batch-bytes current-batch))
batch-seconds
(batch-oversized-p current-batch))
(update-stats :data table
:rows (batch-count current-batch)
:bytes (batch-bytes current-batch))
;; return batch-seconds
batch-seconds))))
(declare (inline send-current-batch))
(pgloader.pgsql:with-pgsql-connection (pgconn)
(with-schema (unqualified-table-name table)
(with-disabled-triggers (unqualified-table-name
:disable-triggers disable-triggers)
(log-message :info "pgsql:copy-rows-from-queue[~a]: ~a ~a"
(lp:kernel-worker-index)
(format-table-name table)
columns)
(loop
:for row := (lq:pop-queue queue)
:until (eq :end-of-data row)
:do
(progn
;; if current-batch is full, send data to PostgreSQL
;; and prepare a new batch
(when (batch-full-p current-batch)
(incf seconds (send-current-batch unqualified-table-name))
(setf current-batch (make-batch))
;; give a little help to our friend, now is a good time
;; to garbage collect
#+sbcl (sb-ext:gc :full t))
;; also add up the time it takes to format the rows
(let ((start-time (get-internal-real-time)))
(format-row-in-batch copy nbcols row current-batch)
(incf seconds (elapsed-time-since start-time)))))
;; the last batch might not be empty
(unless (= 0 (batch-count current-batch))
(incf seconds (send-current-batch unqualified-table-name)))))))
;;
;; When on-error-stop is nil, we actually implement
;; on-error-resume-next behavior, and for that we need to keep
;; a batch of rows around in order to replay COPYing its
;; content around, skipping rows that are rejected by
;; PostgreSQL.
;;
(incf seconds
(batch-rows-to-copy table columns copy nbcols queue))))))
;; each writer thread sends its own stop timestamp and the monitor keeps
;; only the latest entry
@ -177,29 +65,20 @@
seconds)
(list :writer table seconds)))
(defun prepare-copy-parameters (copy)
"add some COPY activity related bits to our COPY object."
(setf (transforms copy)
(let ((funs (transforms copy)))
(unless (every #'null funs)
funs))
(defun format-row-in-batch (copy nbcols row current-batch)
"Given a row from the queue, prepare it for the next batch."
(let* ((row (if (preprocessor copy)
(funcall (preprocessor copy) row)
row))
(transformed-row (cond ((eq :escaped (copy-format copy)) row)
((null (transforms copy)) row)
(t
(apply-transforms copy
nbcols
row
(transforms copy))))))
(multiple-value-bind (pg-vector-row bytes)
(if transformed-row
(ecase (copy-format copy)
(:raw (format-vector-row nbcols transformed-row))
(:escaped (format-escaped-vector-row nbcols transformed-row)))
(values nil 0))
;; FIXME: we should change the API around preprocess-row, someday.
(preprocessor copy)
(pgloader.sources::preprocess-row copy)
;; we might have to debug
(when pg-vector-row
(log-message :data "> ~s" (map 'string #'code-char pg-vector-row))
;; FIXME: we could change the API around data-is-preformatted-p,
;; but that's a bigger change than duplicating the information in
;; the object.
(copy-format copy)
(if (data-is-preformatted-p copy) :escaped :raw)))
;; now add copy-data to current-batch
(push-row current-batch pg-vector-row bytes)))))

View File

@ -0,0 +1,144 @@
;;;
;;; The PostgreSQL COPY TO implementation, with batches and retries.
;;;
(in-package :pgloader.copy)
(defun batch-rows-to-copy (table columns copy nbcols queue)
"Add rows that we pop from QUEUE into a batch, that we then COPY over to
PostgreSQL as soon as the batch is full. This allows sophisticated error
handling and recovery, where we can retry rows that are not rejected by
PostgreSQL."
(let ((seconds 0)
(current-batch (make-batch)))
(loop
:for row := (lq:pop-queue queue)
:until (eq :end-of-data row)
:do (multiple-value-bind (maybe-new-batch seconds-in-this-batch)
(add-row-to-current-batch table columns copy nbcols
current-batch row)
(setf current-batch maybe-new-batch)
(incf seconds seconds-in-this-batch)))
;; the last batch might not be empty
(unless (= 0 (batch-count current-batch))
(incf seconds (send-batch table columns current-batch)))
seconds))
(defun add-row-to-current-batch (table columns copy nbcols batch row)
"Add another ROW we just received to CURRENT-BATCH, and prepare a new
batch if needed. The current-batch (possibly a new one) is returned."
(let ((seconds 0)
(current-batch batch))
;; if current-batch is full, send data to PostgreSQL
;; and prepare a new batch
(when (batch-full-p current-batch)
(incf seconds (send-batch table columns current-batch))
(setf current-batch (make-batch))
;; give a little help to our friend, now is a good time
;; to garbage collect
#+sbcl
(let ((garbage-collect-start (get-internal-real-time)))
(sb-ext:gc :full t)
(incf seconds (elapsed-time-since garbage-collect-start))))
;; also add up the time it takes to format the rows
(let ((start-time (get-internal-real-time)))
(format-row-in-batch copy nbcols row current-batch)
(incf seconds (elapsed-time-since start-time)))
(values current-batch seconds)))
(defun send-batch (table columns batch &key (db pomo:*database*))
"Copy current *writer-batch* into TABLE-NAME."
;; We need to keep a copy of the rows we send through the COPY
;; protocol to PostgreSQL to be able to process them again in case
;; of a data error being signaled, that's the BATCH here.
(let ((batch-start-time (get-internal-real-time))
(table-name (format-table-name table))
(pomo:*database* db))
;; We can't use with-pgsql-transaction here because of the specifics
;; of error handling in case of cl-postgres:open-db-writer errors: the
;; transaction is dead already when we get a signal, and the COMMIT or
;; ABORT steps then trigger a protocol error on a #\Z message.
(handler-case
(progn
(pomo:execute "BEGIN")
(let* ((copier
(handler-case
(cl-postgres:open-db-writer db table-name columns)
(condition (c)
;; failed to open the COPY protocol mode (e.g. missing
;; columns on the target table), stop here,
;; transaction is dead already (no ROLLBACK needed).
(error (make-condition 'copy-init-error
:table table
:columns columns
:condition c))))))
(unwind-protect
(db-write-batch copier batch)
(cl-postgres:close-db-writer copier)
(pomo:execute "COMMIT"))))
;; If PostgreSQL signals a data error, process the batch by isolating
;; erroneous data away and retrying the rest.
(postgresql-retryable (condition)
(pomo:execute "ROLLBACK")
(log-message :error "PostgreSQL [~s] ~a" table-name condition)
;; clean the current transaction before retrying new ones
(let ((errors
(retry-batch table columns batch condition)))
(log-message :debug "retry-batch found ~d errors" errors)
(update-stats :data table :rows (- errors))))
(postgresql-unavailable (condition)
(log-message :error "[PostgreSQL ~s] ~a" table-name condition)
(log-message :error "Copy Batch reconnecting to PostgreSQL")
;; in order to avoid Socket error in "connect": ECONNREFUSED if we
;; try just too soon, wait a little
(sleep 2)
(cl-postgres:reopen-database db)
(send-batch table columns batch :db db))
(copy-init-error (condition)
;; Couldn't init the COPY protocol, process the condition up the
;; stack
(update-stats :data table :errs 1)
(error condition))
(condition (c)
;; non retryable failures
(log-message :error "Non-retryable error ~a" c)
(pomo:execute "ROLLBACK")))
;; now log about having send a batch, and update our stats with the
;; time that took
(let ((seconds (elapsed-time-since batch-start-time)))
(log-message :debug
"send-batch[~a] ~a ~d row~:p [~a] in ~6$s~@[ [oversized]~]"
(lp:kernel-worker-index)
(format-table-name table)
(batch-count batch)
(pretty-print-bytes (batch-bytes batch))
seconds
(batch-oversized-p batch))
(update-stats :data table
:rows (batch-count batch)
:bytes (batch-bytes batch))
;; and return batch-seconds
seconds)))
(defun format-row-in-batch (copy nbcols row current-batch)
"Given a row from the queue, prepare it for the next batch."
(multiple-value-bind (pg-vector-row bytes)
(prepare-and-format-row copy nbcols row)
(push-row current-batch pg-vector-row bytes)))

View File

@ -0,0 +1,73 @@
;;;
;;; The PostgreSQL COPY TO implementation, with batches and retries.
;;;
(in-package :pgloader.copy)
(defun stream-rows-to-copy (table columns copy nbcols queue
&optional (db pomo:*database*))
"Directly stream rows that we pop from QUEUE into PostgreSQL database
connection DB."
(let ((rcount 0)
(bytes 0)
(seconds 0))
(handler-case
(progn
(pomo:execute "BEGIN")
(let* ((table-name (format-table-name table))
(copier
(handler-case
(cl-postgres:open-db-writer db table-name columns)
(condition (c)
;; failed to open the COPY protocol mode (e.g. missing
;; columns on the target table), stop here,
;; transaction is dead already (no ROLLBACK needed).
(error (make-condition 'copy-init-error
:table table
:columns columns
:condition c))))))
(unwind-protect
(loop
:for row := (lq:pop-queue queue)
:until (eq :end-of-data row)
:do (multiple-value-bind (row-bytes row-seconds)
(stream-row copier copy nbcols row)
(incf rcount)
(incf bytes row-bytes)
(incf seconds row-seconds)))
(cl-postgres:close-db-writer copier)
(pomo:execute "COMMIT"))))
(postgresql-unavailable (condition)
;; We got disconnected, maybe because PostgreSQL is being restarted,
;; maybe for another reason, but in any case the transaction doesn't
;; exists anymore, the connection doesn't exists anymore, there's no
;; need to send anything, not even a ROLLBACK; in that case.
;;
;; Re-signal the condition as an error to be processed by the calling
;; thread, where it's possible to also stop the reader.
(error condition))
(copy-init-error (condition)
(update-stats :data table :errs 1)
(error condition))
(condition (c)
;; stop at any failure here, this function doesn't implement any kind
;; of retry behaviour.
(log-message :error "~a" c)
(pomo:execute "ROLLBACK")))
;; return seconds spent sending data to PostgreSQL
(update-stats :data table :rows rcount :bytes bytes)
seconds))
(defun stream-row (stream copy nbcols row)
"Send a single ROW down in the PostgreSQL COPY STREAM."
(let* ((start (get-internal-real-time))
(row (prepare-row copy nbcols row)))
(when row
(let ((bytes
(ecase (copy-format copy)
(:raw (db-write-vector-row stream row nbcols))
(:escaped (db-write-escaped-vector-row stream row nbcols)))))
(values bytes (elapsed-time-since start))))))

View File

@ -1,35 +0,0 @@
;;;
;;; The PostgreSQL COPY TO implementation, with batches and retries.
;;;
;;; Here, sending the data in the COPY stream opened in copy-batch.
;;;
(in-package :pgloader.copy)
;;;
;;; Stream prepared data from *writer-batch* down to PostgreSQL using the
;;; COPY protocol, and retry the batch avoiding known bad rows (from parsing
;;; COPY error messages) in case some data related conditions are signaled.
;;;
(defun db-write-batch (copier batch)
(loop :for count :below (batch-count batch)
:for data :across (batch-data batch)
:do (when data
(db-write-row copier data))
:finally (return (batch-count batch))))
(defun db-write-row (copier data)
"Copy cl-postgres:db-write-row guts to avoid computing utf-8 bytes all
over again, as we reproduced the data formating in pgloader code. The
reason we do that is to be able to lower the cost of retrying batches:
the formating has then already been done."
(let* ((connection (cl-postgres::copier-database copier))
(cl-postgres::socket (cl-postgres::connection-socket connection)))
(cl-postgres::with-reconnect-restart connection
(cl-postgres::using-connection connection
(cl-postgres::with-syncing
(cl-postgres::write-uint1 cl-postgres::socket 100)
(cl-postgres::write-uint4 cl-postgres::socket (+ 4 (length data)))
(loop :for byte :across data
:do (write-byte byte cl-postgres::socket))))))
(incf (cl-postgres::copier-count copier)))

View File

@ -85,7 +85,11 @@
(incf task-count)))
(lp:task-handler-bind
((on-error-stop
((pgloader.copy::copy-init-error
#'(lambda (condition)
;; everything has been handled already
(lp:invoke-transfer-error condition)))
(on-error-stop
#'(lambda (condition)
;; everything has been handled already
(lp:invoke-transfer-error condition)))
@ -124,7 +128,7 @@
;; each reader pretends to be alone, pass 1 as concurrency
(submit-task channel #'queue-raw-data reader rawq 1)
(submit-task channel #'pgloader.pgsql::copy-rows-from-queue
(submit-task channel #'pgloader.copy::copy-rows-from-queue
copy rawq
:on-error-stop on-error-stop
:disable-triggers disable-triggers)))