From 9ac400b62355066cbff4b3e33b6ccd20b7cb71fc Mon Sep 17 00:00:00 2001 From: Dimitri Fontaine Date: Mon, 21 May 2018 21:22:15 +0200 Subject: [PATCH] Implement copying data through S3 for Redshift. Also add some schema-level support by disabling our usual index and constraint support when the target is Redshift, because it doesn't support those parts of SQL. The S3 parameters are read form either the process environment variables or from the AWS configuration files in ~/.aws. --- pgloader.asd | 2 + src/load/migrate-database.lisp | 30 ++- src/pg-copy/copy-batch.lisp | 39 ++++ src/pg-copy/copy-from-queue.lisp | 51 ++-- .../copy-rows-in-batch-through-s3.lisp | 219 ++++++++++++++++++ src/pg-copy/copy-rows-in-batch.lisp | 36 +-- 6 files changed, 318 insertions(+), 59 deletions(-) create mode 100644 src/pg-copy/copy-rows-in-batch-through-s3.lisp diff --git a/pgloader.asd b/pgloader.asd index 0be756b..55468e4 100644 --- a/pgloader.asd +++ b/pgloader.asd @@ -38,6 +38,7 @@ #:cl-mustache ; Logic-less templates #:yason ; JSON routines #:closer-mop ; introspection + #:zs3 ; integration with AWS S3 for Redshift ) :components ((:module "src" @@ -197,6 +198,7 @@ (:file "copy-db-write") (:file "copy-rows-in-stream") (:file "copy-rows-in-batch") + (:file "copy-rows-in-batch-through-s3") (:file "copy-retry-batch") (:file "copy-from-queue"))) diff --git a/src/load/migrate-database.lisp b/src/load/migrate-database.lisp index 79be231..e4c39bf 100644 --- a/src/load/migrate-database.lisp +++ b/src/load/migrate-database.lisp @@ -167,11 +167,13 @@ ;; ;; Add schemas that needs to be in the search_path to the database - ;; search_path + ;; search_path, when using PostgreSQL. Redshift doesn't know how to + ;; do that, unfortunately. ;; - (add-to-search-path catalog - :section :post - :label "Set Search Path") + (unless (eq :redshift (pgconn-variant (target-db copy))) + (add-to-search-path catalog + :section :post + :label "Set Search Path")) ;; ;; And now, comments on tables and columns. @@ -263,11 +265,21 @@ (create-tables (and create-tables create-ddl)) (create-schemas (and create-schemas create-ddl)) ;; foreign keys has a special meaning in data-only mode - (foreign-keys foreign-keys) - (drop-indexes (or reindex - (and include-drop create-ddl))) - (create-indexes (or reindex - (and create-indexes drop-indexes create-ddl))) + (foreign-keys (if (eq :redshift (pgconn-variant (target-db copy))) + nil + foreign-keys)) + (drop-indexes (if (eq :redshift (pgconn-variant (target-db copy))) + nil + (or reindex + (and include-drop create-ddl)))) + (create-indexes (if (eq :redshift (pgconn-variant (target-db copy))) + nil + (or reindex + (and create-indexes drop-indexes create-ddl)))) + + (reset-sequences (if (eq :redshift (pgconn-variant (target-db copy))) + nil + reset-sequences)) (*preserve-index-names* (or (eq :preserve index-names) diff --git a/src/pg-copy/copy-batch.lisp b/src/pg-copy/copy-batch.lisp index c20f8ad..4a599f6 100644 --- a/src/pg-copy/copy-batch.lisp +++ b/src/pg-copy/copy-batch.lisp @@ -48,3 +48,42 @@ (setf (aref data count) row) (incf count) (incf bytes row-bytes))) + + +;;; +;;; Integration of batch with COPY row format +;;; +(defun format-row-in-batch (copy nbcols row current-batch) + "Given a row from the queue, prepare it for the next batch." + (multiple-value-bind (pg-vector-row bytes) + (prepare-and-format-row copy nbcols row) + (when pg-vector-row + (push-row current-batch pg-vector-row bytes)))) + +(defun add-row-to-current-batch (table columns copy nbcols batch row + send-batch-fn) + "Add another ROW we just received to CURRENT-BATCH, and prepare a new + batch if needed. The current-batch (possibly a new one) is returned. When + the batch is full, the function SEND-BATCH-FN is called with TABLE, + COLUMNS and the full BATCH as parameters." + (let ((seconds 0) + (current-batch batch)) + ;; if current-batch is full, send data to PostgreSQL + ;; and prepare a new batch + (when (batch-full-p current-batch) + (incf seconds (funcall send-batch-fn table columns current-batch)) + (setf current-batch (make-batch)) + + ;; give a little help to our friend, now is a good time + ;; to garbage collect + #+sbcl + (let ((garbage-collect-start (get-internal-real-time))) + (sb-ext:gc :full t) + (incf seconds (elapsed-time-since garbage-collect-start)))) + + ;; also add up the time it takes to format the rows + (let ((start-time (get-internal-real-time))) + (format-row-in-batch copy nbcols row current-batch) + (incf seconds (elapsed-time-since start-time))) + + (values current-batch seconds))) diff --git a/src/pg-copy/copy-from-queue.lisp b/src/pg-copy/copy-from-queue.lisp index 4a537f8..56b6def 100644 --- a/src/pg-copy/copy-from-queue.lisp +++ b/src/pg-copy/copy-from-queue.lisp @@ -38,23 +38,42 @@ (format-table-name table) columns) - (if on-error-stop - ;; - ;; When on-error-stop is true, we don't need to handle batch - ;; processing, we can stop as soon as there's a failure. - ;; - (incf seconds - (stream-rows-to-copy table columns copy nbcols queue)) + (let ((copy-fun + (cond ((eq :redshift (pgconn-variant pgconn)) + ;; + ;; When using Redshift as the target, we lose the + ;; COPY FROM STDIN feature, and we have to use S3 as + ;; an intermediate step. We then upload content a + ;; batch at a time, and don't follow the + ;; on-error-stop setting. + ;; + (log-message :log "copy-rows-from-queue REDSHIFT") + (function batch-rows-to-s3-then-copy)) - ;; - ;; When on-error-stop is nil, we actually implement - ;; on-error-resume-next behavior, and for that we need to keep - ;; a batch of rows around in order to replay COPYing its - ;; content around, skipping rows that are rejected by - ;; PostgreSQL. - ;; - (incf seconds - (batch-rows-to-copy table columns copy nbcols queue)))))) + (on-error-stop + ;; + ;; When on-error-stop is true, we don't need to + ;; handle batch processing, we can stop as soon as + ;; there's a failure. + ;; + (function stream-rows-to-copy)) + + (t + ;; + ;; When on-error-stop is nil, we actually implement + ;; on-error-resume-next behavior, and for that we + ;; need to keep a batch of rows around in order to + ;; replay COPYing its content around, skipping rows + ;; that are rejected by PostgreSQL. + ;; + (function batch-rows-to-copy))))) + + ;; + ;; As all our function have the same API. we can just funcall + ;; the selected one here. + ;; + (incf seconds + (funcall copy-fun table columns copy nbcols queue)))))) ;; each writer thread sends its own stop timestamp and the monitor keeps ;; only the latest entry diff --git a/src/pg-copy/copy-rows-in-batch-through-s3.lisp b/src/pg-copy/copy-rows-in-batch-through-s3.lisp new file mode 100644 index 0000000..a0c61e6 --- /dev/null +++ b/src/pg-copy/copy-rows-in-batch-through-s3.lisp @@ -0,0 +1,219 @@ +;;; +;;; The PostgreSQL COPY TO implementation, using S3 as an intermediate +;;; location for the data. +;;; +;;; This file is only used for Redshift support at the moment. +;;; +(in-package :pgloader.pgcopy) + +(defun batch-rows-to-s3-then-copy (table columns copy nbcols queue) + "Add rows that we pop from QUEUE into a batch, that we then COPY over to + PostgreSQL as soon as the batch is full. This allows sophisticated error + handling and recovery, where we can retry rows that are not rejected by + PostgreSQL." + (let ((seconds 0) + (current-batch (make-batch))) + (loop + :for row := (lq:pop-queue queue) + :until (eq :end-of-data row) + :do (multiple-value-bind (maybe-new-batch seconds-in-this-batch) + (add-row-to-current-batch table columns copy nbcols + current-batch row + (function send-batch-through-s3)) + (setf current-batch maybe-new-batch) + (incf seconds seconds-in-this-batch))) + + ;; the last batch might not be empty + (unless (= 0 (batch-count current-batch)) + (incf seconds (send-batch-through-s3 table columns current-batch))) + + seconds)) + + +(defun send-batch-through-s3 (table columns batch &key (db pomo:*database*)) + "Copy current *writer-batch* into TABLE-NAME." + (let ((batch-start-time (get-internal-real-time)) + (table-name (format-table-name table)) + (pomo:*database* db)) + + ;; + ;; We first upload the batch of data we have to S3 + ;; + (multiple-value-bind (aws-access-key-id + aws-secret-access-key + aws-region + aws-s3-bucket) + ;; TODO: implement --aws--profile and use it here + (get-aws-credentials-and-setup) + + (let ((s3-filename (format nil "~a.~a.~a" + (format-table-name table) + (lp:kernel-worker-index) + (batch-start batch))) + (vector (batch-as-single-vector batch))) + + (log-message :info + "Uploading a batch of ~a rows [~a] to s3://~a/~a" + (batch-count batch) + (pretty-print-bytes (batch-bytes batch)) + aws-s3-bucket + s3-filename) + + (zs3:put-vector vector + aws-s3-bucket + s3-filename + :credentials (list aws-access-key-id + aws-secret-access-key)) + + ;; Now we COPY the data from S3 to Redshift: + ;; + ;; https://docs.aws.amazon.com/redshift/latest/dg/r_COPY.html + ;; + (handler-case + (with-pgsql-transaction (:database db) + (let ((sql (format nil "~ + COPY ~a + FROM 's3://~a/~a' + DELIMITER '\\t' + TIMEFORMAT 'auto' + REGION '~a' + ACCESS_KEY_ID '~a'" + table-name + aws-s3-bucket + s3-filename + aws-region + aws-access-key-id))) + (log-message :sql "~a" sql) + (let ((sql-with-access-key + (format nil "~a +SECRET_ACCESS_KEY '~a'" + sql + aws-secret-access-key))) + (pomo:execute sql-with-access-key)))) + + ;; If PostgreSQL signals a data error, process the batch by isolating + ;; erroneous data away and retrying the rest. + (postgresql-retryable (condition) + (pomo:execute "ROLLBACK") + (log-message :error "PostgreSQL [~s] ~a" table-name condition) + (update-stats :data table :errs (batch-count batch))) + + (postgresql-unavailable (condition) + (log-message :error "[PostgreSQL ~s] ~a" table-name condition) + (log-message :error "Copy Batch reconnecting to PostgreSQL") + + ;; in order to avoid Socket error in "connect": ECONNREFUSED if we + ;; try just too soon, wait a little + (sleep 2) + + (cl-postgres:reopen-database db) + (send-batch-through-s3 table columns batch :db db)) + + (copy-init-error (condition) + ;; Couldn't init the COPY protocol, process the condition up the + ;; stack + (update-stats :data table :errs 1) + (error condition)) + + (condition (c) + ;; non retryable failures + (log-message :error "Non-retryable error ~a" c) + (pomo:execute "ROLLBACK"))))) + + ;; now log about having send a batch, and update our stats with the + ;; time that took + (let ((seconds (elapsed-time-since batch-start-time))) + (log-message :debug + "send-batch[~a] ~a ~d row~:p [~a] in ~6$s~@[ [oversized]~]" + (lp:kernel-worker-index) + (format-table-name table) + (batch-count batch) + (pretty-print-bytes (batch-bytes batch)) + seconds + (batch-oversized-p batch)) + (update-stats :data table + :rows (batch-count batch) + :bytes (batch-bytes batch)) + + ;; and return batch-seconds + seconds))) + + +(defun batch-as-single-vector (batch) + "For communicating with AWS S3, we finalize our batch data into a single + vector." + (if (= 0 (batch-count batch)) + nil + + ;; non-empty batch + ;; + ;; first compute the total number of bytes we need to represent this + ;; batch, and then flatten it into a single vector of that size. + ;; + ;; So now we now how many bytes we need to finalize this batch + ;; + (let* ((bytes (batch-bytes batch)) + (vector (make-array bytes :element-type '(unsigned-byte 8)))) + (loop :for count :below (batch-count batch) + :for pos := 0 :then (+ pos (length row)) + :for row :across (batch-data batch) + :do (when row + (replace vector row :start1 pos))) + + vector))) + +;;; +;;; S3 support needs some AWS specific setup. We use the same configuration +;;; files as the main AWS command line interface, as documented at the +;;; following places: +;;; +;;; https://docs.aws.amazon.com/cli/latest/userguide/cli-config-files.html +;;; https://docs.aws.amazon.com/cli/latest/userguide/cli-multiple-profiles.html +;;; https://docs.aws.amazon.com/cli/latest/userguide/cli-environment.html +;;; +(defun get-aws-credentials-and-setup (&optional profile) + "Returns AWS access key id, secret access key, region and S3 bucket-name + from environment or ~/.aws/ configuration files, as multiple values." + (let* (aws-access-key-id + aws-secret-access-key + aws-region + aws-s3-bucket-name + (aws-directory (uiop:native-namestring + (uiop:merge-pathnames* ".aws/" + (user-homedir-pathname)))) + (aws-config-fn (make-pathname :name "config" + :directory aws-directory)) + (aws-creds-fn (make-pathname :name "credentials" + :directory aws-directory)) + (aws-config (ini:make-config)) + (credentials (ini:make-config)) + + (conf-profile (if profile (format nil "profile ~a" profile) + "default")) + (creds-profile (or profile "default"))) + + ;; read config files + (ini:read-files aws-config (list aws-config-fn)) + (ini:read-files credentials (list aws-creds-fn)) + + ;; get values from the environment, and if not in the env, from the + ;; configuration files. + (setf aws-access-key-id + (or (uiop:getenv "AWS_ACCESS_KEY_ID") + (ini:get-option credentials creds-profile "aws_access_key_id"))) + + (setf aws-secret-access-key + (or (uiop:getenv "AWS_SECRET_ACCESS_KEY") + (ini:get-option credentials creds-profile "aws_secret_access_key"))) + + (setf aws-region + (or (uiop:getenv "AWS_DEFAULT_REGION") + (ini:get-option aws-config conf-profile "region"))) + + (setf aws-s3-bucket-name (or (uiop:getenv "AWS_S3_BUCKET_NAME") + "pgloader")) + + (values aws-access-key-id + aws-secret-access-key + aws-region + aws-s3-bucket-name))) diff --git a/src/pg-copy/copy-rows-in-batch.lisp b/src/pg-copy/copy-rows-in-batch.lisp index 73a175e..b34dd9f 100644 --- a/src/pg-copy/copy-rows-in-batch.lisp +++ b/src/pg-copy/copy-rows-in-batch.lisp @@ -15,7 +15,8 @@ :until (eq :end-of-data row) :do (multiple-value-bind (maybe-new-batch seconds-in-this-batch) (add-row-to-current-batch table columns copy nbcols - current-batch row) + current-batch row + (function send-batch)) (setf current-batch maybe-new-batch) (incf seconds seconds-in-this-batch))) @@ -26,32 +27,6 @@ seconds)) -(defun add-row-to-current-batch (table columns copy nbcols batch row) - "Add another ROW we just received to CURRENT-BATCH, and prepare a new - batch if needed. The current-batch (possibly a new one) is returned." - (let ((seconds 0) - (current-batch batch)) - ;; if current-batch is full, send data to PostgreSQL - ;; and prepare a new batch - (when (batch-full-p current-batch) - (incf seconds (send-batch table columns current-batch)) - (setf current-batch (make-batch)) - - ;; give a little help to our friend, now is a good time - ;; to garbage collect - #+sbcl - (let ((garbage-collect-start (get-internal-real-time))) - (sb-ext:gc :full t) - (incf seconds (elapsed-time-since garbage-collect-start)))) - - ;; also add up the time it takes to format the rows - (let ((start-time (get-internal-real-time))) - (format-row-in-batch copy nbcols row current-batch) - (incf seconds (elapsed-time-since start-time))) - - (values current-batch seconds))) - - (defun send-batch (table columns batch &key (db pomo:*database*)) "Copy current *writer-batch* into TABLE-NAME." ;; We need to keep a copy of the rows we send through the COPY @@ -136,10 +111,3 @@ ;; and return batch-seconds seconds))) - -(defun format-row-in-batch (copy nbcols row current-batch) - "Given a row from the queue, prepare it for the next batch." - (multiple-value-bind (pg-vector-row bytes) - (prepare-and-format-row copy nbcols row) - (when pg-vector-row - (push-row current-batch pg-vector-row bytes))))