From 7dd69a11e1bd521ea28092d1d480a2027f3a03cd Mon Sep 17 00:00:00 2001 From: Dimitri Fontaine Date: Sat, 16 Jan 2016 22:53:55 +0100 Subject: [PATCH] Implement concurrency and workers for files sources. More than the syntax and API tweaks, this patch also make it so that a multi-file specification (using e.g. ALL FILENAMES IN DIRECTORY) can be loaded with several files in the group in parallel. To that effect, tweak again the md-connection and md-copy implementations. --- pgloader.1 | 2 +- pgloader.1.md | 3 +- src/main.lisp | 16 ++-- src/package.lisp | 3 +- src/parsers/command-copy.lisp | 38 ++++----- src/parsers/command-csv.lisp | 40 ++++----- src/parsers/command-fixed.lisp | 34 ++++---- src/pgsql/schema.lisp | 3 +- src/sources/common/api.lisp | 4 +- src/sources/common/files-and-pathnames.lisp | 68 +++++++-------- src/sources/common/md-methods.lisp | 94 +++++++++++++++------ src/sources/common/methods.lisp | 3 + src/sources/copy.lisp | 19 +++-- src/sources/csv/csv.lisp | 18 ++++ src/sources/fixed.lisp | 4 + src/utils/connection.lisp | 5 +- test/csv-filename-pattern.load | 5 +- 17 files changed, 216 insertions(+), 143 deletions(-) diff --git a/pgloader.1 b/pgloader.1 index b7f1c6c..bb65eb0 100644 --- a/pgloader.1 +++ b/pgloader.1 @@ -622,7 +622,7 @@ All data sources specific commands support the following options: See the section BATCH BEHAVIOUR OPTIONS for more details\. . .IP -In addition, the data sources \fImysql\fR, \fIsqlite\fR, \fImssql\fR, \fIixf\fR and \fIdbf\fR all support the following settings: +In addition, the following settings are available: . .IP "\(bu" 4 \fIworkers = W\fR diff --git a/pgloader.1.md b/pgloader.1.md index b47a1ea..7143635 100644 --- a/pgloader.1.md +++ b/pgloader.1.md @@ -550,8 +550,7 @@ Some clauses are common to all commands: See the section BATCH BEHAVIOUR OPTIONS for more details. - In addition, the data sources *mysql*, *sqlite*, *mssql*, *ixf* and - *dbf* all support the following settings: + In addition, the following settings are available: - *workers = W* - *concurrency = C* diff --git a/src/main.lisp b/src/main.lisp index d27db96..5fcb2b9 100644 --- a/src/main.lisp +++ b/src/main.lisp @@ -567,7 +567,7 @@ (lisp-code-for-loading-from-copy source fields target :encoding (or encoding :default) :gucs gucs - :copy-options options + :options options :before before :after after)) @@ -575,7 +575,7 @@ (lisp-code-for-loading-from-fixed source fields target :encoding encoding :gucs gucs - :fixed-options options + :options options :before before :after after)) @@ -583,21 +583,21 @@ (lisp-code-for-loading-from-csv source fields target :encoding encoding :gucs gucs - :csv-options options + :options options :before before :after after)) (dbf-connection (lisp-code-for-loading-from-dbf source target :gucs gucs - :dbf-options options + :options options :before before :after after)) (ixf-connection (lisp-code-for-loading-from-ixf source target :gucs gucs - :ixf-options options + :options options :before before :after after)) @@ -605,7 +605,7 @@ (lisp-code-for-loading-from-sqlite source target :gucs gucs :casts casts - :sqlite-options options + :options options :before before :after after)) @@ -613,7 +613,7 @@ (lisp-code-for-loading-from-mysql source target :gucs gucs :casts casts - :mysql-options options + :options options :before before :after after)) @@ -621,7 +621,7 @@ (lisp-code-for-loading-from-mssql source target :gucs gucs :casts casts - :mssql-options options + :options options :before before :after after)))) :start-logger nil))) diff --git a/src/package.lisp b/src/package.lisp index 5620500..664c25c 100644 --- a/src/package.lisp +++ b/src/package.lisp @@ -438,7 +438,7 @@ #:md-spec #:md-strm #:expand-spec - #:open-next-stream + #:clone-copy-for ;; the db-methods #:fetch-metadata @@ -451,7 +451,6 @@ ;; file based utils for CSV, fixed etc #:with-open-file-or-stream #:get-pathname - #:get-absolute-pathname #:project-fields #:reformat-then-process diff --git a/src/parsers/command-copy.lisp b/src/parsers/command-copy.lisp index 78ef37d..2ab5559 100644 --- a/src/parsers/command-copy.lisp +++ b/src/parsers/command-copy.lisp @@ -33,7 +33,9 @@ (defrule option-null (and kw-null quoted-string) (:destructure (kw null) (declare (ignore kw)) (cons :null-as null))) -(defrule copy-option (or option-batch-rows +(defrule copy-option (or option-workers + option-concurrency + option-batch-rows option-batch-size option-batch-concurrency option-truncate @@ -43,19 +45,9 @@ option-delimiter option-null)) -(defrule another-copy-option (and comma copy-option) - (:lambda (source) - (bind (((_ option) source)) option))) - -(defrule copy-option-list (and copy-option (* another-copy-option)) - (:lambda (source) - (destructuring-bind (opt1 opts) source - (alexandria:alist-plist `(,opt1 ,@opts))))) - -(defrule copy-options (and kw-with copy-option-list) - (:lambda (source) - (bind (((_ opts) source)) - (cons :copy-options opts)))) +(defrule copy-options (and kw-with + (and copy-option (* (and comma copy-option)))) + (:function flatten-option-list)) (defrule copy-uri (and "copy://" filename) (:lambda (source) @@ -112,8 +104,10 @@ &key (encoding :utf-8) columns - gucs before after - ((:copy-options options))) + gucs before after options + &aux + (worker-count (getf options :worker-count)) + (concurrency (getf options :concurrency))) `(lambda () (let* (,@(pgsql-connection-bindings pg-db-conn gucs) ,@(batch-control-bindings options) @@ -136,10 +130,16 @@ :fields ',fields :columns ',columns ,@(remove-batch-control-option - options :extras '(:truncate + options :extras '(:worker-count + :concurrency + :truncate :drop-indexes :disable-triggers))))) (pgloader.sources:copy-database source + ,@ (when worker-count + (list :worker-count worker-count)) + ,@ (when concurrency + (list :concurrency concurrency)) :truncate truncate :drop-indexes drop-indexes :disable-triggers disable-triggers)) @@ -149,7 +149,7 @@ (defrule load-copy-file load-copy-file-command (:lambda (command) (bind (((source encoding fields pg-db-uri columns - &key ((:copy-options options)) gucs before after) command)) + &key options gucs before after) command)) (cond (*dry-run* (lisp-code-for-csv-dry-run pg-db-uri)) (t @@ -159,4 +159,4 @@ :gucs gucs :before before :after after - :copy-options options)))))) + :options options)))))) diff --git a/src/parsers/command-csv.lisp b/src/parsers/command-csv.lisp index 0b95b4e..75271b9 100644 --- a/src/parsers/command-csv.lisp +++ b/src/parsers/command-csv.lisp @@ -103,7 +103,9 @@ (bind (((_ _ _ escape-mode) term)) (cons :escape-mode escape-mode)))) -(defrule csv-option (or option-batch-rows +(defrule csv-option (or option-workers + option-concurrency + option-batch-rows option-batch-size option-batch-concurrency option-truncate @@ -120,19 +122,9 @@ option-keep-unquoted-blanks option-csv-escape-mode)) -(defrule another-csv-option (and comma csv-option) - (:lambda (source) - (bind (((_ option) source)) option))) - -(defrule csv-option-list (and csv-option (* another-csv-option)) - (:lambda (source) - (destructuring-bind (opt1 opts) source - (alexandria:alist-plist `(,opt1 ,@opts))))) - -(defrule csv-options (and kw-with csv-option-list) - (:lambda (source) - (bind (((_ opts) source)) - (cons :csv-options opts)))) +(defrule csv-options (and kw-with + (and csv-option (* (and comma csv-option)))) + (:function flatten-option-list)) ;; ;; CSV per-field reading options @@ -414,8 +406,10 @@ &key (encoding :utf-8) columns - gucs before after - ((:csv-options options))) + gucs before after options + &aux + (worker-count (getf options :worker-count)) + (concurrency (getf options :concurrency))) `(lambda () (let* (,@(pgsql-connection-bindings pg-db-conn gucs) ,@(batch-control-bindings options) @@ -425,7 +419,7 @@ (progn ,(sql-code-block pg-db-conn :pre before "before load") - (let ((truncate (getf ',options :truncate)) + (let ((truncate (getf ',options :truncate)) (disable-triggers (getf ',options :disable-triggers)) (drop-indexes (getf ',options :drop-indexes)) (source @@ -438,10 +432,16 @@ :fields ',fields :columns ',columns ,@(remove-batch-control-option - options :extras '(:truncate + options :extras '(:worker-count + :concurrency + :truncate :drop-indexes :disable-triggers))))) (pgloader.sources:copy-database source + ,@ (when worker-count + (list :worker-count worker-count)) + ,@ (when concurrency + (list :concurrency concurrency)) :truncate truncate :drop-indexes drop-indexes :disable-triggers disable-triggers)) @@ -451,7 +451,7 @@ (defrule load-csv-file load-csv-file-command (:lambda (command) (bind (((source encoding fields pg-db-uri columns - &key ((:csv-options options)) gucs before after) command)) + &key options gucs before after) command)) (cond (*dry-run* (lisp-code-for-csv-dry-run pg-db-uri)) (t @@ -461,4 +461,4 @@ :gucs gucs :before before :after after - :csv-options options)))))) + :options options)))))) diff --git a/src/parsers/command-fixed.lisp b/src/parsers/command-fixed.lisp index 2aa817a..dc25809 100644 --- a/src/parsers/command-fixed.lisp +++ b/src/parsers/command-fixed.lisp @@ -43,7 +43,9 @@ (:lambda (source) (bind (((_ field-defs _) source)) field-defs))) -(defrule fixed-option (or option-batch-rows +(defrule fixed-option (or option-workers + option-concurrency + option-batch-rows option-batch-size option-batch-concurrency option-truncate @@ -51,19 +53,9 @@ option-disable-triggers option-skip-header)) -(defrule another-fixed-option (and comma fixed-option) - (:lambda (source) - (bind (((_ option) source)) option))) - -(defrule fixed-option-list (and fixed-option (* another-fixed-option)) - (:lambda (source) - (destructuring-bind (opt1 opts) source - (alexandria:alist-plist `(,opt1 ,@opts))))) - -(defrule fixed-options (and kw-with csv-option-list) - (:lambda (source) - (bind (((_ opts) source)) - (cons :fixed-options opts)))) +(defrule fixed-options (and kw-with + (and fixed-option (* (and comma fixed-option)))) + (:function flatten-option-list)) (defrule fixed-uri (and "fixed://" filename) (:lambda (source) @@ -120,8 +112,10 @@ &key (encoding :utf-8) columns - gucs before after - ((:fixed-options options))) + gucs before after options + &aux + (worker-count (getf options :worker-count)) + (concurrency (getf options :concurrency))) `(lambda () (let* (,@(pgsql-connection-bindings pg-db-conn gucs) ,@(batch-control-bindings options) @@ -146,6 +140,10 @@ :skip-lines ,(or (getf options :skip-line) 0)))) (pgloader.sources:copy-database source + ,@ (when worker-count + (list :worker-count worker-count)) + ,@ (when concurrency + (list :concurrency concurrency)) :truncate truncate :drop-indexes drop-indexes :disable-triggers disable-triggers)) @@ -155,7 +153,7 @@ (defrule load-fixed-cols-file load-fixed-cols-file-command (:lambda (command) (bind (((source encoding fields pg-db-uri columns - &key ((:fixed-options options)) gucs before after) command)) + &key options gucs before after) command)) (cond (*dry-run* (lisp-code-for-csv-dry-run pg-db-uri)) (t @@ -165,4 +163,4 @@ :gucs gucs :before before :after after - :fixed-options options)))))) + :options options)))))) diff --git a/src/pgsql/schema.lisp b/src/pgsql/schema.lisp index 8acea41..660b733 100644 --- a/src/pgsql/schema.lisp +++ b/src/pgsql/schema.lisp @@ -406,7 +406,8 @@ (with-stats-collection ("Index Build Completion" :section section) (loop :repeat (count-indexes table) - :do (lp:receive-result idx-channel))) + :do (lp:receive-result idx-channel)) + (lp:end-kernel :wait t)) ;; turn unique indexes into pkeys now (with-pgsql-connection (target) diff --git a/src/sources/common/api.lisp b/src/sources/common/api.lisp index 4edee6c..c990bb3 100644 --- a/src/sources/common/api.lisp +++ b/src/sources/common/api.lisp @@ -151,6 +151,8 @@ (defgeneric process-rows (md-copy stream process-fn) (:documentation "Process rows from a given input stream.")) +(defgeneric clone-copy-for (md-copy path-spec) + (:documentation "Create a new instance for copying PATH-SPEC data.")) ;;; @@ -188,4 +190,4 @@ (:documentation "Alter load duties for database sources copy support.")) (defgeneric instanciate-table-copy-object (db-copy table) - (:documentation "Create an new instance for copying TABLE data.")) + (:documentation "Create a new instance for copying TABLE data.")) diff --git a/src/sources/common/files-and-pathnames.lisp b/src/sources/common/files-and-pathnames.lisp index d01d515..687ea4b 100644 --- a/src/sources/common/files-and-pathnames.lisp +++ b/src/sources/common/files-and-pathnames.lisp @@ -12,7 +12,11 @@ (print-unreadable-object (c stream :type t :identity t) (with-slots (type spec) c (let ((path (when (slot-boundp c 'path) (slot-value c 'path)))) - (format stream "~a://~a:~{~a~^,~}" type (first spec) path))))) + (etypecase path + (string + (format stream "~a://~a:~a" type (first spec) path)) + (list + (format stream "~a://~a:~{~a~^,~}" type (first spec) path))))))) (defmethod expand :after ((md md-connection)) "Expand the archive for the MD connection." @@ -30,15 +34,12 @@ (defgeneric expand-spec (md-connection) (:documentation "Expand specification for an FD source.")) -(defgeneric open-next-stream (md-connection &rest args &key) - (:documentation "Open the next input stream from FD.")) - (defmethod expand-spec ((md md-connection)) "Given fd spec as a CONS of a source type and a tagged object, expand the tagged object depending on the source type and return a list of pathnames." (destructuring-bind (type &rest part) (md-spec md) (ecase type - (:inline (list (caar part))) + (:inline (list (md-spec md))) (:stdin (list *standard-input*)) (:regex (destructuring-bind (keep regex root) part (filter-directory regex @@ -51,39 +52,31 @@ (if (probe-file realname) (list realname) (error "File does not exists: '~a'." realname))))))) -(defmethod open-next-stream ((md md-connection) - &rest args - &key &allow-other-keys) - "Switch to the following file in the PATH list." - ;; first close current stream - (when (slot-boundp md 'strm) - (close (md-strm md))) - - ;; now open the new one +(defmethod open-connection ((md md-connection) + &rest args + &key &allow-other-keys) + "We know how to open several kinds of specs here, all that target a single + file as input. The multi-file specs must have been expanded before trying + to open the connection." (when (fd-path md) - (let ((current-path (pop (fd-path md)))) - (when current-path - (log-message :info "Open ~s" current-path) - (prog1 - (setf (md-strm md) - (typecase current-path - (stream current-path) - (t (apply #'open current-path args)))) - ;; - ;; The :inline md spec is a little special, it's a filename and a - ;; position where to skip to at opening the file. It allows for - ;; easier self-contained tests. - ;; - (when (eq :inline (car (md-spec md))) - (file-position (md-strm md) (cdadr (md-spec md))))))))) + (log-message :notice "Opening ~s" (fd-path md)) ; info + (cond ;; inline + ((and (listp (fd-path md)) + (eq :inline (first (fd-path md)))) + (destructuring-bind (filename . position) + (second (fd-path md)) + ;; open the filename with given extra args + (setf (md-strm md) (apply #'open filename args)) + ;; and position the stream as expected + (file-position (md-strm md) position))) -(defmethod open-connection ((md md-connection) &key) - "The fd connection supports several specs to open a connection: - - if we have a path, that's a single file to open - - otherwise spec is an CONS wherin - - car is the source type - - cdr is an object suitable for `get-absolute-pathname`" - (setf (fd-path md) (expand-spec md)) + ;; stdin + ((streamp (fd-path md)) + (setf (md-strm md) (fd-path md))) + + ;; other cases should be filenames + (t + (setf (md-strm md) (apply #'open (fd-path md) args))))) md) (defmethod close-connection ((md md-connection)) @@ -91,8 +84,7 @@ (when (and (slot-boundp md 'strm) (md-strm md)) (close (md-strm md))) - (setf (md-strm md) nil - (fd-path md) nil)) + (setf (md-strm md) nil)) (defun get-pathname (dbname table-name &key (fd-path-root *fd-path-root*)) "Return a pathname where to read or write the file data" diff --git a/src/sources/common/md-methods.lisp b/src/sources/common/md-methods.lisp index 956cac8..b69b570 100644 --- a/src/sources/common/md-methods.lisp +++ b/src/sources/common/md-methods.lisp @@ -18,26 +18,24 @@ Finally returns how many rows where read and processed." - (with-connection (cnx (source copy)) - (loop :for input := (open-next-stream cnx - :direction :input - :external-format (encoding copy) - :if-does-not-exist nil) - :while input - :do (progn - ;; we handle skipping more than one line here, as cl-copy only knows - ;; about skipping the first line - (loop repeat (skip-lines copy) do (read-line input nil nil)) + (with-connection (cnx (source copy) + :direction :input + :external-format (encoding copy) + :if-does-not-exist nil) + (let ((input (md-strm cnx))) + ;; we handle skipping more than one line here, as cl-copy only knows + ;; about skipping the first line + (loop :repeat (skip-lines copy) :do (read-line input nil nil)) - ;; we might now have to read the fields from the header line - (when (header copy) - (setf (fields copy) - (parse-header copy (read-line input nil nil))) + ;; we might now have to read the fields from the header line + (when (header copy) + (setf (fields copy) + (parse-header copy (read-line input nil nil))) - (log-message :debug "Parsed header columns ~s" (fields copy))) + (log-message :debug "Parsed header columns ~s" (fields copy))) - ;; read in the text file, split it into columns - (process-rows copy input process-row-fn))))) + ;; read in the text file, split it into columns + (process-rows copy input process-row-fn)))) (defmethod preprocess-row ((copy md-copy)) "The file based readers possibly have extra work to do with user defined @@ -54,6 +52,23 @@ (format nil "~s" (car col))) (columns copy))) +(defmethod clone-copy-for ((copy md-copy) path-spec) + "Create a copy of CSV for loading data from PATH-SPEC." + (make-instance (class-of copy) + ;; source-db is expected unbound here, so bypassed + :target-db (clone-connection (target-db copy)) + :source (make-instance (class-of (source copy)) + :spec (md-spec (source copy)) + :type (conn-type (source copy)) + :path path-spec) + :target (target copy) + :fields (fields copy) + :columns (columns copy) + :transforms (transforms copy) + :encoding (encoding copy) + :skip-lines (skip-lines copy) + :header (header copy))) + (defmethod copy-database ((copy md-copy) &key truncate @@ -61,8 +76,8 @@ drop-indexes ;; generic API, but ignored here - (worker-count 8) - (concurrency 2) + (worker-count 4) + (concurrency 1) data-only schema-only @@ -86,11 +101,42 @@ (target copy) :drop-indexes drop-indexes) - (copy-from copy - :worker-count worker-count - :concurrency concurrency - :truncate truncate - :disable-triggers disable-triggers) + ;; ensure we truncate only one + (when truncate + (truncate-tables (clone-connection (target-db copy)) (target copy))) + + ;; expand the specs of our source, we might have to care about several + ;; files actually. + (let* ((lp:*kernel* (make-kernel worker-count)) + (channel (lp:make-channel)) + (path-list (expand-spec (source copy)))) + (loop :for path-spec :in path-list + :do (let ((table-source (clone-copy-for copy path-spec))) + (copy-from table-source + :concurrency concurrency + :kernel lp:*kernel* + :channel channel + :truncate nil + :disable-triggers disable-triggers))) + + ;; end kernel + (with-stats-collection ("COPY Threads Completion" :section :post + :use-result-as-read t + :use-result-as-rows t) + (let ((worker-count (* (length path-list) + (task-count concurrency)))) + (loop :for tasks :below worker-count + :do (destructuring-bind (task table seconds) + (lp:receive-result channel) + (log-message :debug + "Finished processing ~a for ~s ~50T~6$s" + task (format-table-name table) seconds) + (when (eq :writer task) + (update-stats :data table :secs seconds)))) + (prog1 + worker-count + (lp:end-kernel :wait nil)))) + (lp:end-kernel :wait t)) ;; re-create the indexes from the target table entry (create-indexes-again (target-db copy) diff --git a/src/sources/common/methods.lisp b/src/sources/common/methods.lisp index 3b696df..8415c49 100644 --- a/src/sources/common/methods.lisp +++ b/src/sources/common/methods.lisp @@ -180,5 +180,8 @@ ;; now wait until both the tasks are over, and kill the kernel (unless c-s-p + (log-message :debug "waiting for ~d tasks" (task-count concurrency)) + (loop :repeat (task-count concurrency) + :do (lp:receive-result channel)) (log-message :info "COPY ~s done." table-name) (unless k-s-p (lp:end-kernel :wait t))))))) diff --git a/src/sources/copy.lisp b/src/sources/copy.lisp index 65a8c5e..63457f2 100644 --- a/src/sources/copy.lisp +++ b/src/sources/copy.lisp @@ -10,12 +10,7 @@ (setf (slot-value copy 'type) "copy")) (defclass copy-copy (md-copy) - ((encoding :accessor encoding ; file encoding - :initarg :encoding) ; - (skip-lines :accessor skip-lines ; we might want to skip COPY lines - :initarg :skip-lines ; - :initform 0) ; - (delimiter :accessor delimiter ; see COPY options for TEXT + ((delimiter :accessor delimiter ; see COPY options for TEXT :initarg :delimiter ; in PostgreSQL docs :initform #\Tab) (null-as :accessor null-as @@ -35,6 +30,18 @@ (unless transforms (setf (slot-value copy 'transforms) (make-list (length columns)))))) +(defmethod clone-copy-for ((copy copy-copy) path-spec) + "Create a copy of FIXED for loading data from PATH-SPEC." + (let ((copy-for-path-spec + (change-class (call-next-method copy path-spec) 'copy-copy))) + (loop :for slot-name :in '(delimiter null-as) + :do (when (slot-boundp copy slot-name) + (setf (slot-value copy-for-path-spec slot-name) + (slot-value copy slot-name)))) + + ;; return the new instance! + copy-for-path-spec)) + (declaim (inline parse-row)) (defun parse-row (line &key (delimiter #\Tab) (null-as "\\N")) diff --git a/src/sources/csv/csv.lisp b/src/sources/csv/csv.lisp index 19446dd..53ff89a 100644 --- a/src/sources/csv/csv.lisp +++ b/src/sources/csv/csv.lisp @@ -48,6 +48,24 @@ (unless transforms (setf (slot-value csv 'transforms) (make-list (length columns)))))) +(defmethod clone-copy-for ((csv copy-csv) path-spec) + "Create a copy of CSV for loading data from PATH-SPEC." + (let ((csv-for-path-spec + (change-class (call-next-method csv path-spec) 'copy-csv))) + (loop :for slot-name :in '(source-type + separator + newline + quote + escape + escape-mode + trim-blanks) + :do (when (slot-boundp csv slot-name) + (setf (slot-value csv-for-path-spec slot-name) + (slot-value csv slot-name)))) + + ;; return the new instance! + csv-for-path-spec)) + ;;; ;;; Read a file format in CSV format, and call given function on each line. ;;; diff --git a/src/sources/fixed.lisp b/src/sources/fixed.lisp index fd7a4d5..97f950e 100644 --- a/src/sources/fixed.lisp +++ b/src/sources/fixed.lisp @@ -30,6 +30,10 @@ (unless transforms (setf (slot-value fixed 'transforms) (make-list (length columns)))))) +(defmethod clone-copy-for ((fixed copy-fixed) path-spec) + "Create a copy of FIXED for loading data from PATH-SPEC." + (change-class (call-next-method fixed path-spec) 'copy-fixed)) + (declaim (inline parse-row)) (defun parse-row (fixed-cols-specs line) diff --git a/src/utils/connection.lisp b/src/utils/connection.lisp index 8371c3a..51ea74a 100644 --- a/src/utils/connection.lisp +++ b/src/utils/connection.lisp @@ -133,7 +133,8 @@ ;;; ;;; Tools for every connection classes ;;; -(defmacro with-connection ((var connection) &body forms) +(defmacro with-connection ((var connection &rest args &key &allow-other-keys) + &body forms) "Connect to DB-CONNECTION and handle any condition when doing so, and when connected execute FORMS in a protected way so that we always disconnect at the end." @@ -147,7 +148,7 @@ #'(lambda (w) (log-message :warning "~a" w) (muffle-warning)))) - (open-connection ,conn)) + (apply #'open-connection ,conn (list ,@args))) (condition (e) (cond ((typep ,connection 'fd-connection) (error 'fd-connection-error diff --git a/test/csv-filename-pattern.load b/test/csv-filename-pattern.load index ce8e492..65da029 100644 --- a/test/csv-filename-pattern.load +++ b/test/csv-filename-pattern.load @@ -3,9 +3,12 @@ load csv in directory 'data' having fields (id, field) into postgres:///pgloader?matching + with fields optionally enclosed by '"', fields terminated by ',', - truncate + truncate, + workers = 8, + concurrency = 1 before load do $$ drop table if exists matching; $$,