mirror of
https://github.com/dimitri/pgloader.git
synced 2026-05-05 10:56:10 +02:00
Refactor PostgreSQL error handling.
The code was too complex and the transaction / connection handling wasn't good enough, too many reconnections when a ROLLBACK; is all we need to be able to continue our processing. Also fix some stats counters about errors handled, and improve error message by adding PostgreSQL explicitely, and the name of the table where the error comes from.
This commit is contained in:
parent
3eab88b144
commit
3f7853491f
@ -354,6 +354,8 @@
|
||||
(:use #:cl
|
||||
#:pgloader.params #:pgloader.utils #:pgloader.connection
|
||||
#:pgloader.catalog)
|
||||
(:import-from #:cl-postgres
|
||||
#:database-error-context)
|
||||
(:export #:pgsql-connection
|
||||
#:pgconn-use-ssl
|
||||
#:pgconn-table-name
|
||||
|
||||
@ -49,8 +49,17 @@
|
||||
"File where to read the PostgreSQL Client Side SSL Private Key.")
|
||||
|
||||
;;;
|
||||
;;; PostgreSQL errors types for pgloader.
|
||||
;;;
|
||||
;;;
|
||||
(deftype postgresql-retryable ()
|
||||
"PostgreSQL errors that we know how to retry in a batch."
|
||||
`(or
|
||||
cl-postgres-error::data-exception
|
||||
cl-postgres-error::integrity-violation
|
||||
cl-postgres-error:internal-error
|
||||
cl-postgres-error::insufficient-resources
|
||||
cl-postgres-error::program-limit-exceeded))
|
||||
|
||||
(deftype postgresql-unavailable ()
|
||||
"It might happen that PostgreSQL becomes unavailable in the middle of
|
||||
our processing: it being restarted is an example."
|
||||
|
||||
@ -32,7 +32,8 @@
|
||||
;; We need to keep a copy of the rows we send through the COPY
|
||||
;; protocol to PostgreSQL to be able to process them again in case
|
||||
;; of a data error being signaled, that's the BATCH here.
|
||||
(let ((pomo:*database* db))
|
||||
(let ((table-name (format-table-name table))
|
||||
(pomo:*database* db))
|
||||
;; We can't use with-pgsql-transaction here because of the specifics
|
||||
;; of error handling in case of cl-postgres:open-db-writer errors: the
|
||||
;; transaction is dead already when we get a signal, and the COMMIT or
|
||||
@ -40,8 +41,7 @@
|
||||
(handler-case
|
||||
(progn
|
||||
(pomo:execute "BEGIN")
|
||||
(let* ((table-name (format-table-name table))
|
||||
(copier
|
||||
(let* ((copier
|
||||
(handler-case
|
||||
(cl-postgres:open-db-writer db table-name columns)
|
||||
(condition (c)
|
||||
@ -66,30 +66,24 @@
|
||||
|
||||
;; If PostgreSQL signals a data error, process the batch by isolating
|
||||
;; erroneous data away and retrying the rest.
|
||||
((or
|
||||
cl-postgres-error::data-exception
|
||||
cl-postgres-error::integrity-violation
|
||||
cl-postgres-error::internal-error
|
||||
cl-postgres-error::insufficient-resources
|
||||
cl-postgres-error::program-limit-exceeded) (condition)
|
||||
|
||||
(postgresql-retryable (condition)
|
||||
(pomo:execute "ROLLBACK")
|
||||
|
||||
(log-message :error "PostgreSQL [~s] ~a" table-name condition)
|
||||
(if on-error-stop
|
||||
;; re-signal the condition to upper level
|
||||
(progn
|
||||
(log-message :error "~a" condition)
|
||||
(signal 'on-error-stop :on-condition condition))
|
||||
(signal 'on-error-stop :on-condition condition)
|
||||
|
||||
;; normal behavior, on-error-stop being nil
|
||||
;; clean the current transaction before retrying new ones
|
||||
(progn
|
||||
(log-message :error "~a" condition)
|
||||
(retry-batch table columns batch batch-rows condition))))
|
||||
(let ((errors
|
||||
(retry-batch table columns batch batch-rows condition)))
|
||||
(log-message :debug "retry-batch found ~d errors" errors)
|
||||
(update-stats :data table :rows (- errors)))))
|
||||
|
||||
(postgresql-unavailable (condition)
|
||||
|
||||
(log-message :error "~a" condition)
|
||||
(log-message :error "[PostgreSQL ~s] ~a" table-name condition)
|
||||
(log-message :error "Copy Batch reconnecting to PostgreSQL")
|
||||
|
||||
;; in order to avoid Socket error in "connect": ECONNREFUSED if we
|
||||
@ -202,6 +196,8 @@
|
||||
(pgloader.sources::transforms copy)
|
||||
pre-formatted)
|
||||
(condition (e)
|
||||
(log-message :error "Error while formating a row from ~s:"
|
||||
(format-table-name (pgloader.sources:target copy)))
|
||||
(log-message :error "~a" e)
|
||||
(update-stats :data (pgloader.sources:target copy) :errs 1)
|
||||
(values nil 0)))))
|
||||
|
||||
@ -63,16 +63,18 @@
|
||||
(log-message :info "Entering error recovery.")
|
||||
|
||||
(loop
|
||||
:with next-error = (parse-copy-error-context
|
||||
(cl-postgres::database-error-context condition))
|
||||
:with table-name := (format-table-name table)
|
||||
:with next-error := (parse-copy-error-context
|
||||
(database-error-context condition))
|
||||
|
||||
:while (< current-batch-pos batch-rows)
|
||||
|
||||
:do
|
||||
(progn ; indenting helper
|
||||
(progn ; indenting helper
|
||||
(log-message :debug "pos: ~s ; err: ~a" current-batch-pos next-error)
|
||||
(when (= current-batch-pos next-error)
|
||||
(log-message :info "error recovery at ~d/~d, processing bad row"
|
||||
(+ 1 next-error) batch-rows)
|
||||
current-batch-pos batch-rows)
|
||||
(process-bad-row table condition (aref batch current-batch-pos))
|
||||
(incf current-batch-pos)
|
||||
(incf nb-errors))
|
||||
@ -80,46 +82,61 @@
|
||||
(let* ((current-batch-rows
|
||||
(next-batch-rows batch-rows current-batch-pos next-error)))
|
||||
(when (< 0 current-batch-rows)
|
||||
(handler-case
|
||||
(with-pgsql-transaction (:database pomo:*database*)
|
||||
(let* ((table-name (format-table-name table))
|
||||
(stream
|
||||
(cl-postgres:open-db-writer pomo:*database*
|
||||
table-name columns)))
|
||||
(if (< current-batch-pos next-error)
|
||||
(log-message :info
|
||||
"error recovery at ~d/~d, next error at ~d, ~
|
||||
loading ~d row~:p"
|
||||
current-batch-pos
|
||||
batch-rows
|
||||
next-error
|
||||
current-batch-rows)
|
||||
(log-message :info
|
||||
"error recovery at ~d/~d, trying ~d row~:p"
|
||||
current-batch-pos batch-rows current-batch-rows))
|
||||
|
||||
(if (< current-batch-pos next-error)
|
||||
(log-message :info
|
||||
"error recovery at ~d/~d, next error at ~d, loading ~d row~:p"
|
||||
current-batch-pos batch-rows (+ 1 next-error) current-batch-rows)
|
||||
(log-message :info
|
||||
"error recovery at ~d/~d, trying ~d row~:p"
|
||||
current-batch-pos batch-rows current-batch-rows))
|
||||
(handler-case
|
||||
(incf current-batch-pos
|
||||
(copy-partial-batch table-name
|
||||
columns
|
||||
batch
|
||||
current-batch-rows
|
||||
current-batch-pos))
|
||||
|
||||
(unwind-protect
|
||||
(loop :repeat current-batch-rows
|
||||
:for pos :from current-batch-pos
|
||||
:do (db-write-row stream (aref batch pos)))
|
||||
;; the batch didn't make it, prepare error handling for next turn
|
||||
(postgresql-retryable (next-error-in-batch)
|
||||
(pomo:execute "ROLLBACK")
|
||||
(log-message :error "PostgreSQL [~s] ~a"
|
||||
table-name
|
||||
next-error-in-batch)
|
||||
(let ((next-error-relative
|
||||
(parse-copy-error-context
|
||||
(database-error-context next-error-in-batch))))
|
||||
|
||||
;; close-db-writer is the one signaling cl-postgres-errors
|
||||
(cl-postgres:close-db-writer stream)
|
||||
(incf current-batch-pos current-batch-rows))))
|
||||
(setf condition next-error-in-batch
|
||||
next-error (+ current-batch-pos next-error-relative)))))))))
|
||||
|
||||
;; the batch didn't make it, prepare error handling for next turn
|
||||
((or
|
||||
cl-postgres-error::data-exception
|
||||
cl-postgres-error::integrity-violation
|
||||
cl-postgres-error:internal-error
|
||||
cl-postgres-error::insufficient-resources
|
||||
cl-postgres-error::program-limit-exceeded) (next-error-in-batch)
|
||||
(log-message :info "Recovery found ~d errors in ~d row~:p"
|
||||
nb-errors batch-rows)
|
||||
|
||||
(setf condition next-error-in-batch
|
||||
;; Return how many rows where erroneous, for statistics purposes
|
||||
nb-errors)
|
||||
|
||||
next-error
|
||||
(+ current-batch-pos
|
||||
(parse-copy-error-context
|
||||
(cl-postgres::database-error-context condition))))))))))
|
||||
(defun copy-partial-batch (table-name columns
|
||||
batch current-batch-rows current-batch-pos)
|
||||
"Copy some rows of the batch, not all of them."
|
||||
(pomo:execute "BEGIN;")
|
||||
(let ((stream
|
||||
(cl-postgres:open-db-writer pomo:*database* table-name columns)))
|
||||
|
||||
(log-message :info "Recovery found ~d errors in ~d row~:p" nb-errors batch-rows)
|
||||
(unwind-protect
|
||||
(loop :repeat current-batch-rows
|
||||
:for pos :from current-batch-pos
|
||||
:do (db-write-row stream (aref batch pos)))
|
||||
|
||||
;; Return how many rows we did load, for statistics purposes
|
||||
(- batch-rows nb-errors))
|
||||
;; close-db-writer is the one signaling cl-postgres-errors
|
||||
(progn
|
||||
(cl-postgres:close-db-writer stream)
|
||||
(pomo:execute "COMMIT;")))
|
||||
|
||||
;; return how many rows we loaded, which is current-batch-rows
|
||||
current-batch-rows))
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user