Redshift doesn't have support for the COPY format.

Instead, it needs to parse CSV files. On the other hand, as we don't have to
implement the COPY protocol from within pgloader for Redshift (because it's
using S3 as the data source, and not STDIN), we don't need the level of
control that we are using when talking to a normal PostgreSQL.
This commit is contained in:
Dimitri Fontaine 2018-05-23 13:45:16 +02:00
parent 3db3ecf81b
commit bcf9cf9bf4
3 changed files with 27 additions and 12 deletions

View File

@ -61,7 +61,7 @@
(push-row current-batch pg-vector-row bytes)))) (push-row current-batch pg-vector-row bytes))))
(defun add-row-to-current-batch (table columns copy nbcols batch row (defun add-row-to-current-batch (table columns copy nbcols batch row
send-batch-fn) &key send-batch-fn format-row-fn)
"Add another ROW we just received to CURRENT-BATCH, and prepare a new "Add another ROW we just received to CURRENT-BATCH, and prepare a new
batch if needed. The current-batch (possibly a new one) is returned. When batch if needed. The current-batch (possibly a new one) is returned. When
the batch is full, the function SEND-BATCH-FN is called with TABLE, the batch is full, the function SEND-BATCH-FN is called with TABLE,
@ -83,7 +83,10 @@
;; also add up the time it takes to format the rows ;; also add up the time it takes to format the rows
(let ((start-time (get-internal-real-time))) (let ((start-time (get-internal-real-time)))
(format-row-in-batch copy nbcols row current-batch) (multiple-value-bind (pg-vector-row bytes)
(funcall format-row-fn copy nbcols row)
(when pg-vector-row
(push-row current-batch pg-vector-row bytes)))
(incf seconds (elapsed-time-since start-time))) (incf seconds (elapsed-time-since start-time)))
(values current-batch seconds))) (values current-batch seconds)))

View File

@ -19,7 +19,8 @@
:do (multiple-value-bind (maybe-new-batch seconds-in-this-batch) :do (multiple-value-bind (maybe-new-batch seconds-in-this-batch)
(add-row-to-current-batch table columns copy nbcols (add-row-to-current-batch table columns copy nbcols
current-batch row current-batch row
(function send-batch-through-s3)) :send-batch-fn #'send-batch-through-s3
:format-row-fn #'prepare-and-format-row-for-s3)
(setf current-batch maybe-new-batch) (setf current-batch maybe-new-batch)
(incf seconds seconds-in-this-batch))) (incf seconds seconds-in-this-batch)))
@ -30,6 +31,22 @@
seconds)) seconds))
(defun prepare-and-format-row-for-s3 (copy nbcols row)
"Redshift doesn't know how to parse COPY format, we need to upload CSV
instead. That said, we don't have to be as careful with the data layout
and unicode representation when COPYing from a CSV file as we do when
implementing the data streaming outselves."
(declare (ignore copy nbcols))
(let ((pg-vector-row (cl-csv:write-csv-row (coerce row 'list)
:separator #\,
:quote #\"
:escape #(#\" #\")
:newline #(#\Newline)
:always-quote t)))
(log-message :data "> ~s" pg-vector-row)
(values pg-vector-row (length pg-vector-row))))
(defun send-batch-through-s3 (table columns batch &key (db pomo:*database*)) (defun send-batch-through-s3 (table columns batch &key (db pomo:*database*))
"Copy current *writer-batch* into TABLE-NAME." "Copy current *writer-batch* into TABLE-NAME."
(let ((batch-start-time (get-internal-real-time)) (let ((batch-start-time (get-internal-real-time))
@ -71,13 +88,7 @@
;; ;;
(handler-case (handler-case
(with-pgsql-transaction (:database db) (with-pgsql-transaction (:database db)
(let ((sql (format nil "~ (let ((sql (format nil "COPY ~a FROM 's3://~a/~a' FORMAT CSV TIMEFORMAT 'auto' REGION '~a' ACCESS_KEY_ID '~a'"
COPY ~a
FROM 's3://~a/~a'
DELIMITER '\\t'
TIMEFORMAT 'auto'
REGION '~a'
ACCESS_KEY_ID '~a'"
table-name table-name
aws-s3-bucket aws-s3-bucket
s3-filename s3-filename
@ -153,7 +164,7 @@ SECRET_ACCESS_KEY '~a'"
;; So now we now how many bytes we need to finalize this batch ;; So now we now how many bytes we need to finalize this batch
;; ;;
(let* ((bytes (batch-bytes batch)) (let* ((bytes (batch-bytes batch))
(vector (make-array bytes :element-type '(unsigned-byte 8)))) (vector (make-array bytes :element-type 'character)))
(loop :for count :below (batch-count batch) (loop :for count :below (batch-count batch)
:for pos := 0 :then (+ pos (length row)) :for pos := 0 :then (+ pos (length row))
:for row :across (batch-data batch) :for row :across (batch-data batch)

View File

@ -16,7 +16,8 @@
:do (multiple-value-bind (maybe-new-batch seconds-in-this-batch) :do (multiple-value-bind (maybe-new-batch seconds-in-this-batch)
(add-row-to-current-batch table columns copy nbcols (add-row-to-current-batch table columns copy nbcols
current-batch row current-batch row
(function send-batch)) :send-batch-fn (function send-batch)
:format-row-fn #'prepare-and-format-row)
(setf current-batch maybe-new-batch) (setf current-batch maybe-new-batch)
(incf seconds seconds-in-this-batch))) (incf seconds seconds-in-this-batch)))