Reconnect to PostgreSQL in case of connection lost.

It may happen that PostgreSQL is restarted while pgloader is running, or
that for some other reason we lose the connection to the server, and in most
cases we know how to gracefully reconnect and retry, so just do so.

Fixes #546 initial report.
This commit is contained in:
Dimitri Fontaine 2017-06-29 01:55:00 +02:00
parent f0d1f4ef8c
commit cea82a6aa8
3 changed files with 160 additions and 71 deletions

View File

@ -140,9 +140,18 @@
been applied. PostgreSQL warnings and errors are logged at the
appropriate log level."
`(handler-bind
((cl-postgres:database-error
#'(lambda (e)
(log-message :error "~a" e)))
(((and cl-postgres:database-error
(not (or
cl-postgres-error::server-shutdown
cl-postgres-error::admin-shutdown
cl-postgres-error::crash-shutdown
cl-postgres-error::operator-intervention
cl-postgres-error::cannot-connect-now
cl-postgres-error::database-connection-error
cl-postgres-error::database-connection-lost
cl-postgres-error::database-socket-error)))
#'(lambda (e)
(log-message :error "~a" e)))
(cl-postgres:postgresql-warning
#'(lambda (w)
(log-message :warning "~a" w)
@ -235,9 +244,31 @@
(defun pgsql-connect-and-execute-with-timing (pgconn section label sql
&key (count 1))
"Run pgsql-execute-with-timing within a newly establised connection."
(with-pgsql-connection (pgconn)
(pomo:with-transaction ()
(pgsql-execute-with-timing section label sql :count count))))
(handler-case
(with-pgsql-connection (pgconn)
(pomo:with-transaction ()
(pgsql-execute-with-timing section label sql :count count)))
((or
cl-postgres-error::server-shutdown
cl-postgres-error::admin-shutdown
cl-postgres-error::crash-shutdown
cl-postgres-error::operator-intervention
cl-postgres-error::cannot-connect-now
cl-postgres-error::database-connection-error
cl-postgres-error::database-connection-lost
cl-postgres-error::database-socket-error)
(condition)
(log-message :error "~a" condition)
(log-message :error "Reconnecting to PostgreSQL")
;; in order to avoid Socket error in "connect": ECONNREFUSED if we
;; try just too soon, wait a little
(sleep 2)
(pgsql-connect-and-execute-with-timing
pgconn section label sql :count count))))
(defun pgsql-execute-with-timing (section label sql
&key

View File

@ -33,13 +33,13 @@
;; protocol to PostgreSQL to be able to process them again in case
;; of a data error being signaled, that's the BATCH here.
(let ((pomo:*database* db))
(handling-pgsql-notices
;; We can't use with-pgsql-transaction here because of the specifics
;; of error handling in case of cl-postgres:open-db-writer errors: the
;; transaction is dead already when we get a signal, and the COMMIT or
;; ABORT steps then trigger a protocol error on a #\Z message.
(pomo:execute "BEGIN")
(handler-case
;; We can't use with-pgsql-transaction here because of the specifics
;; of error handling in case of cl-postgres:open-db-writer errors: the
;; transaction is dead already when we get a signal, and the COMMIT or
;; ABORT steps then trigger a protocol error on a #\Z message.
(handler-case
(progn
(pomo:execute "BEGIN")
(let* ((table-name (format-table-name table))
(copier
(handler-case
@ -62,35 +62,58 @@
(db-write-row copier data))
:finally (return batch-rows))
(cl-postgres:close-db-writer copier)
(pomo:execute "COMMIT")))
(pomo:execute "COMMIT"))))
;; If PostgreSQL signals a data error, process the batch by isolating
;; erroneous data away and retrying the rest.
((or
cl-postgres-error::data-exception
cl-postgres-error::integrity-violation
cl-postgres-error::internal-error
cl-postgres-error::insufficient-resources
cl-postgres-error::program-limit-exceeded) (condition)
;; If PostgreSQL signals a data error, process the batch by isolating
;; erroneous data away and retrying the rest.
((or
cl-postgres-error::data-exception
cl-postgres-error::integrity-violation
cl-postgres-error::internal-error
cl-postgres-error::insufficient-resources
cl-postgres-error::program-limit-exceeded) (condition)
(pomo:execute "ROLLBACK")
(pomo:execute "ROLLBACK")
(if on-error-stop
;; re-signal the condition to upper level
(progn
(log-message :error "~a" condition)
(signal 'on-error-stop :on-condition condition))
(if on-error-stop
;; re-signal the condition to upper level
(progn
(log-message :error "~a" condition)
(signal 'on-error-stop :on-condition condition))
;; normal behavior, on-error-stop being nil
;; clean the current transaction before retrying new ones
(progn
(log-message :error "~a" condition)
(retry-batch table columns batch batch-rows condition))))
;; normal behavior, on-error-stop being nil
;; clean the current transaction before retrying new ones
(progn
(log-message :error "~a" condition)
(retry-batch table columns batch batch-rows condition))))
(condition (c)
;; non retryable failures
(log-message :error "Non-retryable error ~a" c)
(pomo:execute "ROLLBACK"))))))
((or
cl-postgres-error::server-shutdown
cl-postgres-error::admin-shutdown
cl-postgres-error::crash-shutdown
cl-postgres-error::operator-intervention
cl-postgres-error::cannot-connect-now
cl-postgres-error::database-connection-error
cl-postgres-error::database-connection-lost
cl-postgres-error::database-socket-error)
(condition)
(log-message :error "~a" condition)
(log-message :error "Copy Batch reconnecting to PostgreSQL")
;; in order to avoid Socket error in "connect": ECONNREFUSED if we
;; try just too soon, wait a little
(sleep 2)
(cl-postgres:reopen-database db)
(copy-batch table columns batch batch-rows
:db db
:on-error-stop on-error-stop))
(condition (c)
;; non retryable failures
(log-message :error "Non-retryable error ~a" c)
(pomo:execute "ROLLBACK")))))
;;;
;;; We receive raw input rows from an lparallel queue, push their content

View File

@ -24,7 +24,7 @@
tables for materialized views.
That function mutates index definitions in ALL-INDEXES."
(log-message :info "~:[~;DROP then ~]CREATE TABLES" include-drop)
(log-message :notice "Prepare PostgreSQL database.")
(with-pgsql-transaction (:pgconn (target-db copy))
(when create-schemas
@ -131,45 +131,80 @@
;; able to benefit from the indexes. In particular avoid doing that step
;; while CREATE INDEX statements are in flight (avoid locking).
;;
(log-message :notice "Complete PostgreSQL database.")
(when reset-sequences
(reset-sequences (clone-connection (target-db copy)) catalog))
(with-pgsql-connection ((clone-connection (target-db copy)))
;;
;; Turn UNIQUE indexes into PRIMARY KEYS now
;;
(when create-indexes
(pgsql-execute-with-timing :post "Primary Keys"
pkeys
:count (length pkeys))
(handler-case
(with-pgsql-transaction (:pgconn (clone-connection (target-db copy)))
;;
;; Turn UNIQUE indexes into PRIMARY KEYS now
;;
(when create-indexes
(pgsql-execute-with-timing :post "Primary Keys"
pkeys
:count (length pkeys))
;;
;; Foreign Key Constraints
;;
;; We need to have finished loading both the reference and the refering
;; tables to be able to build the foreign keys, so wait until all tables
;; and indexes are imported before doing that.
;;
(when foreign-keys
(create-pgsql-fkeys catalog
:section :post
:label "Create Foreign Keys"))
;;
;; Foreign Key Constraints
;;
;; We need to have finished loading both the reference and the
;; refering tables to be able to build the foreign keys, so wait
;; until all tables and indexes are imported before doing that.
;;
(when foreign-keys
(create-pgsql-fkeys catalog
:section :post
:label "Create Foreign Keys"))
;;
;; Triggers and stored procedures -- includes special default values
;;
(when create-triggers
(with-pgsql-transaction (:pgconn (target-db copy))
(create-triggers catalog
:section :post
:label "Create Triggers"))))
;;
;; Triggers and stored procedures -- includes special default values
;;
(when create-triggers
(create-triggers catalog
:section :post
:label "Create Triggers")))
;;
;; And now, comments on tables and columns.
;;
(comment-on-tables-and-columns catalog
:section :post
:label "Install Comments")))
;;
;; And now, comments on tables and columns.
;;
(comment-on-tables-and-columns catalog
:section :post
:label "Install Comments"))
((or
cl-postgres-error::server-shutdown
cl-postgres-error::admin-shutdown
cl-postgres-error::crash-shutdown
cl-postgres-error::operator-intervention
cl-postgres-error::cannot-connect-now
cl-postgres-error::database-connection-error
cl-postgres-error::database-connection-lost
cl-postgres-error::database-socket-error)
(condition)
(log-message :error "~a" condition)
(log-message :error
"Complete PostgreSQL database reconnecting to PostgreSQL.")
;; in order to avoid Socket error in "connect": ECONNREFUSED if we
;; try just too soon, wait a little
(sleep 2)
;;
;; Reset Sequence can be done several times safely, and the rest of the
;; operations run in a single transaction, so if the connection was lost,
;; nothing has been done. Retry.
;;
(complete-pgsql-database copy
catalog
pkeys
:foreign-keys foreign-keys
:create-indexes create-indexes
:create-triggers create-triggers
:reset-sequences reset-sequences))))
(defmethod instanciate-table-copy-object ((copy db-copy) (table table))
"Create an new instance for copying TABLE data."