From 0e79aca3bcd73f83aaa45992fca2d93e5dd04047 Mon Sep 17 00:00:00 2001 From: Dimitri Fontaine Date: Sun, 6 Oct 2013 23:09:27 +0200 Subject: [PATCH] Fix the processing of bad rows, needs a Postmodern fix too. See https://github.com/marijnh/Postmodern/issues/39 for details. --- pgsql.lisp | 34 +++++++++++++++++++++------------- 1 file changed, 21 insertions(+), 13 deletions(-) diff --git a/pgsql.lisp b/pgsql.lisp index b806b95..19dc1d7 100644 --- a/pgsql.lisp +++ b/pgsql.lisp @@ -320,17 +320,20 @@ Finally returns how many rows where read and processed." (cl-postgres:close-db-writer copier)) ((or CL-POSTGRES-ERROR:UNIQUE-VIOLATION - CL-POSTGRES-ERROR:DATA-EXCEPTION - CL-POSTGRES::PROTOCOL-ERROR) (e) + CL-POSTGRES-ERROR:DATA-EXCEPTION) (e) (progn (log-message :debug "pgsql:copy-from-queue: ~a" e) - (retry-batch dbname table-name - (nreverse *batch*) *batch-size*)))))) + (retry-batch dbname + table-name + (nreverse *batch*) + *batch-size* + :transforms transforms)))))) ;; fetch how many rows we just pushed through, update stats for rows = (if (consp retval) (cdr retval) retval) for cont = (and (consp retval) (eq (car retval) :continue)) - do (pgstate-incf *state* table-name :rows rows) + do + (pgstate-incf *state* table-name :rows rows) while cont))) ;;; @@ -404,11 +407,13 @@ Finally returns how many rows where read and processed." ;;; split 1000 rows in 10 batches of 100 rows ;;; split 352 rows in 3 batches of 100 rows + 1 batch of 52 rows ;;; -(defun process-bad-row (table-name condition row) +(defun process-bad-row (table-name condition row &key transforms) "Add the row to the reject file, in PostgreSQL COPY TEXT format" ;; first, update the stats. (pgstate-incf *state* table-name :errs 1 :rows -1) + (log-message :error "Invalid input: ~{~s~^, ~}~%~a~%" row condition) + ;; now, the bad row processing (let* ((table (pgstate-get-table *state* table-name)) (data (pgtable-reject-data table)) @@ -421,7 +426,7 @@ Finally returns how many rows where read and processed." :if-does-not-exist :create :external-format :utf-8) ;; the row has already been processed when we get here - (format-row reject-data-file row)) + (format-row reject-data-file row :transforms transforms)) ;; now log the condition signaled to reject the data (with-open-file (reject-logs-file logs @@ -448,7 +453,7 @@ Finally returns how many rows where read and processed." ;;; ;;; The recursive retry batch function. ;;; -(defun retry-batch (dbname table-name batch batch-size) +(defun retry-batch (dbname table-name batch batch-size &key transforms) "Batch is a list of rows containing at least one bad row. Find it." (let* ((conspec (get-connection-spec dbname :with-port nil)) (current-batch-pos batch) @@ -479,11 +484,14 @@ Finally returns how many rows where read and processed." ;; the batch didn't make it, recurse ((or CL-POSTGRES-ERROR:UNIQUE-VIOLATION - CL-POSTGRES-ERROR:DATA-EXCEPTION - CL-POSTGRES::PROTOCOL-ERROR) (condition) + CL-POSTGRES-ERROR:DATA-EXCEPTION) (condition) ;; process bad data (if (= 1 current-batch-size) - (process-bad-row table-name condition (car current-batch)) + (process-bad-row table-name condition (car current-batch) + :transforms transforms) ;; more than one line of bad data: recurse - (retry-batch dbname table-name - current-batch current-batch-size))))))))) + (retry-batch dbname + table-name + current-batch + current-batch-size + :transforms transforms)))))))))