From edb12ce3f867fc8eb74e376e7ef5efb81fe6951a Mon Sep 17 00:00:00 2001 From: Dimitri Fontaine Date: Thu, 7 Feb 2013 15:26:29 +0100 Subject: [PATCH] Implement basic error management of data so that COPY still imports the good rows. --- pgsql.lisp | 132 +++++++++++++++++++++++++++++++++++++++++++++++++---- 1 file changed, 122 insertions(+), 10 deletions(-) diff --git a/pgsql.lisp b/pgsql.lisp index d902d12..a078e2e 100644 --- a/pgsql.lisp +++ b/pgsql.lisp @@ -6,6 +6,12 @@ ;;; ;;; Quick utilities to get rid of later. ;;; +(defparameter *copy-batch-size* 10000 + "How many rows to per COPY transaction") + +(defparameter *copy-batch-split* 10 + "Number of batches in which to split a batch with bad data") + (defparameter *pgconn* '("gdb" "none" "localhost" :port 5432) "Connection string to the local database") @@ -187,14 +193,120 @@ Finally returns how many rows where read and processed." "Fetch data from the QUEUE until we see :end-of-data" (when truncate (truncate-table dbname table-name)) - (let* ((conspec (remove :port (get-connection-string dbname))) - (stream - (cl-postgres:open-db-writer conspec table-name nil))) - (unwind-protect - (pgloader.queue:map-pop-queue - dataq (lambda (row) - (cl-postgres:db-write-row - stream - (reformat-row row :date-columns date-columns)))) - (cl-postgres:close-db-writer stream)))) + (let* ((conspec (remove :port (get-connection-string dbname)))) + (loop + for retval = + (let* ((stream (cl-postgres:open-db-writer conspec table-name nil)) + (batch nil) + (batch-size 0) + (process-row-fn + ;; build our batch aware row processing function + ;; it closes over batch and stream + (lambda (row) + (let ((reformated-row + (reformat-row row :date-columns date-columns))) + (push reformated-row batch) + (incf batch-size 1) + (cl-postgres:db-write-row stream reformated-row) + ;; return control in between batches + (when (= batch-size *copy-batch-size*) + (throw 'next-batch (cons :continue batch-size))))))) + (unwind-protect + (catch 'next-batch + (pgloader.queue:map-pop-queue dataq process-row-fn)) + ;; in case of data-exception, split the batch and try again + (handler-case + (cl-postgres:close-db-writer stream) + ((or + CL-POSTGRES-ERROR:UNIQUE-VIOLATION + CL-POSTGRES-ERROR:DATA-EXCEPTION) (condition) + (retry-batch dbname table-name (nreverse batch) batch-size))))) + ;; the final return value is the number of row processed + summing (if (consp retval) (cdr retval) retval) into total-rows + while (and (consp retval) (eq (car retval) :continue)) + finally (return total-rows)))) + +;;; +;;; When a batch has been refused by PostgreSQL with a data-exception, that +;;; means it contains non-conforming data. It could be only one row in the +;;; middle of the *copy-batch-size* rows. +;;; +;;; The general principle to filter out the bad row(s) is to split the batch +;;; in smaller ones, and try to COPY all of the smaller ones again, +;;; recursively. When the batch is containing only one row, we know that one +;;; is non conforming to PostgreSQL expectations (usually, data type input +;;; does not match, e.g. text is not proper utf-8). +;;; +;;; As we often need to split out a single bad row out of a full batch, we +;;; don't do the classical dichotomy but rather split the batch directly in +;;; lots of smaller ones. +;;; +;;; split 1000 rows in 10 batches of 100 rows +;;; split 352 rows in 3 batches of 100 rows + 1 batch of 52 rows +;;; + +;;; +;;; Retry a single batch, without doing data copying: we already have the +;;; rows inside a batch, just process a subset of it of size batch-size. +;;; +(defun process-bad-row (dbname table-name row) + "Process bad row" + (let* ((str (format nil "~a" row)) + (str (if (< 72 (length str)) (subseq str 0 72) + str))) + (format t "BAD ROW: ~a...~%" str))) + +(defun smaller-batch-size (batch-size processed-rows) + "How many rows should we process in next iteration?" + (let ((remaining-rows (- batch-size processed-rows))) + + (if (< remaining-rows *copy-batch-split*) + 1 + (min remaining-rows + (floor (/ batch-size *copy-batch-split*)))))) + +(defun retry-batch (dbname table-name batch batch-size) + "Batch is a list of rows containing at least one error. Return number of + bad rows." + (let* ((conspec (remove :port (get-connection-string dbname))) + (current-batch-pos batch) + (processed-rows 0) + (total-bad-rows 0)) + (loop + while (<= processed-rows batch-size) + do + (let* ((current-batch current-batch-pos) + (current-batch-size (smaller-batch-size batch-size + processed-rows)) + (stream + (cl-postgres:open-db-writer conspec table-name nil))) + + (unwind-protect + (progn + (dotimes (i current-batch-size) + ;; rows in that batch have already been processed + (cl-postgres:db-write-row stream (car current-batch-pos)) + (setf current-batch-pos (cdr current-batch-pos)) + (incf processed-rows)) + + ;; function's return value: number of bad rows extracted + total-bad-rows) + + (handler-case + (cl-postgres:close-db-writer stream) + + ;; the batch didn't make it, recurse + ((or + CL-POSTGRES-ERROR:UNIQUE-VIOLATION + CL-POSTGRES-ERROR:DATA-EXCEPTION) (condition) + (format t "~&botched batch of ~d rows: ~a.~%" + current-batch-size condition) + ;; process bad data + (if (= 1 current-batch-size) + (progn + (process-bad-row dbname table-name (car current-batch)) + (incf total-bad-rows)) + ;; more than one line of bad data: recurse + (retry-batch dbname table-name + current-batch current-batch-size)))))))))