diff --git a/pgloader.asd b/pgloader.asd index 0be756b..55468e4 100644 --- a/pgloader.asd +++ b/pgloader.asd @@ -38,6 +38,7 @@ #:cl-mustache ; Logic-less templates #:yason ; JSON routines #:closer-mop ; introspection + #:zs3 ; integration with AWS S3 for Redshift ) :components ((:module "src" @@ -197,6 +198,7 @@ (:file "copy-db-write") (:file "copy-rows-in-stream") (:file "copy-rows-in-batch") + (:file "copy-rows-in-batch-through-s3") (:file "copy-retry-batch") (:file "copy-from-queue"))) diff --git a/src/load/migrate-database.lisp b/src/load/migrate-database.lisp index 79be231..e4c39bf 100644 --- a/src/load/migrate-database.lisp +++ b/src/load/migrate-database.lisp @@ -167,11 +167,13 @@ ;; ;; Add schemas that needs to be in the search_path to the database - ;; search_path + ;; search_path, when using PostgreSQL. Redshift doesn't know how to + ;; do that, unfortunately. ;; - (add-to-search-path catalog - :section :post - :label "Set Search Path") + (unless (eq :redshift (pgconn-variant (target-db copy))) + (add-to-search-path catalog + :section :post + :label "Set Search Path")) ;; ;; And now, comments on tables and columns. @@ -263,11 +265,21 @@ (create-tables (and create-tables create-ddl)) (create-schemas (and create-schemas create-ddl)) ;; foreign keys has a special meaning in data-only mode - (foreign-keys foreign-keys) - (drop-indexes (or reindex - (and include-drop create-ddl))) - (create-indexes (or reindex - (and create-indexes drop-indexes create-ddl))) + (foreign-keys (if (eq :redshift (pgconn-variant (target-db copy))) + nil + foreign-keys)) + (drop-indexes (if (eq :redshift (pgconn-variant (target-db copy))) + nil + (or reindex + (and include-drop create-ddl)))) + (create-indexes (if (eq :redshift (pgconn-variant (target-db copy))) + nil + (or reindex + (and create-indexes drop-indexes create-ddl)))) + + (reset-sequences (if (eq :redshift (pgconn-variant (target-db copy))) + nil + reset-sequences)) (*preserve-index-names* (or (eq :preserve index-names) diff --git a/src/pg-copy/copy-batch.lisp b/src/pg-copy/copy-batch.lisp index c20f8ad..4a599f6 100644 --- a/src/pg-copy/copy-batch.lisp +++ b/src/pg-copy/copy-batch.lisp @@ -48,3 +48,42 @@ (setf (aref data count) row) (incf count) (incf bytes row-bytes))) + + +;;; +;;; Integration of batch with COPY row format +;;; +(defun format-row-in-batch (copy nbcols row current-batch) + "Given a row from the queue, prepare it for the next batch." + (multiple-value-bind (pg-vector-row bytes) + (prepare-and-format-row copy nbcols row) + (when pg-vector-row + (push-row current-batch pg-vector-row bytes)))) + +(defun add-row-to-current-batch (table columns copy nbcols batch row + send-batch-fn) + "Add another ROW we just received to CURRENT-BATCH, and prepare a new + batch if needed. The current-batch (possibly a new one) is returned. When + the batch is full, the function SEND-BATCH-FN is called with TABLE, + COLUMNS and the full BATCH as parameters." + (let ((seconds 0) + (current-batch batch)) + ;; if current-batch is full, send data to PostgreSQL + ;; and prepare a new batch + (when (batch-full-p current-batch) + (incf seconds (funcall send-batch-fn table columns current-batch)) + (setf current-batch (make-batch)) + + ;; give a little help to our friend, now is a good time + ;; to garbage collect + #+sbcl + (let ((garbage-collect-start (get-internal-real-time))) + (sb-ext:gc :full t) + (incf seconds (elapsed-time-since garbage-collect-start)))) + + ;; also add up the time it takes to format the rows + (let ((start-time (get-internal-real-time))) + (format-row-in-batch copy nbcols row current-batch) + (incf seconds (elapsed-time-since start-time))) + + (values current-batch seconds))) diff --git a/src/pg-copy/copy-from-queue.lisp b/src/pg-copy/copy-from-queue.lisp index 4a537f8..56b6def 100644 --- a/src/pg-copy/copy-from-queue.lisp +++ b/src/pg-copy/copy-from-queue.lisp @@ -38,23 +38,42 @@ (format-table-name table) columns) - (if on-error-stop - ;; - ;; When on-error-stop is true, we don't need to handle batch - ;; processing, we can stop as soon as there's a failure. - ;; - (incf seconds - (stream-rows-to-copy table columns copy nbcols queue)) + (let ((copy-fun + (cond ((eq :redshift (pgconn-variant pgconn)) + ;; + ;; When using Redshift as the target, we lose the + ;; COPY FROM STDIN feature, and we have to use S3 as + ;; an intermediate step. We then upload content a + ;; batch at a time, and don't follow the + ;; on-error-stop setting. + ;; + (log-message :log "copy-rows-from-queue REDSHIFT") + (function batch-rows-to-s3-then-copy)) - ;; - ;; When on-error-stop is nil, we actually implement - ;; on-error-resume-next behavior, and for that we need to keep - ;; a batch of rows around in order to replay COPYing its - ;; content around, skipping rows that are rejected by - ;; PostgreSQL. - ;; - (incf seconds - (batch-rows-to-copy table columns copy nbcols queue)))))) + (on-error-stop + ;; + ;; When on-error-stop is true, we don't need to + ;; handle batch processing, we can stop as soon as + ;; there's a failure. + ;; + (function stream-rows-to-copy)) + + (t + ;; + ;; When on-error-stop is nil, we actually implement + ;; on-error-resume-next behavior, and for that we + ;; need to keep a batch of rows around in order to + ;; replay COPYing its content around, skipping rows + ;; that are rejected by PostgreSQL. + ;; + (function batch-rows-to-copy))))) + + ;; + ;; As all our function have the same API. we can just funcall + ;; the selected one here. + ;; + (incf seconds + (funcall copy-fun table columns copy nbcols queue)))))) ;; each writer thread sends its own stop timestamp and the monitor keeps ;; only the latest entry diff --git a/src/pg-copy/copy-rows-in-batch-through-s3.lisp b/src/pg-copy/copy-rows-in-batch-through-s3.lisp new file mode 100644 index 0000000..a0c61e6 --- /dev/null +++ b/src/pg-copy/copy-rows-in-batch-through-s3.lisp @@ -0,0 +1,219 @@ +;;; +;;; The PostgreSQL COPY TO implementation, using S3 as an intermediate +;;; location for the data. +;;; +;;; This file is only used for Redshift support at the moment. +;;; +(in-package :pgloader.pgcopy) + +(defun batch-rows-to-s3-then-copy (table columns copy nbcols queue) + "Add rows that we pop from QUEUE into a batch, that we then COPY over to + PostgreSQL as soon as the batch is full. This allows sophisticated error + handling and recovery, where we can retry rows that are not rejected by + PostgreSQL." + (let ((seconds 0) + (current-batch (make-batch))) + (loop + :for row := (lq:pop-queue queue) + :until (eq :end-of-data row) + :do (multiple-value-bind (maybe-new-batch seconds-in-this-batch) + (add-row-to-current-batch table columns copy nbcols + current-batch row + (function send-batch-through-s3)) + (setf current-batch maybe-new-batch) + (incf seconds seconds-in-this-batch))) + + ;; the last batch might not be empty + (unless (= 0 (batch-count current-batch)) + (incf seconds (send-batch-through-s3 table columns current-batch))) + + seconds)) + + +(defun send-batch-through-s3 (table columns batch &key (db pomo:*database*)) + "Copy current *writer-batch* into TABLE-NAME." + (let ((batch-start-time (get-internal-real-time)) + (table-name (format-table-name table)) + (pomo:*database* db)) + + ;; + ;; We first upload the batch of data we have to S3 + ;; + (multiple-value-bind (aws-access-key-id + aws-secret-access-key + aws-region + aws-s3-bucket) + ;; TODO: implement --aws--profile and use it here + (get-aws-credentials-and-setup) + + (let ((s3-filename (format nil "~a.~a.~a" + (format-table-name table) + (lp:kernel-worker-index) + (batch-start batch))) + (vector (batch-as-single-vector batch))) + + (log-message :info + "Uploading a batch of ~a rows [~a] to s3://~a/~a" + (batch-count batch) + (pretty-print-bytes (batch-bytes batch)) + aws-s3-bucket + s3-filename) + + (zs3:put-vector vector + aws-s3-bucket + s3-filename + :credentials (list aws-access-key-id + aws-secret-access-key)) + + ;; Now we COPY the data from S3 to Redshift: + ;; + ;; https://docs.aws.amazon.com/redshift/latest/dg/r_COPY.html + ;; + (handler-case + (with-pgsql-transaction (:database db) + (let ((sql (format nil "~ + COPY ~a + FROM 's3://~a/~a' + DELIMITER '\\t' + TIMEFORMAT 'auto' + REGION '~a' + ACCESS_KEY_ID '~a'" + table-name + aws-s3-bucket + s3-filename + aws-region + aws-access-key-id))) + (log-message :sql "~a" sql) + (let ((sql-with-access-key + (format nil "~a +SECRET_ACCESS_KEY '~a'" + sql + aws-secret-access-key))) + (pomo:execute sql-with-access-key)))) + + ;; If PostgreSQL signals a data error, process the batch by isolating + ;; erroneous data away and retrying the rest. + (postgresql-retryable (condition) + (pomo:execute "ROLLBACK") + (log-message :error "PostgreSQL [~s] ~a" table-name condition) + (update-stats :data table :errs (batch-count batch))) + + (postgresql-unavailable (condition) + (log-message :error "[PostgreSQL ~s] ~a" table-name condition) + (log-message :error "Copy Batch reconnecting to PostgreSQL") + + ;; in order to avoid Socket error in "connect": ECONNREFUSED if we + ;; try just too soon, wait a little + (sleep 2) + + (cl-postgres:reopen-database db) + (send-batch-through-s3 table columns batch :db db)) + + (copy-init-error (condition) + ;; Couldn't init the COPY protocol, process the condition up the + ;; stack + (update-stats :data table :errs 1) + (error condition)) + + (condition (c) + ;; non retryable failures + (log-message :error "Non-retryable error ~a" c) + (pomo:execute "ROLLBACK"))))) + + ;; now log about having send a batch, and update our stats with the + ;; time that took + (let ((seconds (elapsed-time-since batch-start-time))) + (log-message :debug + "send-batch[~a] ~a ~d row~:p [~a] in ~6$s~@[ [oversized]~]" + (lp:kernel-worker-index) + (format-table-name table) + (batch-count batch) + (pretty-print-bytes (batch-bytes batch)) + seconds + (batch-oversized-p batch)) + (update-stats :data table + :rows (batch-count batch) + :bytes (batch-bytes batch)) + + ;; and return batch-seconds + seconds))) + + +(defun batch-as-single-vector (batch) + "For communicating with AWS S3, we finalize our batch data into a single + vector." + (if (= 0 (batch-count batch)) + nil + + ;; non-empty batch + ;; + ;; first compute the total number of bytes we need to represent this + ;; batch, and then flatten it into a single vector of that size. + ;; + ;; So now we now how many bytes we need to finalize this batch + ;; + (let* ((bytes (batch-bytes batch)) + (vector (make-array bytes :element-type '(unsigned-byte 8)))) + (loop :for count :below (batch-count batch) + :for pos := 0 :then (+ pos (length row)) + :for row :across (batch-data batch) + :do (when row + (replace vector row :start1 pos))) + + vector))) + +;;; +;;; S3 support needs some AWS specific setup. We use the same configuration +;;; files as the main AWS command line interface, as documented at the +;;; following places: +;;; +;;; https://docs.aws.amazon.com/cli/latest/userguide/cli-config-files.html +;;; https://docs.aws.amazon.com/cli/latest/userguide/cli-multiple-profiles.html +;;; https://docs.aws.amazon.com/cli/latest/userguide/cli-environment.html +;;; +(defun get-aws-credentials-and-setup (&optional profile) + "Returns AWS access key id, secret access key, region and S3 bucket-name + from environment or ~/.aws/ configuration files, as multiple values." + (let* (aws-access-key-id + aws-secret-access-key + aws-region + aws-s3-bucket-name + (aws-directory (uiop:native-namestring + (uiop:merge-pathnames* ".aws/" + (user-homedir-pathname)))) + (aws-config-fn (make-pathname :name "config" + :directory aws-directory)) + (aws-creds-fn (make-pathname :name "credentials" + :directory aws-directory)) + (aws-config (ini:make-config)) + (credentials (ini:make-config)) + + (conf-profile (if profile (format nil "profile ~a" profile) + "default")) + (creds-profile (or profile "default"))) + + ;; read config files + (ini:read-files aws-config (list aws-config-fn)) + (ini:read-files credentials (list aws-creds-fn)) + + ;; get values from the environment, and if not in the env, from the + ;; configuration files. + (setf aws-access-key-id + (or (uiop:getenv "AWS_ACCESS_KEY_ID") + (ini:get-option credentials creds-profile "aws_access_key_id"))) + + (setf aws-secret-access-key + (or (uiop:getenv "AWS_SECRET_ACCESS_KEY") + (ini:get-option credentials creds-profile "aws_secret_access_key"))) + + (setf aws-region + (or (uiop:getenv "AWS_DEFAULT_REGION") + (ini:get-option aws-config conf-profile "region"))) + + (setf aws-s3-bucket-name (or (uiop:getenv "AWS_S3_BUCKET_NAME") + "pgloader")) + + (values aws-access-key-id + aws-secret-access-key + aws-region + aws-s3-bucket-name))) diff --git a/src/pg-copy/copy-rows-in-batch.lisp b/src/pg-copy/copy-rows-in-batch.lisp index 73a175e..b34dd9f 100644 --- a/src/pg-copy/copy-rows-in-batch.lisp +++ b/src/pg-copy/copy-rows-in-batch.lisp @@ -15,7 +15,8 @@ :until (eq :end-of-data row) :do (multiple-value-bind (maybe-new-batch seconds-in-this-batch) (add-row-to-current-batch table columns copy nbcols - current-batch row) + current-batch row + (function send-batch)) (setf current-batch maybe-new-batch) (incf seconds seconds-in-this-batch))) @@ -26,32 +27,6 @@ seconds)) -(defun add-row-to-current-batch (table columns copy nbcols batch row) - "Add another ROW we just received to CURRENT-BATCH, and prepare a new - batch if needed. The current-batch (possibly a new one) is returned." - (let ((seconds 0) - (current-batch batch)) - ;; if current-batch is full, send data to PostgreSQL - ;; and prepare a new batch - (when (batch-full-p current-batch) - (incf seconds (send-batch table columns current-batch)) - (setf current-batch (make-batch)) - - ;; give a little help to our friend, now is a good time - ;; to garbage collect - #+sbcl - (let ((garbage-collect-start (get-internal-real-time))) - (sb-ext:gc :full t) - (incf seconds (elapsed-time-since garbage-collect-start)))) - - ;; also add up the time it takes to format the rows - (let ((start-time (get-internal-real-time))) - (format-row-in-batch copy nbcols row current-batch) - (incf seconds (elapsed-time-since start-time))) - - (values current-batch seconds))) - - (defun send-batch (table columns batch &key (db pomo:*database*)) "Copy current *writer-batch* into TABLE-NAME." ;; We need to keep a copy of the rows we send through the COPY @@ -136,10 +111,3 @@ ;; and return batch-seconds seconds))) - -(defun format-row-in-batch (copy nbcols row current-batch) - "Given a row from the queue, prepare it for the next batch." - (multiple-value-bind (pg-vector-row bytes) - (prepare-and-format-row copy nbcols row) - (when pg-vector-row - (push-row current-batch pg-vector-row bytes))))