From 7c238f45f22ab023a91a96ec2c15beff84d0fcd5 Mon Sep 17 00:00:00 2001 From: Dimitri Fontaine Date: Wed, 22 Jan 2014 22:52:18 +0100 Subject: [PATCH] Fix batch retry handling, broken in previous refactoring. Fixes #22. --- src/pgsql/pgsql.lisp | 51 ++++++++++++++++++++++++-------------------- test/data/retry.lisp | 50 +++++++++++++++++++++++++++++++++++++++++++ test/data/retry.load | 5 +++++ 3 files changed, 83 insertions(+), 23 deletions(-) create mode 100644 test/data/retry.lisp create mode 100644 test/data/retry.load diff --git a/src/pgsql/pgsql.lisp b/src/pgsql/pgsql.lisp index 0ebd035..a599679 100644 --- a/src/pgsql/pgsql.lisp +++ b/src/pgsql/pgsql.lisp @@ -128,30 +128,30 @@ details about the format, and format specs." (let* ((*batch-rows* 0)) (log-message :debug "pgsql:copy-from-queue: new batch") (handler-case - (with-pgsql-transaction (:dbname dbname :database pomo:*database*) - (let* ((copier (cl-postgres:open-db-writer pomo:*database* - table-name - columns))) - (unwind-protect - (let ((process-row-fn - (make-copy-and-batch-fn copier + (with-pgsql-transaction (:dbname dbname :database pomo:*database*) + (let* ((copier (cl-postgres:open-db-writer pomo:*database* + table-name + columns))) + (unwind-protect + (let ((process-row-fn + (make-copy-and-batch-fn copier :transforms transforms))) - (catch 'next-batch - (pgloader.queue:map-pop-queue dataq process-row-fn))) + (catch 'next-batch + (pgloader.queue:map-pop-queue dataq process-row-fn))) + (cl-postgres:close-db-writer copier)))) - (cl-postgres:close-db-writer copier)))) - - ;; in case of data-exception, split the batch and try again - ((or - CL-POSTGRES-ERROR:UNIQUE-VIOLATION - CL-POSTGRES-ERROR:DATA-EXCEPTION) (condition) - (retry-batch table-name columns *batch* *batch-rows* condition))) - (log-message :debug "pgsql:copy-from-queue: batch done")) + ;; in case of data-exception, split the batch and try again + ((or + CL-POSTGRES-ERROR:UNIQUE-VIOLATION + CL-POSTGRES-ERROR:DATA-EXCEPTION) (condition) + (retry-batch table-name columns *batch* *batch-rows* condition)))) ;; fetch how many rows we just pushed through, update stats for rows = (if (consp retval) (cdr retval) retval) for cont = (and (consp retval) (eq (car retval) :continue)) - do (pgstate-incf *state* table-name :rows rows) + do (progn + (log-message :debug "pgsql:copy-from-queue: batch done") + (pgstate-incf *state* table-name :rows rows)) while cont))) ;;; @@ -245,7 +245,8 @@ details about the format, and format specs." ;;; The main retry batch function. ;;; (defun retry-batch (table-name columns batch batch-rows condition - &optional (current-batch-pos 0)) + &optional (current-batch-pos 0) + &aux (nb-errors 0)) "Batch is a list of rows containing at least one bad row, the first such row is known to be located at FIRST-ERROR index in the BATCH array." @@ -261,9 +262,10 @@ details about the format, and format specs." (progn ; indenting helper (when (= current-batch-pos next-error) (log-message :info "error recovery at ~d/~d, processing bad row" - next-error batch-rows) + (+ 1 next-error) batch-rows) (process-bad-row table-name condition (aref batch current-batch-pos)) - (incf current-batch-pos)) + (incf current-batch-pos) + (incf nb-errors)) (let* ((current-batch-rows (next-batch-rows batch-rows current-batch-pos next-error))) @@ -277,7 +279,7 @@ details about the format, and format specs." (if (< current-batch-pos next-error) (log-message :info "error recovery at ~d/~d, next error at ~d, loading ~d row~:p" - current-batch-pos batch-rows next-error current-batch-rows) + current-batch-pos batch-rows (+ 1 next-error) current-batch-rows) (log-message :info "error recovery at ~d/~d, trying ~d row~:p" current-batch-pos batch-rows current-batch-rows)) @@ -303,4 +305,7 @@ details about the format, and format specs." (parse-copy-error-context (cl-postgres::database-error-context condition)))))))))) - (log-message :info "Recovery done.")) + (log-message :info "Recovery done.") + + ;; Implement the batch processing protocol + (cons :continue (- batch-rows nb-errors))) diff --git a/test/data/retry.lisp b/test/data/retry.lisp new file mode 100644 index 0000000..a49f1f3 --- /dev/null +++ b/test/data/retry.lisp @@ -0,0 +1,50 @@ +;;; Test cases for issue https://github.com/dimitri/pgloader/issues/22 +;;; +;;; + +#| +CREATE TABLE `retry` ( + `id` int(10) unsigned NOT NULL AUTO_INCREMENT, + `content` text, + PRIMARY KEY (`id`) +); +|# + +(defpackage #:pgloader.test.retry + (:use #:cl #:pgloader.params #:pgloader.mysql) + (:export #:produce-data)) + +(in-package #:pgloader.test.retry) + +(defvar *inject-null-bytes* + (coerce (loop for previous = 0 then (+ previous offset) + for offset in '(15769 54 7 270 8752) + collect (+ previous offset)) 'vector) + "Line numbers in the batch where to inject erroneous data.") + +(defvar *string-with-null-byte* (concatenate 'string "Hello" (list #\Nul) "World!")) + +(defvar *random-string* (make-string (random 42) :initial-element #\a) + "A random string.") + +(defvar *query* "INSERT INTO `~a`(`content`) VALUES ('~a')") + +(defun produce-data (&key + (*myconn-host* *myconn-host*) + (*myconn-port* *myconn-port*) + (*myconn-user* *myconn-user*) + (*myconn-pass* *myconn-pass*) + (dbname "retry") + (table-name "retry") + (rows 150000)) + "Produce a data set that looks like the one in issue #22." + (with-mysql-connection (dbname) + (let ((next-error-pos 0)) + (loop for n from 1 to rows + for str = (if (and (< next-error-pos (length *inject-null-bytes*)) + (= n (aref *inject-null-bytes* next-error-pos))) + (progn + (incf next-error-pos) + *string-with-null-byte*) + *random-string*) + do (pgloader.mysql::mysql-query (format nil *query* table-name str)))))) diff --git a/test/data/retry.load b/test/data/retry.load new file mode 100644 index 0000000..1e6d772 --- /dev/null +++ b/test/data/retry.load @@ -0,0 +1,5 @@ +LOAD DATABASE + FROM mysql://root@localhost:3306/retry + INTO postgresql:///retry + + WITH include drop, create tables, create indexes, reset sequences, truncate;