mirror of
https://github.com/dimitri/pgloader.git
synced 2026-05-05 02:46:10 +02:00
Review format-vector-row.
This function prepares the data to be sent down to PostgreSQL as a clean COPY text with unicode handled correctly. This commit is mainly a clean-up of the function, and also adds some smarts to try and make it faster. In testing, the function is now tangentially faster than before, but not by much. The hope here is that it's now easier to optimize it.
This commit is contained in:
parent
ba2d8669c3
commit
3bb128c5db
@ -13,10 +13,7 @@
|
||||
;;; call here.
|
||||
;;;
|
||||
|
||||
(defun format-vector-row (row
|
||||
&optional
|
||||
(transforms (make-list (length row)))
|
||||
pre-formatted)
|
||||
(defun format-vector-row (nbcols row pre-formatted pg-escape-safe-array)
|
||||
"Add a ROW in the STREAM, formating ROW in PostgreSQL COPY TEXT format.
|
||||
|
||||
See http://www.postgresql.org/docs/9.2/static/sql-copy.html#AEN66609 for
|
||||
@ -24,90 +21,181 @@ details about the format, and format specs."
|
||||
(declare (type simple-array row))
|
||||
|
||||
;; first prepare an array of transformed and properly encoded columns
|
||||
(let* ((nbcols (length row))
|
||||
(pgrow (make-array nbcols :element-type 'array)))
|
||||
(loop :for raw-col :across row
|
||||
:for i :from 0
|
||||
:for fn :in transforms
|
||||
:for col := (if pre-formatted raw-col
|
||||
(if fn (funcall fn raw-col) raw-col))
|
||||
:do (setf (aref pgrow i)
|
||||
(if (or (null col) (eq :NULL col))
|
||||
nil
|
||||
(cl-postgres-trivial-utf-8:string-to-utf-8-bytes col))))
|
||||
(row-to-utf8-bytes row)
|
||||
|
||||
;; now that we have all the columns, make a simple array out of them
|
||||
(if pre-formatted
|
||||
;; pre-formatted data means we can return it as it already is
|
||||
(flet ((col-length (col)
|
||||
;; NULL is \N (2 bytes) in COPY format
|
||||
(if col (length col) 2)))
|
||||
(let* ((bytes (+ nbcols (reduce '+ (map 'list #'col-length pgrow))))
|
||||
(result (make-array bytes :element-type '(unsigned-byte 8))))
|
||||
(loop :for start := 0 :then (+ start len 1)
|
||||
:for col :across pgrow
|
||||
:for len := (if col (length col) 2)
|
||||
:do (progn
|
||||
(if col
|
||||
(replace result
|
||||
(the (simple-array (unsigned-byte 8) (*)) col)
|
||||
:start1 start)
|
||||
;; insert \N for a null value
|
||||
(setf (aref result start) #. (char-code #\\)
|
||||
(aref result (+ 1 start)) #. (char-code #\N)))
|
||||
;; now escape bytes for the COPY text format
|
||||
(if pre-formatted
|
||||
;; pre-formatted means no escaping needs to be done
|
||||
(format-pre-formatted-vector-row nbcols row)
|
||||
|
||||
;; either column separator, Tab, or Newline (end of record)
|
||||
(setf (aref result (+ start len))
|
||||
(if (= bytes (+ start len 1))
|
||||
#. (char-code #\Newline)
|
||||
#. (char-code #\Tab)))))
|
||||
;; in the general case we need to take into account PostgreSQL
|
||||
;; escaping rules for the COPY format
|
||||
(format-and-escape-vector-row nbcols row pg-escape-safe-array)))
|
||||
|
||||
;; return the result and how many bytes it represents
|
||||
(values result bytes)))
|
||||
|
||||
;; in the general case we need to take into account PostgreSQL
|
||||
;; escaping rules for the COPY format
|
||||
(flet ((escaped-length (string)
|
||||
(if (null string)
|
||||
;; NULL is \N (2 bytes) in COPY format
|
||||
2
|
||||
(loop :for byte :across string
|
||||
:sum (case byte
|
||||
(#. (char-code #\\) 2)
|
||||
(#. (char-code #\Newline) 2)
|
||||
(#. (char-code #\Return) 2)
|
||||
(#. (char-code #\Tab) 2)
|
||||
(#. (char-code #\Backspace) 2)
|
||||
(#. (char-code #\Page) 2)
|
||||
(t 1))))))
|
||||
(let* ((bytes (+ nbcols
|
||||
(reduce '+ (map 'list #'escaped-length pgrow))))
|
||||
(result (make-array bytes :element-type '(unsigned-byte 8)))
|
||||
(pos 0))
|
||||
(macrolet ((byte-out (byte)
|
||||
`(progn (setf (aref result pos) ,byte)
|
||||
(incf pos)))
|
||||
(esc-byte-out (byte)
|
||||
`(progn (setf (aref result pos) #. (char-code #\\))
|
||||
(setf (aref result (1+ pos)) ,byte)
|
||||
(incf pos 2))))
|
||||
(loop :for col :across pgrow :do
|
||||
(if (null col)
|
||||
(esc-byte-out #. (char-code #\N))
|
||||
(loop :for byte :across col :do
|
||||
(case byte
|
||||
(#. (char-code #\\) (esc-byte-out byte))
|
||||
(#. (char-code #\Newline) (esc-byte-out byte))
|
||||
(#. (char-code #\Return) (esc-byte-out byte))
|
||||
(#. (char-code #\Tab) (esc-byte-out byte))
|
||||
(#. (char-code #\Backspace) (esc-byte-out byte))
|
||||
(#. (char-code #\Page) (esc-byte-out byte))
|
||||
(t (byte-out byte)))))
|
||||
(defun row-to-utf8-bytes (row)
|
||||
"Apply transformation functions to ROW fields, and convert text to UTF-8
|
||||
bytes arrays. Return an array of array-of-bytes ready to be sent to
|
||||
PostgreSQL."
|
||||
(loop :for col :across row
|
||||
:for i fixnum :from 0
|
||||
:do (setf (aref row i)
|
||||
(if (or (null col) (eq :NULL col))
|
||||
nil
|
||||
(cl-postgres-trivial-utf-8:string-to-utf-8-bytes col)))))
|
||||
|
||||
;; either column separator, Tab, or end-of-record with Newline
|
||||
(if (= bytes (+ 1 pos))
|
||||
(byte-out #. (char-code #\Newline))
|
||||
(byte-out #. (char-code #\Tab)))))
|
||||
(defun apply-transforms (nbcols row transform-fns)
|
||||
(loop :for i fixnum :below nbcols
|
||||
:for col :across row
|
||||
:for fun :in transform-fns
|
||||
:do (setf (aref row i)
|
||||
(if fun (funcall fun col) col))))
|
||||
|
||||
;; return the result and how many bytes it represents
|
||||
(values result bytes))))))
|
||||
(defvar *pg-escape-safe-types*
|
||||
(list "integer"
|
||||
"serial"
|
||||
"bigserial"
|
||||
"smallint"
|
||||
"bigint"
|
||||
"numeric"
|
||||
"decimal"
|
||||
"real"
|
||||
"float"
|
||||
"timestamp"
|
||||
"timestamptz"
|
||||
"bit"
|
||||
"boolean"
|
||||
"cidr"
|
||||
"inet"
|
||||
"date"
|
||||
"interval"
|
||||
"uuid")
|
||||
"List of PostgreSQL data types with non-backslash, ascii-only input
|
||||
values: values that we don't need to escape.")
|
||||
|
||||
(defun pg-escape-safe-array (copy)
|
||||
"Given a PostgreSQL type name, return non-nil when this type is known to
|
||||
represent a plain ASCII value: a value that we don't need to convert into
|
||||
UTF-8, such as numbers and dates."
|
||||
(let* ((nbcols (length (pgloader.sources::columns copy)))
|
||||
(result (make-array nbcols :element-type '(integer 0 2)))
|
||||
(pgtypes (loop :for pgcol :in (table-column-list
|
||||
(pgloader.sources::target copy))
|
||||
;; column might miss in pg table definition, but we
|
||||
;; want to let PostgreSQL handle this kind of error.
|
||||
:collect (when pgcol (column-type-name pgcol)))))
|
||||
(loop :for pgtype :in pgtypes
|
||||
:for i fixnum :from 0
|
||||
:do (setf (aref result i)
|
||||
(cond
|
||||
;; bytea representation begins with a backslash, which
|
||||
;; needs special processing.
|
||||
((string= "bytea" pgtype) 2)
|
||||
((member pgtype *pg-escape-safe-types* :test #'string=) 1)
|
||||
(t 0))))
|
||||
(log-message :debug "pg escape safe types for ~a: ~a ~a"
|
||||
(format-table-name (pgloader.sources::target copy))
|
||||
pgtypes
|
||||
result)
|
||||
result))
|
||||
|
||||
(defun format-and-escape-vector-row (nbcols pgrow pg-escape-safe-array)
|
||||
"Prepare the PGROW to be send as a PostgreSQL COPY buffer."
|
||||
(flet ((escaped-length (escape-safe string)
|
||||
(if (null string)
|
||||
;; NULL is \N (2 bytes) in COPY format
|
||||
2
|
||||
(case escape-safe
|
||||
(2 (+ 1 (length string))) ; bytea begins with a \
|
||||
(1 (length string))
|
||||
(0 (loop :for byte :across string
|
||||
:sum (case byte
|
||||
(#. (char-code #\\) 2)
|
||||
(#. (char-code #\Newline) 2)
|
||||
(#. (char-code #\Return) 2)
|
||||
(#. (char-code #\Tab) 2)
|
||||
(#. (char-code #\Backspace) 2)
|
||||
(#. (char-code #\Page) 2)
|
||||
(t 1))))))))
|
||||
(let* ((bytes (+ nbcols
|
||||
(loop :for col :across pgrow
|
||||
:for escape-safe :across pg-escape-safe-array
|
||||
:sum (escaped-length escape-safe col))))
|
||||
(result (make-array bytes :element-type '(unsigned-byte 8)))
|
||||
(pos 0))
|
||||
(macrolet ((byte-out (byte)
|
||||
`(progn (setf (aref result pos) ,byte)
|
||||
(incf pos)))
|
||||
(esc-byte-out (byte)
|
||||
`(progn (setf (aref result pos) #. (char-code #\\))
|
||||
(setf (aref result (1+ pos)) ,byte)
|
||||
(incf pos 2))))
|
||||
(loop :for col :across pgrow
|
||||
:for escape-safe :across pg-escape-safe-array
|
||||
:do (if (null col)
|
||||
(esc-byte-out #. (char-code #\N))
|
||||
(case escape-safe
|
||||
(0 (loop :for byte :across col :do
|
||||
(case byte
|
||||
(#. (char-code #\\) (esc-byte-out byte))
|
||||
(#. (char-code #\Newline) (esc-byte-out byte))
|
||||
(#. (char-code #\Return) (esc-byte-out byte))
|
||||
(#. (char-code #\Tab) (esc-byte-out byte))
|
||||
(#. (char-code #\Backspace) (esc-byte-out byte))
|
||||
(#. (char-code #\Page) (esc-byte-out byte))
|
||||
(t (byte-out byte)))))
|
||||
|
||||
(1 (replace result
|
||||
(the (simple-array (unsigned-byte 8) (*)) col)
|
||||
:start1 pos)
|
||||
(incf pos (length col)))
|
||||
|
||||
(2 ;; Here we know we deal with bytea, and that starts
|
||||
;; with a backslach char that we need to escape...
|
||||
;; it's the almost-escape-safe case, because every
|
||||
;; other character in the stream is going to be an
|
||||
;; hexadecimal char.
|
||||
(esc-byte-out (aref col 0))
|
||||
(replace result
|
||||
(the (simple-array (unsigned-byte 8) (*)) col)
|
||||
:start1 pos
|
||||
:start2 1)
|
||||
(incf pos (+ -1 (length col))))))
|
||||
|
||||
;; either column separator, Tab, or end-of-record with Newline
|
||||
(if (= bytes (+ 1 pos))
|
||||
(byte-out #. (char-code #\Newline))
|
||||
(byte-out #. (char-code #\Tab)))))
|
||||
|
||||
;; return the result and how many bytes it represents
|
||||
(values result bytes))))
|
||||
|
||||
(defun format-pre-formatted-vector-row (nbcols pgrow)
|
||||
"Prepare the PGROW for being sent as a PostgreSQL COPY buffer, knowing
|
||||
that the data in the PGROW is pre-formatted -- no extra escaping is
|
||||
needed."
|
||||
(flet ((col-length (col)
|
||||
;; NULL is \N (2 bytes) in COPY format
|
||||
(if col (length col) 2)))
|
||||
(let* ((bytes (+ nbcols (reduce #'+ pgrow :key #'col-length)))
|
||||
(result (make-array bytes :element-type '(unsigned-byte 8))))
|
||||
(loop :for start := 0 :then (+ start len 1)
|
||||
:for col :across pgrow
|
||||
:for len := (if col (length col) 2)
|
||||
:do (progn
|
||||
(if col
|
||||
(replace result
|
||||
(the (simple-array (unsigned-byte 8) (*)) col)
|
||||
:start1 start)
|
||||
;; insert \N for a null value
|
||||
(setf (aref result start) #. (char-code #\\)
|
||||
(aref result (+ 1 start)) #. (char-code #\N)))
|
||||
|
||||
;; either column separator, Tab, or Newline (end of record)
|
||||
(setf (aref result (+ start len))
|
||||
(if (= bytes (+ start len 1))
|
||||
#. (char-code #\Newline)
|
||||
#. (char-code #\Tab)))))
|
||||
|
||||
;; return the result and how many bytes it represents
|
||||
(values result bytes))))
|
||||
|
||||
@ -118,6 +118,12 @@
|
||||
PostgreSQL, and when that's done update stats."
|
||||
(let ((preprocessor (pgloader.sources::preprocess-row copy))
|
||||
(pre-formatted (pgloader.sources:data-is-preformatted-p copy))
|
||||
(esc-safe-arr (pg-escape-safe-array copy))
|
||||
(nbcols (length
|
||||
(table-column-list (pgloader.sources::target copy))))
|
||||
(transform-fns (let ((funs (pgloader.sources::transforms copy)))
|
||||
(unless (every #'null funs)
|
||||
funs)))
|
||||
(current-batch (make-batch))
|
||||
(seconds 0))
|
||||
|
||||
@ -173,8 +179,9 @@
|
||||
|
||||
;; also add up the time it takes to format the rows
|
||||
(let ((start-time (get-internal-real-time)))
|
||||
(format-row-in-batch copy row current-batch
|
||||
preprocessor pre-formatted)
|
||||
(format-row-in-batch copy nbcols row current-batch
|
||||
preprocessor pre-formatted
|
||||
esc-safe-arr transform-fns)
|
||||
(incf seconds (elapsed-time-since start-time)))))
|
||||
|
||||
;; the last batch might not be empty
|
||||
@ -191,22 +198,36 @@
|
||||
(list :writer table seconds)))
|
||||
|
||||
|
||||
(defun format-row-in-batch (copy row current-batch preprocessor pre-formatted)
|
||||
(defun format-row-in-batch (copy nbcols row current-batch
|
||||
preprocessor
|
||||
pre-formatted
|
||||
escape-safe-array
|
||||
transform-fns)
|
||||
"Given a row from the queue, prepare it for the next batch."
|
||||
(declare (ignore copy))
|
||||
(metabang.bind:bind
|
||||
((row (if preprocessor (funcall preprocessor row) row))
|
||||
((row (if preprocessor (funcall preprocessor row) row))
|
||||
(transformed-row (or (when (and (not pre-formatted)
|
||||
transform-fns)
|
||||
(apply-transforms nbcols
|
||||
row
|
||||
transform-fns))
|
||||
row))
|
||||
|
||||
((:values copy-data bytes)
|
||||
(handler-case
|
||||
(format-vector-row row
|
||||
(pgloader.sources::transforms copy)
|
||||
pre-formatted)
|
||||
(condition (e)
|
||||
(log-message :error "Error while formating a row from ~s:"
|
||||
(format-table-name (pgloader.sources:target copy)))
|
||||
(log-message :error "~a" e)
|
||||
(update-stats :data (pgloader.sources:target copy) :errs 1)
|
||||
(values nil 0)))))
|
||||
(format-vector-row nbcols transformed-row pre-formatted escape-safe-array)
|
||||
;; (handler-case
|
||||
;; (format-vector-row row
|
||||
;; pre-formatted
|
||||
;; transform-fns
|
||||
;; ascii-types)
|
||||
;; (condition (e)
|
||||
;; (log-message :error "Error while formating a row from ~s:"
|
||||
;; (format-table-name (pgloader.sources:target copy)))
|
||||
;; (log-message :error "~a" e)
|
||||
;; (update-stats :data (pgloader.sources:target copy) :errs 1)
|
||||
;; (values nil 0)))
|
||||
))
|
||||
;; we might have to debug
|
||||
(when copy-data
|
||||
(log-message :data "> ~s"
|
||||
|
||||
@ -136,6 +136,18 @@
|
||||
(log-message :fatal "~a" e)
|
||||
(return-from copy-database)))
|
||||
|
||||
;; Keep the PostgreSQL table target around in the copy instance,
|
||||
;; with the following subtleties to deal with:
|
||||
;; 1. the catalog fetching did fill-in PostgreSQL columns as fields
|
||||
;; 2. we might target fewer pg columns than the table actually has
|
||||
(let ((table (first (table-list pgsql-catalog))))
|
||||
(setf (table-column-list table)
|
||||
(loop :for column-name :in (mapcar #'first (columns copy))
|
||||
:collect (find column-name (table-field-list table)
|
||||
:key #'column-name
|
||||
:test #'string=)))
|
||||
(setf (target copy) table))
|
||||
|
||||
;; expand the specs of our source, we might have to care about several
|
||||
;; files actually.
|
||||
(let* ((lp:*kernel* (make-kernel worker-count))
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user