mirror of
https://github.com/dimitri/pgloader.git
synced 2026-05-04 10:31:02 +02:00
Improve and fix COPY error handling, transactions, connections, and GUCs.
This commit is contained in:
parent
6fb3de7043
commit
d99b859c3f
@ -48,6 +48,7 @@
|
||||
(defpackage #:pgloader.pgsql
|
||||
(:use #:cl #:pgloader.params #:pgloader.utils)
|
||||
(:export #:with-pgsql-transaction
|
||||
#:with-pgsql-connection
|
||||
#:pgsql-execute
|
||||
#:pgsql-execute-with-timing
|
||||
#:truncate-table
|
||||
|
||||
@ -67,40 +67,43 @@
|
||||
transforms)
|
||||
"Fetch data from the QUEUE until we see :end-of-data. Update *state*"
|
||||
(when truncate
|
||||
(log-message :notice "TRUNCATE ~a.~a;" dbname table-name)
|
||||
(log-message :notice "TRUNCATE ~a;" table-name)
|
||||
(truncate-table dbname table-name))
|
||||
|
||||
(log-message :debug "pgsql:copy-from-queue: ~a ~a ~a" dbname table-name columns)
|
||||
(log-message :debug "pgsql:copy-from-queue: ~a ~a" table-name columns)
|
||||
|
||||
(with-pgsql-transaction (dbname)
|
||||
(with-pgsql-connection (dbname)
|
||||
(loop
|
||||
for retval =
|
||||
(let* ((copier
|
||||
(cl-postgres:open-db-writer pomo:*database* table-name columns))
|
||||
(*batch* nil)
|
||||
(let* ((*batch* nil)
|
||||
(*batch-size* 0))
|
||||
(log-message :debug "pgsql:copy-from-queue: starting new batch")
|
||||
(unwind-protect
|
||||
(let ((process-row-fn
|
||||
(make-copy-and-batch-fn copier :transforms transforms)))
|
||||
(catch 'next-batch
|
||||
(pgloader.queue:map-pop-queue dataq process-row-fn)))
|
||||
(handler-case
|
||||
(with-pgsql-transaction (dbname :database pomo:*database*)
|
||||
(let* ((copier (cl-postgres:open-db-writer pomo:*database*
|
||||
table-name
|
||||
columns)))
|
||||
(log-message :debug "pgsql:copy-from-queue: new batch")
|
||||
(unwind-protect
|
||||
(let ((process-row-fn
|
||||
(make-copy-and-batch-fn copier
|
||||
:transforms transforms)))
|
||||
(catch 'next-batch
|
||||
(pgloader.queue:map-pop-queue dataq process-row-fn)))
|
||||
|
||||
(log-message :debug "pgsql:copy-from-queue: batch done")
|
||||
(cl-postgres:close-db-writer copier))))
|
||||
|
||||
;; in case of data-exception, split the batch and try again
|
||||
(handler-case
|
||||
(progn
|
||||
(log-message :debug "pgsql:copy-from-queue: commit batch")
|
||||
(cl-postgres:close-db-writer copier))
|
||||
((or
|
||||
CL-POSTGRES-ERROR:UNIQUE-VIOLATION
|
||||
CL-POSTGRES-ERROR:DATA-EXCEPTION) (e)
|
||||
(progn
|
||||
(log-message :debug "pgsql:copy-from-queue: ~a" e)
|
||||
(retry-batch dbname
|
||||
table-name
|
||||
(nreverse *batch*)
|
||||
*batch-size*
|
||||
:columns columns
|
||||
:transforms transforms))))))
|
||||
((or
|
||||
CL-POSTGRES-ERROR:UNIQUE-VIOLATION
|
||||
CL-POSTGRES-ERROR:DATA-EXCEPTION) (e)
|
||||
(declare (ignore e)) ; already logged
|
||||
(retry-batch dbname
|
||||
table-name
|
||||
(nreverse *batch*)
|
||||
*batch-size*
|
||||
:columns columns
|
||||
:transforms transforms))))
|
||||
|
||||
;; fetch how many rows we just pushed through, update stats
|
||||
for rows = (if (consp retval) (cdr retval) retval)
|
||||
@ -132,8 +135,6 @@
|
||||
;; first, update the stats.
|
||||
(pgstate-incf *state* table-name :errs 1 :rows -1)
|
||||
|
||||
(log-message :error "Invalid input: ~{~s~^, ~}~%~a~%" row condition)
|
||||
|
||||
;; now, the bad row processing
|
||||
(let* ((table (pgstate-get-table *state* table-name))
|
||||
(data (pgtable-reject-data table))
|
||||
@ -175,44 +176,45 @@
|
||||
;;;
|
||||
(defun retry-batch (dbname table-name batch batch-size &key columns transforms)
|
||||
"Batch is a list of rows containing at least one bad row. Find it."
|
||||
(let* ((conspec (get-connection-spec dbname :with-port nil))
|
||||
(current-batch-pos batch)
|
||||
(log-message :debug "pgsql:retry-batch: splitting current batch [~d rows]" batch-size)
|
||||
(let* ((current-batch-pos batch)
|
||||
(processed-rows 0))
|
||||
(loop
|
||||
while (< processed-rows batch-size)
|
||||
do
|
||||
(log-message :debug "pgsql:retry-batch: splitting current batch")
|
||||
(let* ((current-batch current-batch-pos)
|
||||
(current-batch-size (smaller-batch-size batch-size
|
||||
processed-rows))
|
||||
(stream
|
||||
(cl-postgres:open-db-writer conspec table-name columns)))
|
||||
processed-rows)))
|
||||
(handler-case
|
||||
(with-pgsql-transaction (dbname :database pomo:*database*)
|
||||
(let* ((stream (cl-postgres:open-db-writer pomo:*database*
|
||||
table-name
|
||||
columns)))
|
||||
|
||||
(log-message :debug "pgsql:retry-batch: current-batch-size = ~d"
|
||||
current-batch-size)
|
||||
(log-message :debug "pgsql:retry-batch: current-batch-size = ~d"
|
||||
current-batch-size)
|
||||
|
||||
(unwind-protect
|
||||
(dotimes (i current-batch-size)
|
||||
;; rows in that batch have already been processed
|
||||
(cl-postgres:db-write-row stream (car current-batch-pos))
|
||||
(setf current-batch-pos (cdr current-batch-pos))
|
||||
(incf processed-rows))
|
||||
(unwind-protect
|
||||
(dotimes (i current-batch-size)
|
||||
;; rows in that batch have already been processed
|
||||
(cl-postgres:db-write-row stream (car current-batch-pos))
|
||||
(setf current-batch-pos (cdr current-batch-pos))
|
||||
(incf processed-rows))
|
||||
|
||||
(handler-case
|
||||
(cl-postgres:close-db-writer stream)
|
||||
(cl-postgres:close-db-writer stream))))
|
||||
|
||||
;; the batch didn't make it, recurse
|
||||
((or
|
||||
CL-POSTGRES-ERROR:UNIQUE-VIOLATION
|
||||
CL-POSTGRES-ERROR:DATA-EXCEPTION) (condition)
|
||||
;; process bad data
|
||||
(if (= 1 current-batch-size)
|
||||
(process-bad-row table-name condition (car current-batch)
|
||||
:transforms transforms)
|
||||
;; more than one line of bad data: recurse
|
||||
(retry-batch dbname
|
||||
table-name
|
||||
current-batch
|
||||
current-batch-size
|
||||
:columns columns
|
||||
:transforms transforms)))))))))
|
||||
;; the batch didn't make it, recurse
|
||||
((or
|
||||
CL-POSTGRES-ERROR:UNIQUE-VIOLATION
|
||||
CL-POSTGRES-ERROR:DATA-EXCEPTION) (condition)
|
||||
;; process bad data
|
||||
(if (= 1 current-batch-size)
|
||||
(process-bad-row table-name condition (car current-batch)
|
||||
:transforms transforms)
|
||||
;; more than one line of bad data: recurse
|
||||
(retry-batch dbname
|
||||
table-name
|
||||
current-batch
|
||||
current-batch-size
|
||||
:columns columns
|
||||
:transforms transforms))))))))
|
||||
|
||||
@ -6,33 +6,47 @@
|
||||
;;;
|
||||
;;; PostgreSQL Tools connecting to a database
|
||||
;;;
|
||||
(defmacro handling-pgsql-notices ((&key set-local) &body forms)
|
||||
(defmacro handling-pgsql-notices (&body forms)
|
||||
"The BODY is run within a PostgreSQL transaction where *pg-settings* have
|
||||
been applied. PostgreSQL warnings and errors are logged at the
|
||||
appropriate log level."
|
||||
`(pomo:with-transaction ()
|
||||
(handler-bind
|
||||
((cl-postgres:database-error
|
||||
#'(lambda (e)
|
||||
(log-message :error "~a" e)))
|
||||
(cl-postgres:postgresql-warning
|
||||
#'(lambda (w)
|
||||
(log-message :warning "~a" w)
|
||||
(muffle-warning))))
|
||||
(set-session-gucs *pg-settings* :transaction ,set-local)
|
||||
(progn ,@forms))))
|
||||
`(handler-bind
|
||||
((cl-postgres:database-error
|
||||
#'(lambda (e)
|
||||
(log-message :error "~a" e)))
|
||||
(cl-postgres:postgresql-warning
|
||||
#'(lambda (w)
|
||||
(log-message :warning "~a" w)
|
||||
(muffle-warning))))
|
||||
(progn ,@forms)))
|
||||
|
||||
(defmacro with-pgsql-transaction ((dbname &key database) &body forms)
|
||||
"Run FORMS within a PostgreSQL transaction to DBNAME, reusing DATABASE if
|
||||
given. To get the connection spec from the DBNAME, use `get-connection-spec'."
|
||||
(if database
|
||||
`(let ((pomo:*database* database))
|
||||
(handling-pgsql-notices (:set-local t)
|
||||
,@forms))
|
||||
`(let ((pomo:*database* ,database))
|
||||
(handling-pgsql-notices
|
||||
(pomo:with-transaction ()
|
||||
(log-message :debug "BEGIN")
|
||||
(set-session-gucs *pg-settings* :transaction t)
|
||||
,@forms)))
|
||||
;; no database given, create a new database connection
|
||||
`(pomo:with-connection (get-connection-spec ,dbname)
|
||||
(log-message :debug "CONNECT")
|
||||
(set-session-gucs *pg-settings*)
|
||||
(handling-pgsql-notices ()
|
||||
,@forms))))
|
||||
(pomo:with-transaction ()
|
||||
(log-message :debug "BEGIN")
|
||||
,@forms)))))
|
||||
|
||||
(defmacro with-pgsql-connection ((dbname) &body forms)
|
||||
"Run FROMS within a PostgreSQL connection to DBNAME. To get the connection
|
||||
spec from the DBNAME, use `get-connection-spec'."
|
||||
`(pomo:with-connection (get-connection-spec ,dbname)
|
||||
(log-message :debug "CONNECT")
|
||||
(set-session-gucs *pg-settings*)
|
||||
(handling-pgsql-notices ()
|
||||
,@forms)))
|
||||
|
||||
(defun get-connection-spec (dbname &key (with-port t))
|
||||
"pomo:with-connection and cl-postgres:open-database and open-db-writer are
|
||||
@ -47,7 +61,7 @@
|
||||
(let ((pomo:*database* (or database pomo:*database*)))
|
||||
(loop
|
||||
for (name . value) in alist
|
||||
for set = (format nil "SET~@[ LOCAL~] ~a TO '~a'" transaction name value)
|
||||
for set = (format nil "SET~:[~; LOCAL~] ~a TO '~a'" transaction name value)
|
||||
do
|
||||
(log-message :debug set)
|
||||
(pomo:execute set))))
|
||||
|
||||
@ -21,11 +21,13 @@ LOAD CSV
|
||||
|
||||
SET client_encoding to 'latin1',
|
||||
work_mem to '12MB',
|
||||
standard_conforming_strings to 'on'
|
||||
standard_conforming_strings to 'on',
|
||||
search_path to 'err' -- test GUC settings in retry path
|
||||
|
||||
BEFORE LOAD DO
|
||||
$$ drop table if exists errors; $$,
|
||||
$$ create table errors (
|
||||
$$ create schema if not exists err; $$,
|
||||
$$ drop table if exists err.errors; $$,
|
||||
$$ create table err.errors (
|
||||
a integer primary key,
|
||||
b date,
|
||||
c text
|
||||
@ -35,10 +37,10 @@ LOAD CSV
|
||||
|
||||
|
||||
|
||||
1|some first row text|2006-13-11|
|
||||
2|some second row text|2006-11-11|
|
||||
3|some third row text|2006-10-12|
|
||||
1|expected error, month 13|2006-13-11|
|
||||
2|nov. the 11th should go|2006-11-11|
|
||||
3|12th of oct. should go|2006-10-12|
|
||||
4|\ |2006-16-4|
|
||||
5|some fifth row text|2006-5-12|
|
||||
6|some sixth row text|2006-13-10|
|
||||
5|month should be may, ok|2006-5-12|
|
||||
6|another month 13, stress retry path|2006-13-10|
|
||||
7|some null date to play with||
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user