diff --git a/src/pgsql/copy-format.lisp b/src/pgsql/copy-format.lisp index dbfa491..e071c23 100644 --- a/src/pgsql/copy-format.lisp +++ b/src/pgsql/copy-format.lisp @@ -13,10 +13,7 @@ ;;; call here. ;;; -(defun format-vector-row (row - &optional - (transforms (make-list (length row))) - pre-formatted) +(defun format-vector-row (nbcols row pre-formatted pg-escape-safe-array) "Add a ROW in the STREAM, formating ROW in PostgreSQL COPY TEXT format. See http://www.postgresql.org/docs/9.2/static/sql-copy.html#AEN66609 for @@ -24,90 +21,181 @@ details about the format, and format specs." (declare (type simple-array row)) ;; first prepare an array of transformed and properly encoded columns - (let* ((nbcols (length row)) - (pgrow (make-array nbcols :element-type 'array))) - (loop :for raw-col :across row - :for i :from 0 - :for fn :in transforms - :for col := (if pre-formatted raw-col - (if fn (funcall fn raw-col) raw-col)) - :do (setf (aref pgrow i) - (if (or (null col) (eq :NULL col)) - nil - (cl-postgres-trivial-utf-8:string-to-utf-8-bytes col)))) + (row-to-utf8-bytes row) - ;; now that we have all the columns, make a simple array out of them - (if pre-formatted - ;; pre-formatted data means we can return it as it already is - (flet ((col-length (col) - ;; NULL is \N (2 bytes) in COPY format - (if col (length col) 2))) - (let* ((bytes (+ nbcols (reduce '+ (map 'list #'col-length pgrow)))) - (result (make-array bytes :element-type '(unsigned-byte 8)))) - (loop :for start := 0 :then (+ start len 1) - :for col :across pgrow - :for len := (if col (length col) 2) - :do (progn - (if col - (replace result - (the (simple-array (unsigned-byte 8) (*)) col) - :start1 start) - ;; insert \N for a null value - (setf (aref result start) #. (char-code #\\) - (aref result (+ 1 start)) #. (char-code #\N))) + ;; now escape bytes for the COPY text format + (if pre-formatted + ;; pre-formatted means no escaping needs to be done + (format-pre-formatted-vector-row nbcols row) - ;; either column separator, Tab, or Newline (end of record) - (setf (aref result (+ start len)) - (if (= bytes (+ start len 1)) - #. (char-code #\Newline) - #. (char-code #\Tab))))) + ;; in the general case we need to take into account PostgreSQL + ;; escaping rules for the COPY format + (format-and-escape-vector-row nbcols row pg-escape-safe-array))) - ;; return the result and how many bytes it represents - (values result bytes))) - ;; in the general case we need to take into account PostgreSQL - ;; escaping rules for the COPY format - (flet ((escaped-length (string) - (if (null string) - ;; NULL is \N (2 bytes) in COPY format - 2 - (loop :for byte :across string - :sum (case byte - (#. (char-code #\\) 2) - (#. (char-code #\Newline) 2) - (#. (char-code #\Return) 2) - (#. (char-code #\Tab) 2) - (#. (char-code #\Backspace) 2) - (#. (char-code #\Page) 2) - (t 1)))))) - (let* ((bytes (+ nbcols - (reduce '+ (map 'list #'escaped-length pgrow)))) - (result (make-array bytes :element-type '(unsigned-byte 8))) - (pos 0)) - (macrolet ((byte-out (byte) - `(progn (setf (aref result pos) ,byte) - (incf pos))) - (esc-byte-out (byte) - `(progn (setf (aref result pos) #. (char-code #\\)) - (setf (aref result (1+ pos)) ,byte) - (incf pos 2)))) - (loop :for col :across pgrow :do - (if (null col) - (esc-byte-out #. (char-code #\N)) - (loop :for byte :across col :do - (case byte - (#. (char-code #\\) (esc-byte-out byte)) - (#. (char-code #\Newline) (esc-byte-out byte)) - (#. (char-code #\Return) (esc-byte-out byte)) - (#. (char-code #\Tab) (esc-byte-out byte)) - (#. (char-code #\Backspace) (esc-byte-out byte)) - (#. (char-code #\Page) (esc-byte-out byte)) - (t (byte-out byte))))) +(defun row-to-utf8-bytes (row) + "Apply transformation functions to ROW fields, and convert text to UTF-8 + bytes arrays. Return an array of array-of-bytes ready to be sent to + PostgreSQL." + (loop :for col :across row + :for i fixnum :from 0 + :do (setf (aref row i) + (if (or (null col) (eq :NULL col)) + nil + (cl-postgres-trivial-utf-8:string-to-utf-8-bytes col))))) - ;; either column separator, Tab, or end-of-record with Newline - (if (= bytes (+ 1 pos)) - (byte-out #. (char-code #\Newline)) - (byte-out #. (char-code #\Tab))))) +(defun apply-transforms (nbcols row transform-fns) + (loop :for i fixnum :below nbcols + :for col :across row + :for fun :in transform-fns + :do (setf (aref row i) + (if fun (funcall fun col) col)))) - ;; return the result and how many bytes it represents - (values result bytes)))))) +(defvar *pg-escape-safe-types* + (list "integer" + "serial" + "bigserial" + "smallint" + "bigint" + "numeric" + "decimal" + "real" + "float" + "timestamp" + "timestamptz" + "bit" + "boolean" + "cidr" + "inet" + "date" + "interval" + "uuid") + "List of PostgreSQL data types with non-backslash, ascii-only input + values: values that we don't need to escape.") + +(defun pg-escape-safe-array (copy) + "Given a PostgreSQL type name, return non-nil when this type is known to + represent a plain ASCII value: a value that we don't need to convert into + UTF-8, such as numbers and dates." + (let* ((nbcols (length (pgloader.sources::columns copy))) + (result (make-array nbcols :element-type '(integer 0 2))) + (pgtypes (loop :for pgcol :in (table-column-list + (pgloader.sources::target copy)) + ;; column might miss in pg table definition, but we + ;; want to let PostgreSQL handle this kind of error. + :collect (when pgcol (column-type-name pgcol))))) + (loop :for pgtype :in pgtypes + :for i fixnum :from 0 + :do (setf (aref result i) + (cond + ;; bytea representation begins with a backslash, which + ;; needs special processing. + ((string= "bytea" pgtype) 2) + ((member pgtype *pg-escape-safe-types* :test #'string=) 1) + (t 0)))) + (log-message :debug "pg escape safe types for ~a: ~a ~a" + (format-table-name (pgloader.sources::target copy)) + pgtypes + result) + result)) + +(defun format-and-escape-vector-row (nbcols pgrow pg-escape-safe-array) + "Prepare the PGROW to be send as a PostgreSQL COPY buffer." + (flet ((escaped-length (escape-safe string) + (if (null string) + ;; NULL is \N (2 bytes) in COPY format + 2 + (case escape-safe + (2 (+ 1 (length string))) ; bytea begins with a \ + (1 (length string)) + (0 (loop :for byte :across string + :sum (case byte + (#. (char-code #\\) 2) + (#. (char-code #\Newline) 2) + (#. (char-code #\Return) 2) + (#. (char-code #\Tab) 2) + (#. (char-code #\Backspace) 2) + (#. (char-code #\Page) 2) + (t 1)))))))) + (let* ((bytes (+ nbcols + (loop :for col :across pgrow + :for escape-safe :across pg-escape-safe-array + :sum (escaped-length escape-safe col)))) + (result (make-array bytes :element-type '(unsigned-byte 8))) + (pos 0)) + (macrolet ((byte-out (byte) + `(progn (setf (aref result pos) ,byte) + (incf pos))) + (esc-byte-out (byte) + `(progn (setf (aref result pos) #. (char-code #\\)) + (setf (aref result (1+ pos)) ,byte) + (incf pos 2)))) + (loop :for col :across pgrow + :for escape-safe :across pg-escape-safe-array + :do (if (null col) + (esc-byte-out #. (char-code #\N)) + (case escape-safe + (0 (loop :for byte :across col :do + (case byte + (#. (char-code #\\) (esc-byte-out byte)) + (#. (char-code #\Newline) (esc-byte-out byte)) + (#. (char-code #\Return) (esc-byte-out byte)) + (#. (char-code #\Tab) (esc-byte-out byte)) + (#. (char-code #\Backspace) (esc-byte-out byte)) + (#. (char-code #\Page) (esc-byte-out byte)) + (t (byte-out byte))))) + + (1 (replace result + (the (simple-array (unsigned-byte 8) (*)) col) + :start1 pos) + (incf pos (length col))) + + (2 ;; Here we know we deal with bytea, and that starts + ;; with a backslach char that we need to escape... + ;; it's the almost-escape-safe case, because every + ;; other character in the stream is going to be an + ;; hexadecimal char. + (esc-byte-out (aref col 0)) + (replace result + (the (simple-array (unsigned-byte 8) (*)) col) + :start1 pos + :start2 1) + (incf pos (+ -1 (length col)))))) + + ;; either column separator, Tab, or end-of-record with Newline + (if (= bytes (+ 1 pos)) + (byte-out #. (char-code #\Newline)) + (byte-out #. (char-code #\Tab))))) + + ;; return the result and how many bytes it represents + (values result bytes)))) + +(defun format-pre-formatted-vector-row (nbcols pgrow) + "Prepare the PGROW for being sent as a PostgreSQL COPY buffer, knowing + that the data in the PGROW is pre-formatted -- no extra escaping is + needed." + (flet ((col-length (col) + ;; NULL is \N (2 bytes) in COPY format + (if col (length col) 2))) + (let* ((bytes (+ nbcols (reduce #'+ pgrow :key #'col-length))) + (result (make-array bytes :element-type '(unsigned-byte 8)))) + (loop :for start := 0 :then (+ start len 1) + :for col :across pgrow + :for len := (if col (length col) 2) + :do (progn + (if col + (replace result + (the (simple-array (unsigned-byte 8) (*)) col) + :start1 start) + ;; insert \N for a null value + (setf (aref result start) #. (char-code #\\) + (aref result (+ 1 start)) #. (char-code #\N))) + + ;; either column separator, Tab, or Newline (end of record) + (setf (aref result (+ start len)) + (if (= bytes (+ start len 1)) + #. (char-code #\Newline) + #. (char-code #\Tab))))) + + ;; return the result and how many bytes it represents + (values result bytes)))) diff --git a/src/pgsql/copy-from-queue.lisp b/src/pgsql/copy-from-queue.lisp index 36ac472..2e0e8fe 100644 --- a/src/pgsql/copy-from-queue.lisp +++ b/src/pgsql/copy-from-queue.lisp @@ -118,6 +118,12 @@ PostgreSQL, and when that's done update stats." (let ((preprocessor (pgloader.sources::preprocess-row copy)) (pre-formatted (pgloader.sources:data-is-preformatted-p copy)) + (esc-safe-arr (pg-escape-safe-array copy)) + (nbcols (length + (table-column-list (pgloader.sources::target copy)))) + (transform-fns (let ((funs (pgloader.sources::transforms copy))) + (unless (every #'null funs) + funs))) (current-batch (make-batch)) (seconds 0)) @@ -173,8 +179,9 @@ ;; also add up the time it takes to format the rows (let ((start-time (get-internal-real-time))) - (format-row-in-batch copy row current-batch - preprocessor pre-formatted) + (format-row-in-batch copy nbcols row current-batch + preprocessor pre-formatted + esc-safe-arr transform-fns) (incf seconds (elapsed-time-since start-time))))) ;; the last batch might not be empty @@ -191,22 +198,36 @@ (list :writer table seconds))) -(defun format-row-in-batch (copy row current-batch preprocessor pre-formatted) +(defun format-row-in-batch (copy nbcols row current-batch + preprocessor + pre-formatted + escape-safe-array + transform-fns) "Given a row from the queue, prepare it for the next batch." + (declare (ignore copy)) (metabang.bind:bind - ((row (if preprocessor (funcall preprocessor row) row)) + ((row (if preprocessor (funcall preprocessor row) row)) + (transformed-row (or (when (and (not pre-formatted) + transform-fns) + (apply-transforms nbcols + row + transform-fns)) + row)) ((:values copy-data bytes) - (handler-case - (format-vector-row row - (pgloader.sources::transforms copy) - pre-formatted) - (condition (e) - (log-message :error "Error while formating a row from ~s:" - (format-table-name (pgloader.sources:target copy))) - (log-message :error "~a" e) - (update-stats :data (pgloader.sources:target copy) :errs 1) - (values nil 0))))) + (format-vector-row nbcols transformed-row pre-formatted escape-safe-array) + ;; (handler-case + ;; (format-vector-row row + ;; pre-formatted + ;; transform-fns + ;; ascii-types) + ;; (condition (e) + ;; (log-message :error "Error while formating a row from ~s:" + ;; (format-table-name (pgloader.sources:target copy))) + ;; (log-message :error "~a" e) + ;; (update-stats :data (pgloader.sources:target copy) :errs 1) + ;; (values nil 0))) + )) ;; we might have to debug (when copy-data (log-message :data "> ~s" diff --git a/src/sources/common/md-methods.lisp b/src/sources/common/md-methods.lisp index 0ac4b42..37aaaeb 100644 --- a/src/sources/common/md-methods.lisp +++ b/src/sources/common/md-methods.lisp @@ -136,6 +136,18 @@ (log-message :fatal "~a" e) (return-from copy-database))) + ;; Keep the PostgreSQL table target around in the copy instance, + ;; with the following subtleties to deal with: + ;; 1. the catalog fetching did fill-in PostgreSQL columns as fields + ;; 2. we might target fewer pg columns than the table actually has + (let ((table (first (table-list pgsql-catalog)))) + (setf (table-column-list table) + (loop :for column-name :in (mapcar #'first (columns copy)) + :collect (find column-name (table-field-list table) + :key #'column-name + :test #'string=))) + (setf (target copy) table)) + ;; expand the specs of our source, we might have to care about several ;; files actually. (let* ((lp:*kernel* (make-kernel worker-count))