Rationalize common generic API implementation.

When devising the common API, the first step has been to implement
specific methods for each generic function of the protocol. It now
appears that in some cases we don't need the extra level of flexibility:
each change of the API has been systematically reported to all the
specific methods, so just use a single generic definition where possible.

In particular, introduce new intermediate class for COPY subclasses
allowing to share more common code in the methods implementation, rather
than having to copy/paste and maintain several versions of the same
code.

It would be good to be able to centralize more code for the database
sources and how they are organized around metadata/import-data/complete
schema, but it doesn't look obvious how to do it just now.
This commit is contained in:
Dimitri Fontaine 2015-10-05 18:10:58 +02:00
parent 0d9c2119b1
commit 41e9eebd54
16 changed files with 321 additions and 524 deletions

View File

@ -126,6 +126,8 @@
((:module "common"
:components
((:file "api")
(:file "methods" :depends-on ("api"))
(:file "md-methods" :depends-on ("api"))
(:file "casting-rules")
(:file "files-and-pathnames")
(:file "project-fields")))

View File

@ -173,55 +173,6 @@
(:export #:push-to-end
#:with-schema))
(defpackage #:pgloader.sources
(:use #:cl
#:pgloader.params #:pgloader.utils #:pgloader.connection
#:pgloader.schema)
(:import-from #:pgloader.transforms
#:precision
#:scale
#:intern-symbol
#:typemod-expr-to-function)
(:import-from #:pgloader.parse-date
#:parse-date-string
#:parse-date-format)
(:export #:copy
#:source-db
#:target-db
#:source
#:target
#:fields
#:columns
#:transforms
#:map-rows
#:copy-from
#:copy-to-queue
#:copy-to
#:copy-database
;; the md-connection facilities
#:md-connection
#:md-spec
#:md-strm
#:expand-spec
#:open-next-stream
;; common schema facilities
#:push-to-end
#:with-schema
;; file based utils for CSV, fixed etc
#:with-open-file-or-stream
#:get-pathname
#:get-absolute-pathname
#:project-fields
#:reformat-then-process
;; database cast machinery
#:*default-cast-rules*
#:*cast-rules*
#:cast))
(defpackage #:pgloader.pgsql
(:use #:cl
#:pgloader.params #:pgloader.utils #:pgloader.connection
@ -268,6 +219,68 @@
#:create-indexes-again
#:reset-sequences))
(defpackage #:pgloader.sources
(:use #:cl
#:pgloader.params #:pgloader.utils #:pgloader.connection
#:pgloader.schema #:pgloader.pgsql)
(:import-from #:pgloader.transforms
#:precision
#:scale
#:intern-symbol
#:typemod-expr-to-function)
(:import-from #:pgloader.parse-date
#:parse-date-string
#:parse-date-format)
(:export #:copy
#:md-copy
;; Accessors
#:source-db
#:target-db
#:source
#:target
#:fields
#:columns
#:transforms
#:encoding
#:skip-lines
#:header
;; Main protocol/API
#:map-rows
#:copy-from
#:copy-to-queue
#:copy-to
#:copy-database
;; md-copy protocol/API
#:parse-header
#:process-rows
;; the md-connection facilities
#:md-connection
#:md-spec
#:md-strm
#:expand-spec
#:open-next-stream
;; common schema facilities
#:push-to-end
#:with-schema
;; file based utils for CSV, fixed etc
#:with-open-file-or-stream
#:get-pathname
#:get-absolute-pathname
#:project-fields
#:reformat-then-process
;; database cast machinery
#:*default-cast-rules*
#:*cast-rules*
#:cast))
(defpackage #:pgloader.queue
(:use #:cl #:pgloader.params #:pgloader.monitor)
(:import-from #:pgloader.pgsql

View File

@ -117,8 +117,8 @@
`(lambda ()
(let* (,@(pgsql-connection-bindings pg-db-conn gucs)
,@(batch-control-bindings options)
(source-db (with-stats-collection ("fetch" :section :pre)
(expand (fetch-file ,copy-conn)))))
(source-db (with-stats-collection ("fetch" :section :pre)
(expand (fetch-file ,copy-conn)))))
(progn
,(sql-code-block pg-db-conn :pre before "before load")
@ -138,10 +138,10 @@
options :extras '(:truncate
:drop-indexes
:disable-triggers)))))
(pgloader.sources:copy-from source
:truncate truncate
:drop-indexes drop-indexes
:disable-triggers disable-triggers))
(pgloader.sources:copy-database source
:truncate truncate
:drop-indexes drop-indexes
:disable-triggers disable-triggers))
,(sql-code-block pg-db-conn :post after "after load")))))

View File

@ -50,7 +50,7 @@
(cons :skip-lines (parse-integer (text digits))))))
(defrule option-csv-header (and kw-csv kw-header)
(:constant (cons :csv-header t)))
(:constant (cons :header t)))
(defrule option-fields-enclosed-by
(and kw-fields (? kw-optionally) kw-enclosed kw-by separator)
@ -419,8 +419,8 @@
`(lambda ()
(let* (,@(pgsql-connection-bindings pg-db-conn gucs)
,@(batch-control-bindings options)
(source-db (with-stats-collection ("fetch" :section :pre)
(expand (fetch-file ,csv-conn)))))
(source-db (with-stats-collection ("fetch" :section :pre)
(expand (fetch-file ,csv-conn)))))
(progn
,(sql-code-block pg-db-conn :pre before "before load")
@ -440,10 +440,10 @@
options :extras '(:truncate
:drop-indexes
:disable-triggers)))))
(pgloader.sources:copy-from source
:truncate truncate
:drop-indexes drop-indexes
:disable-triggers disable-triggers))
(pgloader.sources:copy-database source
:truncate truncate
:drop-indexes drop-indexes
:disable-triggers disable-triggers))
,(sql-code-block pg-db-conn :post after "after load")))))

View File

@ -125,8 +125,8 @@
`(lambda ()
(let* (,@(pgsql-connection-bindings pg-db-conn gucs)
,@(batch-control-bindings options)
(source-db (with-stats-collection ("fetch" :section :pre)
(expand (fetch-file ,fixed-conn)))))
(source-db (with-stats-collection ("fetch" :section :pre)
(expand (fetch-file ,fixed-conn)))))
(progn
,(sql-code-block pg-db-conn :pre before "before load")
@ -144,10 +144,10 @@
:columns ',columns
:skip-lines ,(or (getf options :skip-line) 0))))
(pgloader.sources:copy-from source
:truncate truncate
:drop-indexes drop-indexes
:disable-triggers disable-triggers))
(pgloader.sources:copy-database source
:truncate truncate
:drop-indexes drop-indexes
:disable-triggers disable-triggers))
,(sql-code-block pg-db-conn :post after "after load")))))

View File

@ -41,9 +41,13 @@
(defgeneric copy-to-queue (source queue)
(:documentation
"Load data from SOURCE and queue each row into QUEUE. Typicall
"Load data from SOURCE and queue each row into QUEUE. Typical
implementation will directly use pgloader.queue:map-push-queue."))
(defgeneric copy-column-list (source)
(:documentation
"Return the list of column names for the data sent in the queue."))
(defgeneric copy-from (source &key truncate)
(:documentation
"Load data from SOURCE into its target as defined by the SOURCE object."))
@ -97,3 +101,36 @@
;; (defgeneric fetch-metadata (connection &key)
;; (:documentation "Full discovery of the CONNECTION data source."))
;;;
;;; Class hierarchy allowing to share features among a subcategory of
;;; pgloader sources. Those subcategory are divided in about the same set as
;;; the connection types.
;;;
;;; fd-connection: single file reader, copy
;;; md-connection: multiple file reader, md-copy
;;; db-connection: database connection reader, with introspection, db-copy
;;;
;;; Of those only md-copy objects share a lot in common, so we have another
;;; layer of protocols just for them here, and the shared implementation
;;; lives in md-methods.lisp in this directory.
;;;
(defclass md-copy (copy)
((encoding :accessor encoding ; file encoding
:initarg :encoding) ;
(skip-lines :accessor skip-lines ; skip firt N lines
:initarg :skip-lines ;
:initform 0) ;
(header :accessor header ; CSV headers are col names
:initarg :header ;
:initform nil)) ;
(:documentation "pgloader Multiple Files Data Source (csv, fixed, copy)."))
(defgeneric parse-header (md-copy header)
(:documentation "Parse the file header and return a list of fields."))
(defgeneric process-rows (md-copy stream process-fn)
(:documentation "Process rows from a given input stream."))

View File

@ -0,0 +1,82 @@
;;;
;;; Generic API for pgloader sources
;;; Methods for source types with multiple files input
;;;
(in-package :pgloader.sources)
(defmethod parse-header ((copy md-copy) header)
"Unsupported by default, to be implemented in each md-copy subclass."
(error "Parsing the header of a ~s is not implemented yet." (type-of copy)))
(defmethod map-rows ((copy md-copy) &key process-row-fn)
"Load data from a text file in CSV format, with support for advanced
projecting capabilities. See `project-fields' for details.
Each row is pre-processed then PROCESS-ROW-FN is called with the row as a
list as its only parameter.
Finally returns how many rows where read and processed."
(with-connection (cnx (source copy))
(loop :for input := (open-next-stream cnx
:direction :input
:external-format (encoding copy)
:if-does-not-exist nil)
:while input
:do (progn
;; we handle skipping more than one line here, as cl-copy only knows
;; about skipping the first line
(loop repeat (skip-lines copy) do (read-line input nil nil))
;; we might now have to read the fields from the header line
(when (header copy)
(setf (fields copy)
(parse-header copy (read-line input nil nil)))
(log-message :debug "Parsed header columns ~s" (fields copy)))
;; read in the text file, split it into columns, process NULL
;; columns the way postmodern expects them, and call
;; PROCESS-ROW-FN on them
(let ((reformat-then-process
(reformat-then-process :fields (fields copy)
:columns (columns copy)
:target (target copy)
:process-row-fn process-row-fn)))
(process-rows copy input reformat-then-process))))))
(defmethod copy-column-list ((copy md-copy))
"We did reformat-then-process the column list, so we now send them in the
COPY buffer as found in (columns fixed)."
(mapcar (lambda (col)
;; always double quote column names
(format nil "~s" (car col)))
(columns copy)))
(defmethod copy-database ((copy md-copy)
&key
truncate
disable-triggers
drop-indexes
;; generic API, but ignored here
data-only
schema-only
create-tables
include-drop
create-indexes
reset-sequences)
"Copy the contents of the COPY formated file to PostgreSQL."
(declare (ignore data-only schema-only
create-tables include-drop
create-indexes reset-sequences))
(let ((indexes (maybe-drop-indexes (target-db copy)
(target copy)
:drop-indexes drop-indexes)))
(copy-from copy
:truncate truncate
:disable-triggers disable-triggers)
;; re-create the indexes
(create-indexes-again (target-db copy) indexes :drop-indexes drop-indexes)))

View File

@ -0,0 +1,61 @@
;;;
;;; Generic API for pgloader sources
;;;
(in-package :pgloader.sources)
;;;
;;; Common API implementation
;;;
(defmethod copy-to-queue ((copy copy) queue)
"Copy data from given COPY definition into lparallel.queue QUEUE"
(pgloader.queue:map-push-queue copy queue))
(defmethod copy-column-list ((copy copy))
"Default column list is an empty list."
nil)
(defmethod copy-to ((copy copy) pgsql-copy-filename)
"Extract data from COPY file into a PotgreSQL COPY TEXT formated file"
(with-open-file (text-file pgsql-copy-filename
:direction :output
:if-exists :supersede
:external-format :utf-8)
(let ((row-fn (lambda (row)
(format-vector-row text-file row (transforms copy)))))
(map-rows copy :process-row-fn row-fn))))
(defmethod copy-from ((copy copy)
&key
(kernel nil k-s-p)
truncate
disable-triggers)
"Copy data from COPY source into PostgreSQL."
(let* ((lp:*kernel* (or kernel (make-kernel 2)))
(channel (lp:make-channel))
(queue (lq:make-queue :fixed-capacity *concurrent-batches*))
(table-name (format-table-name (target copy))))
(with-stats-collection ((target copy) :dbname (db-name (target-db copy)))
(lp:task-handler-bind ((error #'lp:invoke-transfer-error))
(log-message :notice "COPY ~s" table-name)
;; start a tast to read data from the source into the queue
(lp:submit-task channel #'copy-to-queue copy queue)
;; and start another task to push that data from the queue into
;; PostgreSQL
(lp:submit-task channel
#'pgloader.pgsql:copy-from-queue
(target-db copy)
(target copy)
queue
:columns (copy-column-list copy)
:truncate truncate
:disable-triggers disable-triggers)
;; now wait until both the tasks are over, and kill the kernel
(loop :for tasks :below 2 :do (lp:receive-result channel)
:finally
(log-message :info "COPY ~s done." table-name)
(unless k-s-p (lp:end-kernel)))))))

View File

@ -9,7 +9,7 @@
"Assign the type slot to sqlite."
(setf (slot-value copy 'type) "copy"))
(defclass copy-copy (copy)
(defclass copy-copy (md-copy)
((encoding :accessor encoding ; file encoding
:initarg :encoding) ;
(skip-lines :accessor skip-lines ; we might want to skip COPY lines
@ -56,85 +56,24 @@
;; see format-row-for-copy for details
(sq:split-sequence delimiter line)))
(defmethod map-rows ((copy copy-copy) &key process-row-fn)
"Load data from a text file in Copy Columns format.
Each row is pre-processed then PROCESS-ROW-FN is called with the row as a
list as its only parameter.
Returns how many rows were read and processed."
(with-connection (cnx (source copy))
(loop :for input := (open-next-stream cnx
:direction :input
:external-format (encoding copy)
:if-does-not-exist nil)
:while input
:do (progn
;; ignore as much as skip-lines lines in the file
(loop repeat (skip-lines copy) do (read-line input nil nil))
;; read in the text file, split it into columns, process NULL
;; columns the way postmodern expects them, and call
;; PROCESS-ROW-FN on them
(let ((reformat-then-process
(reformat-then-process :fields (fields copy)
:columns (columns copy)
:target (target copy)
:process-row-fn process-row-fn)))
(loop
:with fun := reformat-then-process
:for line := (read-line input nil nil)
:counting line :into read
:while line
:do (handler-case
(funcall fun (parse-row line
:delimiter (delimiter copy)
:null-as (null-as copy)))
(condition (e)
(progn
(log-message :error "~a" e)
(update-stats :data (target copy) :errs 1))))))))))
(defmethod process-rows ((copy copy-copy) stream process-fn)
"Process rows from STREAM according to COPY specifications and PROCESS-FN."
(loop
:with fun := process-fn
:for line := (read-line stream nil nil)
:counting line :into read
:while line
:do (handler-case
(funcall fun (parse-row line
:delimiter (delimiter copy)
:null-as (null-as copy)))
(condition (e)
(progn
(log-message :error "~a" e)
(update-stats :data (target copy) :errs 1))))))
(defmethod copy-to-queue ((copy copy-copy) queue)
"Copy data from given COPY definition into lparallel.queue DATAQ"
(pgloader.queue:map-push-queue copy queue 'pre-formatted))
(defmethod copy-from ((copy copy-copy)
&key
truncate
disable-triggers
drop-indexes)
"Copy data from given COPY file definition into its PostgreSQL target table."
(let* ((lp:*kernel* (make-kernel 2))
(channel (lp:make-channel))
(queue (lq:make-queue :fixed-capacity *concurrent-batches*))
(indexes (maybe-drop-indexes (target-db copy)
(target copy)
:drop-indexes drop-indexes)))
(with-stats-collection ((target copy) :dbname (db-name (target-db copy)))
(lp:task-handler-bind ((error #'lp:invoke-transfer-error))
(log-message :notice "COPY ~a" (target copy))
(lp:submit-task channel #'copy-to-queue copy queue)
;; and start another task to push that data from the queue to PostgreSQL
(lp:submit-task channel
;; this function update :rows stats
#'pgloader.pgsql:copy-from-queue
(target-db copy) (target copy) queue
;; we only are interested into the column names here
:columns (mapcar (lambda (col)
;; always double quote column names
(format nil "~s" (car col)))
(columns copy))
:truncate truncate
:disable-triggers disable-triggers)
;; now wait until both the tasks are over
(loop for tasks below 2 do (lp:receive-result channel)
finally (lp:end-kernel))))
;; re-create the indexes
(create-indexes-again (target-db copy) indexes
:drop-indexes drop-indexes)))

View File

@ -13,17 +13,9 @@
;;;
;;; Implementing the pgloader source API
;;;
(defclass copy-csv (copy)
(defclass copy-csv (md-copy)
((source-type :accessor source-type ; one of :inline, :stdin, :regex
:initarg :source-type) ; or :filename
(encoding :accessor encoding ; file encoding
:initarg :encoding) ;
(csv-header :accessor csv-header ; CSV headers are col names
:initarg :csv-header
:initform nil) ;
(skip-lines :accessor skip-lines ; CSV skip firt N lines
:initarg :skip-lines ;
:initform 0) ;
(separator :accessor csv-separator ; CSV separator
:initarg :separator ;
:initform #\Tab) ;
@ -59,7 +51,7 @@
;;;
;;; Read a file format in CSV format, and call given function on each line.
;;;
(defun parse-csv-header (csv header)
(defmethod parse-header ((csv copy-csv) header)
"Parse the header line given csv setup."
;; a field entry is a list of field name and options
(mapcar #'list
@ -73,99 +65,25 @@
:trim-outer-whitespace (csv-trim-blanks csv)
:newline (csv-newline csv)))))
(defmethod map-rows ((csv copy-csv) &key process-row-fn)
"Load data from a text file in CSV format, with support for advanced
projecting capabilities. See `project-fields' for details.
(defmethod process-rows ((csv copy-csv) stream process-fn)
"Process rows from STREAM according to COPY specifications and PROCESS-FN."
(handler-case
(handler-bind ((cl-csv:csv-parse-error
#'(lambda (c)
(log-message :error "~a" c)
(cl-csv::continue))))
(cl-csv:read-csv stream
:row-fn process-fn
:separator (csv-separator csv)
:quote (csv-quote csv)
:escape (csv-escape csv)
:escape-mode (csv-escape-mode csv)
:unquoted-empty-string-is-nil t
:quoted-empty-string-is-nil nil
:trim-outer-whitespace (csv-trim-blanks csv)
:newline (csv-newline csv)))
(condition (e)
(progn
(log-message :error "~a" e)
(update-stats :data (target csv) :errs 1)))))
Each row is pre-processed then PROCESS-ROW-FN is called with the row as a
list as its only parameter.
Finally returns how many rows where read and processed."
(with-connection (cnx (source csv))
(loop :for input := (open-next-stream cnx
:direction :input
:external-format (encoding csv)
:if-does-not-exist nil)
:while input
:do (progn
;; we handle skipping more than one line here, as cl-csv only knows
;; about skipping the first line
(loop repeat (skip-lines csv) do (read-line input nil nil))
;; we might now have to read the CSV fields from the header line
(when (csv-header csv)
(setf (fields csv)
(parse-csv-header csv (read-line input nil nil)))
(log-message :debug "Parsed header columns ~s" (fields csv)))
;; read in the text file, split it into columns, process NULL
;; columns the way postmodern expects them, and call
;; PROCESS-ROW-FN on them
(let ((reformat-then-process
(reformat-then-process :fields (fields csv)
:columns (columns csv)
:target (target csv)
:process-row-fn process-row-fn)))
(handler-case
(handler-bind ((cl-csv:csv-parse-error
#'(lambda (c)
(log-message :error "~a" c)
(cl-csv::continue))))
(cl-csv:read-csv input
:row-fn (compile nil reformat-then-process)
:separator (csv-separator csv)
:quote (csv-quote csv)
:escape (csv-escape csv)
:escape-mode (csv-escape-mode csv)
:unquoted-empty-string-is-nil t
:quoted-empty-string-is-nil nil
:trim-outer-whitespace (csv-trim-blanks csv)
:newline (csv-newline csv)))
(condition (e)
(progn
(log-message :error "~a" e)
(update-stats :data (target csv) :errs 1)))))))))
(defmethod copy-to-queue ((csv copy-csv) queue)
"Copy data from given CSV definition into lparallel.queue DATAQ"
(map-push-queue csv queue))
(defmethod copy-from ((csv copy-csv)
&key
truncate
disable-triggers
drop-indexes)
"Copy data from given CSV file definition into its PostgreSQL target table."
(let* ((lp:*kernel* (make-kernel 2))
(channel (lp:make-channel))
(queue (lq:make-queue :fixed-capacity *concurrent-batches*))
(indexes (maybe-drop-indexes (target-db csv)
(target csv)
:drop-indexes drop-indexes)))
(with-stats-collection ((target csv) :dbname (db-name (target-db csv)))
(lp:task-handler-bind () ;; ((error #'lp:invoke-transfer-error))
(log-message :notice "COPY ~a" (target csv))
(lp:submit-task channel #'copy-to-queue csv queue)
;; and start another task to push that data from the queue to PostgreSQL
(lp:submit-task channel
;; this function update :rows stats
#'pgloader.pgsql:copy-from-queue
(target-db csv) (target csv) queue
;; we only are interested into the column names here
:columns (mapcar (lambda (col)
;; always double quote column names
(format nil "~s" (car col)))
(columns csv))
:truncate truncate
:disable-triggers disable-triggers)
;; now wait until both the tasks are over
(loop for tasks below 2 do (lp:receive-result channel)
finally (lp:end-kernel))))
;; re-create the indexes
(create-indexes-again (target-db csv) indexes :drop-indexes drop-indexes)))

View File

@ -45,46 +45,6 @@
:do (funcall process-row-fn row-array)
:finally (return count)))))
(defmethod copy-to ((db3 copy-db3) pgsql-copy-filename)
"Extract data from DB3 file into a PotgreSQL COPY TEXT formated file"
(with-open-file (text-file pgsql-copy-filename
:direction :output
:if-exists :supersede
:external-format :utf-8)
(let ((transforms (list-transforms (source db3))))
(map-rows db3
:process-row-fn
(lambda (row)
(format-vector-row text-file row transforms))))))
(defmethod copy-to-queue ((db3 copy-db3) queue)
"Copy data from DB3 file FILENAME into queue DATAQ"
(pgloader.queue:map-push-queue db3 queue))
(defmethod copy-from ((db3 copy-db3)
&key (kernel nil k-s-p) truncate disable-triggers)
(let* ((lp:*kernel* (or kernel (make-kernel 2)))
(channel (lp:make-channel))
(queue (lq:make-queue :fixed-capacity *concurrent-batches*)))
(with-stats-collection ((target db3) :dbname (db-name (target-db db3)))
(lp:task-handler-bind ((error #'lp:invoke-transfer-error))
(log-message :notice "COPY \"~a\" from '~a'" (target db3) (source db3))
(lp:submit-task channel #'copy-to-queue db3 queue)
;; and start another task to push that data from the queue to PostgreSQL
(lp:submit-task channel
#'pgloader.pgsql:copy-from-queue
(target-db db3) (target db3) queue
:truncate truncate
:disable-triggers disable-triggers)
;; now wait until both the tasks are over, and kill the kernel
(loop for tasks below 2 do (lp:receive-result channel)
finally
(log-message :info "COPY \"~a\" done." (target db3))
(unless k-s-p (lp:end-kernel)))))))
(defmethod copy-database ((db3 copy-db3)
&key
table-name

View File

@ -10,7 +10,7 @@
"Assign the type slot to sqlite."
(setf (slot-value fixed 'type) "fixed"))
(defclass copy-fixed (copy)
(defclass copy-fixed (md-copy)
((encoding :accessor encoding ; file encoding
:initarg :encoding) ;
(skip-lines :accessor skip-lines ; CSV headers
@ -44,82 +44,17 @@
(when (<= start len)
(subseq line start (min len end)))))))
(defmethod map-rows ((fixed copy-fixed) &key process-row-fn)
"Load data from a text file in Fixed Columns format.
Each row is pre-processed then PROCESS-ROW-FN is called with the row as a
list as its only parameter.
Returns how many rows where read and processed."
(with-connection (cnx (source fixed))
(loop :for input := (open-next-stream cnx
:direction :input
:external-format (encoding fixed)
:if-does-not-exist nil)
:while input
:do (progn ;; ignore as much as skip-lines lines in the file
(loop repeat (skip-lines fixed) do (read-line input nil nil))
;; read in the text file, split it into columns, process NULL
;; columns the way postmodern expects them, and call
;; PROCESS-ROW-FN on them
(let ((reformat-then-process
(reformat-then-process :fields (fields fixed)
:columns (columns fixed)
:target (target fixed)
:process-row-fn process-row-fn)))
(loop
:with fun := (compile nil reformat-then-process)
:with fixed-cols-specs := (mapcar #'cdr (fields fixed))
:for line := (read-line input nil nil)
:counting line :into read
:while line
:do (handler-case
(funcall fun (parse-row fixed-cols-specs line))
(condition (e)
(progn
(log-message :error "~a" e)
(update-stats :data (target fixed) :errs 1))))))))))
(defmethod copy-to-queue ((fixed copy-fixed) queue)
"Copy data from given FIXED definition into lparallel.queue DATAQ"
(pgloader.queue:map-push-queue fixed queue))
(defmethod copy-from ((fixed copy-fixed)
&key
truncate
disable-triggers
drop-indexes)
"Copy data from given FIXED file definition into its PostgreSQL target table."
(let* ((lp:*kernel* (make-kernel 2))
(channel (lp:make-channel))
(queue (lq:make-queue :fixed-capacity *concurrent-batches*))
(indexes (maybe-drop-indexes (target-db fixed)
(target fixed)
:drop-indexes drop-indexes)))
(with-stats-collection ((target fixed) :dbname (db-name (target-db fixed)))
(lp:task-handler-bind () ;; ((error #'lp:invoke-transfer-error))
(log-message :notice "COPY ~a" (target fixed))
(lp:submit-task channel #'copy-to-queue fixed queue)
;; and start another task to push that data from the queue to PostgreSQL
(lp:submit-task channel
;; this function update :rows stats
#'pgloader.pgsql:copy-from-queue
(target-db fixed) (target fixed) queue
;; we only are interested into the column names here
:columns (mapcar (lambda (col)
;; always double quote column names
(format nil "~s" (car col)))
(columns fixed))
:truncate truncate
:disable-triggers disable-triggers)
;; now wait until both the tasks are over
(loop for tasks below 2 do (lp:receive-result channel)
finally (lp:end-kernel))))
;; re-create the indexes
(create-indexes-again (target-db fixed) indexes :drop-indexes drop-indexes)))
(defmethod process-rows ((fixed copy-fixed) stream process-fn)
"Process rows from STREAM according to COPY specifications and PROCESS-FN."
(loop
:with fun := process-fn
:with fixed-cols-specs := (mapcar #'cdr (fields fixed))
:for line := (read-line stream nil nil)
:counting line :into read
:while line
:do (handler-case
(funcall fun (parse-row fixed-cols-specs line))
(condition (e)
(progn
(log-message :error "~a" e)
(update-stats :data (target fixed) :errs 1))))))

View File

@ -73,34 +73,6 @@
(ixf:read-headers ixf)
(ixf:map-data ixf process-row-fn)))))
(defmethod copy-to-queue ((ixf copy-ixf) queue)
"Copy data from IXF file FILENAME into queue DATAQ"
(pgloader.queue:map-push-queue ixf queue))
(defmethod copy-from ((ixf copy-ixf)
&key (kernel nil k-s-p) truncate disable-triggers)
(let* ((lp:*kernel* (or kernel (make-kernel 2)))
(channel (lp:make-channel))
(queue (lq:make-queue :fixed-capacity *concurrent-batches*)))
(with-stats-collection ((target ixf) :dbname (db-name (target-db ixf)))
(lp:task-handler-bind ((error #'lp:invoke-transfer-error))
(log-message :notice "COPY \"~a\" from '~a'" (target ixf) (source ixf))
(lp:submit-task channel #'copy-to-queue ixf queue)
;; and start another task to push that data from the queue to PostgreSQL
(lp:submit-task channel
#'pgloader.pgsql:copy-from-queue
(target-db ixf) (target ixf) queue
:truncate truncate
:disable-triggers disable-triggers)
;; now wait until both the tasks are over, and kill the kernel
(loop for tasks below 2 do (lp:receive-result channel)
finally
(log-message :info "COPY \"~a\" done." (target ixf))
(unless k-s-p (lp:end-kernel)))))))
(defmethod copy-database ((ixf copy-ixf)
&key
table-name

View File

@ -51,41 +51,6 @@
(log-message :error "~a" e)
(update-stats :data (target mssql) :errs 1)))))))
(defmethod copy-to-queue ((mssql copy-mssql) queue)
"Copy data from MSSQL table DBNAME.TABLE-NAME into queue DATAQ"
(map-push-queue mssql queue))
(defmethod copy-from ((mssql copy-mssql)
&key (kernel nil k-s-p) truncate disable-triggers)
"Connect in parallel to MSSQL and PostgreSQL and stream the data."
(let* ((lp:*kernel* (or kernel (make-kernel 2)))
(channel (lp:make-channel))
(queue (lq:make-queue :fixed-capacity *concurrent-batches*))
(table-name (target mssql)))
;; we account stats against the target table-name, because that's all we
;; know on the PostgreSQL thread
(with-stats-collection (table-name :dbname (db-name (target-db mssql)))
(lp:task-handler-bind ((error #'lp:invoke-transfer-error))
(log-message :notice "COPY ~a" table-name)
;; read data from Mssql
(lp:submit-task channel #'copy-to-queue mssql queue)
;; and start another task to push that data from the queue to PostgreSQL
(lp:submit-task channel #'pgloader.pgsql:copy-from-queue
(target-db mssql) (target mssql) queue
:truncate truncate
:disable-triggers disable-triggers)
;; now wait until both the tasks are over
(loop for tasks below 2 do (lp:receive-result channel)
finally
(log-message :info "COPY ~a done." table-name)
(unless k-s-p (lp:end-kernel)))))
;; return the copy-mssql object we just did the COPY for
mssql))
(defun complete-pgsql-database (pgconn all-columns all-fkeys pkeys
&key
data-only

View File

@ -71,67 +71,11 @@
(invoke-restart 'qmynd-impl::use-nil))))
(mysql-query sql :row-fn process-row-fn :result-type 'vector))))))
;;;
;;; Use map-rows and pgsql-text-copy-format to fill in a CSV file on disk
;;; with MySQL data in there.
;;;
(defmethod copy-to ((mysql copy-mysql) filename)
"Extract data from MySQL in PostgreSQL COPY TEXT format"
(with-open-file (text-file filename
:direction :output
:if-exists :supersede
:external-format :utf-8)
(map-rows mysql
:process-row-fn
(lambda (row)
(format-vector-row text-file row (transforms mysql))))))
;;;
;;; Export MySQL data to our lparallel data queue. All the work is done in
;;; other basic layers, simple enough function.
;;;
(defmethod copy-to-queue ((mysql copy-mysql) queue)
"Copy data from MySQL table DBNAME.TABLE-NAME into queue DATAQ"
(map-push-queue mysql queue))
;;;
;;; Direct "stream" in between mysql fetching of results and PostgreSQL COPY
;;; protocol
;;;
(defmethod copy-from ((mysql copy-mysql)
&key (kernel nil k-s-p) truncate disable-triggers)
"Connect in parallel to MySQL and PostgreSQL and stream the data."
(let* ((lp:*kernel* (or kernel (make-kernel 2)))
(channel (lp:make-channel))
(queue (lq:make-queue :fixed-capacity *concurrent-batches*))
(table-name (target mysql)))
;; we account stats against the target table-name, because that's all we
;; know on the PostgreSQL thread
(with-stats-collection (table-name :dbname (db-name (target-db mysql)))
(lp:task-handler-bind ((error #'lp:invoke-transfer-error))
(log-message :notice "COPY ~a" table-name)
;; read data from MySQL
(lp:submit-task channel #'copy-to-queue mysql queue)
;; and start another task to push that data from the queue to PostgreSQL
(lp:submit-task channel #'pgloader.pgsql:copy-from-queue
(target-db mysql) (target mysql) queue
:columns (mapcar #'apply-identifier-case
(mapcar #'mysql-column-name
(fields mysql)))
:truncate truncate
:disable-triggers disable-triggers)
;; now wait until both the tasks are over
(loop for tasks below 2 do (lp:receive-result channel)
finally
(log-message :info "COPY ~a done." table-name)
(unless k-s-p (lp:end-kernel)))))
;; return the copy-mysql object we just did the COPY for
mysql))
(defmethod copy-column-list ((mysql copy-mysql))
"We are sending the data in the MySQL columns ordering here."
(mapcar #'apply-identifier-case (mapcar #'mysql-column-name (fields mysql))))
;;;

View File

@ -113,37 +113,6 @@
(log-message :error "~a" e)
(update-stats :data (target sqlite) :errs 1)))))))
(defmethod copy-to-queue ((sqlite copy-sqlite) queue)
"Copy data from SQLite table TABLE-NAME within connection DB into queue DATAQ"
(map-push-queue sqlite queue))
(defmethod copy-from ((sqlite copy-sqlite)
&key (kernel nil k-s-p) truncate disable-triggers)
"Stream the contents from a SQLite database table down to PostgreSQL."
(let* ((lp:*kernel* (or kernel (make-kernel 2)))
(channel (lp:make-channel))
(queue (lq:make-queue :fixed-capacity *concurrent-batches*)))
(with-stats-collection ((target sqlite) :dbname (db-name (target-db sqlite)))
(lp:task-handler-bind ((error #'lp:invoke-transfer-error))
(log-message :notice "COPY ~a" (target sqlite))
;; read data from SQLite
(lp:submit-task channel #'copy-to-queue sqlite queue)
;; and start another task to push that data from the queue to PostgreSQL
(lp:submit-task channel
#'pgloader.pgsql:copy-from-queue
(target-db sqlite) (target sqlite) queue
:truncate truncate
:disable-triggers disable-triggers)
;; now wait until both the tasks are over
(loop for tasks below 2 do (lp:receive-result channel)
finally
(log-message :info "COPY ~a done." (target sqlite))
(unless k-s-p (lp:end-kernel)))))))
(defun fetch-sqlite-metadata (sqlite &key including excluding)
"SQLite introspection to prepare the migration."
(let (all-columns all-indexes)