From ef358c0b7ddbc80a281f41c9b34ddf004285deba Mon Sep 17 00:00:00 2001 From: Dimitri Fontaine Date: Wed, 25 Dec 2013 21:43:22 +0100 Subject: [PATCH] Take benefits of PostgreSQL COPY error CONTEXT. This message has the line number where the erroneous data was found on the server, and given the pre-processing we already done at that point, it's easy to convert that number into an index into the current batch, an array. To do do, we need Postmodern to expose the CONTEXT error message and we need to parse it. The following pull request cares about the Postmodern side of things: https://github.com/marijnh/Postmodern/pull/46 The parsing is done as simply as possible, only assuming that the error message is using comma separators and having the line number in second position. The parsing as done here should still work with localized message strings. CONTEXT: COPY errors, line 3, column b: "2006-13-11" This change should significantly reduce the cost of error processing. --- Makefile | 12 +++- pgloader.1.md | 24 +++----- src/pgsql/pgsql.lisp | 141 ++++++++++++++++++++++++++----------------- test/errors.load | 14 ++--- 4 files changed, 110 insertions(+), 81 deletions(-) diff --git a/Makefile b/Makefile index ccf813b..ea17842 100644 --- a/Makefile +++ b/Makefile @@ -14,12 +14,16 @@ all: $(PGLOADER) docs: pandoc pgloader.1.md -o pgloader.1 +~/quicklisp/local-projects/Postmodern: + git clone -b protocol-error-fields https://github.com/dimitri/Postmodern.git $@ + ~/quicklisp/local-projects/qmynd: git clone https://github.com/qitab/qmynd.git $@ ~/quicklisp/local-projects/cl-csv: git clone -b empty-strings-and-nil https://github.com/dimitri/cl-csv.git $@ +postmodern: ~/quicklisp/local-projects/Postmodern ; qmynd: ~/quicklisp/local-projects/qmynd ; cl-csv: ~/quicklisp/local-projects/cl-csv ; @@ -37,7 +41,7 @@ $(ASDF_CONF): asdf-config: $(ASDF_CONF) ; -$(LIBS): quicklisp $(ASDF_CONF) cl-csv qmynd +$(LIBS): quicklisp $(ASDF_CONF) cl-csv qmynd postmodern sbcl --load ~/quicklisp/setup.lisp \ --eval '(ql:quickload "pgloader")' \ --eval '(quit)' @@ -50,13 +54,17 @@ $(MANIFEST): libs --eval '(ql:write-asdf-manifest-file "./build/manifest.ql")' \ --eval '(quit)' +manifest: $(MANIFEST) ; + $(BUILDAPP): quicklisp sbcl --load ~/quicklisp/setup.lisp \ --eval '(ql:quickload "buildapp")' \ --eval '(buildapp:build-buildapp "./build/buildapp")' \ --eval '(quit)' -$(PGLOADER): $(MANIFEST) $(BUILDAPP) +buildapp: $(BUILDAPP) ; + +$(PGLOADER): manifest buildapp ./build/buildapp --logfile /tmp/build.log \ --asdf-tree ~/quicklisp/local-projects \ --manifest-file ./build/manifest.ql \ diff --git a/pgloader.1.md b/pgloader.1.md index 270d587..1408852 100644 --- a/pgloader.1.md +++ b/pgloader.1.md @@ -80,24 +80,16 @@ order to be able to handle errors should some happen. When PostgreSQL rejects the whole batch, pgloader logs the error message then isolates the bad row(s) from the accepted ones by retrying the batched -rows in smaller batches. The generic way to do that is using *dichotomy* -where the rows are split in two batches as evenly as possible. In the case -of pgloader, as we expect bad data to be a rare event and want to optimize -finding it as quickly as possible, the rows are split in 5 batches. +rows in smaller batches. To do that, pgloader parses the *CONTEXT* error +message from the failed COPY, as the message contains the line number where +the error was found in the batch, as in the following example: -Each batch of rows is sent again to PostgreSQL until we have an error -message corresponding to a batch of single row, then we process the row as -rejected and continue loading the remaining of the batch. + CONTEXT: COPY errors, line 3, column b: "2006-13-11" -So the batch sizes are going to be as following: - - - 1 batch of 25000 rows, which fails to load - - 5 batches of 5000 rows, one of which fails to load - - 5 batches of 1000 rows, one of which fails to load - - 5 batches of 200 rows, one of which fails to load - - 5 batches of 40 rows, one of which fails to load - - 5 batchs of 8 rows, one of which fails to load - - 8 batches of 1 row, one of which fails to load +Using that information, pgloader will reload all rows in the batch before +the erroneous one, log the erroneous one as rejected, then try loading the +remaining of the batch in a single attempt, which may or may not contain +other erroneous data. At the end of a load containing rejected rows, you will find two files in the *root-dir* location, under a directory named the same as the target diff --git a/src/pgsql/pgsql.lisp b/src/pgsql/pgsql.lisp index 6093955..b6b9b5c 100644 --- a/src/pgsql/pgsql.lisp +++ b/src/pgsql/pgsql.lisp @@ -145,14 +145,9 @@ details about the format, and format specs." ;; in case of data-exception, split the batch and try again ((or CL-POSTGRES-ERROR:UNIQUE-VIOLATION - CL-POSTGRES-ERROR:DATA-EXCEPTION) (e) - (declare (ignore e)) ; already logged - (retry-batch dbname - table-name - *batch* - *batch-rows* - :columns columns - :transforms transforms)))) + CL-POSTGRES-ERROR:DATA-EXCEPTION) (condition) + (retry-batch dbname table-name columns + *batch* *batch-rows* condition)))) ;; fetch how many rows we just pushed through, update stats for rows = (if (consp retval) (cdr retval) retval) @@ -210,60 +205,94 @@ details about the format, and format specs." ;;; Compute the next batch size in rows, must be smaller than the previous ;;; one or just one row to ensure the retry-batch recursion is not infinite. ;;; -(defun smaller-batch-rows (batch-rows processed-rows) +(defun next-batch-rows (batch-rows current-batch-pos next-error) "How many rows should we process in next iteration?" - (let ((remaining-rows (- batch-rows processed-rows))) + (cond + ((< current-batch-pos next-error) + ;; We Can safely push a batch with all the rows until the first error, + ;; and here current-batch-pos should be 0 anyways. + ;; + ;; How many rows do we have from position 0 to position next-error, + ;; excluding next-error? Well, next-error. + (- next-error current-batch-pos)) - (if (< remaining-rows *copy-batch-split*) - 1 - (min remaining-rows - (floor (/ batch-rows *copy-batch-split*)))))) + ((= current-batch-pos next-error) + ;; Now we got to the line that we know is an error, we need to process + ;; only that one in the next batch + 1) + + (t + ;; We're past the known erroneous row. The batch might have new errors, + ;; or maybe that was the only one. We'll figure it out soon enough, + ;; let's try the whole remaining rows. + (- batch-rows current-batch-pos)))) ;;; -;;; The recursive retry batch function. +;;; In case of COPY error, PostgreSQL gives us the line where the error was +;;; found as a CONTEXT message. Let's parse that information to optimize our +;;; batching splitting in case of errors. ;;; -(defun retry-batch (dbname table-name batch batch-rows - &key (current-batch-pos 0) columns transforms) - "Batch is a list of rows containing at least one bad row. Find it." - (log-message :debug "pgsql:retry-batch: splitting current batch [~d rows]" batch-rows) - (let* ((processed-rows 0)) - (loop - while (< processed-rows batch-rows) - do - (let* ((current-batch-rows - (smaller-batch-rows batch-rows processed-rows))) - (handler-case - (with-pgsql-transaction (:dbname dbname :database pomo:*database*) - (let* ((stream - (cl-postgres:open-db-writer pomo:*database* - table-name columns))) +;;; CONTEXT: COPY errors, line 1, column b: "2006-13-11" +;;; +(defun parse-copy-error-context (context) + "Given a COPY command CONTEXT error message, return the batch position + where the error comes from." + (let* ((fields (sq:split-sequence #\, context)) + (linepart (second fields)) + (linestr (second (sq:split-sequence #\Space linepart :start 1)))) + ;; COPY command counts from 1 where we index our batch from 0 + (1- (parse-integer linestr)))) - (log-message :debug "pgsql:retry-batch: current-batch-rows = ~d" - current-batch-rows) +;;; +;;; The main retry batch function. +;;; +(defun retry-batch (dbname table-name columns batch batch-rows condition + &optional (current-batch-pos 0)) + "Batch is a list of rows containing at least one bad row, the first such + row is known to be located at FIRST-ERROR index in the BATCH array." - (unwind-protect - (loop repeat current-batch-rows - for pos from current-batch-pos - do (cl-postgres:db-write-row stream nil (aref *batch* pos)) - finally - (incf current-batch-pos current-batch-rows) - (incf processed-rows current-batch-rows)) + (loop + :with next-error = (parse-copy-error-context + (cl-postgres::database-error-context condition)) - (cl-postgres:close-db-writer stream)))) + :while (< current-batch-pos batch-rows) - ;; the batch didn't make it, recurse - ((or - CL-POSTGRES-ERROR:UNIQUE-VIOLATION - CL-POSTGRES-ERROR:DATA-EXCEPTION) (condition) - ;; process bad data - (if (= 1 current-batch-rows) - (process-bad-row table-name condition - (aref *batch* current-batch-pos)) - ;; more than one line of bad data: recurse - (retry-batch dbname - table-name - batch - current-batch-rows - :current-batch-pos current-batch-pos - :columns columns - :transforms transforms)))))))) + :do + (progn ; indenting helper + (when (= current-batch-pos next-error) + (log-message :info "error recovery at ~d/~d, processing bad row" + next-error batch-rows) + (process-bad-row table-name condition (aref batch current-batch-pos)) + (incf current-batch-pos)) + + (let* ((current-batch-rows + (next-batch-rows batch-rows current-batch-pos next-error))) + (handler-case + (with-pgsql-transaction (:dbname dbname :database pomo:*database*) + (let* ((stream + (cl-postgres:open-db-writer pomo:*database* + table-name columns))) + + (log-message :info "error recovery at ~d/~d, trying ~d row~:p" + current-batch-pos batch-rows current-batch-rows) + + (unwind-protect + (loop :repeat current-batch-rows + :for pos :from current-batch-pos + :do (cl-postgres:db-write-row stream nil (aref batch pos))) + + ;; close-db-writer is the one signaling cl-postgres-errors + (cl-postgres:close-db-writer stream) + (incf current-batch-pos current-batch-rows)))) + + ;; the batch didn't make it, prepare error handling for next turn + ((or + CL-POSTGRES-ERROR:UNIQUE-VIOLATION + CL-POSTGRES-ERROR:DATA-EXCEPTION) (next-error-in-batch) + + (setf condition next-error-in-batch + + next-error + (+ current-batch-pos + (parse-copy-error-context + (cl-postgres::database-error-context condition)))))))))) diff --git a/test/errors.load b/test/errors.load index 7f758f0..d44b288 100644 --- a/test/errors.load +++ b/test/errors.load @@ -37,10 +37,10 @@ LOAD CSV -1|expected error, month 13|2006-13-11| -2|nov. the 11th should go|2006-11-11| -3|12th of oct. should go|2006-10-12| -4|\ |2006-16-4| -5|month should be may, ok|2006-5-12| -6|another month 13, stress retry path|2006-13-10| -7|some null date to play with|| +0|nov. the 11th should go|2006-11-11| +1|12th of oct. should go|2006-10-12| +2|expected error, month 13|2006-13-11| +3|\ |2006-16-4| +4|month should be may, ok|2006-5-12| +5|another month 13, stress retry path|2006-13-10| +6|some null date to play with||