mirror of
https://github.com/dimitri/pgloader.git
synced 2026-05-04 10:31:02 +02:00
Simplify format-vector-row a lot.
Copy some code over from cl-postgres-trivial-utf-8 and add the support for PostgreSQL COPY escaping right at the same place, allowing to allocate our formatted utf-8 buffer only once, with the escaping already installed. This patch was expected to be more about perfs, but it's actually only about code cleaning it seems, as it doesn't make a big difference in the testing I could do here. That said, getting rid of one intermediate buffer should be nice in terms of memory management.
This commit is contained in:
parent
adf03c47ad
commit
8ee799070a
@ -193,7 +193,7 @@
|
||||
:components
|
||||
((:file "copy-batch")
|
||||
(:file "copy-format")
|
||||
(:file "copy-write-row")
|
||||
(:file "copy-write-batch")
|
||||
(:file "copy-from-queue")
|
||||
(:file "copy-retry-batch")))
|
||||
|
||||
|
||||
@ -516,6 +516,9 @@
|
||||
#:postgresql-retryable)
|
||||
(:import-from #:cl-postgres
|
||||
#:database-error-context)
|
||||
(:import-from #:cl-postgres-trivial-utf-8
|
||||
#:utf-8-byte-length
|
||||
#:string-to-utf-8-bytes)
|
||||
(:export #:copy-rows-from-queue
|
||||
#:format-vector-row))
|
||||
|
||||
|
||||
@ -43,9 +43,8 @@
|
||||
(or (= (batch-count batch) (batch-max-count batch))
|
||||
(batch-oversized-p batch)))
|
||||
|
||||
(defun push-row (batch row &optional row-bytes)
|
||||
(defun push-row (batch row row-bytes)
|
||||
(with-slots (data count bytes) batch
|
||||
(setf (aref data count) row)
|
||||
(incf count)
|
||||
(when row-bytes
|
||||
(incf bytes row-bytes))))
|
||||
(incf bytes row-bytes)))
|
||||
|
||||
@ -13,189 +13,174 @@
|
||||
;;; call here.
|
||||
;;;
|
||||
|
||||
(defun format-vector-row (nbcols row pre-formatted pg-escape-safe-array)
|
||||
"Add a ROW in the STREAM, formating ROW in PostgreSQL COPY TEXT format.
|
||||
(defun format-vector-row (nb-cols row)
|
||||
(declare (optimize
|
||||
(speed 3)
|
||||
#-ecl(safety 0) #+ecl(safety 1)
|
||||
(space 0)
|
||||
(debug 1)
|
||||
(compilation-speed 0)))
|
||||
(let* ((lens (map 'vector
|
||||
(lambda (col)
|
||||
(if (col-null-p col) 2 (copy-utf-8-byte-length col)))
|
||||
row))
|
||||
(len (+ nb-cols (reduce #'+ lens)))
|
||||
(buf (make-array (the fixnum len) :element-type '(unsigned-byte 8))))
|
||||
(loop :for col :across row
|
||||
:for i :from 1
|
||||
:for position := 0 :then (+ position col-len 1)
|
||||
:for col-len :across lens
|
||||
:do (if (col-null-p col)
|
||||
(insert-copy-null buf position)
|
||||
(string-to-copy-utf-8-bytes col buf position))
|
||||
:do (insert-copy-separator buf (+ position col-len) i nb-cols))
|
||||
;; return our pg vector of escaped utf8 bytes
|
||||
(values buf len)))
|
||||
|
||||
See http://www.postgresql.org/docs/9.2/static/sql-copy.html#AEN66609 for
|
||||
details about the format, and format specs."
|
||||
(declare (type simple-array row))
|
||||
(defun format-escaped-vector-row (nb-cols row)
|
||||
"We've read data in the COPY format, so already escaped."
|
||||
(declare (optimize
|
||||
(speed 3)
|
||||
#-ecl(safety 0) #+ecl(safety 1)
|
||||
(space 0)
|
||||
(debug 1)
|
||||
(compilation-speed 0)))
|
||||
(let* ((lens (map 'vector
|
||||
(lambda (col)
|
||||
(if (col-null-p col) 2 (utf-8-byte-length col)))
|
||||
row))
|
||||
(len (+ nb-cols (reduce #'+ lens)))
|
||||
(buf (make-array (the fixnum len) :element-type '(unsigned-byte 8))))
|
||||
(loop :for col :across row
|
||||
:for i :from 1
|
||||
:for position := 0 :then (+ position col-len 1)
|
||||
:for col-len :across lens
|
||||
:do (if (col-null-p col)
|
||||
(insert-copy-null buf position)
|
||||
|
||||
;; first prepare an array of transformed and properly encoded columns
|
||||
(row-to-utf8-bytes row)
|
||||
(let ((utf-8-bytes (string-to-utf-8-bytes col)))
|
||||
(replace buf utf-8-bytes :start1 position)))
|
||||
|
||||
;; now escape bytes for the COPY text format
|
||||
(if pre-formatted
|
||||
;; pre-formatted means no escaping needs to be done
|
||||
(format-pre-formatted-vector-row nbcols row)
|
||||
:do (insert-copy-separator buf (+ position col-len) i nb-cols))
|
||||
;; return our pg vector of escaped utf8 bytes
|
||||
(values buf len)))
|
||||
|
||||
;; in the general case we need to take into account PostgreSQL
|
||||
;; escaping rules for the COPY format
|
||||
(format-and-escape-vector-row nbcols row pg-escape-safe-array)))
|
||||
(declaim (inline insert-copy-separator insert-copy-null col-null-p))
|
||||
|
||||
(defun col-null-p (col)
|
||||
(or (null col) (eq :NULL col)))
|
||||
|
||||
(defun row-to-utf8-bytes (row)
|
||||
"Apply transformation functions to ROW fields, and convert text to UTF-8
|
||||
bytes arrays. Return an array of array-of-bytes ready to be sent to
|
||||
PostgreSQL."
|
||||
(loop :for col :across row
|
||||
:for i fixnum :from 0
|
||||
:do (setf (aref row i)
|
||||
(if (or (null col) (eq :NULL col))
|
||||
nil
|
||||
(cl-postgres-trivial-utf-8:string-to-utf-8-bytes col)))))
|
||||
(defun insert-copy-null (buffer position)
|
||||
"NULL is \\N in COPY format (that's 2 bytes)"
|
||||
(setf (aref buffer position) #. (char-code #\\))
|
||||
(setf (aref buffer (+ 1 position)) #. (char-code #\N)))
|
||||
|
||||
(defun apply-transforms (nbcols row transform-fns)
|
||||
(loop :for i fixnum :below nbcols
|
||||
:for col :across row
|
||||
:for fun :in transform-fns
|
||||
:do (setf (aref row i)
|
||||
(if fun (funcall fun col) col))))
|
||||
(defun insert-copy-separator (buffer position col nb-cols)
|
||||
(if (< col nb-cols)
|
||||
(setf (aref buffer position) #. (char-code #\Tab))
|
||||
(setf (aref buffer position) #. (char-code #\Newline))))
|
||||
|
||||
(defvar *pg-escape-safe-types*
|
||||
(list "integer"
|
||||
"serial"
|
||||
"bigserial"
|
||||
"smallint"
|
||||
"bigint"
|
||||
"numeric"
|
||||
"decimal"
|
||||
"real"
|
||||
"float"
|
||||
"timestamp"
|
||||
"timestamptz"
|
||||
"bit"
|
||||
"boolean"
|
||||
"cidr"
|
||||
"inet"
|
||||
"date"
|
||||
"interval"
|
||||
"uuid")
|
||||
"List of PostgreSQL data types with non-backslash, ascii-only input
|
||||
values: values that we don't need to escape.")
|
||||
(defun apply-transforms (copy nbcols row transform-fns)
|
||||
(handler-case
|
||||
(loop :for i fixnum :below nbcols
|
||||
:for col :across row
|
||||
:for fun :in transform-fns
|
||||
:do (setf (aref row i)
|
||||
(if fun (funcall fun col) col))
|
||||
:finally (return row))
|
||||
(condition (e)
|
||||
(log-message :error "Error while formating a row from ~s:"
|
||||
(format-table-name (target copy)))
|
||||
(log-message :error "~a" e)
|
||||
(update-stats :data (target copy) :errs 1)
|
||||
nil)))
|
||||
|
||||
(defun pg-escape-safe-array (copy)
|
||||
"Given a PostgreSQL type name, return non-nil when this type is known to
|
||||
represent a plain ASCII value: a value that we don't need to convert into
|
||||
UTF-8, such as numbers and dates."
|
||||
(let* ((nbcols (length (pgloader.sources::columns copy)))
|
||||
(result (make-array nbcols :element-type '(integer 0 2)))
|
||||
(pgtypes (loop :for pgcol :in (table-column-list
|
||||
(pgloader.sources::target copy))
|
||||
;; column might miss in pg table definition, but we
|
||||
;; want to let PostgreSQL handle this kind of error.
|
||||
:collect (when pgcol (column-type-name pgcol)))))
|
||||
(loop :for pgtype :in pgtypes
|
||||
:for i fixnum :from 0
|
||||
:do (setf (aref result i)
|
||||
(cond
|
||||
;; bytea representation begins with a backslash, which
|
||||
;; needs special processing.
|
||||
((string= "bytea" pgtype) 2)
|
||||
((member pgtype *pg-escape-safe-types* :test #'string=) 1)
|
||||
(t 0))))
|
||||
(log-message :debug "pg escape safe types for ~a: ~a ~a"
|
||||
(format-table-name (pgloader.sources::target copy))
|
||||
pgtypes
|
||||
result)
|
||||
result))
|
||||
;;;
|
||||
;;; Low Level UTF-8 handling + PostgreSQL COPY format escaping
|
||||
;;;
|
||||
;;; Main bits stolen from Postmodern:cl-postgres/trivial-utf-8.lisp
|
||||
;;;
|
||||
;;; We add PostgreSQL COPY escaping right at the same time as we do the
|
||||
;;; UTF-8 preparation dance.
|
||||
;;;
|
||||
(defun copy-utf-8-byte-length (string)
|
||||
"Calculate the amount of bytes needed to encode a string."
|
||||
(declare (type string string)
|
||||
(optimize
|
||||
(speed 3)
|
||||
#-ecl(safety 0) #+ecl(safety 1)
|
||||
(space 0)
|
||||
(debug 1)
|
||||
(compilation-speed 0)))
|
||||
(let ((length (length string))
|
||||
(string (coerce string 'simple-string)))
|
||||
(loop :for char :across string
|
||||
:do (let ((code (char-code char)))
|
||||
(case char
|
||||
((#\\ #\Newline #\Return #\Tab #\Backspace #\Page)
|
||||
(incf length 1))
|
||||
(otherwise
|
||||
(when (> code 127)
|
||||
(incf length
|
||||
(cond ((< code 2048) 1)
|
||||
((< code 65536) 2)
|
||||
(t 3))))))))
|
||||
length))
|
||||
|
||||
(defun format-and-escape-vector-row (nbcols pgrow pg-escape-safe-array)
|
||||
"Prepare the PGROW to be send as a PostgreSQL COPY buffer."
|
||||
(flet ((escaped-length (escape-safe string)
|
||||
(if (null string)
|
||||
;; NULL is \N (2 bytes) in COPY format
|
||||
2
|
||||
(case escape-safe
|
||||
(2 (+ 1 (length string))) ; bytea begins with a \
|
||||
(1 (length string))
|
||||
(0 (loop :for byte :across string
|
||||
:sum (case byte
|
||||
(#. (char-code #\\) 2)
|
||||
(#. (char-code #\Newline) 2)
|
||||
(#. (char-code #\Return) 2)
|
||||
(#. (char-code #\Tab) 2)
|
||||
(#. (char-code #\Backspace) 2)
|
||||
(#. (char-code #\Page) 2)
|
||||
(t 1))))))))
|
||||
(let* ((bytes (+ nbcols
|
||||
(loop :for col :across pgrow
|
||||
:for escape-safe :across pg-escape-safe-array
|
||||
:sum (escaped-length escape-safe col))))
|
||||
(result (make-array bytes :element-type '(unsigned-byte 8)))
|
||||
(pos 0))
|
||||
(macrolet ((byte-out (byte)
|
||||
`(progn (setf (aref result pos) ,byte)
|
||||
(incf pos)))
|
||||
(esc-byte-out (byte)
|
||||
`(progn (setf (aref result pos) #. (char-code #\\))
|
||||
(setf (aref result (1+ pos)) ,byte)
|
||||
(incf pos 2))))
|
||||
(loop :for col :across pgrow
|
||||
:for escape-safe :across pg-escape-safe-array
|
||||
:do (if (null col)
|
||||
(esc-byte-out #. (char-code #\N))
|
||||
(case escape-safe
|
||||
(0 (loop :for byte :across col :do
|
||||
(case byte
|
||||
(#. (char-code #\\) (esc-byte-out byte))
|
||||
(#. (char-code #\Newline) (esc-byte-out byte))
|
||||
(#. (char-code #\Return) (esc-byte-out byte))
|
||||
(#. (char-code #\Tab) (esc-byte-out byte))
|
||||
(#. (char-code #\Backspace) (esc-byte-out byte))
|
||||
(#. (char-code #\Page) (esc-byte-out byte))
|
||||
(t (byte-out byte)))))
|
||||
(defmacro as-copy-utf-8-bytes (char writer)
|
||||
"Given a character, calls the writer function for every byte in the
|
||||
encoded form of that character."
|
||||
(let ((char-code (gensym)))
|
||||
`(let ((,char-code (char-code ,char)))
|
||||
(declare (type fixnum ,char-code))
|
||||
(cond ((= ,char-code #. (char-code #\\))
|
||||
(progn (,writer #. (char-code #\\))
|
||||
(,writer ,char-code)))
|
||||
((= ,char-code #. (char-code #\Newline))
|
||||
(progn (,writer #. (char-code #\\))
|
||||
(,writer ,char-code)))
|
||||
((= ,char-code #. (char-code #\Return))
|
||||
(progn (,writer #. (char-code #\\))
|
||||
(,writer ,char-code)))
|
||||
((= ,char-code #. (char-code #\Tab))
|
||||
(progn (,writer #. (char-code #\\))
|
||||
(,writer ,char-code)))
|
||||
((= ,char-code #. (char-code #\Backspace))
|
||||
(progn (,writer #. (char-code #\\))
|
||||
(,writer ,char-code)))
|
||||
((= ,char-code #. (char-code #\Page))
|
||||
(progn (,writer #. (char-code #\\))
|
||||
(,writer ,char-code)))
|
||||
((< ,char-code 128)
|
||||
(,writer ,char-code))
|
||||
((< ,char-code 2048)
|
||||
(,writer (logior #b11000000 (ldb (byte 5 6) ,char-code)))
|
||||
(,writer (logior #b10000000 (ldb (byte 6 0) ,char-code))))
|
||||
((< ,char-code 65536)
|
||||
(,writer (logior #b11100000 (ldb (byte 4 12) ,char-code)))
|
||||
(,writer (logior #b10000000 (ldb (byte 6 6) ,char-code)))
|
||||
(,writer (logior #b10000000 (ldb (byte 6 0) ,char-code))))
|
||||
(t
|
||||
(,writer (logior #b11110000 (ldb (byte 3 18) ,char-code)))
|
||||
(,writer (logior #b10000000 (ldb (byte 6 12) ,char-code)))
|
||||
(,writer (logior #b10000000 (ldb (byte 6 6) ,char-code)))
|
||||
(,writer (logior #b10000000 (ldb (byte 6 0) ,char-code))))))))
|
||||
|
||||
(1 (replace result
|
||||
(the (simple-array (unsigned-byte 8) (*)) col)
|
||||
:start1 pos)
|
||||
(incf pos (length col)))
|
||||
|
||||
(2 ;; Here we know we deal with bytea, and that starts
|
||||
;; with a backslach char that we need to escape...
|
||||
;; it's the almost-escape-safe case, because every
|
||||
;; other character in the stream is going to be an
|
||||
;; hexadecimal char.
|
||||
(esc-byte-out (aref col 0))
|
||||
(replace result
|
||||
(the (simple-array (unsigned-byte 8) (*)) col)
|
||||
:start1 pos
|
||||
:start2 1)
|
||||
(incf pos (+ -1 (length col))))))
|
||||
|
||||
;; either column separator, Tab, or end-of-record with Newline
|
||||
(if (= bytes (+ 1 pos))
|
||||
(byte-out #. (char-code #\Newline))
|
||||
(byte-out #. (char-code #\Tab)))))
|
||||
|
||||
;; return the result and how many bytes it represents
|
||||
(values result bytes))))
|
||||
|
||||
(defun format-pre-formatted-vector-row (nbcols pgrow)
|
||||
"Prepare the PGROW for being sent as a PostgreSQL COPY buffer, knowing
|
||||
that the data in the PGROW is pre-formatted -- no extra escaping is
|
||||
needed."
|
||||
(flet ((col-length (col)
|
||||
;; NULL is \N (2 bytes) in COPY format
|
||||
(if col (length col) 2)))
|
||||
(let* ((bytes (+ nbcols (reduce #'+ pgrow :key #'col-length)))
|
||||
(result (make-array bytes :element-type '(unsigned-byte 8))))
|
||||
(loop :for start := 0 :then (+ start len 1)
|
||||
:for col :across pgrow
|
||||
:for len := (if col (length col) 2)
|
||||
:do (progn
|
||||
(if col
|
||||
(replace result
|
||||
(the (simple-array (unsigned-byte 8) (*)) col)
|
||||
:start1 start)
|
||||
;; insert \N for a null value
|
||||
(setf (aref result start) #. (char-code #\\)
|
||||
(aref result (+ 1 start)) #. (char-code #\N)))
|
||||
|
||||
;; either column separator, Tab, or Newline (end of record)
|
||||
(setf (aref result (+ start len))
|
||||
(if (= bytes (+ start len 1))
|
||||
#. (char-code #\Newline)
|
||||
#. (char-code #\Tab)))))
|
||||
|
||||
;; return the result and how many bytes it represents
|
||||
(values result bytes))))
|
||||
(defun string-to-copy-utf-8-bytes (string buffer &optional (position 0))
|
||||
"Convert a string into an array of unsigned bytes containing its
|
||||
utf-8 representation."
|
||||
(declare (type string string)
|
||||
(optimize
|
||||
(speed 3)
|
||||
#-ecl(safety 0) #+ecl(safety 1)
|
||||
(space 0)
|
||||
(debug 1)
|
||||
(compilation-speed 0)))
|
||||
(let ((string (coerce string 'simple-string)))
|
||||
(declare (type (array (unsigned-byte 8)) buffer)
|
||||
(type fixnum position))
|
||||
(macrolet ((add-byte (byte)
|
||||
`(progn (setf (aref buffer position) ,byte)
|
||||
(incf position))))
|
||||
(loop :for char :across string
|
||||
:do (as-copy-utf-8-bytes char add-byte)))))
|
||||
|
||||
@ -3,7 +3,7 @@
|
||||
;;;
|
||||
(in-package :pgloader.copy)
|
||||
|
||||
(defun copy-batch (table columns batch batch-rows
|
||||
(defun send-batch (table columns batch
|
||||
&key
|
||||
(db pomo:*database*)
|
||||
on-error-stop)
|
||||
@ -33,13 +33,9 @@
|
||||
columns
|
||||
c)
|
||||
(update-stats :data table :errs 1)
|
||||
(return-from copy-batch 0)))))
|
||||
(return-from send-batch 0)))))
|
||||
(unwind-protect
|
||||
(loop :for i :below batch-rows
|
||||
:for data := (aref batch i)
|
||||
:do (when data
|
||||
(db-write-row copier data))
|
||||
:finally (return batch-rows))
|
||||
(db-write-batch copier batch)
|
||||
(cl-postgres:close-db-writer copier)
|
||||
(pomo:execute "COMMIT"))))
|
||||
|
||||
@ -56,7 +52,7 @@
|
||||
;; normal behavior, on-error-stop being nil
|
||||
;; clean the current transaction before retrying new ones
|
||||
(let ((errors
|
||||
(retry-batch table columns batch batch-rows condition)))
|
||||
(retry-batch table columns batch condition)))
|
||||
(log-message :debug "retry-batch found ~d errors" errors)
|
||||
(update-stats :data table :rows (- errors)))))
|
||||
|
||||
@ -65,14 +61,12 @@
|
||||
(log-message :error "[PostgreSQL ~s] ~a" table-name condition)
|
||||
(log-message :error "Copy Batch reconnecting to PostgreSQL")
|
||||
|
||||
;; in order to avoid Socket error in "connect": ECONNREFUSED if we
|
||||
;; try just too soon, wait a little
|
||||
(sleep 2)
|
||||
;; in order to avoid Socket error in "connect": ECONNREFUSED if we
|
||||
;; try just too soon, wait a little
|
||||
(sleep 2)
|
||||
|
||||
(cl-postgres:reopen-database db)
|
||||
(copy-batch table columns batch batch-rows
|
||||
:db db
|
||||
:on-error-stop on-error-stop))
|
||||
(send-batch table columns batch :db db :on-error-stop on-error-stop))
|
||||
|
||||
(condition (c)
|
||||
;; non retryable failures
|
||||
@ -95,29 +89,38 @@
|
||||
(table (pgloader.sources:target copy)))
|
||||
"Fetch rows from the QUEUE, prepare them in batches and send them down to
|
||||
PostgreSQL, and when that's done update stats."
|
||||
(let ((preprocessor (pgloader.sources::preprocess-row copy))
|
||||
(pre-formatted (pgloader.sources:data-is-preformatted-p copy))
|
||||
(esc-safe-arr (pg-escape-safe-array copy))
|
||||
(nbcols (length
|
||||
(table-column-list (pgloader.sources::target copy))))
|
||||
(transform-fns (let ((funs (pgloader.sources::transforms copy)))
|
||||
(unless (every #'null funs)
|
||||
funs)))
|
||||
(current-batch (make-batch))
|
||||
(seconds 0))
|
||||
(let* ((nbcols (length
|
||||
(table-column-list (pgloader.sources::target copy))))
|
||||
(current-batch (make-batch))
|
||||
(seconds 0))
|
||||
|
||||
;; add some COPY activity related bits to our COPY object.
|
||||
(setf (transforms copy)
|
||||
(let ((funs (transforms copy)))
|
||||
(unless (every #'null funs)
|
||||
funs))
|
||||
|
||||
;; FIXME: we should change the API around preprocess-row, someday.
|
||||
(preprocessor copy)
|
||||
(pgloader.sources::preprocess-row copy)
|
||||
|
||||
;; FIXME: we could change the API around data-is-preformatted-p,
|
||||
;; but that's a bigger change than duplicating the information in
|
||||
;; the object.
|
||||
(copy-format copy)
|
||||
(if (data-is-preformatted-p copy) :escaped :raw))
|
||||
|
||||
(flet ((send-current-batch (unqualified-table-name)
|
||||
;; we close over the whole lexical environment or almost...
|
||||
(let ((batch-start-time (get-internal-real-time)))
|
||||
(copy-batch table
|
||||
(send-batch table
|
||||
columns
|
||||
(batch-data current-batch)
|
||||
(batch-count current-batch)
|
||||
current-batch
|
||||
:on-error-stop on-error-stop)
|
||||
|
||||
(let ((batch-seconds (elapsed-time-since batch-start-time)))
|
||||
(log-message :debug
|
||||
"copy-batch[~a] ~a ~d row~:p [~a] in ~6$s~@[ [oversized]~]"
|
||||
"send-batch[~a] ~a ~d row~:p [~a] in ~6$s~@[ [oversized]~]"
|
||||
(lp:kernel-worker-index)
|
||||
unqualified-table-name
|
||||
(batch-count current-batch)
|
||||
@ -158,9 +161,7 @@
|
||||
|
||||
;; also add up the time it takes to format the rows
|
||||
(let ((start-time (get-internal-real-time)))
|
||||
(format-row-in-batch copy nbcols row current-batch
|
||||
preprocessor pre-formatted
|
||||
esc-safe-arr transform-fns)
|
||||
(format-row-in-batch copy nbcols row current-batch)
|
||||
(incf seconds (elapsed-time-since start-time)))))
|
||||
|
||||
;; the last batch might not be empty
|
||||
@ -177,40 +178,28 @@
|
||||
(list :writer table seconds)))
|
||||
|
||||
|
||||
(defun format-row-in-batch (copy nbcols row current-batch
|
||||
preprocessor
|
||||
pre-formatted
|
||||
escape-safe-array
|
||||
transform-fns)
|
||||
(defun format-row-in-batch (copy nbcols row current-batch)
|
||||
"Given a row from the queue, prepare it for the next batch."
|
||||
(declare (ignore copy))
|
||||
(metabang.bind:bind
|
||||
((row (if preprocessor (funcall preprocessor row) row))
|
||||
(transformed-row (or (when (and (not pre-formatted)
|
||||
transform-fns)
|
||||
(apply-transforms nbcols
|
||||
row
|
||||
transform-fns))
|
||||
row))
|
||||
(let* ((row (if (preprocessor copy)
|
||||
(funcall (preprocessor copy) row)
|
||||
row))
|
||||
(transformed-row (cond ((eq :escaped (copy-format copy)) row)
|
||||
((null (transforms copy)) row)
|
||||
(t
|
||||
(apply-transforms copy
|
||||
nbcols
|
||||
row
|
||||
(transforms copy))))))
|
||||
(multiple-value-bind (pg-vector-row bytes)
|
||||
(if transformed-row
|
||||
(ecase (copy-format copy)
|
||||
(:raw (format-vector-row nbcols transformed-row))
|
||||
(:escaped (format-escaped-vector-row nbcols transformed-row)))
|
||||
(values nil 0))
|
||||
|
||||
((:values copy-data bytes)
|
||||
(format-vector-row nbcols transformed-row pre-formatted escape-safe-array)
|
||||
;; (handler-case
|
||||
;; (format-vector-row row
|
||||
;; pre-formatted
|
||||
;; transform-fns
|
||||
;; ascii-types)
|
||||
;; (condition (e)
|
||||
;; (log-message :error "Error while formating a row from ~s:"
|
||||
;; (format-table-name (pgloader.sources:target copy)))
|
||||
;; (log-message :error "~a" e)
|
||||
;; (update-stats :data (pgloader.sources:target copy) :errs 1)
|
||||
;; (values nil 0)))
|
||||
))
|
||||
;; we might have to debug
|
||||
(when copy-data
|
||||
(log-message :data "> ~s"
|
||||
(map 'string #'code-char copy-data)))
|
||||
;; we might have to debug
|
||||
(when pg-vector-row
|
||||
(log-message :data "> ~s" (map 'string #'code-char pg-vector-row))
|
||||
|
||||
;; now add copy-data to current-batch
|
||||
(push-row current-batch copy-data bytes)))
|
||||
;; now add copy-data to current-batch
|
||||
(push-row current-batch pg-vector-row bytes)))))
|
||||
|
||||
@ -54,7 +54,7 @@
|
||||
;;;
|
||||
;;; The main retry batch function.
|
||||
;;;
|
||||
(defun retry-batch (table columns batch batch-rows condition
|
||||
(defun retry-batch (table columns batch condition
|
||||
&optional (current-batch-pos 0)
|
||||
&aux (nb-errors 0))
|
||||
"Batch is a list of rows containing at least one bad row, the first such
|
||||
@ -67,32 +67,36 @@
|
||||
:with next-error := (parse-copy-error-context
|
||||
(database-error-context condition))
|
||||
|
||||
:while (< current-batch-pos batch-rows)
|
||||
:while (< current-batch-pos (batch-count batch))
|
||||
|
||||
:do
|
||||
(progn ; indenting helper
|
||||
(log-message :debug "pos: ~s ; err: ~a" current-batch-pos next-error)
|
||||
(when (= current-batch-pos next-error)
|
||||
(log-message :info "error recovery at ~d/~d, processing bad row"
|
||||
current-batch-pos batch-rows)
|
||||
(process-bad-row table condition (aref batch current-batch-pos))
|
||||
current-batch-pos (batch-count batch))
|
||||
(process-bad-row table
|
||||
condition
|
||||
(aref (batch-data batch) current-batch-pos))
|
||||
(incf current-batch-pos)
|
||||
(incf nb-errors))
|
||||
|
||||
(let* ((current-batch-rows
|
||||
(next-batch-rows batch-rows current-batch-pos next-error)))
|
||||
(next-batch-rows (batch-count batch) current-batch-pos next-error)))
|
||||
(when (< 0 current-batch-rows)
|
||||
(if (< current-batch-pos next-error)
|
||||
(log-message :info
|
||||
"error recovery at ~d/~d, next error at ~d, ~
|
||||
loading ~d row~:p"
|
||||
current-batch-pos
|
||||
batch-rows
|
||||
(batch-count batch)
|
||||
next-error
|
||||
current-batch-rows)
|
||||
(log-message :info
|
||||
"error recovery at ~d/~d, trying ~d row~:p"
|
||||
current-batch-pos batch-rows current-batch-rows))
|
||||
current-batch-pos
|
||||
(batch-count batch)
|
||||
current-batch-rows))
|
||||
|
||||
(handler-case
|
||||
(incf current-batch-pos
|
||||
@ -116,7 +120,7 @@
|
||||
next-error (+ current-batch-pos next-error-relative)))))))))
|
||||
|
||||
(log-message :info "Recovery found ~d errors in ~d row~:p"
|
||||
nb-errors batch-rows)
|
||||
nb-errors (batch-count batch))
|
||||
|
||||
;; Return how many rows where erroneous, for statistics purposes
|
||||
nb-errors)
|
||||
@ -131,7 +135,7 @@
|
||||
(unwind-protect
|
||||
(loop :repeat current-batch-rows
|
||||
:for pos :from current-batch-pos
|
||||
:do (db-write-row stream (aref batch pos)))
|
||||
:do (db-write-row stream (aref (batch-data batch) pos)))
|
||||
|
||||
;; close-db-writer is the one signaling cl-postgres-errors
|
||||
(progn
|
||||
|
||||
@ -10,6 +10,13 @@
|
||||
;;; COPY protocol, and retry the batch avoiding known bad rows (from parsing
|
||||
;;; COPY error messages) in case some data related conditions are signaled.
|
||||
;;;
|
||||
(defun db-write-batch (copier batch)
|
||||
(loop :for count :below (batch-count batch)
|
||||
:for data :across (batch-data batch)
|
||||
:do (when data
|
||||
(db-write-row copier data))
|
||||
:finally (return (batch-count batch))))
|
||||
|
||||
(defun db-write-row (copier data)
|
||||
"Copy cl-postgres:db-write-row guts to avoid computing utf-8 bytes all
|
||||
over again, as we reproduced the data formating in pgloader code. The
|
||||
@ -26,4 +33,3 @@
|
||||
:do (write-byte byte cl-postgres::socket))))))
|
||||
(incf (cl-postgres::copier-count copier)))
|
||||
|
||||
|
||||
@ -27,12 +27,7 @@
|
||||
:initarg :process-fn) ;
|
||||
(format :accessor copy-format ; format can be :COPY
|
||||
:initarg :format ; in which case no escaping
|
||||
:initform :raw) ; has to be done
|
||||
;; and this one is an array of magic numbers for per-column escaping
|
||||
;; rules, which depend on the PostgreSQL data type we are targetting, and
|
||||
;; allow to reduce the work to be done at COPY time.
|
||||
(columns-escape-mode :accessor columns-escape-mode
|
||||
:initarg :columns-escape-mode))
|
||||
:initform :raw)) ; has to be done
|
||||
(:documentation "pgloader Generic Data Source"))
|
||||
|
||||
(defmethod initialize-instance :after ((source copy) &key)
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user