mirror of
https://github.com/dimitri/pgloader.git
synced 2025-08-08 07:16:58 +02:00
Take benefits of PostgreSQL COPY error CONTEXT.
This message has the line number where the erroneous data was found on the server, and given the pre-processing we already done at that point, it's easy to convert that number into an index into the current batch, an array. To do do, we need Postmodern to expose the CONTEXT error message and we need to parse it. The following pull request cares about the Postmodern side of things: https://github.com/marijnh/Postmodern/pull/46 The parsing is done as simply as possible, only assuming that the error message is using comma separators and having the line number in second position. The parsing as done here should still work with localized message strings. CONTEXT: COPY errors, line 3, column b: "2006-13-11" This change should significantly reduce the cost of error processing.
This commit is contained in:
parent
36bfd923fa
commit
ef358c0b7d
12
Makefile
12
Makefile
@ -14,12 +14,16 @@ all: $(PGLOADER)
|
|||||||
docs:
|
docs:
|
||||||
pandoc pgloader.1.md -o pgloader.1
|
pandoc pgloader.1.md -o pgloader.1
|
||||||
|
|
||||||
|
~/quicklisp/local-projects/Postmodern:
|
||||||
|
git clone -b protocol-error-fields https://github.com/dimitri/Postmodern.git $@
|
||||||
|
|
||||||
~/quicklisp/local-projects/qmynd:
|
~/quicklisp/local-projects/qmynd:
|
||||||
git clone https://github.com/qitab/qmynd.git $@
|
git clone https://github.com/qitab/qmynd.git $@
|
||||||
|
|
||||||
~/quicklisp/local-projects/cl-csv:
|
~/quicklisp/local-projects/cl-csv:
|
||||||
git clone -b empty-strings-and-nil https://github.com/dimitri/cl-csv.git $@
|
git clone -b empty-strings-and-nil https://github.com/dimitri/cl-csv.git $@
|
||||||
|
|
||||||
|
postmodern: ~/quicklisp/local-projects/Postmodern ;
|
||||||
qmynd: ~/quicklisp/local-projects/qmynd ;
|
qmynd: ~/quicklisp/local-projects/qmynd ;
|
||||||
cl-csv: ~/quicklisp/local-projects/cl-csv ;
|
cl-csv: ~/quicklisp/local-projects/cl-csv ;
|
||||||
|
|
||||||
@ -37,7 +41,7 @@ $(ASDF_CONF):
|
|||||||
|
|
||||||
asdf-config: $(ASDF_CONF) ;
|
asdf-config: $(ASDF_CONF) ;
|
||||||
|
|
||||||
$(LIBS): quicklisp $(ASDF_CONF) cl-csv qmynd
|
$(LIBS): quicklisp $(ASDF_CONF) cl-csv qmynd postmodern
|
||||||
sbcl --load ~/quicklisp/setup.lisp \
|
sbcl --load ~/quicklisp/setup.lisp \
|
||||||
--eval '(ql:quickload "pgloader")' \
|
--eval '(ql:quickload "pgloader")' \
|
||||||
--eval '(quit)'
|
--eval '(quit)'
|
||||||
@ -50,13 +54,17 @@ $(MANIFEST): libs
|
|||||||
--eval '(ql:write-asdf-manifest-file "./build/manifest.ql")' \
|
--eval '(ql:write-asdf-manifest-file "./build/manifest.ql")' \
|
||||||
--eval '(quit)'
|
--eval '(quit)'
|
||||||
|
|
||||||
|
manifest: $(MANIFEST) ;
|
||||||
|
|
||||||
$(BUILDAPP): quicklisp
|
$(BUILDAPP): quicklisp
|
||||||
sbcl --load ~/quicklisp/setup.lisp \
|
sbcl --load ~/quicklisp/setup.lisp \
|
||||||
--eval '(ql:quickload "buildapp")' \
|
--eval '(ql:quickload "buildapp")' \
|
||||||
--eval '(buildapp:build-buildapp "./build/buildapp")' \
|
--eval '(buildapp:build-buildapp "./build/buildapp")' \
|
||||||
--eval '(quit)'
|
--eval '(quit)'
|
||||||
|
|
||||||
$(PGLOADER): $(MANIFEST) $(BUILDAPP)
|
buildapp: $(BUILDAPP) ;
|
||||||
|
|
||||||
|
$(PGLOADER): manifest buildapp
|
||||||
./build/buildapp --logfile /tmp/build.log \
|
./build/buildapp --logfile /tmp/build.log \
|
||||||
--asdf-tree ~/quicklisp/local-projects \
|
--asdf-tree ~/quicklisp/local-projects \
|
||||||
--manifest-file ./build/manifest.ql \
|
--manifest-file ./build/manifest.ql \
|
||||||
|
@ -80,24 +80,16 @@ order to be able to handle errors should some happen.
|
|||||||
|
|
||||||
When PostgreSQL rejects the whole batch, pgloader logs the error message
|
When PostgreSQL rejects the whole batch, pgloader logs the error message
|
||||||
then isolates the bad row(s) from the accepted ones by retrying the batched
|
then isolates the bad row(s) from the accepted ones by retrying the batched
|
||||||
rows in smaller batches. The generic way to do that is using *dichotomy*
|
rows in smaller batches. To do that, pgloader parses the *CONTEXT* error
|
||||||
where the rows are split in two batches as evenly as possible. In the case
|
message from the failed COPY, as the message contains the line number where
|
||||||
of pgloader, as we expect bad data to be a rare event and want to optimize
|
the error was found in the batch, as in the following example:
|
||||||
finding it as quickly as possible, the rows are split in 5 batches.
|
|
||||||
|
|
||||||
Each batch of rows is sent again to PostgreSQL until we have an error
|
CONTEXT: COPY errors, line 3, column b: "2006-13-11"
|
||||||
message corresponding to a batch of single row, then we process the row as
|
|
||||||
rejected and continue loading the remaining of the batch.
|
|
||||||
|
|
||||||
So the batch sizes are going to be as following:
|
Using that information, pgloader will reload all rows in the batch before
|
||||||
|
the erroneous one, log the erroneous one as rejected, then try loading the
|
||||||
- 1 batch of 25000 rows, which fails to load
|
remaining of the batch in a single attempt, which may or may not contain
|
||||||
- 5 batches of 5000 rows, one of which fails to load
|
other erroneous data.
|
||||||
- 5 batches of 1000 rows, one of which fails to load
|
|
||||||
- 5 batches of 200 rows, one of which fails to load
|
|
||||||
- 5 batches of 40 rows, one of which fails to load
|
|
||||||
- 5 batchs of 8 rows, one of which fails to load
|
|
||||||
- 8 batches of 1 row, one of which fails to load
|
|
||||||
|
|
||||||
At the end of a load containing rejected rows, you will find two files in
|
At the end of a load containing rejected rows, you will find two files in
|
||||||
the *root-dir* location, under a directory named the same as the target
|
the *root-dir* location, under a directory named the same as the target
|
||||||
|
@ -145,14 +145,9 @@ details about the format, and format specs."
|
|||||||
;; in case of data-exception, split the batch and try again
|
;; in case of data-exception, split the batch and try again
|
||||||
((or
|
((or
|
||||||
CL-POSTGRES-ERROR:UNIQUE-VIOLATION
|
CL-POSTGRES-ERROR:UNIQUE-VIOLATION
|
||||||
CL-POSTGRES-ERROR:DATA-EXCEPTION) (e)
|
CL-POSTGRES-ERROR:DATA-EXCEPTION) (condition)
|
||||||
(declare (ignore e)) ; already logged
|
(retry-batch dbname table-name columns
|
||||||
(retry-batch dbname
|
*batch* *batch-rows* condition))))
|
||||||
table-name
|
|
||||||
*batch*
|
|
||||||
*batch-rows*
|
|
||||||
:columns columns
|
|
||||||
:transforms transforms))))
|
|
||||||
|
|
||||||
;; fetch how many rows we just pushed through, update stats
|
;; fetch how many rows we just pushed through, update stats
|
||||||
for rows = (if (consp retval) (cdr retval) retval)
|
for rows = (if (consp retval) (cdr retval) retval)
|
||||||
@ -210,60 +205,94 @@ details about the format, and format specs."
|
|||||||
;;; Compute the next batch size in rows, must be smaller than the previous
|
;;; Compute the next batch size in rows, must be smaller than the previous
|
||||||
;;; one or just one row to ensure the retry-batch recursion is not infinite.
|
;;; one or just one row to ensure the retry-batch recursion is not infinite.
|
||||||
;;;
|
;;;
|
||||||
(defun smaller-batch-rows (batch-rows processed-rows)
|
(defun next-batch-rows (batch-rows current-batch-pos next-error)
|
||||||
"How many rows should we process in next iteration?"
|
"How many rows should we process in next iteration?"
|
||||||
(let ((remaining-rows (- batch-rows processed-rows)))
|
(cond
|
||||||
|
((< current-batch-pos next-error)
|
||||||
|
;; We Can safely push a batch with all the rows until the first error,
|
||||||
|
;; and here current-batch-pos should be 0 anyways.
|
||||||
|
;;
|
||||||
|
;; How many rows do we have from position 0 to position next-error,
|
||||||
|
;; excluding next-error? Well, next-error.
|
||||||
|
(- next-error current-batch-pos))
|
||||||
|
|
||||||
(if (< remaining-rows *copy-batch-split*)
|
((= current-batch-pos next-error)
|
||||||
1
|
;; Now we got to the line that we know is an error, we need to process
|
||||||
(min remaining-rows
|
;; only that one in the next batch
|
||||||
(floor (/ batch-rows *copy-batch-split*))))))
|
1)
|
||||||
|
|
||||||
|
(t
|
||||||
|
;; We're past the known erroneous row. The batch might have new errors,
|
||||||
|
;; or maybe that was the only one. We'll figure it out soon enough,
|
||||||
|
;; let's try the whole remaining rows.
|
||||||
|
(- batch-rows current-batch-pos))))
|
||||||
|
|
||||||
;;;
|
;;;
|
||||||
;;; The recursive retry batch function.
|
;;; In case of COPY error, PostgreSQL gives us the line where the error was
|
||||||
|
;;; found as a CONTEXT message. Let's parse that information to optimize our
|
||||||
|
;;; batching splitting in case of errors.
|
||||||
;;;
|
;;;
|
||||||
(defun retry-batch (dbname table-name batch batch-rows
|
;;; CONTEXT: COPY errors, line 1, column b: "2006-13-11"
|
||||||
&key (current-batch-pos 0) columns transforms)
|
;;;
|
||||||
"Batch is a list of rows containing at least one bad row. Find it."
|
(defun parse-copy-error-context (context)
|
||||||
(log-message :debug "pgsql:retry-batch: splitting current batch [~d rows]" batch-rows)
|
"Given a COPY command CONTEXT error message, return the batch position
|
||||||
(let* ((processed-rows 0))
|
where the error comes from."
|
||||||
(loop
|
(let* ((fields (sq:split-sequence #\, context))
|
||||||
while (< processed-rows batch-rows)
|
(linepart (second fields))
|
||||||
do
|
(linestr (second (sq:split-sequence #\Space linepart :start 1))))
|
||||||
(let* ((current-batch-rows
|
;; COPY command counts from 1 where we index our batch from 0
|
||||||
(smaller-batch-rows batch-rows processed-rows)))
|
(1- (parse-integer linestr))))
|
||||||
(handler-case
|
|
||||||
(with-pgsql-transaction (:dbname dbname :database pomo:*database*)
|
|
||||||
(let* ((stream
|
|
||||||
(cl-postgres:open-db-writer pomo:*database*
|
|
||||||
table-name columns)))
|
|
||||||
|
|
||||||
(log-message :debug "pgsql:retry-batch: current-batch-rows = ~d"
|
;;;
|
||||||
current-batch-rows)
|
;;; The main retry batch function.
|
||||||
|
;;;
|
||||||
|
(defun retry-batch (dbname table-name columns batch batch-rows condition
|
||||||
|
&optional (current-batch-pos 0))
|
||||||
|
"Batch is a list of rows containing at least one bad row, the first such
|
||||||
|
row is known to be located at FIRST-ERROR index in the BATCH array."
|
||||||
|
|
||||||
(unwind-protect
|
(loop
|
||||||
(loop repeat current-batch-rows
|
:with next-error = (parse-copy-error-context
|
||||||
for pos from current-batch-pos
|
(cl-postgres::database-error-context condition))
|
||||||
do (cl-postgres:db-write-row stream nil (aref *batch* pos))
|
|
||||||
finally
|
|
||||||
(incf current-batch-pos current-batch-rows)
|
|
||||||
(incf processed-rows current-batch-rows))
|
|
||||||
|
|
||||||
(cl-postgres:close-db-writer stream))))
|
:while (< current-batch-pos batch-rows)
|
||||||
|
|
||||||
;; the batch didn't make it, recurse
|
:do
|
||||||
((or
|
(progn ; indenting helper
|
||||||
CL-POSTGRES-ERROR:UNIQUE-VIOLATION
|
(when (= current-batch-pos next-error)
|
||||||
CL-POSTGRES-ERROR:DATA-EXCEPTION) (condition)
|
(log-message :info "error recovery at ~d/~d, processing bad row"
|
||||||
;; process bad data
|
next-error batch-rows)
|
||||||
(if (= 1 current-batch-rows)
|
(process-bad-row table-name condition (aref batch current-batch-pos))
|
||||||
(process-bad-row table-name condition
|
(incf current-batch-pos))
|
||||||
(aref *batch* current-batch-pos))
|
|
||||||
;; more than one line of bad data: recurse
|
(let* ((current-batch-rows
|
||||||
(retry-batch dbname
|
(next-batch-rows batch-rows current-batch-pos next-error)))
|
||||||
table-name
|
(handler-case
|
||||||
batch
|
(with-pgsql-transaction (:dbname dbname :database pomo:*database*)
|
||||||
current-batch-rows
|
(let* ((stream
|
||||||
:current-batch-pos current-batch-pos
|
(cl-postgres:open-db-writer pomo:*database*
|
||||||
:columns columns
|
table-name columns)))
|
||||||
:transforms transforms))))))))
|
|
||||||
|
(log-message :info "error recovery at ~d/~d, trying ~d row~:p"
|
||||||
|
current-batch-pos batch-rows current-batch-rows)
|
||||||
|
|
||||||
|
(unwind-protect
|
||||||
|
(loop :repeat current-batch-rows
|
||||||
|
:for pos :from current-batch-pos
|
||||||
|
:do (cl-postgres:db-write-row stream nil (aref batch pos)))
|
||||||
|
|
||||||
|
;; close-db-writer is the one signaling cl-postgres-errors
|
||||||
|
(cl-postgres:close-db-writer stream)
|
||||||
|
(incf current-batch-pos current-batch-rows))))
|
||||||
|
|
||||||
|
;; the batch didn't make it, prepare error handling for next turn
|
||||||
|
((or
|
||||||
|
CL-POSTGRES-ERROR:UNIQUE-VIOLATION
|
||||||
|
CL-POSTGRES-ERROR:DATA-EXCEPTION) (next-error-in-batch)
|
||||||
|
|
||||||
|
(setf condition next-error-in-batch
|
||||||
|
|
||||||
|
next-error
|
||||||
|
(+ current-batch-pos
|
||||||
|
(parse-copy-error-context
|
||||||
|
(cl-postgres::database-error-context condition))))))))))
|
||||||
|
@ -37,10 +37,10 @@ LOAD CSV
|
|||||||
|
|
||||||
|
|
||||||
|
|
||||||
1|expected error, month 13|2006-13-11|
|
0|nov. the 11th should go|2006-11-11|
|
||||||
2|nov. the 11th should go|2006-11-11|
|
1|12th of oct. should go|2006-10-12|
|
||||||
3|12th of oct. should go|2006-10-12|
|
2|expected error, month 13|2006-13-11|
|
||||||
4|\ |2006-16-4|
|
3|\ |2006-16-4|
|
||||||
5|month should be may, ok|2006-5-12|
|
4|month should be may, ok|2006-5-12|
|
||||||
6|another month 13, stress retry path|2006-13-10|
|
5|another month 13, stress retry path|2006-13-10|
|
||||||
7|some null date to play with||
|
6|some null date to play with||
|
||||||
|
Loading…
Reference in New Issue
Block a user