diff --git a/src/pgsql/connection.lisp b/src/pgsql/connection.lisp index d6fe832..69cfe8b 100644 --- a/src/pgsql/connection.lisp +++ b/src/pgsql/connection.lisp @@ -140,9 +140,18 @@ been applied. PostgreSQL warnings and errors are logged at the appropriate log level." `(handler-bind - ((cl-postgres:database-error - #'(lambda (e) - (log-message :error "~a" e))) + (((and cl-postgres:database-error + (not (or + cl-postgres-error::server-shutdown + cl-postgres-error::admin-shutdown + cl-postgres-error::crash-shutdown + cl-postgres-error::operator-intervention + cl-postgres-error::cannot-connect-now + cl-postgres-error::database-connection-error + cl-postgres-error::database-connection-lost + cl-postgres-error::database-socket-error))) + #'(lambda (e) + (log-message :error "~a" e))) (cl-postgres:postgresql-warning #'(lambda (w) (log-message :warning "~a" w) @@ -235,9 +244,31 @@ (defun pgsql-connect-and-execute-with-timing (pgconn section label sql &key (count 1)) "Run pgsql-execute-with-timing within a newly establised connection." - (with-pgsql-connection (pgconn) - (pomo:with-transaction () - (pgsql-execute-with-timing section label sql :count count)))) + (handler-case + (with-pgsql-connection (pgconn) + (pomo:with-transaction () + (pgsql-execute-with-timing section label sql :count count))) + + ((or + cl-postgres-error::server-shutdown + cl-postgres-error::admin-shutdown + cl-postgres-error::crash-shutdown + cl-postgres-error::operator-intervention + cl-postgres-error::cannot-connect-now + cl-postgres-error::database-connection-error + cl-postgres-error::database-connection-lost + cl-postgres-error::database-socket-error) + (condition) + + (log-message :error "~a" condition) + (log-message :error "Reconnecting to PostgreSQL") + + ;; in order to avoid Socket error in "connect": ECONNREFUSED if we + ;; try just too soon, wait a little + (sleep 2) + + (pgsql-connect-and-execute-with-timing + pgconn section label sql :count count)))) (defun pgsql-execute-with-timing (section label sql &key diff --git a/src/pgsql/copy-from-queue.lisp b/src/pgsql/copy-from-queue.lisp index 1075246..0116aa5 100644 --- a/src/pgsql/copy-from-queue.lisp +++ b/src/pgsql/copy-from-queue.lisp @@ -33,13 +33,13 @@ ;; protocol to PostgreSQL to be able to process them again in case ;; of a data error being signaled, that's the BATCH here. (let ((pomo:*database* db)) - (handling-pgsql-notices - ;; We can't use with-pgsql-transaction here because of the specifics - ;; of error handling in case of cl-postgres:open-db-writer errors: the - ;; transaction is dead already when we get a signal, and the COMMIT or - ;; ABORT steps then trigger a protocol error on a #\Z message. - (pomo:execute "BEGIN") - (handler-case + ;; We can't use with-pgsql-transaction here because of the specifics + ;; of error handling in case of cl-postgres:open-db-writer errors: the + ;; transaction is dead already when we get a signal, and the COMMIT or + ;; ABORT steps then trigger a protocol error on a #\Z message. + (handler-case + (progn + (pomo:execute "BEGIN") (let* ((table-name (format-table-name table)) (copier (handler-case @@ -62,35 +62,58 @@ (db-write-row copier data)) :finally (return batch-rows)) (cl-postgres:close-db-writer copier) - (pomo:execute "COMMIT"))) + (pomo:execute "COMMIT")))) - ;; If PostgreSQL signals a data error, process the batch by isolating - ;; erroneous data away and retrying the rest. - ((or - cl-postgres-error::data-exception - cl-postgres-error::integrity-violation - cl-postgres-error::internal-error - cl-postgres-error::insufficient-resources - cl-postgres-error::program-limit-exceeded) (condition) + ;; If PostgreSQL signals a data error, process the batch by isolating + ;; erroneous data away and retrying the rest. + ((or + cl-postgres-error::data-exception + cl-postgres-error::integrity-violation + cl-postgres-error::internal-error + cl-postgres-error::insufficient-resources + cl-postgres-error::program-limit-exceeded) (condition) - (pomo:execute "ROLLBACK") + (pomo:execute "ROLLBACK") - (if on-error-stop - ;; re-signal the condition to upper level - (progn - (log-message :error "~a" condition) - (signal 'on-error-stop :on-condition condition)) + (if on-error-stop + ;; re-signal the condition to upper level + (progn + (log-message :error "~a" condition) + (signal 'on-error-stop :on-condition condition)) - ;; normal behavior, on-error-stop being nil - ;; clean the current transaction before retrying new ones - (progn - (log-message :error "~a" condition) - (retry-batch table columns batch batch-rows condition)))) + ;; normal behavior, on-error-stop being nil + ;; clean the current transaction before retrying new ones + (progn + (log-message :error "~a" condition) + (retry-batch table columns batch batch-rows condition)))) - (condition (c) - ;; non retryable failures - (log-message :error "Non-retryable error ~a" c) - (pomo:execute "ROLLBACK")))))) + ((or + cl-postgres-error::server-shutdown + cl-postgres-error::admin-shutdown + cl-postgres-error::crash-shutdown + cl-postgres-error::operator-intervention + cl-postgres-error::cannot-connect-now + cl-postgres-error::database-connection-error + cl-postgres-error::database-connection-lost + cl-postgres-error::database-socket-error) + (condition) + + (log-message :error "~a" condition) + (log-message :error "Copy Batch reconnecting to PostgreSQL") + + ;; in order to avoid Socket error in "connect": ECONNREFUSED if we + ;; try just too soon, wait a little + (sleep 2) + + (cl-postgres:reopen-database db) + (copy-batch table columns batch batch-rows + :db db + :on-error-stop on-error-stop)) + + (condition (c) + ;; non retryable failures + (log-message :error "Non-retryable error ~a" c) + (pomo:execute "ROLLBACK"))))) ;;; ;;; We receive raw input rows from an lparallel queue, push their content diff --git a/src/sources/common/db-methods.lisp b/src/sources/common/db-methods.lisp index 619b68a..dd8ebe8 100644 --- a/src/sources/common/db-methods.lisp +++ b/src/sources/common/db-methods.lisp @@ -24,7 +24,7 @@ tables for materialized views. That function mutates index definitions in ALL-INDEXES." - (log-message :info "~:[~;DROP then ~]CREATE TABLES" include-drop) + (log-message :notice "Prepare PostgreSQL database.") (with-pgsql-transaction (:pgconn (target-db copy)) (when create-schemas @@ -131,45 +131,80 @@ ;; able to benefit from the indexes. In particular avoid doing that step ;; while CREATE INDEX statements are in flight (avoid locking). ;; + (log-message :notice "Complete PostgreSQL database.") + (when reset-sequences (reset-sequences (clone-connection (target-db copy)) catalog)) - (with-pgsql-connection ((clone-connection (target-db copy))) - ;; - ;; Turn UNIQUE indexes into PRIMARY KEYS now - ;; - (when create-indexes - (pgsql-execute-with-timing :post "Primary Keys" - pkeys - :count (length pkeys)) + (handler-case + (with-pgsql-transaction (:pgconn (clone-connection (target-db copy))) + ;; + ;; Turn UNIQUE indexes into PRIMARY KEYS now + ;; + (when create-indexes + (pgsql-execute-with-timing :post "Primary Keys" + pkeys + :count (length pkeys)) - ;; - ;; Foreign Key Constraints - ;; - ;; We need to have finished loading both the reference and the refering - ;; tables to be able to build the foreign keys, so wait until all tables - ;; and indexes are imported before doing that. - ;; - (when foreign-keys - (create-pgsql-fkeys catalog - :section :post - :label "Create Foreign Keys")) + ;; + ;; Foreign Key Constraints + ;; + ;; We need to have finished loading both the reference and the + ;; refering tables to be able to build the foreign keys, so wait + ;; until all tables and indexes are imported before doing that. + ;; + (when foreign-keys + (create-pgsql-fkeys catalog + :section :post + :label "Create Foreign Keys")) - ;; - ;; Triggers and stored procedures -- includes special default values - ;; - (when create-triggers - (with-pgsql-transaction (:pgconn (target-db copy)) - (create-triggers catalog - :section :post - :label "Create Triggers")))) + ;; + ;; Triggers and stored procedures -- includes special default values + ;; + (when create-triggers + (create-triggers catalog + :section :post + :label "Create Triggers"))) - ;; - ;; And now, comments on tables and columns. - ;; - (comment-on-tables-and-columns catalog - :section :post - :label "Install Comments"))) + ;; + ;; And now, comments on tables and columns. + ;; + (comment-on-tables-and-columns catalog + :section :post + :label "Install Comments")) + + ((or + cl-postgres-error::server-shutdown + cl-postgres-error::admin-shutdown + cl-postgres-error::crash-shutdown + cl-postgres-error::operator-intervention + cl-postgres-error::cannot-connect-now + cl-postgres-error::database-connection-error + cl-postgres-error::database-connection-lost + cl-postgres-error::database-socket-error) + (condition) + + (log-message :error "~a" condition) + (log-message :error + "Complete PostgreSQL database reconnecting to PostgreSQL.") + + ;; in order to avoid Socket error in "connect": ECONNREFUSED if we + ;; try just too soon, wait a little + (sleep 2) + + + ;; + ;; Reset Sequence can be done several times safely, and the rest of the + ;; operations run in a single transaction, so if the connection was lost, + ;; nothing has been done. Retry. + ;; + (complete-pgsql-database copy + catalog + pkeys + :foreign-keys foreign-keys + :create-indexes create-indexes + :create-triggers create-triggers + :reset-sequences reset-sequences)))) (defmethod instanciate-table-copy-object ((copy db-copy) (table table)) "Create an new instance for copying TABLE data."