diff --git a/src/package.lisp b/src/package.lisp index 9e171a6..e019078 100644 --- a/src/package.lisp +++ b/src/package.lisp @@ -48,6 +48,7 @@ (defpackage #:pgloader.pgsql (:use #:cl #:pgloader.params #:pgloader.utils) (:export #:with-pgsql-transaction + #:with-pgsql-connection #:pgsql-execute #:pgsql-execute-with-timing #:truncate-table diff --git a/src/pgsql/pgsql.lisp b/src/pgsql/pgsql.lisp index c7b59c1..f2fb65e 100644 --- a/src/pgsql/pgsql.lisp +++ b/src/pgsql/pgsql.lisp @@ -67,40 +67,43 @@ transforms) "Fetch data from the QUEUE until we see :end-of-data. Update *state*" (when truncate - (log-message :notice "TRUNCATE ~a.~a;" dbname table-name) + (log-message :notice "TRUNCATE ~a;" table-name) (truncate-table dbname table-name)) - (log-message :debug "pgsql:copy-from-queue: ~a ~a ~a" dbname table-name columns) + (log-message :debug "pgsql:copy-from-queue: ~a ~a" table-name columns) - (with-pgsql-transaction (dbname) + (with-pgsql-connection (dbname) (loop for retval = - (let* ((copier - (cl-postgres:open-db-writer pomo:*database* table-name columns)) - (*batch* nil) + (let* ((*batch* nil) (*batch-size* 0)) - (log-message :debug "pgsql:copy-from-queue: starting new batch") - (unwind-protect - (let ((process-row-fn - (make-copy-and-batch-fn copier :transforms transforms))) - (catch 'next-batch - (pgloader.queue:map-pop-queue dataq process-row-fn))) + (handler-case + (with-pgsql-transaction (dbname :database pomo:*database*) + (let* ((copier (cl-postgres:open-db-writer pomo:*database* + table-name + columns))) + (log-message :debug "pgsql:copy-from-queue: new batch") + (unwind-protect + (let ((process-row-fn + (make-copy-and-batch-fn copier + :transforms transforms))) + (catch 'next-batch + (pgloader.queue:map-pop-queue dataq process-row-fn))) + + (log-message :debug "pgsql:copy-from-queue: batch done") + (cl-postgres:close-db-writer copier)))) + ;; in case of data-exception, split the batch and try again - (handler-case - (progn - (log-message :debug "pgsql:copy-from-queue: commit batch") - (cl-postgres:close-db-writer copier)) - ((or - CL-POSTGRES-ERROR:UNIQUE-VIOLATION - CL-POSTGRES-ERROR:DATA-EXCEPTION) (e) - (progn - (log-message :debug "pgsql:copy-from-queue: ~a" e) - (retry-batch dbname - table-name - (nreverse *batch*) - *batch-size* - :columns columns - :transforms transforms)))))) + ((or + CL-POSTGRES-ERROR:UNIQUE-VIOLATION + CL-POSTGRES-ERROR:DATA-EXCEPTION) (e) + (declare (ignore e)) ; already logged + (retry-batch dbname + table-name + (nreverse *batch*) + *batch-size* + :columns columns + :transforms transforms)))) ;; fetch how many rows we just pushed through, update stats for rows = (if (consp retval) (cdr retval) retval) @@ -132,8 +135,6 @@ ;; first, update the stats. (pgstate-incf *state* table-name :errs 1 :rows -1) - (log-message :error "Invalid input: ~{~s~^, ~}~%~a~%" row condition) - ;; now, the bad row processing (let* ((table (pgstate-get-table *state* table-name)) (data (pgtable-reject-data table)) @@ -175,44 +176,45 @@ ;;; (defun retry-batch (dbname table-name batch batch-size &key columns transforms) "Batch is a list of rows containing at least one bad row. Find it." - (let* ((conspec (get-connection-spec dbname :with-port nil)) - (current-batch-pos batch) + (log-message :debug "pgsql:retry-batch: splitting current batch [~d rows]" batch-size) + (let* ((current-batch-pos batch) (processed-rows 0)) (loop while (< processed-rows batch-size) do - (log-message :debug "pgsql:retry-batch: splitting current batch") (let* ((current-batch current-batch-pos) (current-batch-size (smaller-batch-size batch-size - processed-rows)) - (stream - (cl-postgres:open-db-writer conspec table-name columns))) + processed-rows))) + (handler-case + (with-pgsql-transaction (dbname :database pomo:*database*) + (let* ((stream (cl-postgres:open-db-writer pomo:*database* + table-name + columns))) - (log-message :debug "pgsql:retry-batch: current-batch-size = ~d" - current-batch-size) + (log-message :debug "pgsql:retry-batch: current-batch-size = ~d" + current-batch-size) - (unwind-protect - (dotimes (i current-batch-size) - ;; rows in that batch have already been processed - (cl-postgres:db-write-row stream (car current-batch-pos)) - (setf current-batch-pos (cdr current-batch-pos)) - (incf processed-rows)) + (unwind-protect + (dotimes (i current-batch-size) + ;; rows in that batch have already been processed + (cl-postgres:db-write-row stream (car current-batch-pos)) + (setf current-batch-pos (cdr current-batch-pos)) + (incf processed-rows)) - (handler-case - (cl-postgres:close-db-writer stream) + (cl-postgres:close-db-writer stream)))) - ;; the batch didn't make it, recurse - ((or - CL-POSTGRES-ERROR:UNIQUE-VIOLATION - CL-POSTGRES-ERROR:DATA-EXCEPTION) (condition) - ;; process bad data - (if (= 1 current-batch-size) - (process-bad-row table-name condition (car current-batch) - :transforms transforms) - ;; more than one line of bad data: recurse - (retry-batch dbname - table-name - current-batch - current-batch-size - :columns columns - :transforms transforms))))))))) + ;; the batch didn't make it, recurse + ((or + CL-POSTGRES-ERROR:UNIQUE-VIOLATION + CL-POSTGRES-ERROR:DATA-EXCEPTION) (condition) + ;; process bad data + (if (= 1 current-batch-size) + (process-bad-row table-name condition (car current-batch) + :transforms transforms) + ;; more than one line of bad data: recurse + (retry-batch dbname + table-name + current-batch + current-batch-size + :columns columns + :transforms transforms)))))))) diff --git a/src/pgsql/queries.lisp b/src/pgsql/queries.lisp index 4554563..5bb001e 100644 --- a/src/pgsql/queries.lisp +++ b/src/pgsql/queries.lisp @@ -6,33 +6,47 @@ ;;; ;;; PostgreSQL Tools connecting to a database ;;; -(defmacro handling-pgsql-notices ((&key set-local) &body forms) +(defmacro handling-pgsql-notices (&body forms) "The BODY is run within a PostgreSQL transaction where *pg-settings* have been applied. PostgreSQL warnings and errors are logged at the appropriate log level." - `(pomo:with-transaction () - (handler-bind - ((cl-postgres:database-error - #'(lambda (e) - (log-message :error "~a" e))) - (cl-postgres:postgresql-warning - #'(lambda (w) - (log-message :warning "~a" w) - (muffle-warning)))) - (set-session-gucs *pg-settings* :transaction ,set-local) - (progn ,@forms)))) + `(handler-bind + ((cl-postgres:database-error + #'(lambda (e) + (log-message :error "~a" e))) + (cl-postgres:postgresql-warning + #'(lambda (w) + (log-message :warning "~a" w) + (muffle-warning)))) + (progn ,@forms))) (defmacro with-pgsql-transaction ((dbname &key database) &body forms) "Run FORMS within a PostgreSQL transaction to DBNAME, reusing DATABASE if given. To get the connection spec from the DBNAME, use `get-connection-spec'." (if database - `(let ((pomo:*database* database)) - (handling-pgsql-notices (:set-local t) - ,@forms)) + `(let ((pomo:*database* ,database)) + (handling-pgsql-notices + (pomo:with-transaction () + (log-message :debug "BEGIN") + (set-session-gucs *pg-settings* :transaction t) + ,@forms))) ;; no database given, create a new database connection `(pomo:with-connection (get-connection-spec ,dbname) + (log-message :debug "CONNECT") + (set-session-gucs *pg-settings*) (handling-pgsql-notices () - ,@forms)))) + (pomo:with-transaction () + (log-message :debug "BEGIN") + ,@forms))))) + +(defmacro with-pgsql-connection ((dbname) &body forms) + "Run FROMS within a PostgreSQL connection to DBNAME. To get the connection + spec from the DBNAME, use `get-connection-spec'." + `(pomo:with-connection (get-connection-spec ,dbname) + (log-message :debug "CONNECT") + (set-session-gucs *pg-settings*) + (handling-pgsql-notices () + ,@forms))) (defun get-connection-spec (dbname &key (with-port t)) "pomo:with-connection and cl-postgres:open-database and open-db-writer are @@ -47,7 +61,7 @@ (let ((pomo:*database* (or database pomo:*database*))) (loop for (name . value) in alist - for set = (format nil "SET~@[ LOCAL~] ~a TO '~a'" transaction name value) + for set = (format nil "SET~:[~; LOCAL~] ~a TO '~a'" transaction name value) do (log-message :debug set) (pomo:execute set)))) diff --git a/test/errors.load b/test/errors.load index 89b0a46..7f758f0 100644 --- a/test/errors.load +++ b/test/errors.load @@ -21,11 +21,13 @@ LOAD CSV SET client_encoding to 'latin1', work_mem to '12MB', - standard_conforming_strings to 'on' + standard_conforming_strings to 'on', + search_path to 'err' -- test GUC settings in retry path BEFORE LOAD DO - $$ drop table if exists errors; $$, - $$ create table errors ( + $$ create schema if not exists err; $$, + $$ drop table if exists err.errors; $$, + $$ create table err.errors ( a integer primary key, b date, c text @@ -35,10 +37,10 @@ LOAD CSV -1|some first row text|2006-13-11| -2|some second row text|2006-11-11| -3|some third row text|2006-10-12| +1|expected error, month 13|2006-13-11| +2|nov. the 11th should go|2006-11-11| +3|12th of oct. should go|2006-10-12| 4|\ |2006-16-4| -5|some fifth row text|2006-5-12| -6|some sixth row text|2006-13-10| +5|month should be may, ok|2006-5-12| +6|another month 13, stress retry path|2006-13-10| 7|some null date to play with||