diff --git a/pgsql.lisp b/pgsql.lisp index e456eee..f53ca13 100644 --- a/pgsql.lisp +++ b/pgsql.lisp @@ -240,6 +240,9 @@ Finally returns how many rows where read and processed." ;;; First prepare the streaming function that batches the rows so that we ;;; are able to process them in case of errors. ;;; +(defvar *batch* nil "Current batch of rows being processed.") +(defvar *batch-size* 0 "How many rows are to be found in current *batch*.") + (defun make-copy-and-batch-fn (stream &key date-columns) "Returns a function of one argument, ROW. @@ -247,7 +250,6 @@ Finally returns how many rows where read and processed." PostgreSQL COPY STREAM, and push it to BATCH (a list). When batch's size is up to *copy-batch-size*, throw the 'next-batch tag with its current size." - (declare (special *batch* *batch-size*)) (lambda (row) (let ((reformated-row (if date-columns (reformat-row row :date-columns date-columns) @@ -274,19 +276,17 @@ Finally returns how many rows where read and processed." (defun copy-from-queue (dbname table-name dataq &key (truncate t) - (state *state*) + ((:state *state*) *state*) date-columns) "Fetch data from the QUEUE until we see :end-of-data. Update *state*" (when truncate (truncate-table dbname table-name)) - (let* ((*state* state) - (conspec (remove :port (get-connection-string dbname)))) + (let* ((conspec (remove :port (get-connection-string dbname)))) (loop for retval = (let* ((stream (cl-postgres:open-db-writer conspec table-name nil)) (*batch* nil) (*batch-size* 0)) - (declare (special *batch* *batch-size*)) (unwind-protect (let ((process-row-fn (make-copy-and-batch-fn stream :date-columns date-columns))) @@ -372,8 +372,7 @@ Finally returns how many rows where read and processed." (defun process-bad-row (table-name condition row) "Add the row to the reject file, in PostgreSQL COPY TEXT format" ;; first, update the stats. - (pgstate-incf *state* table-name :errs 1) - (pgstate-decf *state* table-name :rows 1) + (pgstate-incf *state* table-name :errs 1 :rows -1) ;; now, the bad row processing (let* ((table (pgstate-get-table *state* table-name))