mirror of
https://github.com/dimitri/pgloader.git
synced 2025-08-13 17:56:59 +02:00
Fix the processing of bad rows, needs a Postmodern fix too.
See https://github.com/marijnh/Postmodern/issues/39 for details.
This commit is contained in:
parent
5a235de5c5
commit
0e79aca3bc
34
pgsql.lisp
34
pgsql.lisp
@ -320,17 +320,20 @@ Finally returns how many rows where read and processed."
|
|||||||
(cl-postgres:close-db-writer copier))
|
(cl-postgres:close-db-writer copier))
|
||||||
((or
|
((or
|
||||||
CL-POSTGRES-ERROR:UNIQUE-VIOLATION
|
CL-POSTGRES-ERROR:UNIQUE-VIOLATION
|
||||||
CL-POSTGRES-ERROR:DATA-EXCEPTION
|
CL-POSTGRES-ERROR:DATA-EXCEPTION) (e)
|
||||||
CL-POSTGRES::PROTOCOL-ERROR) (e)
|
|
||||||
(progn
|
(progn
|
||||||
(log-message :debug "pgsql:copy-from-queue: ~a" e)
|
(log-message :debug "pgsql:copy-from-queue: ~a" e)
|
||||||
(retry-batch dbname table-name
|
(retry-batch dbname
|
||||||
(nreverse *batch*) *batch-size*))))))
|
table-name
|
||||||
|
(nreverse *batch*)
|
||||||
|
*batch-size*
|
||||||
|
:transforms transforms))))))
|
||||||
|
|
||||||
;; fetch how many rows we just pushed through, update stats
|
;; fetch how many rows we just pushed through, update stats
|
||||||
for rows = (if (consp retval) (cdr retval) retval)
|
for rows = (if (consp retval) (cdr retval) retval)
|
||||||
for cont = (and (consp retval) (eq (car retval) :continue))
|
for cont = (and (consp retval) (eq (car retval) :continue))
|
||||||
do (pgstate-incf *state* table-name :rows rows)
|
do
|
||||||
|
(pgstate-incf *state* table-name :rows rows)
|
||||||
while cont)))
|
while cont)))
|
||||||
|
|
||||||
;;;
|
;;;
|
||||||
@ -404,11 +407,13 @@ Finally returns how many rows where read and processed."
|
|||||||
;;; split 1000 rows in 10 batches of 100 rows
|
;;; split 1000 rows in 10 batches of 100 rows
|
||||||
;;; split 352 rows in 3 batches of 100 rows + 1 batch of 52 rows
|
;;; split 352 rows in 3 batches of 100 rows + 1 batch of 52 rows
|
||||||
;;;
|
;;;
|
||||||
(defun process-bad-row (table-name condition row)
|
(defun process-bad-row (table-name condition row &key transforms)
|
||||||
"Add the row to the reject file, in PostgreSQL COPY TEXT format"
|
"Add the row to the reject file, in PostgreSQL COPY TEXT format"
|
||||||
;; first, update the stats.
|
;; first, update the stats.
|
||||||
(pgstate-incf *state* table-name :errs 1 :rows -1)
|
(pgstate-incf *state* table-name :errs 1 :rows -1)
|
||||||
|
|
||||||
|
(log-message :error "Invalid input: ~{~s~^, ~}~%~a~%" row condition)
|
||||||
|
|
||||||
;; now, the bad row processing
|
;; now, the bad row processing
|
||||||
(let* ((table (pgstate-get-table *state* table-name))
|
(let* ((table (pgstate-get-table *state* table-name))
|
||||||
(data (pgtable-reject-data table))
|
(data (pgtable-reject-data table))
|
||||||
@ -421,7 +426,7 @@ Finally returns how many rows where read and processed."
|
|||||||
:if-does-not-exist :create
|
:if-does-not-exist :create
|
||||||
:external-format :utf-8)
|
:external-format :utf-8)
|
||||||
;; the row has already been processed when we get here
|
;; the row has already been processed when we get here
|
||||||
(format-row reject-data-file row))
|
(format-row reject-data-file row :transforms transforms))
|
||||||
|
|
||||||
;; now log the condition signaled to reject the data
|
;; now log the condition signaled to reject the data
|
||||||
(with-open-file (reject-logs-file logs
|
(with-open-file (reject-logs-file logs
|
||||||
@ -448,7 +453,7 @@ Finally returns how many rows where read and processed."
|
|||||||
;;;
|
;;;
|
||||||
;;; The recursive retry batch function.
|
;;; The recursive retry batch function.
|
||||||
;;;
|
;;;
|
||||||
(defun retry-batch (dbname table-name batch batch-size)
|
(defun retry-batch (dbname table-name batch batch-size &key transforms)
|
||||||
"Batch is a list of rows containing at least one bad row. Find it."
|
"Batch is a list of rows containing at least one bad row. Find it."
|
||||||
(let* ((conspec (get-connection-spec dbname :with-port nil))
|
(let* ((conspec (get-connection-spec dbname :with-port nil))
|
||||||
(current-batch-pos batch)
|
(current-batch-pos batch)
|
||||||
@ -479,11 +484,14 @@ Finally returns how many rows where read and processed."
|
|||||||
;; the batch didn't make it, recurse
|
;; the batch didn't make it, recurse
|
||||||
((or
|
((or
|
||||||
CL-POSTGRES-ERROR:UNIQUE-VIOLATION
|
CL-POSTGRES-ERROR:UNIQUE-VIOLATION
|
||||||
CL-POSTGRES-ERROR:DATA-EXCEPTION
|
CL-POSTGRES-ERROR:DATA-EXCEPTION) (condition)
|
||||||
CL-POSTGRES::PROTOCOL-ERROR) (condition)
|
|
||||||
;; process bad data
|
;; process bad data
|
||||||
(if (= 1 current-batch-size)
|
(if (= 1 current-batch-size)
|
||||||
(process-bad-row table-name condition (car current-batch))
|
(process-bad-row table-name condition (car current-batch)
|
||||||
|
:transforms transforms)
|
||||||
;; more than one line of bad data: recurse
|
;; more than one line of bad data: recurse
|
||||||
(retry-batch dbname table-name
|
(retry-batch dbname
|
||||||
current-batch current-batch-size)))))))))
|
table-name
|
||||||
|
current-batch
|
||||||
|
current-batch-size
|
||||||
|
:transforms transforms)))))))))
|
||||||
|
Loading…
Reference in New Issue
Block a user