From bcf9cf9bf4606b3374b83f94ebae5eedc16bdf98 Mon Sep 17 00:00:00 2001 From: Dimitri Fontaine Date: Wed, 23 May 2018 13:45:16 +0200 Subject: [PATCH] Redshift doesn't have support for the COPY format. Instead, it needs to parse CSV files. On the other hand, as we don't have to implement the COPY protocol from within pgloader for Redshift (because it's using S3 as the data source, and not STDIN), we don't need the level of control that we are using when talking to a normal PostgreSQL. --- src/pg-copy/copy-batch.lisp | 7 +++-- .../copy-rows-in-batch-through-s3.lisp | 29 +++++++++++++------ src/pg-copy/copy-rows-in-batch.lisp | 3 +- 3 files changed, 27 insertions(+), 12 deletions(-) diff --git a/src/pg-copy/copy-batch.lisp b/src/pg-copy/copy-batch.lisp index 4a599f6..7647ff0 100644 --- a/src/pg-copy/copy-batch.lisp +++ b/src/pg-copy/copy-batch.lisp @@ -61,7 +61,7 @@ (push-row current-batch pg-vector-row bytes)))) (defun add-row-to-current-batch (table columns copy nbcols batch row - send-batch-fn) + &key send-batch-fn format-row-fn) "Add another ROW we just received to CURRENT-BATCH, and prepare a new batch if needed. The current-batch (possibly a new one) is returned. When the batch is full, the function SEND-BATCH-FN is called with TABLE, @@ -83,7 +83,10 @@ ;; also add up the time it takes to format the rows (let ((start-time (get-internal-real-time))) - (format-row-in-batch copy nbcols row current-batch) + (multiple-value-bind (pg-vector-row bytes) + (funcall format-row-fn copy nbcols row) + (when pg-vector-row + (push-row current-batch pg-vector-row bytes))) (incf seconds (elapsed-time-since start-time))) (values current-batch seconds))) diff --git a/src/pg-copy/copy-rows-in-batch-through-s3.lisp b/src/pg-copy/copy-rows-in-batch-through-s3.lisp index a0c61e6..b6db4c0 100644 --- a/src/pg-copy/copy-rows-in-batch-through-s3.lisp +++ b/src/pg-copy/copy-rows-in-batch-through-s3.lisp @@ -19,7 +19,8 @@ :do (multiple-value-bind (maybe-new-batch seconds-in-this-batch) (add-row-to-current-batch table columns copy nbcols current-batch row - (function send-batch-through-s3)) + :send-batch-fn #'send-batch-through-s3 + :format-row-fn #'prepare-and-format-row-for-s3) (setf current-batch maybe-new-batch) (incf seconds seconds-in-this-batch))) @@ -30,6 +31,22 @@ seconds)) +(defun prepare-and-format-row-for-s3 (copy nbcols row) + "Redshift doesn't know how to parse COPY format, we need to upload CSV + instead. That said, we don't have to be as careful with the data layout + and unicode representation when COPYing from a CSV file as we do when + implementing the data streaming outselves." + (declare (ignore copy nbcols)) + (let ((pg-vector-row (cl-csv:write-csv-row (coerce row 'list) + :separator #\, + :quote #\" + :escape #(#\" #\") + :newline #(#\Newline) + :always-quote t))) + (log-message :data "> ~s" pg-vector-row) + (values pg-vector-row (length pg-vector-row)))) + + (defun send-batch-through-s3 (table columns batch &key (db pomo:*database*)) "Copy current *writer-batch* into TABLE-NAME." (let ((batch-start-time (get-internal-real-time)) @@ -71,13 +88,7 @@ ;; (handler-case (with-pgsql-transaction (:database db) - (let ((sql (format nil "~ - COPY ~a - FROM 's3://~a/~a' - DELIMITER '\\t' - TIMEFORMAT 'auto' - REGION '~a' - ACCESS_KEY_ID '~a'" + (let ((sql (format nil "COPY ~a FROM 's3://~a/~a' FORMAT CSV TIMEFORMAT 'auto' REGION '~a' ACCESS_KEY_ID '~a'" table-name aws-s3-bucket s3-filename @@ -153,7 +164,7 @@ SECRET_ACCESS_KEY '~a'" ;; So now we now how many bytes we need to finalize this batch ;; (let* ((bytes (batch-bytes batch)) - (vector (make-array bytes :element-type '(unsigned-byte 8)))) + (vector (make-array bytes :element-type 'character))) (loop :for count :below (batch-count batch) :for pos := 0 :then (+ pos (length row)) :for row :across (batch-data batch) diff --git a/src/pg-copy/copy-rows-in-batch.lisp b/src/pg-copy/copy-rows-in-batch.lisp index b34dd9f..deb27d2 100644 --- a/src/pg-copy/copy-rows-in-batch.lisp +++ b/src/pg-copy/copy-rows-in-batch.lisp @@ -16,7 +16,8 @@ :do (multiple-value-bind (maybe-new-batch seconds-in-this-batch) (add-row-to-current-batch table columns copy nbcols current-batch row - (function send-batch)) + :send-batch-fn (function send-batch) + :format-row-fn #'prepare-and-format-row) (setf current-batch maybe-new-batch) (incf seconds seconds-in-this-batch)))