Fix handling of COPY data, fix #222.

When given a file in the COPY format, we should expect that its content
is already properly escaped as expected by PostgreSQL. Rather than
unescape the data then escape it again, add a new more of operation to
format-vector-row in which it won't even try to reformat the data.

In passing, fix an off-by-one bug in dealing with non-ascii characters.
This commit is contained in:
Dimitri Fontaine 2015-04-30 13:17:02 +02:00
parent 5759ae50bb
commit 53dcdfd8ef
7 changed files with 3573 additions and 33 deletions

View File

@ -13,7 +13,9 @@
;;; call here.
;;;
(defun format-vector-row (stream row
&optional (transforms (make-list (length row))))
&optional
(transforms (make-list (length row)))
pre-formatted)
"Add a ROW in the STREAM, formating ROW in PostgreSQL COPY TEXT format.
See http://www.postgresql.org/docs/9.2/static/sql-copy.html#AEN66609 for
@ -35,7 +37,8 @@ details about the format, and format specs."
for i from 1
for more? = (< i nbcols)
for fn in transforms
for preprocessed-col = (if fn (funcall fn col) col)
for preprocessed-col = (if pre-formatted col
(if fn (funcall fn col) col))
do
(if (or (null preprocessed-col)
;; still accept postmodern :NULL in "preprocessed" data
@ -43,32 +46,39 @@ details about the format, and format specs."
(progn
;; NULL is expected as \N, two chars
(write-bytes #\\) (write-bytes #\N))
(loop
;; From PostgreSQL docs:
;;
;; In particular, the following characters must be preceded
;; by a backslash if they appear as part of a column value:
;; backslash itself, newline, carriage return, and the
;; current delimiter character.
for byte across (cl-postgres-trivial-utf-8:string-to-utf-8-bytes preprocessed-col)
do (case (code-char byte)
(#\\ (progn (write-bytes #\\)
(write-bytes #\\)))
(#\Space (write-bytes #\Space))
(#\Newline (progn (write-bytes #\\)
(write-bytes #\n)))
(#\Return (progn (write-bytes #\\)
(write-bytes #\r)))
(#\Tab (progn (write-bytes #\\)
(write-bytes #\t)))
(#\Backspace (progn (write-bytes #\\)
(write-bytes #\b)))
(#\Page (progn (write-bytes #\\)
(write-bytes #\f)))
(t (if (< 32 byte 127)
(write-bytes (code-char byte))
(write-bytes (format nil "\\~o" byte)))))))
(if pre-formatted
(map nil
(lambda (byte)
(if (<= 32 byte 127)
(write-bytes (code-char byte))
(write-bytes (format nil "\\~o" byte))))
(cl-postgres-trivial-utf-8:string-to-utf-8-bytes col))
(loop
;; From PostgreSQL docs:
;;
;; In particular, the following characters must be preceded
;; by a backslash if they appear as part of a column value:
;; backslash itself, newline, carriage return, and the
;; current delimiter character.
for byte across (cl-postgres-trivial-utf-8:string-to-utf-8-bytes preprocessed-col)
do (case (code-char byte)
(#\\ (progn (write-bytes #\\)
(write-bytes #\\)))
(#\Space (write-bytes #\Space))
(#\Newline (progn (write-bytes #\\)
(write-bytes #\n)))
(#\Return (progn (write-bytes #\\)
(write-bytes #\r)))
(#\Tab (progn (write-bytes #\\)
(write-bytes #\t)))
(#\Backspace (progn (write-bytes #\\)
(write-bytes #\b)))
(#\Page (progn (write-bytes #\\)
(write-bytes #\f)))
(t
(if (<= 32 byte 127)
(write-bytes (code-char byte))
(write-bytes (format nil "\\~o" byte))))))))
when more? do (write-bytes #\Tab)
finally (progn (write-bytes #\Newline)
(return bytes))))))

View File

@ -23,7 +23,7 @@
(and *copy-batch-size* ; defaults to nil
(<= *copy-batch-size* (batch-bytes batch))))
(defun batch-row (row copy queue)
(defun batch-row (row copy queue &optional pre-formatted)
"Add ROW to the reader batch. When the batch is full, provide it to the
writer."
(when (or (eq :data *log-min-messages*)
@ -45,7 +45,9 @@
(with-slots (data count bytes) *current-batch*
(let ((copy-string
(with-output-to-string (s)
(let ((c-s-bytes (format-vector-row s row (transforms copy))))
(let ((c-s-bytes (format-vector-row s row
(transforms copy)
pre-formatted)))
(when *copy-batch-size* ; running under memory watch
(incf bytes c-s-bytes))))))
(setf (aref data count) copy-string)
@ -54,19 +56,22 @@
(condition (e)
(log-message :error "~a" e))))
(defun map-push-queue (copy queue)
(defun map-push-queue (copy queue &optional pre-formatted)
"Apply MAP-ROWS on the COPY instance and a function of ROW that will push
the row into the QUEUE. When MAP-ROWS returns, push :end-of-data in the
queue."
(unwind-protect
(let ((*current-batch* (make-batch)))
(map-rows copy :process-row-fn (lambda (row)
(batch-row row copy queue)))
(batch-row row copy queue
pre-formatted)))
;; we might have the last batch to send over now
(with-slots (data count) *current-batch*
(when (< 0 count)
(log-message :debug "Sending last batch (~d rows)" count)
(lq:push-queue (list :batch data count nil) queue))))
;; signal we're done
(log-message :debug "End of data.")
(lq:push-queue (list :end-of-data nil nil nil) queue)))

View File

@ -96,7 +96,7 @@
(defmethod copy-to-queue ((copy copy-copy) queue)
"Copy data from given COPY definition into lparallel.queue DATAQ"
(pgloader.queue:map-push-queue copy queue))
(pgloader.queue:map-push-queue copy queue 'pre-formatted))
(defmethod copy-from ((copy copy-copy) &key truncate disable-triggers)
"Copy data from given COPY file definition into its PostgreSQL target table."

View File

@ -14,6 +14,8 @@ REGRESS= allcols.load \
csv-nulls.load \
csv-trim-extra-blanks.load \
csv.load \
copy.load \
copy-hex.load \
dbf.load \
errors.load \
fixed.load \

15
test/copy-hex.load Normal file
View File

@ -0,0 +1,15 @@
LOAD COPY
FROM inline (id, text)
INTO postgresql:///pgloader?copyhex
WITH truncate
BEFORE LOAD DO
$$ drop table if exists copyhex; $$,
$$ create table copyhex(id int, text varchar(4)); $$;
1 a
2 aa
3 \x1a
4 a\x1a
5 \N

View File

@ -0,0 +1,5 @@
1 a
2 aa
3 
4 a
5 \N

File diff suppressed because it is too large Load Diff