mirror of
https://github.com/dimitri/pgloader.git
synced 2025-08-07 23:07:00 +02:00
Fix batch retry handling, broken in previous refactoring. Fixes #22.
This commit is contained in:
parent
a5c661dd4a
commit
7c238f45f2
@ -128,30 +128,30 @@ details about the format, and format specs."
|
|||||||
(let* ((*batch-rows* 0))
|
(let* ((*batch-rows* 0))
|
||||||
(log-message :debug "pgsql:copy-from-queue: new batch")
|
(log-message :debug "pgsql:copy-from-queue: new batch")
|
||||||
(handler-case
|
(handler-case
|
||||||
(with-pgsql-transaction (:dbname dbname :database pomo:*database*)
|
(with-pgsql-transaction (:dbname dbname :database pomo:*database*)
|
||||||
(let* ((copier (cl-postgres:open-db-writer pomo:*database*
|
(let* ((copier (cl-postgres:open-db-writer pomo:*database*
|
||||||
table-name
|
table-name
|
||||||
columns)))
|
columns)))
|
||||||
(unwind-protect
|
(unwind-protect
|
||||||
(let ((process-row-fn
|
(let ((process-row-fn
|
||||||
(make-copy-and-batch-fn copier
|
(make-copy-and-batch-fn copier
|
||||||
:transforms transforms)))
|
:transforms transforms)))
|
||||||
(catch 'next-batch
|
(catch 'next-batch
|
||||||
(pgloader.queue:map-pop-queue dataq process-row-fn)))
|
(pgloader.queue:map-pop-queue dataq process-row-fn)))
|
||||||
|
(cl-postgres:close-db-writer copier))))
|
||||||
|
|
||||||
(cl-postgres:close-db-writer copier))))
|
;; in case of data-exception, split the batch and try again
|
||||||
|
((or
|
||||||
;; in case of data-exception, split the batch and try again
|
CL-POSTGRES-ERROR:UNIQUE-VIOLATION
|
||||||
((or
|
CL-POSTGRES-ERROR:DATA-EXCEPTION) (condition)
|
||||||
CL-POSTGRES-ERROR:UNIQUE-VIOLATION
|
(retry-batch table-name columns *batch* *batch-rows* condition))))
|
||||||
CL-POSTGRES-ERROR:DATA-EXCEPTION) (condition)
|
|
||||||
(retry-batch table-name columns *batch* *batch-rows* condition)))
|
|
||||||
(log-message :debug "pgsql:copy-from-queue: batch done"))
|
|
||||||
|
|
||||||
;; fetch how many rows we just pushed through, update stats
|
;; fetch how many rows we just pushed through, update stats
|
||||||
for rows = (if (consp retval) (cdr retval) retval)
|
for rows = (if (consp retval) (cdr retval) retval)
|
||||||
for cont = (and (consp retval) (eq (car retval) :continue))
|
for cont = (and (consp retval) (eq (car retval) :continue))
|
||||||
do (pgstate-incf *state* table-name :rows rows)
|
do (progn
|
||||||
|
(log-message :debug "pgsql:copy-from-queue: batch done")
|
||||||
|
(pgstate-incf *state* table-name :rows rows))
|
||||||
while cont)))
|
while cont)))
|
||||||
|
|
||||||
;;;
|
;;;
|
||||||
@ -245,7 +245,8 @@ details about the format, and format specs."
|
|||||||
;;; The main retry batch function.
|
;;; The main retry batch function.
|
||||||
;;;
|
;;;
|
||||||
(defun retry-batch (table-name columns batch batch-rows condition
|
(defun retry-batch (table-name columns batch batch-rows condition
|
||||||
&optional (current-batch-pos 0))
|
&optional (current-batch-pos 0)
|
||||||
|
&aux (nb-errors 0))
|
||||||
"Batch is a list of rows containing at least one bad row, the first such
|
"Batch is a list of rows containing at least one bad row, the first such
|
||||||
row is known to be located at FIRST-ERROR index in the BATCH array."
|
row is known to be located at FIRST-ERROR index in the BATCH array."
|
||||||
|
|
||||||
@ -261,9 +262,10 @@ details about the format, and format specs."
|
|||||||
(progn ; indenting helper
|
(progn ; indenting helper
|
||||||
(when (= current-batch-pos next-error)
|
(when (= current-batch-pos next-error)
|
||||||
(log-message :info "error recovery at ~d/~d, processing bad row"
|
(log-message :info "error recovery at ~d/~d, processing bad row"
|
||||||
next-error batch-rows)
|
(+ 1 next-error) batch-rows)
|
||||||
(process-bad-row table-name condition (aref batch current-batch-pos))
|
(process-bad-row table-name condition (aref batch current-batch-pos))
|
||||||
(incf current-batch-pos))
|
(incf current-batch-pos)
|
||||||
|
(incf nb-errors))
|
||||||
|
|
||||||
(let* ((current-batch-rows
|
(let* ((current-batch-rows
|
||||||
(next-batch-rows batch-rows current-batch-pos next-error)))
|
(next-batch-rows batch-rows current-batch-pos next-error)))
|
||||||
@ -277,7 +279,7 @@ details about the format, and format specs."
|
|||||||
(if (< current-batch-pos next-error)
|
(if (< current-batch-pos next-error)
|
||||||
(log-message :info
|
(log-message :info
|
||||||
"error recovery at ~d/~d, next error at ~d, loading ~d row~:p"
|
"error recovery at ~d/~d, next error at ~d, loading ~d row~:p"
|
||||||
current-batch-pos batch-rows next-error current-batch-rows)
|
current-batch-pos batch-rows (+ 1 next-error) current-batch-rows)
|
||||||
(log-message :info
|
(log-message :info
|
||||||
"error recovery at ~d/~d, trying ~d row~:p"
|
"error recovery at ~d/~d, trying ~d row~:p"
|
||||||
current-batch-pos batch-rows current-batch-rows))
|
current-batch-pos batch-rows current-batch-rows))
|
||||||
@ -303,4 +305,7 @@ details about the format, and format specs."
|
|||||||
(parse-copy-error-context
|
(parse-copy-error-context
|
||||||
(cl-postgres::database-error-context condition))))))))))
|
(cl-postgres::database-error-context condition))))))))))
|
||||||
|
|
||||||
(log-message :info "Recovery done."))
|
(log-message :info "Recovery done.")
|
||||||
|
|
||||||
|
;; Implement the batch processing protocol
|
||||||
|
(cons :continue (- batch-rows nb-errors)))
|
||||||
|
50
test/data/retry.lisp
Normal file
50
test/data/retry.lisp
Normal file
@ -0,0 +1,50 @@
|
|||||||
|
;;; Test cases for issue https://github.com/dimitri/pgloader/issues/22
|
||||||
|
;;;
|
||||||
|
;;;
|
||||||
|
|
||||||
|
#|
|
||||||
|
CREATE TABLE `retry` (
|
||||||
|
`id` int(10) unsigned NOT NULL AUTO_INCREMENT,
|
||||||
|
`content` text,
|
||||||
|
PRIMARY KEY (`id`)
|
||||||
|
);
|
||||||
|
|#
|
||||||
|
|
||||||
|
(defpackage #:pgloader.test.retry
|
||||||
|
(:use #:cl #:pgloader.params #:pgloader.mysql)
|
||||||
|
(:export #:produce-data))
|
||||||
|
|
||||||
|
(in-package #:pgloader.test.retry)
|
||||||
|
|
||||||
|
(defvar *inject-null-bytes*
|
||||||
|
(coerce (loop for previous = 0 then (+ previous offset)
|
||||||
|
for offset in '(15769 54 7 270 8752)
|
||||||
|
collect (+ previous offset)) 'vector)
|
||||||
|
"Line numbers in the batch where to inject erroneous data.")
|
||||||
|
|
||||||
|
(defvar *string-with-null-byte* (concatenate 'string "Hello" (list #\Nul) "World!"))
|
||||||
|
|
||||||
|
(defvar *random-string* (make-string (random 42) :initial-element #\a)
|
||||||
|
"A random string.")
|
||||||
|
|
||||||
|
(defvar *query* "INSERT INTO `~a`(`content`) VALUES ('~a')")
|
||||||
|
|
||||||
|
(defun produce-data (&key
|
||||||
|
(*myconn-host* *myconn-host*)
|
||||||
|
(*myconn-port* *myconn-port*)
|
||||||
|
(*myconn-user* *myconn-user*)
|
||||||
|
(*myconn-pass* *myconn-pass*)
|
||||||
|
(dbname "retry")
|
||||||
|
(table-name "retry")
|
||||||
|
(rows 150000))
|
||||||
|
"Produce a data set that looks like the one in issue #22."
|
||||||
|
(with-mysql-connection (dbname)
|
||||||
|
(let ((next-error-pos 0))
|
||||||
|
(loop for n from 1 to rows
|
||||||
|
for str = (if (and (< next-error-pos (length *inject-null-bytes*))
|
||||||
|
(= n (aref *inject-null-bytes* next-error-pos)))
|
||||||
|
(progn
|
||||||
|
(incf next-error-pos)
|
||||||
|
*string-with-null-byte*)
|
||||||
|
*random-string*)
|
||||||
|
do (pgloader.mysql::mysql-query (format nil *query* table-name str))))))
|
5
test/data/retry.load
Normal file
5
test/data/retry.load
Normal file
@ -0,0 +1,5 @@
|
|||||||
|
LOAD DATABASE
|
||||||
|
FROM mysql://root@localhost:3306/retry
|
||||||
|
INTO postgresql:///retry
|
||||||
|
|
||||||
|
WITH include drop, create tables, create indexes, reset sequences, truncate;
|
Loading…
Reference in New Issue
Block a user