diff --git a/src/package.lisp b/src/package.lisp index 4b4b569..b2379a2 100644 --- a/src/package.lisp +++ b/src/package.lisp @@ -354,6 +354,8 @@ (:use #:cl #:pgloader.params #:pgloader.utils #:pgloader.connection #:pgloader.catalog) + (:import-from #:cl-postgres + #:database-error-context) (:export #:pgsql-connection #:pgconn-use-ssl #:pgconn-table-name diff --git a/src/pgsql/connection.lisp b/src/pgsql/connection.lisp index 65528d7..0cfe869 100644 --- a/src/pgsql/connection.lisp +++ b/src/pgsql/connection.lisp @@ -49,8 +49,17 @@ "File where to read the PostgreSQL Client Side SSL Private Key.") ;;; +;;; PostgreSQL errors types for pgloader. ;;; -;;; +(deftype postgresql-retryable () + "PostgreSQL errors that we know how to retry in a batch." + `(or + cl-postgres-error::data-exception + cl-postgres-error::integrity-violation + cl-postgres-error:internal-error + cl-postgres-error::insufficient-resources + cl-postgres-error::program-limit-exceeded)) + (deftype postgresql-unavailable () "It might happen that PostgreSQL becomes unavailable in the middle of our processing: it being restarted is an example." diff --git a/src/pgsql/copy-from-queue.lisp b/src/pgsql/copy-from-queue.lisp index 8b10dc5..27b6de8 100644 --- a/src/pgsql/copy-from-queue.lisp +++ b/src/pgsql/copy-from-queue.lisp @@ -32,7 +32,8 @@ ;; We need to keep a copy of the rows we send through the COPY ;; protocol to PostgreSQL to be able to process them again in case ;; of a data error being signaled, that's the BATCH here. - (let ((pomo:*database* db)) + (let ((table-name (format-table-name table)) + (pomo:*database* db)) ;; We can't use with-pgsql-transaction here because of the specifics ;; of error handling in case of cl-postgres:open-db-writer errors: the ;; transaction is dead already when we get a signal, and the COMMIT or @@ -40,8 +41,7 @@ (handler-case (progn (pomo:execute "BEGIN") - (let* ((table-name (format-table-name table)) - (copier + (let* ((copier (handler-case (cl-postgres:open-db-writer db table-name columns) (condition (c) @@ -66,30 +66,24 @@ ;; If PostgreSQL signals a data error, process the batch by isolating ;; erroneous data away and retrying the rest. - ((or - cl-postgres-error::data-exception - cl-postgres-error::integrity-violation - cl-postgres-error::internal-error - cl-postgres-error::insufficient-resources - cl-postgres-error::program-limit-exceeded) (condition) - + (postgresql-retryable (condition) (pomo:execute "ROLLBACK") + (log-message :error "PostgreSQL [~s] ~a" table-name condition) (if on-error-stop ;; re-signal the condition to upper level - (progn - (log-message :error "~a" condition) - (signal 'on-error-stop :on-condition condition)) + (signal 'on-error-stop :on-condition condition) ;; normal behavior, on-error-stop being nil ;; clean the current transaction before retrying new ones - (progn - (log-message :error "~a" condition) - (retry-batch table columns batch batch-rows condition)))) + (let ((errors + (retry-batch table columns batch batch-rows condition))) + (log-message :debug "retry-batch found ~d errors" errors) + (update-stats :data table :rows (- errors))))) (postgresql-unavailable (condition) - (log-message :error "~a" condition) + (log-message :error "[PostgreSQL ~s] ~a" table-name condition) (log-message :error "Copy Batch reconnecting to PostgreSQL") ;; in order to avoid Socket error in "connect": ECONNREFUSED if we @@ -202,6 +196,8 @@ (pgloader.sources::transforms copy) pre-formatted) (condition (e) + (log-message :error "Error while formating a row from ~s:" + (format-table-name (pgloader.sources:target copy))) (log-message :error "~a" e) (update-stats :data (pgloader.sources:target copy) :errs 1) (values nil 0))))) diff --git a/src/pgsql/retry-batch.lisp b/src/pgsql/retry-batch.lisp index d783fe3..7e24c2f 100644 --- a/src/pgsql/retry-batch.lisp +++ b/src/pgsql/retry-batch.lisp @@ -63,16 +63,18 @@ (log-message :info "Entering error recovery.") (loop - :with next-error = (parse-copy-error-context - (cl-postgres::database-error-context condition)) + :with table-name := (format-table-name table) + :with next-error := (parse-copy-error-context + (database-error-context condition)) :while (< current-batch-pos batch-rows) :do - (progn ; indenting helper + (progn ; indenting helper + (log-message :debug "pos: ~s ; err: ~a" current-batch-pos next-error) (when (= current-batch-pos next-error) (log-message :info "error recovery at ~d/~d, processing bad row" - (+ 1 next-error) batch-rows) + current-batch-pos batch-rows) (process-bad-row table condition (aref batch current-batch-pos)) (incf current-batch-pos) (incf nb-errors)) @@ -80,46 +82,61 @@ (let* ((current-batch-rows (next-batch-rows batch-rows current-batch-pos next-error))) (when (< 0 current-batch-rows) - (handler-case - (with-pgsql-transaction (:database pomo:*database*) - (let* ((table-name (format-table-name table)) - (stream - (cl-postgres:open-db-writer pomo:*database* - table-name columns))) + (if (< current-batch-pos next-error) + (log-message :info + "error recovery at ~d/~d, next error at ~d, ~ + loading ~d row~:p" + current-batch-pos + batch-rows + next-error + current-batch-rows) + (log-message :info + "error recovery at ~d/~d, trying ~d row~:p" + current-batch-pos batch-rows current-batch-rows)) - (if (< current-batch-pos next-error) - (log-message :info - "error recovery at ~d/~d, next error at ~d, loading ~d row~:p" - current-batch-pos batch-rows (+ 1 next-error) current-batch-rows) - (log-message :info - "error recovery at ~d/~d, trying ~d row~:p" - current-batch-pos batch-rows current-batch-rows)) + (handler-case + (incf current-batch-pos + (copy-partial-batch table-name + columns + batch + current-batch-rows + current-batch-pos)) - (unwind-protect - (loop :repeat current-batch-rows - :for pos :from current-batch-pos - :do (db-write-row stream (aref batch pos))) + ;; the batch didn't make it, prepare error handling for next turn + (postgresql-retryable (next-error-in-batch) + (pomo:execute "ROLLBACK") + (log-message :error "PostgreSQL [~s] ~a" + table-name + next-error-in-batch) + (let ((next-error-relative + (parse-copy-error-context + (database-error-context next-error-in-batch)))) - ;; close-db-writer is the one signaling cl-postgres-errors - (cl-postgres:close-db-writer stream) - (incf current-batch-pos current-batch-rows)))) + (setf condition next-error-in-batch + next-error (+ current-batch-pos next-error-relative))))))))) - ;; the batch didn't make it, prepare error handling for next turn - ((or - cl-postgres-error::data-exception - cl-postgres-error::integrity-violation - cl-postgres-error:internal-error - cl-postgres-error::insufficient-resources - cl-postgres-error::program-limit-exceeded) (next-error-in-batch) + (log-message :info "Recovery found ~d errors in ~d row~:p" + nb-errors batch-rows) - (setf condition next-error-in-batch + ;; Return how many rows where erroneous, for statistics purposes + nb-errors) - next-error - (+ current-batch-pos - (parse-copy-error-context - (cl-postgres::database-error-context condition)))))))))) +(defun copy-partial-batch (table-name columns + batch current-batch-rows current-batch-pos) + "Copy some rows of the batch, not all of them." + (pomo:execute "BEGIN;") + (let ((stream + (cl-postgres:open-db-writer pomo:*database* table-name columns))) - (log-message :info "Recovery found ~d errors in ~d row~:p" nb-errors batch-rows) + (unwind-protect + (loop :repeat current-batch-rows + :for pos :from current-batch-pos + :do (db-write-row stream (aref batch pos))) - ;; Return how many rows we did load, for statistics purposes - (- batch-rows nb-errors)) + ;; close-db-writer is the one signaling cl-postgres-errors + (progn + (cl-postgres:close-db-writer stream) + (pomo:execute "COMMIT;"))) + + ;; return how many rows we loaded, which is current-batch-rows + current-batch-rows))