mirror of
https://github.com/dimitri/pgloader.git
synced 2026-05-06 03:16:10 +02:00
Review transaction and error handling in COPY.
The PostgreSQL COPY protocol requires an explicit initialization phase that may fail, and in this case the Postmodern driver transaction is already dead, so there's no way we can even send ABORT to it. Review the error handling of our copy-batch function to cope with that fact, and add some logging of non-retryable errors we may have. Also improve the thread error reporting when using a binary image from where it might be difficult to open an interactive debugger, while still having the full blown Common Lisp debugging experience for the project developers. Add a test case for a missing column as in issue #339. Fix #339, see #337.
This commit is contained in:
parent
9512ab187e
commit
40c1581794
@ -350,7 +350,8 @@
|
||||
(uiop:quit +os-code-error-bad-source+))
|
||||
|
||||
(condition (c)
|
||||
(when debug (invoke-debugger c))
|
||||
(log-message :fatal "~a" c)
|
||||
(print-backtrace c debug *standard-output*)
|
||||
(uiop:quit +os-code-error+))))))
|
||||
|
||||
;; done.
|
||||
|
||||
@ -27,30 +27,57 @@
|
||||
(defun copy-batch (table columns batch batch-rows
|
||||
&key (db pomo:*database*))
|
||||
"Copy current *writer-batch* into TABLE-NAME."
|
||||
(handler-case
|
||||
(with-pgsql-transaction (:database db)
|
||||
;; We need to keep a copy of the rows we send through the COPY
|
||||
;; protocol to PostgreSQL to be able to process them again in case
|
||||
;; of a data error being signaled, that's the BATCH here.
|
||||
(let* ((table-name (format-table-name table))
|
||||
(copier (cl-postgres:open-db-writer db table-name columns)))
|
||||
(unwind-protect
|
||||
(loop :for i :below batch-rows
|
||||
:for data := (aref batch i)
|
||||
:do (when data
|
||||
(db-write-row copier data))
|
||||
:finally (return batch-rows))
|
||||
(cl-postgres:close-db-writer copier))))
|
||||
;; We need to keep a copy of the rows we send through the COPY
|
||||
;; protocol to PostgreSQL to be able to process them again in case
|
||||
;; of a data error being signaled, that's the BATCH here.
|
||||
(let ((pomo:*database* db))
|
||||
(handling-pgsql-notices
|
||||
;; We can't use with-pgsql-transaction here because of the specifics
|
||||
;; of error handling in case of cl-postgres:open-db-writer errors: the
|
||||
;; transaction is dead already when we get a signal, and the COMMIT or
|
||||
;; ABORT steps then trigger a protocol error on a #\Z message.
|
||||
(pomo:execute "BEGIN")
|
||||
(handler-case
|
||||
(let* ((table-name (format-table-name table))
|
||||
(copier
|
||||
(handler-case
|
||||
(cl-postgres:open-db-writer db table-name columns)
|
||||
(condition (c)
|
||||
;; failed to open the COPY protocol mode (e.g. missing
|
||||
;; columns on the target table), stop here,
|
||||
;; transaction is dead already (no ROLLBACK needed).
|
||||
(log-message :fatal
|
||||
"Can't init COPY to ~a~@[(~{~a~^, ~})~]: ~%~a"
|
||||
(format-table-name table)
|
||||
columns
|
||||
c)
|
||||
(update-stats :data table :errs 1)
|
||||
(return-from copy-batch 0)))))
|
||||
(unwind-protect
|
||||
(loop :for i :below batch-rows
|
||||
:for data := (aref batch i)
|
||||
:do (when data
|
||||
(db-write-row copier data))
|
||||
:finally (return batch-rows))
|
||||
(cl-postgres:close-db-writer copier)
|
||||
(pomo:execute "COMMIT")))
|
||||
|
||||
;; If PostgreSQL signals a data error, process the batch by isolating
|
||||
;; erroneous data away and retrying the rest.
|
||||
((or
|
||||
cl-postgres-error::data-exception
|
||||
cl-postgres-error::integrity-violation
|
||||
cl-postgres-error::internal-error
|
||||
cl-postgres-error::insufficient-resources
|
||||
cl-postgres-error::program-limit-exceeded) (condition)
|
||||
(retry-batch table columns batch batch-rows condition))))
|
||||
;; If PostgreSQL signals a data error, process the batch by isolating
|
||||
;; erroneous data away and retrying the rest.
|
||||
((or
|
||||
cl-postgres-error::data-exception
|
||||
cl-postgres-error::integrity-violation
|
||||
cl-postgres-error::internal-error
|
||||
cl-postgres-error::insufficient-resources
|
||||
cl-postgres-error::program-limit-exceeded) (condition)
|
||||
;; clean the current transaction before retrying new ones
|
||||
(pomo:execute "ROLLBACK")
|
||||
(retry-batch table columns batch batch-rows condition))
|
||||
|
||||
(condition (c)
|
||||
;; non retryable failures
|
||||
(log-message :error "Non-retryable error ~a" c)
|
||||
(pomo:execute "ROLLBACK"))))))
|
||||
|
||||
;;;
|
||||
;;; We receive fully prepared batch from an lparallel queue, push their
|
||||
@ -83,7 +110,9 @@
|
||||
:do (progn
|
||||
;; The SBCL implementation needs some Garbage Collection
|
||||
;; decision making help... and now is a pretty good time.
|
||||
#+sbcl (when oversized? (sb-ext:gc :full t))
|
||||
#+sbcl (when oversized?
|
||||
(log-message :debug "Forcing a full GC.")
|
||||
(sb-ext:gc :full t))
|
||||
(log-message :debug
|
||||
"copy-batch[~a] ~a ~d row~:p in ~6$s~@[ [oversized]~]"
|
||||
(lp:kernel-worker-index)
|
||||
|
||||
@ -126,13 +126,16 @@
|
||||
(let ((worker-count (* (length path-list)
|
||||
(task-count concurrency))))
|
||||
(loop :for tasks :below worker-count
|
||||
:do (destructuring-bind (task table seconds)
|
||||
(lp:receive-result channel)
|
||||
(log-message :debug
|
||||
"Finished processing ~a for ~s ~50T~6$s"
|
||||
task (format-table-name table) seconds)
|
||||
(when (eq :writer task)
|
||||
(update-stats :data table :secs seconds))))
|
||||
:do (handler-case
|
||||
(destructuring-bind (task table seconds)
|
||||
(lp:receive-result channel)
|
||||
(log-message :debug
|
||||
"Finished processing ~a for ~s ~50T~6$s"
|
||||
task (format-table-name table) seconds)
|
||||
(when (eq :writer task)
|
||||
(update-stats :data table :secs seconds)))
|
||||
(condition (e)
|
||||
(log-message :fatal "~a" e))))
|
||||
(prog1
|
||||
worker-count
|
||||
(lp:end-kernel :wait nil))))
|
||||
|
||||
@ -155,8 +155,17 @@
|
||||
((error #'(lambda (condition)
|
||||
(log-message :error "A thread failed with error: ~a"
|
||||
condition)
|
||||
#-pgloader-image
|
||||
(if (member *client-min-messages* (list :debug :data))
|
||||
(lp::invoke-debugger condition)
|
||||
(lp::invoke-transfer-error condition))
|
||||
#+pgloader-image
|
||||
(if (member *client-min-messages* (list :debug :data))
|
||||
(log-message :fatal "Backtrace: ~a"
|
||||
(trivial-backtrace:print-backtrace
|
||||
condition
|
||||
:output nil
|
||||
:verbose t))
|
||||
(lp::invoke-transfer-error condition)))))
|
||||
(log-message :info "COPY ~s" table-name)
|
||||
|
||||
|
||||
@ -14,6 +14,7 @@ REGRESS= allcols.load \
|
||||
csv-header.load \
|
||||
csv-json.load \
|
||||
csv-keep-extra-blanks.load \
|
||||
csv-missing-col.load \
|
||||
csv-non-printable.load \
|
||||
csv-nulls.load \
|
||||
csv-temp.load \
|
||||
|
||||
22
test/csv-missing-col.load
Normal file
22
test/csv-missing-col.load
Normal file
@ -0,0 +1,22 @@
|
||||
LOAD CSV
|
||||
FROM inline (a, b, c, d, e, f, g)
|
||||
INTO postgresql:///pgloader?missingcol (a, b, c, d, e, f, g)
|
||||
|
||||
WITH truncate,
|
||||
fields optionally enclosed by '"',
|
||||
fields escaped by double-quote,
|
||||
fields terminated by ','
|
||||
|
||||
BEFORE LOAD DO
|
||||
$$ drop table if exists missingcol; $$,
|
||||
$$ create table missingcol (
|
||||
a text, b text, x text, d text, e text, f text, g text
|
||||
);
|
||||
$$;
|
||||
|
||||
"2.6.190.56","2.6.190.63","33996344","33996351","GB","United Kingdom"
|
||||
"3.0.0.0","4.17.135.31","50331648","68257567","US","United States"
|
||||
"4.17.135.32","4.17.135.63","68257568","68257599","CA","Canada"
|
||||
"4.17.135.64","4.17.142.255","68257600","68259583","US","United States"
|
||||
"4.17.143.0","4.17.143.15","68259584","68259599","CA","Canada"
|
||||
"4.17.143.16","4.18.32.71","68259600","68296775","US","United States"
|
||||
0
test/regress/expected/csv-missing-col.out
Normal file
0
test/regress/expected/csv-missing-col.out
Normal file
Loading…
x
Reference in New Issue
Block a user