diff --git a/src/main.lisp b/src/main.lisp index e9b75ca..39f5848 100644 --- a/src/main.lisp +++ b/src/main.lisp @@ -350,7 +350,8 @@ (uiop:quit +os-code-error-bad-source+)) (condition (c) - (when debug (invoke-debugger c)) + (log-message :fatal "~a" c) + (print-backtrace c debug *standard-output*) (uiop:quit +os-code-error+)))))) ;; done. diff --git a/src/pgsql/pgsql.lisp b/src/pgsql/pgsql.lisp index d67242d..7f8045c 100644 --- a/src/pgsql/pgsql.lisp +++ b/src/pgsql/pgsql.lisp @@ -27,30 +27,57 @@ (defun copy-batch (table columns batch batch-rows &key (db pomo:*database*)) "Copy current *writer-batch* into TABLE-NAME." - (handler-case - (with-pgsql-transaction (:database db) - ;; We need to keep a copy of the rows we send through the COPY - ;; protocol to PostgreSQL to be able to process them again in case - ;; of a data error being signaled, that's the BATCH here. - (let* ((table-name (format-table-name table)) - (copier (cl-postgres:open-db-writer db table-name columns))) - (unwind-protect - (loop :for i :below batch-rows - :for data := (aref batch i) - :do (when data - (db-write-row copier data)) - :finally (return batch-rows)) - (cl-postgres:close-db-writer copier)))) + ;; We need to keep a copy of the rows we send through the COPY + ;; protocol to PostgreSQL to be able to process them again in case + ;; of a data error being signaled, that's the BATCH here. + (let ((pomo:*database* db)) + (handling-pgsql-notices + ;; We can't use with-pgsql-transaction here because of the specifics + ;; of error handling in case of cl-postgres:open-db-writer errors: the + ;; transaction is dead already when we get a signal, and the COMMIT or + ;; ABORT steps then trigger a protocol error on a #\Z message. + (pomo:execute "BEGIN") + (handler-case + (let* ((table-name (format-table-name table)) + (copier + (handler-case + (cl-postgres:open-db-writer db table-name columns) + (condition (c) + ;; failed to open the COPY protocol mode (e.g. missing + ;; columns on the target table), stop here, + ;; transaction is dead already (no ROLLBACK needed). + (log-message :fatal + "Can't init COPY to ~a~@[(~{~a~^, ~})~]: ~%~a" + (format-table-name table) + columns + c) + (update-stats :data table :errs 1) + (return-from copy-batch 0))))) + (unwind-protect + (loop :for i :below batch-rows + :for data := (aref batch i) + :do (when data + (db-write-row copier data)) + :finally (return batch-rows)) + (cl-postgres:close-db-writer copier) + (pomo:execute "COMMIT"))) - ;; If PostgreSQL signals a data error, process the batch by isolating - ;; erroneous data away and retrying the rest. - ((or - cl-postgres-error::data-exception - cl-postgres-error::integrity-violation - cl-postgres-error::internal-error - cl-postgres-error::insufficient-resources - cl-postgres-error::program-limit-exceeded) (condition) - (retry-batch table columns batch batch-rows condition)))) + ;; If PostgreSQL signals a data error, process the batch by isolating + ;; erroneous data away and retrying the rest. + ((or + cl-postgres-error::data-exception + cl-postgres-error::integrity-violation + cl-postgres-error::internal-error + cl-postgres-error::insufficient-resources + cl-postgres-error::program-limit-exceeded) (condition) + ;; clean the current transaction before retrying new ones + (pomo:execute "ROLLBACK") + (retry-batch table columns batch batch-rows condition)) + + (condition (c) + ;; non retryable failures + (log-message :error "Non-retryable error ~a" c) + (pomo:execute "ROLLBACK")))))) ;;; ;;; We receive fully prepared batch from an lparallel queue, push their @@ -83,7 +110,9 @@ :do (progn ;; The SBCL implementation needs some Garbage Collection ;; decision making help... and now is a pretty good time. - #+sbcl (when oversized? (sb-ext:gc :full t)) + #+sbcl (when oversized? + (log-message :debug "Forcing a full GC.") + (sb-ext:gc :full t)) (log-message :debug "copy-batch[~a] ~a ~d row~:p in ~6$s~@[ [oversized]~]" (lp:kernel-worker-index) diff --git a/src/sources/common/md-methods.lisp b/src/sources/common/md-methods.lisp index b69b570..a6288ae 100644 --- a/src/sources/common/md-methods.lisp +++ b/src/sources/common/md-methods.lisp @@ -126,13 +126,16 @@ (let ((worker-count (* (length path-list) (task-count concurrency)))) (loop :for tasks :below worker-count - :do (destructuring-bind (task table seconds) - (lp:receive-result channel) - (log-message :debug - "Finished processing ~a for ~s ~50T~6$s" - task (format-table-name table) seconds) - (when (eq :writer task) - (update-stats :data table :secs seconds)))) + :do (handler-case + (destructuring-bind (task table seconds) + (lp:receive-result channel) + (log-message :debug + "Finished processing ~a for ~s ~50T~6$s" + task (format-table-name table) seconds) + (when (eq :writer task) + (update-stats :data table :secs seconds))) + (condition (e) + (log-message :fatal "~a" e)))) (prog1 worker-count (lp:end-kernel :wait nil)))) diff --git a/src/sources/common/methods.lisp b/src/sources/common/methods.lisp index 447f1c7..39a8288 100644 --- a/src/sources/common/methods.lisp +++ b/src/sources/common/methods.lisp @@ -155,8 +155,17 @@ ((error #'(lambda (condition) (log-message :error "A thread failed with error: ~a" condition) + #-pgloader-image (if (member *client-min-messages* (list :debug :data)) (lp::invoke-debugger condition) + (lp::invoke-transfer-error condition)) + #+pgloader-image + (if (member *client-min-messages* (list :debug :data)) + (log-message :fatal "Backtrace: ~a" + (trivial-backtrace:print-backtrace + condition + :output nil + :verbose t)) (lp::invoke-transfer-error condition))))) (log-message :info "COPY ~s" table-name) diff --git a/test/Makefile b/test/Makefile index 1aa1d62..6161c6b 100644 --- a/test/Makefile +++ b/test/Makefile @@ -14,6 +14,7 @@ REGRESS= allcols.load \ csv-header.load \ csv-json.load \ csv-keep-extra-blanks.load \ + csv-missing-col.load \ csv-non-printable.load \ csv-nulls.load \ csv-temp.load \ diff --git a/test/csv-missing-col.load b/test/csv-missing-col.load new file mode 100644 index 0000000..9390cc2 --- /dev/null +++ b/test/csv-missing-col.load @@ -0,0 +1,22 @@ +LOAD CSV + FROM inline (a, b, c, d, e, f, g) + INTO postgresql:///pgloader?missingcol (a, b, c, d, e, f, g) + + WITH truncate, + fields optionally enclosed by '"', + fields escaped by double-quote, + fields terminated by ',' + + BEFORE LOAD DO + $$ drop table if exists missingcol; $$, + $$ create table missingcol ( + a text, b text, x text, d text, e text, f text, g text + ); + $$; + +"2.6.190.56","2.6.190.63","33996344","33996351","GB","United Kingdom" +"3.0.0.0","4.17.135.31","50331648","68257567","US","United States" +"4.17.135.32","4.17.135.63","68257568","68257599","CA","Canada" +"4.17.135.64","4.17.142.255","68257600","68259583","US","United States" +"4.17.143.0","4.17.143.15","68259584","68259599","CA","Canada" +"4.17.143.16","4.18.32.71","68259600","68296775","US","United States" diff --git a/test/regress/expected/csv-missing-col.out b/test/regress/expected/csv-missing-col.out new file mode 100644 index 0000000..e69de29