From 8ee799070a8c8a4d7d7d1e957ebfbd21f9a1d975 Mon Sep 17 00:00:00 2001 From: Dimitri Fontaine Date: Wed, 24 Jan 2018 00:10:40 +0100 Subject: [PATCH] Simplify format-vector-row a lot. Copy some code over from cl-postgres-trivial-utf-8 and add the support for PostgreSQL COPY escaping right at the same place, allowing to allocate our formatted utf-8 buffer only once, with the escaping already installed. This patch was expected to be more about perfs, but it's actually only about code cleaning it seems, as it doesn't make a big difference in the testing I could do here. That said, getting rid of one intermediate buffer should be nice in terms of memory management. --- pgloader.asd | 2 +- src/package.lisp | 3 + src/pg-copy/copy-batch.lisp | 5 +- src/pg-copy/copy-format.lisp | 335 +++++++++--------- src/pg-copy/copy-from-queue.lisp | 119 +++---- src/pg-copy/copy-retry-batch.lisp | 22 +- ...y-write-row.lisp => copy-write-batch.lisp} | 8 +- src/sources/common/api.lisp | 7 +- 8 files changed, 241 insertions(+), 260 deletions(-) rename src/pg-copy/{copy-write-row.lisp => copy-write-batch.lisp} (84%) diff --git a/pgloader.asd b/pgloader.asd index 36d9cf5..3aec192 100644 --- a/pgloader.asd +++ b/pgloader.asd @@ -193,7 +193,7 @@ :components ((:file "copy-batch") (:file "copy-format") - (:file "copy-write-row") + (:file "copy-write-batch") (:file "copy-from-queue") (:file "copy-retry-batch"))) diff --git a/src/package.lisp b/src/package.lisp index 496d3a8..25efc22 100644 --- a/src/package.lisp +++ b/src/package.lisp @@ -516,6 +516,9 @@ #:postgresql-retryable) (:import-from #:cl-postgres #:database-error-context) + (:import-from #:cl-postgres-trivial-utf-8 + #:utf-8-byte-length + #:string-to-utf-8-bytes) (:export #:copy-rows-from-queue #:format-vector-row)) diff --git a/src/pg-copy/copy-batch.lisp b/src/pg-copy/copy-batch.lisp index 07fb093..68a30d7 100644 --- a/src/pg-copy/copy-batch.lisp +++ b/src/pg-copy/copy-batch.lisp @@ -43,9 +43,8 @@ (or (= (batch-count batch) (batch-max-count batch)) (batch-oversized-p batch))) -(defun push-row (batch row &optional row-bytes) +(defun push-row (batch row row-bytes) (with-slots (data count bytes) batch (setf (aref data count) row) (incf count) - (when row-bytes - (incf bytes row-bytes)))) + (incf bytes row-bytes))) diff --git a/src/pg-copy/copy-format.lisp b/src/pg-copy/copy-format.lisp index 260a665..5f8ca6a 100644 --- a/src/pg-copy/copy-format.lisp +++ b/src/pg-copy/copy-format.lisp @@ -13,189 +13,174 @@ ;;; call here. ;;; -(defun format-vector-row (nbcols row pre-formatted pg-escape-safe-array) - "Add a ROW in the STREAM, formating ROW in PostgreSQL COPY TEXT format. +(defun format-vector-row (nb-cols row) + (declare (optimize + (speed 3) + #-ecl(safety 0) #+ecl(safety 1) + (space 0) + (debug 1) + (compilation-speed 0))) + (let* ((lens (map 'vector + (lambda (col) + (if (col-null-p col) 2 (copy-utf-8-byte-length col))) + row)) + (len (+ nb-cols (reduce #'+ lens))) + (buf (make-array (the fixnum len) :element-type '(unsigned-byte 8)))) + (loop :for col :across row + :for i :from 1 + :for position := 0 :then (+ position col-len 1) + :for col-len :across lens + :do (if (col-null-p col) + (insert-copy-null buf position) + (string-to-copy-utf-8-bytes col buf position)) + :do (insert-copy-separator buf (+ position col-len) i nb-cols)) + ;; return our pg vector of escaped utf8 bytes + (values buf len))) -See http://www.postgresql.org/docs/9.2/static/sql-copy.html#AEN66609 for -details about the format, and format specs." - (declare (type simple-array row)) +(defun format-escaped-vector-row (nb-cols row) + "We've read data in the COPY format, so already escaped." + (declare (optimize + (speed 3) + #-ecl(safety 0) #+ecl(safety 1) + (space 0) + (debug 1) + (compilation-speed 0))) + (let* ((lens (map 'vector + (lambda (col) + (if (col-null-p col) 2 (utf-8-byte-length col))) + row)) + (len (+ nb-cols (reduce #'+ lens))) + (buf (make-array (the fixnum len) :element-type '(unsigned-byte 8)))) + (loop :for col :across row + :for i :from 1 + :for position := 0 :then (+ position col-len 1) + :for col-len :across lens + :do (if (col-null-p col) + (insert-copy-null buf position) - ;; first prepare an array of transformed and properly encoded columns - (row-to-utf8-bytes row) + (let ((utf-8-bytes (string-to-utf-8-bytes col))) + (replace buf utf-8-bytes :start1 position))) - ;; now escape bytes for the COPY text format - (if pre-formatted - ;; pre-formatted means no escaping needs to be done - (format-pre-formatted-vector-row nbcols row) + :do (insert-copy-separator buf (+ position col-len) i nb-cols)) + ;; return our pg vector of escaped utf8 bytes + (values buf len))) - ;; in the general case we need to take into account PostgreSQL - ;; escaping rules for the COPY format - (format-and-escape-vector-row nbcols row pg-escape-safe-array))) +(declaim (inline insert-copy-separator insert-copy-null col-null-p)) +(defun col-null-p (col) + (or (null col) (eq :NULL col))) -(defun row-to-utf8-bytes (row) - "Apply transformation functions to ROW fields, and convert text to UTF-8 - bytes arrays. Return an array of array-of-bytes ready to be sent to - PostgreSQL." - (loop :for col :across row - :for i fixnum :from 0 - :do (setf (aref row i) - (if (or (null col) (eq :NULL col)) - nil - (cl-postgres-trivial-utf-8:string-to-utf-8-bytes col))))) +(defun insert-copy-null (buffer position) + "NULL is \\N in COPY format (that's 2 bytes)" + (setf (aref buffer position) #. (char-code #\\)) + (setf (aref buffer (+ 1 position)) #. (char-code #\N))) -(defun apply-transforms (nbcols row transform-fns) - (loop :for i fixnum :below nbcols - :for col :across row - :for fun :in transform-fns - :do (setf (aref row i) - (if fun (funcall fun col) col)))) +(defun insert-copy-separator (buffer position col nb-cols) + (if (< col nb-cols) + (setf (aref buffer position) #. (char-code #\Tab)) + (setf (aref buffer position) #. (char-code #\Newline)))) -(defvar *pg-escape-safe-types* - (list "integer" - "serial" - "bigserial" - "smallint" - "bigint" - "numeric" - "decimal" - "real" - "float" - "timestamp" - "timestamptz" - "bit" - "boolean" - "cidr" - "inet" - "date" - "interval" - "uuid") - "List of PostgreSQL data types with non-backslash, ascii-only input - values: values that we don't need to escape.") +(defun apply-transforms (copy nbcols row transform-fns) + (handler-case + (loop :for i fixnum :below nbcols + :for col :across row + :for fun :in transform-fns + :do (setf (aref row i) + (if fun (funcall fun col) col)) + :finally (return row)) + (condition (e) + (log-message :error "Error while formating a row from ~s:" + (format-table-name (target copy))) + (log-message :error "~a" e) + (update-stats :data (target copy) :errs 1) + nil))) -(defun pg-escape-safe-array (copy) - "Given a PostgreSQL type name, return non-nil when this type is known to - represent a plain ASCII value: a value that we don't need to convert into - UTF-8, such as numbers and dates." - (let* ((nbcols (length (pgloader.sources::columns copy))) - (result (make-array nbcols :element-type '(integer 0 2))) - (pgtypes (loop :for pgcol :in (table-column-list - (pgloader.sources::target copy)) - ;; column might miss in pg table definition, but we - ;; want to let PostgreSQL handle this kind of error. - :collect (when pgcol (column-type-name pgcol))))) - (loop :for pgtype :in pgtypes - :for i fixnum :from 0 - :do (setf (aref result i) - (cond - ;; bytea representation begins with a backslash, which - ;; needs special processing. - ((string= "bytea" pgtype) 2) - ((member pgtype *pg-escape-safe-types* :test #'string=) 1) - (t 0)))) - (log-message :debug "pg escape safe types for ~a: ~a ~a" - (format-table-name (pgloader.sources::target copy)) - pgtypes - result) - result)) +;;; +;;; Low Level UTF-8 handling + PostgreSQL COPY format escaping +;;; +;;; Main bits stolen from Postmodern:cl-postgres/trivial-utf-8.lisp +;;; +;;; We add PostgreSQL COPY escaping right at the same time as we do the +;;; UTF-8 preparation dance. +;;; +(defun copy-utf-8-byte-length (string) + "Calculate the amount of bytes needed to encode a string." + (declare (type string string) + (optimize + (speed 3) + #-ecl(safety 0) #+ecl(safety 1) + (space 0) + (debug 1) + (compilation-speed 0))) + (let ((length (length string)) + (string (coerce string 'simple-string))) + (loop :for char :across string + :do (let ((code (char-code char))) + (case char + ((#\\ #\Newline #\Return #\Tab #\Backspace #\Page) + (incf length 1)) + (otherwise + (when (> code 127) + (incf length + (cond ((< code 2048) 1) + ((< code 65536) 2) + (t 3)))))))) + length)) -(defun format-and-escape-vector-row (nbcols pgrow pg-escape-safe-array) - "Prepare the PGROW to be send as a PostgreSQL COPY buffer." - (flet ((escaped-length (escape-safe string) - (if (null string) - ;; NULL is \N (2 bytes) in COPY format - 2 - (case escape-safe - (2 (+ 1 (length string))) ; bytea begins with a \ - (1 (length string)) - (0 (loop :for byte :across string - :sum (case byte - (#. (char-code #\\) 2) - (#. (char-code #\Newline) 2) - (#. (char-code #\Return) 2) - (#. (char-code #\Tab) 2) - (#. (char-code #\Backspace) 2) - (#. (char-code #\Page) 2) - (t 1)))))))) - (let* ((bytes (+ nbcols - (loop :for col :across pgrow - :for escape-safe :across pg-escape-safe-array - :sum (escaped-length escape-safe col)))) - (result (make-array bytes :element-type '(unsigned-byte 8))) - (pos 0)) - (macrolet ((byte-out (byte) - `(progn (setf (aref result pos) ,byte) - (incf pos))) - (esc-byte-out (byte) - `(progn (setf (aref result pos) #. (char-code #\\)) - (setf (aref result (1+ pos)) ,byte) - (incf pos 2)))) - (loop :for col :across pgrow - :for escape-safe :across pg-escape-safe-array - :do (if (null col) - (esc-byte-out #. (char-code #\N)) - (case escape-safe - (0 (loop :for byte :across col :do - (case byte - (#. (char-code #\\) (esc-byte-out byte)) - (#. (char-code #\Newline) (esc-byte-out byte)) - (#. (char-code #\Return) (esc-byte-out byte)) - (#. (char-code #\Tab) (esc-byte-out byte)) - (#. (char-code #\Backspace) (esc-byte-out byte)) - (#. (char-code #\Page) (esc-byte-out byte)) - (t (byte-out byte))))) +(defmacro as-copy-utf-8-bytes (char writer) + "Given a character, calls the writer function for every byte in the +encoded form of that character." + (let ((char-code (gensym))) + `(let ((,char-code (char-code ,char))) + (declare (type fixnum ,char-code)) + (cond ((= ,char-code #. (char-code #\\)) + (progn (,writer #. (char-code #\\)) + (,writer ,char-code))) + ((= ,char-code #. (char-code #\Newline)) + (progn (,writer #. (char-code #\\)) + (,writer ,char-code))) + ((= ,char-code #. (char-code #\Return)) + (progn (,writer #. (char-code #\\)) + (,writer ,char-code))) + ((= ,char-code #. (char-code #\Tab)) + (progn (,writer #. (char-code #\\)) + (,writer ,char-code))) + ((= ,char-code #. (char-code #\Backspace)) + (progn (,writer #. (char-code #\\)) + (,writer ,char-code))) + ((= ,char-code #. (char-code #\Page)) + (progn (,writer #. (char-code #\\)) + (,writer ,char-code))) + ((< ,char-code 128) + (,writer ,char-code)) + ((< ,char-code 2048) + (,writer (logior #b11000000 (ldb (byte 5 6) ,char-code))) + (,writer (logior #b10000000 (ldb (byte 6 0) ,char-code)))) + ((< ,char-code 65536) + (,writer (logior #b11100000 (ldb (byte 4 12) ,char-code))) + (,writer (logior #b10000000 (ldb (byte 6 6) ,char-code))) + (,writer (logior #b10000000 (ldb (byte 6 0) ,char-code)))) + (t + (,writer (logior #b11110000 (ldb (byte 3 18) ,char-code))) + (,writer (logior #b10000000 (ldb (byte 6 12) ,char-code))) + (,writer (logior #b10000000 (ldb (byte 6 6) ,char-code))) + (,writer (logior #b10000000 (ldb (byte 6 0) ,char-code)))))))) - (1 (replace result - (the (simple-array (unsigned-byte 8) (*)) col) - :start1 pos) - (incf pos (length col))) - - (2 ;; Here we know we deal with bytea, and that starts - ;; with a backslach char that we need to escape... - ;; it's the almost-escape-safe case, because every - ;; other character in the stream is going to be an - ;; hexadecimal char. - (esc-byte-out (aref col 0)) - (replace result - (the (simple-array (unsigned-byte 8) (*)) col) - :start1 pos - :start2 1) - (incf pos (+ -1 (length col)))))) - - ;; either column separator, Tab, or end-of-record with Newline - (if (= bytes (+ 1 pos)) - (byte-out #. (char-code #\Newline)) - (byte-out #. (char-code #\Tab))))) - - ;; return the result and how many bytes it represents - (values result bytes)))) - -(defun format-pre-formatted-vector-row (nbcols pgrow) - "Prepare the PGROW for being sent as a PostgreSQL COPY buffer, knowing - that the data in the PGROW is pre-formatted -- no extra escaping is - needed." - (flet ((col-length (col) - ;; NULL is \N (2 bytes) in COPY format - (if col (length col) 2))) - (let* ((bytes (+ nbcols (reduce #'+ pgrow :key #'col-length))) - (result (make-array bytes :element-type '(unsigned-byte 8)))) - (loop :for start := 0 :then (+ start len 1) - :for col :across pgrow - :for len := (if col (length col) 2) - :do (progn - (if col - (replace result - (the (simple-array (unsigned-byte 8) (*)) col) - :start1 start) - ;; insert \N for a null value - (setf (aref result start) #. (char-code #\\) - (aref result (+ 1 start)) #. (char-code #\N))) - - ;; either column separator, Tab, or Newline (end of record) - (setf (aref result (+ start len)) - (if (= bytes (+ start len 1)) - #. (char-code #\Newline) - #. (char-code #\Tab))))) - - ;; return the result and how many bytes it represents - (values result bytes)))) +(defun string-to-copy-utf-8-bytes (string buffer &optional (position 0)) + "Convert a string into an array of unsigned bytes containing its +utf-8 representation." + (declare (type string string) + (optimize + (speed 3) + #-ecl(safety 0) #+ecl(safety 1) + (space 0) + (debug 1) + (compilation-speed 0))) + (let ((string (coerce string 'simple-string))) + (declare (type (array (unsigned-byte 8)) buffer) + (type fixnum position)) + (macrolet ((add-byte (byte) + `(progn (setf (aref buffer position) ,byte) + (incf position)))) + (loop :for char :across string + :do (as-copy-utf-8-bytes char add-byte))))) diff --git a/src/pg-copy/copy-from-queue.lisp b/src/pg-copy/copy-from-queue.lisp index ed880d8..499fc31 100644 --- a/src/pg-copy/copy-from-queue.lisp +++ b/src/pg-copy/copy-from-queue.lisp @@ -3,7 +3,7 @@ ;;; (in-package :pgloader.copy) -(defun copy-batch (table columns batch batch-rows +(defun send-batch (table columns batch &key (db pomo:*database*) on-error-stop) @@ -33,13 +33,9 @@ columns c) (update-stats :data table :errs 1) - (return-from copy-batch 0))))) + (return-from send-batch 0))))) (unwind-protect - (loop :for i :below batch-rows - :for data := (aref batch i) - :do (when data - (db-write-row copier data)) - :finally (return batch-rows)) + (db-write-batch copier batch) (cl-postgres:close-db-writer copier) (pomo:execute "COMMIT")))) @@ -56,7 +52,7 @@ ;; normal behavior, on-error-stop being nil ;; clean the current transaction before retrying new ones (let ((errors - (retry-batch table columns batch batch-rows condition))) + (retry-batch table columns batch condition))) (log-message :debug "retry-batch found ~d errors" errors) (update-stats :data table :rows (- errors))))) @@ -65,14 +61,12 @@ (log-message :error "[PostgreSQL ~s] ~a" table-name condition) (log-message :error "Copy Batch reconnecting to PostgreSQL") - ;; in order to avoid Socket error in "connect": ECONNREFUSED if we - ;; try just too soon, wait a little - (sleep 2) + ;; in order to avoid Socket error in "connect": ECONNREFUSED if we + ;; try just too soon, wait a little + (sleep 2) (cl-postgres:reopen-database db) - (copy-batch table columns batch batch-rows - :db db - :on-error-stop on-error-stop)) + (send-batch table columns batch :db db :on-error-stop on-error-stop)) (condition (c) ;; non retryable failures @@ -95,29 +89,38 @@ (table (pgloader.sources:target copy))) "Fetch rows from the QUEUE, prepare them in batches and send them down to PostgreSQL, and when that's done update stats." - (let ((preprocessor (pgloader.sources::preprocess-row copy)) - (pre-formatted (pgloader.sources:data-is-preformatted-p copy)) - (esc-safe-arr (pg-escape-safe-array copy)) - (nbcols (length - (table-column-list (pgloader.sources::target copy)))) - (transform-fns (let ((funs (pgloader.sources::transforms copy))) - (unless (every #'null funs) - funs))) - (current-batch (make-batch)) - (seconds 0)) + (let* ((nbcols (length + (table-column-list (pgloader.sources::target copy)))) + (current-batch (make-batch)) + (seconds 0)) + + ;; add some COPY activity related bits to our COPY object. + (setf (transforms copy) + (let ((funs (transforms copy))) + (unless (every #'null funs) + funs)) + + ;; FIXME: we should change the API around preprocess-row, someday. + (preprocessor copy) + (pgloader.sources::preprocess-row copy) + + ;; FIXME: we could change the API around data-is-preformatted-p, + ;; but that's a bigger change than duplicating the information in + ;; the object. + (copy-format copy) + (if (data-is-preformatted-p copy) :escaped :raw)) (flet ((send-current-batch (unqualified-table-name) ;; we close over the whole lexical environment or almost... (let ((batch-start-time (get-internal-real-time))) - (copy-batch table + (send-batch table columns - (batch-data current-batch) - (batch-count current-batch) + current-batch :on-error-stop on-error-stop) (let ((batch-seconds (elapsed-time-since batch-start-time))) (log-message :debug - "copy-batch[~a] ~a ~d row~:p [~a] in ~6$s~@[ [oversized]~]" + "send-batch[~a] ~a ~d row~:p [~a] in ~6$s~@[ [oversized]~]" (lp:kernel-worker-index) unqualified-table-name (batch-count current-batch) @@ -158,9 +161,7 @@ ;; also add up the time it takes to format the rows (let ((start-time (get-internal-real-time))) - (format-row-in-batch copy nbcols row current-batch - preprocessor pre-formatted - esc-safe-arr transform-fns) + (format-row-in-batch copy nbcols row current-batch) (incf seconds (elapsed-time-since start-time))))) ;; the last batch might not be empty @@ -177,40 +178,28 @@ (list :writer table seconds))) -(defun format-row-in-batch (copy nbcols row current-batch - preprocessor - pre-formatted - escape-safe-array - transform-fns) +(defun format-row-in-batch (copy nbcols row current-batch) "Given a row from the queue, prepare it for the next batch." - (declare (ignore copy)) - (metabang.bind:bind - ((row (if preprocessor (funcall preprocessor row) row)) - (transformed-row (or (when (and (not pre-formatted) - transform-fns) - (apply-transforms nbcols - row - transform-fns)) - row)) + (let* ((row (if (preprocessor copy) + (funcall (preprocessor copy) row) + row)) + (transformed-row (cond ((eq :escaped (copy-format copy)) row) + ((null (transforms copy)) row) + (t + (apply-transforms copy + nbcols + row + (transforms copy)))))) + (multiple-value-bind (pg-vector-row bytes) + (if transformed-row + (ecase (copy-format copy) + (:raw (format-vector-row nbcols transformed-row)) + (:escaped (format-escaped-vector-row nbcols transformed-row))) + (values nil 0)) - ((:values copy-data bytes) - (format-vector-row nbcols transformed-row pre-formatted escape-safe-array) - ;; (handler-case - ;; (format-vector-row row - ;; pre-formatted - ;; transform-fns - ;; ascii-types) - ;; (condition (e) - ;; (log-message :error "Error while formating a row from ~s:" - ;; (format-table-name (pgloader.sources:target copy))) - ;; (log-message :error "~a" e) - ;; (update-stats :data (pgloader.sources:target copy) :errs 1) - ;; (values nil 0))) - )) - ;; we might have to debug - (when copy-data - (log-message :data "> ~s" - (map 'string #'code-char copy-data))) + ;; we might have to debug + (when pg-vector-row + (log-message :data "> ~s" (map 'string #'code-char pg-vector-row)) - ;; now add copy-data to current-batch - (push-row current-batch copy-data bytes))) + ;; now add copy-data to current-batch + (push-row current-batch pg-vector-row bytes))))) diff --git a/src/pg-copy/copy-retry-batch.lisp b/src/pg-copy/copy-retry-batch.lisp index 8f1a655..9420c1f 100644 --- a/src/pg-copy/copy-retry-batch.lisp +++ b/src/pg-copy/copy-retry-batch.lisp @@ -54,7 +54,7 @@ ;;; ;;; The main retry batch function. ;;; -(defun retry-batch (table columns batch batch-rows condition +(defun retry-batch (table columns batch condition &optional (current-batch-pos 0) &aux (nb-errors 0)) "Batch is a list of rows containing at least one bad row, the first such @@ -67,32 +67,36 @@ :with next-error := (parse-copy-error-context (database-error-context condition)) - :while (< current-batch-pos batch-rows) + :while (< current-batch-pos (batch-count batch)) :do (progn ; indenting helper (log-message :debug "pos: ~s ; err: ~a" current-batch-pos next-error) (when (= current-batch-pos next-error) (log-message :info "error recovery at ~d/~d, processing bad row" - current-batch-pos batch-rows) - (process-bad-row table condition (aref batch current-batch-pos)) + current-batch-pos (batch-count batch)) + (process-bad-row table + condition + (aref (batch-data batch) current-batch-pos)) (incf current-batch-pos) (incf nb-errors)) (let* ((current-batch-rows - (next-batch-rows batch-rows current-batch-pos next-error))) + (next-batch-rows (batch-count batch) current-batch-pos next-error))) (when (< 0 current-batch-rows) (if (< current-batch-pos next-error) (log-message :info "error recovery at ~d/~d, next error at ~d, ~ loading ~d row~:p" current-batch-pos - batch-rows + (batch-count batch) next-error current-batch-rows) (log-message :info "error recovery at ~d/~d, trying ~d row~:p" - current-batch-pos batch-rows current-batch-rows)) + current-batch-pos + (batch-count batch) + current-batch-rows)) (handler-case (incf current-batch-pos @@ -116,7 +120,7 @@ next-error (+ current-batch-pos next-error-relative))))))))) (log-message :info "Recovery found ~d errors in ~d row~:p" - nb-errors batch-rows) + nb-errors (batch-count batch)) ;; Return how many rows where erroneous, for statistics purposes nb-errors) @@ -131,7 +135,7 @@ (unwind-protect (loop :repeat current-batch-rows :for pos :from current-batch-pos - :do (db-write-row stream (aref batch pos))) + :do (db-write-row stream (aref (batch-data batch) pos))) ;; close-db-writer is the one signaling cl-postgres-errors (progn diff --git a/src/pg-copy/copy-write-row.lisp b/src/pg-copy/copy-write-batch.lisp similarity index 84% rename from src/pg-copy/copy-write-row.lisp rename to src/pg-copy/copy-write-batch.lisp index 929303d..8cdf4da 100644 --- a/src/pg-copy/copy-write-row.lisp +++ b/src/pg-copy/copy-write-batch.lisp @@ -10,6 +10,13 @@ ;;; COPY protocol, and retry the batch avoiding known bad rows (from parsing ;;; COPY error messages) in case some data related conditions are signaled. ;;; +(defun db-write-batch (copier batch) + (loop :for count :below (batch-count batch) + :for data :across (batch-data batch) + :do (when data + (db-write-row copier data)) + :finally (return (batch-count batch)))) + (defun db-write-row (copier data) "Copy cl-postgres:db-write-row guts to avoid computing utf-8 bytes all over again, as we reproduced the data formating in pgloader code. The @@ -26,4 +33,3 @@ :do (write-byte byte cl-postgres::socket)))))) (incf (cl-postgres::copier-count copier))) - diff --git a/src/sources/common/api.lisp b/src/sources/common/api.lisp index 9fc2d69..d23d0fe 100644 --- a/src/sources/common/api.lisp +++ b/src/sources/common/api.lisp @@ -27,12 +27,7 @@ :initarg :process-fn) ; (format :accessor copy-format ; format can be :COPY :initarg :format ; in which case no escaping - :initform :raw) ; has to be done - ;; and this one is an array of magic numbers for per-column escaping - ;; rules, which depend on the PostgreSQL data type we are targetting, and - ;; allow to reduce the work to be done at COPY time. - (columns-escape-mode :accessor columns-escape-mode - :initarg :columns-escape-mode)) + :initform :raw)) ; has to be done (:documentation "pgloader Generic Data Source")) (defmethod initialize-instance :after ((source copy) &key)