Implement concurrency and workers for files sources.

More than the syntax and API tweaks, this patch also make it so that a
multi-file specification (using e.g. ALL FILENAMES IN DIRECTORY) can be
loaded with several files in the group in parallel.

To that effect, tweak again the md-connection and md-copy
implementations.
This commit is contained in:
Dimitri Fontaine 2016-01-16 22:53:55 +01:00
parent aa8b756315
commit 7dd69a11e1
17 changed files with 216 additions and 143 deletions

View File

@ -622,7 +622,7 @@ All data sources specific commands support the following options:
See the section BATCH BEHAVIOUR OPTIONS for more details\.
.
.IP
In addition, the data sources \fImysql\fR, \fIsqlite\fR, \fImssql\fR, \fIixf\fR and \fIdbf\fR all support the following settings:
In addition, the following settings are available:
.
.IP "\(bu" 4
\fIworkers = W\fR

View File

@ -550,8 +550,7 @@ Some clauses are common to all commands:
See the section BATCH BEHAVIOUR OPTIONS for more details.
In addition, the data sources *mysql*, *sqlite*, *mssql*, *ixf* and
*dbf* all support the following settings:
In addition, the following settings are available:
- *workers = W*
- *concurrency = C*

View File

@ -567,7 +567,7 @@
(lisp-code-for-loading-from-copy source fields target
:encoding (or encoding :default)
:gucs gucs
:copy-options options
:options options
:before before
:after after))
@ -575,7 +575,7 @@
(lisp-code-for-loading-from-fixed source fields target
:encoding encoding
:gucs gucs
:fixed-options options
:options options
:before before
:after after))
@ -583,21 +583,21 @@
(lisp-code-for-loading-from-csv source fields target
:encoding encoding
:gucs gucs
:csv-options options
:options options
:before before
:after after))
(dbf-connection
(lisp-code-for-loading-from-dbf source target
:gucs gucs
:dbf-options options
:options options
:before before
:after after))
(ixf-connection
(lisp-code-for-loading-from-ixf source target
:gucs gucs
:ixf-options options
:options options
:before before
:after after))
@ -605,7 +605,7 @@
(lisp-code-for-loading-from-sqlite source target
:gucs gucs
:casts casts
:sqlite-options options
:options options
:before before
:after after))
@ -613,7 +613,7 @@
(lisp-code-for-loading-from-mysql source target
:gucs gucs
:casts casts
:mysql-options options
:options options
:before before
:after after))
@ -621,7 +621,7 @@
(lisp-code-for-loading-from-mssql source target
:gucs gucs
:casts casts
:mssql-options options
:options options
:before before
:after after))))
:start-logger nil)))

View File

@ -438,7 +438,7 @@
#:md-spec
#:md-strm
#:expand-spec
#:open-next-stream
#:clone-copy-for
;; the db-methods
#:fetch-metadata
@ -451,7 +451,6 @@
;; file based utils for CSV, fixed etc
#:with-open-file-or-stream
#:get-pathname
#:get-absolute-pathname
#:project-fields
#:reformat-then-process

View File

@ -33,7 +33,9 @@
(defrule option-null (and kw-null quoted-string)
(:destructure (kw null) (declare (ignore kw)) (cons :null-as null)))
(defrule copy-option (or option-batch-rows
(defrule copy-option (or option-workers
option-concurrency
option-batch-rows
option-batch-size
option-batch-concurrency
option-truncate
@ -43,19 +45,9 @@
option-delimiter
option-null))
(defrule another-copy-option (and comma copy-option)
(:lambda (source)
(bind (((_ option) source)) option)))
(defrule copy-option-list (and copy-option (* another-copy-option))
(:lambda (source)
(destructuring-bind (opt1 opts) source
(alexandria:alist-plist `(,opt1 ,@opts)))))
(defrule copy-options (and kw-with copy-option-list)
(:lambda (source)
(bind (((_ opts) source))
(cons :copy-options opts))))
(defrule copy-options (and kw-with
(and copy-option (* (and comma copy-option))))
(:function flatten-option-list))
(defrule copy-uri (and "copy://" filename)
(:lambda (source)
@ -112,8 +104,10 @@
&key
(encoding :utf-8)
columns
gucs before after
((:copy-options options)))
gucs before after options
&aux
(worker-count (getf options :worker-count))
(concurrency (getf options :concurrency)))
`(lambda ()
(let* (,@(pgsql-connection-bindings pg-db-conn gucs)
,@(batch-control-bindings options)
@ -136,10 +130,16 @@
:fields ',fields
:columns ',columns
,@(remove-batch-control-option
options :extras '(:truncate
options :extras '(:worker-count
:concurrency
:truncate
:drop-indexes
:disable-triggers)))))
(pgloader.sources:copy-database source
,@ (when worker-count
(list :worker-count worker-count))
,@ (when concurrency
(list :concurrency concurrency))
:truncate truncate
:drop-indexes drop-indexes
:disable-triggers disable-triggers))
@ -149,7 +149,7 @@
(defrule load-copy-file load-copy-file-command
(:lambda (command)
(bind (((source encoding fields pg-db-uri columns
&key ((:copy-options options)) gucs before after) command))
&key options gucs before after) command))
(cond (*dry-run*
(lisp-code-for-csv-dry-run pg-db-uri))
(t
@ -159,4 +159,4 @@
:gucs gucs
:before before
:after after
:copy-options options))))))
:options options))))))

View File

@ -103,7 +103,9 @@
(bind (((_ _ _ escape-mode) term))
(cons :escape-mode escape-mode))))
(defrule csv-option (or option-batch-rows
(defrule csv-option (or option-workers
option-concurrency
option-batch-rows
option-batch-size
option-batch-concurrency
option-truncate
@ -120,19 +122,9 @@
option-keep-unquoted-blanks
option-csv-escape-mode))
(defrule another-csv-option (and comma csv-option)
(:lambda (source)
(bind (((_ option) source)) option)))
(defrule csv-option-list (and csv-option (* another-csv-option))
(:lambda (source)
(destructuring-bind (opt1 opts) source
(alexandria:alist-plist `(,opt1 ,@opts)))))
(defrule csv-options (and kw-with csv-option-list)
(:lambda (source)
(bind (((_ opts) source))
(cons :csv-options opts))))
(defrule csv-options (and kw-with
(and csv-option (* (and comma csv-option))))
(:function flatten-option-list))
;;
;; CSV per-field reading options
@ -414,8 +406,10 @@
&key
(encoding :utf-8)
columns
gucs before after
((:csv-options options)))
gucs before after options
&aux
(worker-count (getf options :worker-count))
(concurrency (getf options :concurrency)))
`(lambda ()
(let* (,@(pgsql-connection-bindings pg-db-conn gucs)
,@(batch-control-bindings options)
@ -425,7 +419,7 @@
(progn
,(sql-code-block pg-db-conn :pre before "before load")
(let ((truncate (getf ',options :truncate))
(let ((truncate (getf ',options :truncate))
(disable-triggers (getf ',options :disable-triggers))
(drop-indexes (getf ',options :drop-indexes))
(source
@ -438,10 +432,16 @@
:fields ',fields
:columns ',columns
,@(remove-batch-control-option
options :extras '(:truncate
options :extras '(:worker-count
:concurrency
:truncate
:drop-indexes
:disable-triggers)))))
(pgloader.sources:copy-database source
,@ (when worker-count
(list :worker-count worker-count))
,@ (when concurrency
(list :concurrency concurrency))
:truncate truncate
:drop-indexes drop-indexes
:disable-triggers disable-triggers))
@ -451,7 +451,7 @@
(defrule load-csv-file load-csv-file-command
(:lambda (command)
(bind (((source encoding fields pg-db-uri columns
&key ((:csv-options options)) gucs before after) command))
&key options gucs before after) command))
(cond (*dry-run*
(lisp-code-for-csv-dry-run pg-db-uri))
(t
@ -461,4 +461,4 @@
:gucs gucs
:before before
:after after
:csv-options options))))))
:options options))))))

View File

@ -43,7 +43,9 @@
(:lambda (source)
(bind (((_ field-defs _) source)) field-defs)))
(defrule fixed-option (or option-batch-rows
(defrule fixed-option (or option-workers
option-concurrency
option-batch-rows
option-batch-size
option-batch-concurrency
option-truncate
@ -51,19 +53,9 @@
option-disable-triggers
option-skip-header))
(defrule another-fixed-option (and comma fixed-option)
(:lambda (source)
(bind (((_ option) source)) option)))
(defrule fixed-option-list (and fixed-option (* another-fixed-option))
(:lambda (source)
(destructuring-bind (opt1 opts) source
(alexandria:alist-plist `(,opt1 ,@opts)))))
(defrule fixed-options (and kw-with csv-option-list)
(:lambda (source)
(bind (((_ opts) source))
(cons :fixed-options opts))))
(defrule fixed-options (and kw-with
(and fixed-option (* (and comma fixed-option))))
(:function flatten-option-list))
(defrule fixed-uri (and "fixed://" filename)
(:lambda (source)
@ -120,8 +112,10 @@
&key
(encoding :utf-8)
columns
gucs before after
((:fixed-options options)))
gucs before after options
&aux
(worker-count (getf options :worker-count))
(concurrency (getf options :concurrency)))
`(lambda ()
(let* (,@(pgsql-connection-bindings pg-db-conn gucs)
,@(batch-control-bindings options)
@ -146,6 +140,10 @@
:skip-lines ,(or (getf options :skip-line) 0))))
(pgloader.sources:copy-database source
,@ (when worker-count
(list :worker-count worker-count))
,@ (when concurrency
(list :concurrency concurrency))
:truncate truncate
:drop-indexes drop-indexes
:disable-triggers disable-triggers))
@ -155,7 +153,7 @@
(defrule load-fixed-cols-file load-fixed-cols-file-command
(:lambda (command)
(bind (((source encoding fields pg-db-uri columns
&key ((:fixed-options options)) gucs before after) command))
&key options gucs before after) command))
(cond (*dry-run*
(lisp-code-for-csv-dry-run pg-db-uri))
(t
@ -165,4 +163,4 @@
:gucs gucs
:before before
:after after
:fixed-options options))))))
:options options))))))

View File

@ -406,7 +406,8 @@
(with-stats-collection ("Index Build Completion" :section section)
(loop :repeat (count-indexes table)
:do (lp:receive-result idx-channel)))
:do (lp:receive-result idx-channel))
(lp:end-kernel :wait t))
;; turn unique indexes into pkeys now
(with-pgsql-connection (target)

View File

@ -151,6 +151,8 @@
(defgeneric process-rows (md-copy stream process-fn)
(:documentation "Process rows from a given input stream."))
(defgeneric clone-copy-for (md-copy path-spec)
(:documentation "Create a new instance for copying PATH-SPEC data."))
;;;
@ -188,4 +190,4 @@
(:documentation "Alter load duties for database sources copy support."))
(defgeneric instanciate-table-copy-object (db-copy table)
(:documentation "Create an new instance for copying TABLE data."))
(:documentation "Create a new instance for copying TABLE data."))

View File

@ -12,7 +12,11 @@
(print-unreadable-object (c stream :type t :identity t)
(with-slots (type spec) c
(let ((path (when (slot-boundp c 'path) (slot-value c 'path))))
(format stream "~a://~a:~{~a~^,~}" type (first spec) path)))))
(etypecase path
(string
(format stream "~a://~a:~a" type (first spec) path))
(list
(format stream "~a://~a:~{~a~^,~}" type (first spec) path)))))))
(defmethod expand :after ((md md-connection))
"Expand the archive for the MD connection."
@ -30,15 +34,12 @@
(defgeneric expand-spec (md-connection)
(:documentation "Expand specification for an FD source."))
(defgeneric open-next-stream (md-connection &rest args &key)
(:documentation "Open the next input stream from FD."))
(defmethod expand-spec ((md md-connection))
"Given fd spec as a CONS of a source type and a tagged object, expand the
tagged object depending on the source type and return a list of pathnames."
(destructuring-bind (type &rest part) (md-spec md)
(ecase type
(:inline (list (caar part)))
(:inline (list (md-spec md)))
(:stdin (list *standard-input*))
(:regex (destructuring-bind (keep regex root) part
(filter-directory regex
@ -51,39 +52,31 @@
(if (probe-file realname) (list realname)
(error "File does not exists: '~a'." realname)))))))
(defmethod open-next-stream ((md md-connection)
&rest args
&key &allow-other-keys)
"Switch to the following file in the PATH list."
;; first close current stream
(when (slot-boundp md 'strm)
(close (md-strm md)))
;; now open the new one
(defmethod open-connection ((md md-connection)
&rest args
&key &allow-other-keys)
"We know how to open several kinds of specs here, all that target a single
file as input. The multi-file specs must have been expanded before trying
to open the connection."
(when (fd-path md)
(let ((current-path (pop (fd-path md))))
(when current-path
(log-message :info "Open ~s" current-path)
(prog1
(setf (md-strm md)
(typecase current-path
(stream current-path)
(t (apply #'open current-path args))))
;;
;; The :inline md spec is a little special, it's a filename and a
;; position where to skip to at opening the file. It allows for
;; easier self-contained tests.
;;
(when (eq :inline (car (md-spec md)))
(file-position (md-strm md) (cdadr (md-spec md)))))))))
(log-message :notice "Opening ~s" (fd-path md)) ; info
(cond ;; inline
((and (listp (fd-path md))
(eq :inline (first (fd-path md))))
(destructuring-bind (filename . position)
(second (fd-path md))
;; open the filename with given extra args
(setf (md-strm md) (apply #'open filename args))
;; and position the stream as expected
(file-position (md-strm md) position)))
(defmethod open-connection ((md md-connection) &key)
"The fd connection supports several specs to open a connection:
- if we have a path, that's a single file to open
- otherwise spec is an CONS wherin
- car is the source type
- cdr is an object suitable for `get-absolute-pathname`"
(setf (fd-path md) (expand-spec md))
;; stdin
((streamp (fd-path md))
(setf (md-strm md) (fd-path md)))
;; other cases should be filenames
(t
(setf (md-strm md) (apply #'open (fd-path md) args)))))
md)
(defmethod close-connection ((md md-connection))
@ -91,8 +84,7 @@
(when (and (slot-boundp md 'strm) (md-strm md))
(close (md-strm md)))
(setf (md-strm md) nil
(fd-path md) nil))
(setf (md-strm md) nil))
(defun get-pathname (dbname table-name &key (fd-path-root *fd-path-root*))
"Return a pathname where to read or write the file data"

View File

@ -18,26 +18,24 @@
Finally returns how many rows where read and processed."
(with-connection (cnx (source copy))
(loop :for input := (open-next-stream cnx
:direction :input
:external-format (encoding copy)
:if-does-not-exist nil)
:while input
:do (progn
;; we handle skipping more than one line here, as cl-copy only knows
;; about skipping the first line
(loop repeat (skip-lines copy) do (read-line input nil nil))
(with-connection (cnx (source copy)
:direction :input
:external-format (encoding copy)
:if-does-not-exist nil)
(let ((input (md-strm cnx)))
;; we handle skipping more than one line here, as cl-copy only knows
;; about skipping the first line
(loop :repeat (skip-lines copy) :do (read-line input nil nil))
;; we might now have to read the fields from the header line
(when (header copy)
(setf (fields copy)
(parse-header copy (read-line input nil nil)))
;; we might now have to read the fields from the header line
(when (header copy)
(setf (fields copy)
(parse-header copy (read-line input nil nil)))
(log-message :debug "Parsed header columns ~s" (fields copy)))
(log-message :debug "Parsed header columns ~s" (fields copy)))
;; read in the text file, split it into columns
(process-rows copy input process-row-fn)))))
;; read in the text file, split it into columns
(process-rows copy input process-row-fn))))
(defmethod preprocess-row ((copy md-copy))
"The file based readers possibly have extra work to do with user defined
@ -54,6 +52,23 @@
(format nil "~s" (car col)))
(columns copy)))
(defmethod clone-copy-for ((copy md-copy) path-spec)
"Create a copy of CSV for loading data from PATH-SPEC."
(make-instance (class-of copy)
;; source-db is expected unbound here, so bypassed
:target-db (clone-connection (target-db copy))
:source (make-instance (class-of (source copy))
:spec (md-spec (source copy))
:type (conn-type (source copy))
:path path-spec)
:target (target copy)
:fields (fields copy)
:columns (columns copy)
:transforms (transforms copy)
:encoding (encoding copy)
:skip-lines (skip-lines copy)
:header (header copy)))
(defmethod copy-database ((copy md-copy)
&key
truncate
@ -61,8 +76,8 @@
drop-indexes
;; generic API, but ignored here
(worker-count 8)
(concurrency 2)
(worker-count 4)
(concurrency 1)
data-only
schema-only
@ -86,11 +101,42 @@
(target copy)
:drop-indexes drop-indexes)
(copy-from copy
:worker-count worker-count
:concurrency concurrency
:truncate truncate
:disable-triggers disable-triggers)
;; ensure we truncate only one
(when truncate
(truncate-tables (clone-connection (target-db copy)) (target copy)))
;; expand the specs of our source, we might have to care about several
;; files actually.
(let* ((lp:*kernel* (make-kernel worker-count))
(channel (lp:make-channel))
(path-list (expand-spec (source copy))))
(loop :for path-spec :in path-list
:do (let ((table-source (clone-copy-for copy path-spec)))
(copy-from table-source
:concurrency concurrency
:kernel lp:*kernel*
:channel channel
:truncate nil
:disable-triggers disable-triggers)))
;; end kernel
(with-stats-collection ("COPY Threads Completion" :section :post
:use-result-as-read t
:use-result-as-rows t)
(let ((worker-count (* (length path-list)
(task-count concurrency))))
(loop :for tasks :below worker-count
:do (destructuring-bind (task table seconds)
(lp:receive-result channel)
(log-message :debug
"Finished processing ~a for ~s ~50T~6$s"
task (format-table-name table) seconds)
(when (eq :writer task)
(update-stats :data table :secs seconds))))
(prog1
worker-count
(lp:end-kernel :wait nil))))
(lp:end-kernel :wait t))
;; re-create the indexes from the target table entry
(create-indexes-again (target-db copy)

View File

@ -180,5 +180,8 @@
;; now wait until both the tasks are over, and kill the kernel
(unless c-s-p
(log-message :debug "waiting for ~d tasks" (task-count concurrency))
(loop :repeat (task-count concurrency)
:do (lp:receive-result channel))
(log-message :info "COPY ~s done." table-name)
(unless k-s-p (lp:end-kernel :wait t)))))))

View File

@ -10,12 +10,7 @@
(setf (slot-value copy 'type) "copy"))
(defclass copy-copy (md-copy)
((encoding :accessor encoding ; file encoding
:initarg :encoding) ;
(skip-lines :accessor skip-lines ; we might want to skip COPY lines
:initarg :skip-lines ;
:initform 0) ;
(delimiter :accessor delimiter ; see COPY options for TEXT
((delimiter :accessor delimiter ; see COPY options for TEXT
:initarg :delimiter ; in PostgreSQL docs
:initform #\Tab)
(null-as :accessor null-as
@ -35,6 +30,18 @@
(unless transforms
(setf (slot-value copy 'transforms) (make-list (length columns))))))
(defmethod clone-copy-for ((copy copy-copy) path-spec)
"Create a copy of FIXED for loading data from PATH-SPEC."
(let ((copy-for-path-spec
(change-class (call-next-method copy path-spec) 'copy-copy)))
(loop :for slot-name :in '(delimiter null-as)
:do (when (slot-boundp copy slot-name)
(setf (slot-value copy-for-path-spec slot-name)
(slot-value copy slot-name))))
;; return the new instance!
copy-for-path-spec))
(declaim (inline parse-row))
(defun parse-row (line &key (delimiter #\Tab) (null-as "\\N"))

View File

@ -48,6 +48,24 @@
(unless transforms
(setf (slot-value csv 'transforms) (make-list (length columns))))))
(defmethod clone-copy-for ((csv copy-csv) path-spec)
"Create a copy of CSV for loading data from PATH-SPEC."
(let ((csv-for-path-spec
(change-class (call-next-method csv path-spec) 'copy-csv)))
(loop :for slot-name :in '(source-type
separator
newline
quote
escape
escape-mode
trim-blanks)
:do (when (slot-boundp csv slot-name)
(setf (slot-value csv-for-path-spec slot-name)
(slot-value csv slot-name))))
;; return the new instance!
csv-for-path-spec))
;;;
;;; Read a file format in CSV format, and call given function on each line.
;;;

View File

@ -30,6 +30,10 @@
(unless transforms
(setf (slot-value fixed 'transforms) (make-list (length columns))))))
(defmethod clone-copy-for ((fixed copy-fixed) path-spec)
"Create a copy of FIXED for loading data from PATH-SPEC."
(change-class (call-next-method fixed path-spec) 'copy-fixed))
(declaim (inline parse-row))
(defun parse-row (fixed-cols-specs line)

View File

@ -133,7 +133,8 @@
;;;
;;; Tools for every connection classes
;;;
(defmacro with-connection ((var connection) &body forms)
(defmacro with-connection ((var connection &rest args &key &allow-other-keys)
&body forms)
"Connect to DB-CONNECTION and handle any condition when doing so, and when
connected execute FORMS in a protected way so that we always disconnect
at the end."
@ -147,7 +148,7 @@
#'(lambda (w)
(log-message :warning "~a" w)
(muffle-warning))))
(open-connection ,conn))
(apply #'open-connection ,conn (list ,@args)))
(condition (e)
(cond ((typep ,connection 'fd-connection)
(error 'fd-connection-error

View File

@ -3,9 +3,12 @@ load csv
in directory 'data'
having fields (id, field)
into postgres:///pgloader?matching
with fields optionally enclosed by '"',
fields terminated by ',',
truncate
truncate,
workers = 8,
concurrency = 1
before load do
$$ drop table if exists matching; $$,