diff --git a/src/pg-copy/copy-batch.lisp b/src/pg-copy/copy-batch.lisp index 4a599f6..7647ff0 100644 --- a/src/pg-copy/copy-batch.lisp +++ b/src/pg-copy/copy-batch.lisp @@ -61,7 +61,7 @@ (push-row current-batch pg-vector-row bytes)))) (defun add-row-to-current-batch (table columns copy nbcols batch row - send-batch-fn) + &key send-batch-fn format-row-fn) "Add another ROW we just received to CURRENT-BATCH, and prepare a new batch if needed. The current-batch (possibly a new one) is returned. When the batch is full, the function SEND-BATCH-FN is called with TABLE, @@ -83,7 +83,10 @@ ;; also add up the time it takes to format the rows (let ((start-time (get-internal-real-time))) - (format-row-in-batch copy nbcols row current-batch) + (multiple-value-bind (pg-vector-row bytes) + (funcall format-row-fn copy nbcols row) + (when pg-vector-row + (push-row current-batch pg-vector-row bytes))) (incf seconds (elapsed-time-since start-time))) (values current-batch seconds))) diff --git a/src/pg-copy/copy-rows-in-batch-through-s3.lisp b/src/pg-copy/copy-rows-in-batch-through-s3.lisp index a0c61e6..b6db4c0 100644 --- a/src/pg-copy/copy-rows-in-batch-through-s3.lisp +++ b/src/pg-copy/copy-rows-in-batch-through-s3.lisp @@ -19,7 +19,8 @@ :do (multiple-value-bind (maybe-new-batch seconds-in-this-batch) (add-row-to-current-batch table columns copy nbcols current-batch row - (function send-batch-through-s3)) + :send-batch-fn #'send-batch-through-s3 + :format-row-fn #'prepare-and-format-row-for-s3) (setf current-batch maybe-new-batch) (incf seconds seconds-in-this-batch))) @@ -30,6 +31,22 @@ seconds)) +(defun prepare-and-format-row-for-s3 (copy nbcols row) + "Redshift doesn't know how to parse COPY format, we need to upload CSV + instead. That said, we don't have to be as careful with the data layout + and unicode representation when COPYing from a CSV file as we do when + implementing the data streaming outselves." + (declare (ignore copy nbcols)) + (let ((pg-vector-row (cl-csv:write-csv-row (coerce row 'list) + :separator #\, + :quote #\" + :escape #(#\" #\") + :newline #(#\Newline) + :always-quote t))) + (log-message :data "> ~s" pg-vector-row) + (values pg-vector-row (length pg-vector-row)))) + + (defun send-batch-through-s3 (table columns batch &key (db pomo:*database*)) "Copy current *writer-batch* into TABLE-NAME." (let ((batch-start-time (get-internal-real-time)) @@ -71,13 +88,7 @@ ;; (handler-case (with-pgsql-transaction (:database db) - (let ((sql (format nil "~ - COPY ~a - FROM 's3://~a/~a' - DELIMITER '\\t' - TIMEFORMAT 'auto' - REGION '~a' - ACCESS_KEY_ID '~a'" + (let ((sql (format nil "COPY ~a FROM 's3://~a/~a' FORMAT CSV TIMEFORMAT 'auto' REGION '~a' ACCESS_KEY_ID '~a'" table-name aws-s3-bucket s3-filename @@ -153,7 +164,7 @@ SECRET_ACCESS_KEY '~a'" ;; So now we now how many bytes we need to finalize this batch ;; (let* ((bytes (batch-bytes batch)) - (vector (make-array bytes :element-type '(unsigned-byte 8)))) + (vector (make-array bytes :element-type 'character))) (loop :for count :below (batch-count batch) :for pos := 0 :then (+ pos (length row)) :for row :across (batch-data batch) diff --git a/src/pg-copy/copy-rows-in-batch.lisp b/src/pg-copy/copy-rows-in-batch.lisp index b34dd9f..deb27d2 100644 --- a/src/pg-copy/copy-rows-in-batch.lisp +++ b/src/pg-copy/copy-rows-in-batch.lisp @@ -16,7 +16,8 @@ :do (multiple-value-bind (maybe-new-batch seconds-in-this-batch) (add-row-to-current-batch table columns copy nbcols current-batch row - (function send-batch)) + :send-batch-fn (function send-batch) + :format-row-fn #'prepare-and-format-row) (setf current-batch maybe-new-batch) (incf seconds seconds-in-this-batch)))