diff --git a/pgloader.asd b/pgloader.asd index 537ffea..6d1e9c8 100644 --- a/pgloader.asd +++ b/pgloader.asd @@ -126,6 +126,8 @@ ((:module "common" :components ((:file "api") + (:file "methods" :depends-on ("api")) + (:file "md-methods" :depends-on ("api")) (:file "casting-rules") (:file "files-and-pathnames") (:file "project-fields"))) diff --git a/src/package.lisp b/src/package.lisp index 0fcdffa..e1837a1 100644 --- a/src/package.lisp +++ b/src/package.lisp @@ -173,55 +173,6 @@ (:export #:push-to-end #:with-schema)) -(defpackage #:pgloader.sources - (:use #:cl - #:pgloader.params #:pgloader.utils #:pgloader.connection - #:pgloader.schema) - (:import-from #:pgloader.transforms - #:precision - #:scale - #:intern-symbol - #:typemod-expr-to-function) - (:import-from #:pgloader.parse-date - #:parse-date-string - #:parse-date-format) - (:export #:copy - #:source-db - #:target-db - #:source - #:target - #:fields - #:columns - #:transforms - #:map-rows - #:copy-from - #:copy-to-queue - #:copy-to - #:copy-database - - ;; the md-connection facilities - #:md-connection - #:md-spec - #:md-strm - #:expand-spec - #:open-next-stream - - ;; common schema facilities - #:push-to-end - #:with-schema - - ;; file based utils for CSV, fixed etc - #:with-open-file-or-stream - #:get-pathname - #:get-absolute-pathname - #:project-fields - #:reformat-then-process - - ;; database cast machinery - #:*default-cast-rules* - #:*cast-rules* - #:cast)) - (defpackage #:pgloader.pgsql (:use #:cl #:pgloader.params #:pgloader.utils #:pgloader.connection @@ -268,6 +219,68 @@ #:create-indexes-again #:reset-sequences)) +(defpackage #:pgloader.sources + (:use #:cl + #:pgloader.params #:pgloader.utils #:pgloader.connection + #:pgloader.schema #:pgloader.pgsql) + (:import-from #:pgloader.transforms + #:precision + #:scale + #:intern-symbol + #:typemod-expr-to-function) + (:import-from #:pgloader.parse-date + #:parse-date-string + #:parse-date-format) + (:export #:copy + #:md-copy + + ;; Accessors + #:source-db + #:target-db + #:source + #:target + #:fields + #:columns + #:transforms + #:encoding + #:skip-lines + #:header + + ;; Main protocol/API + #:map-rows + #:copy-from + #:copy-to-queue + #:copy-to + #:copy-database + + ;; md-copy protocol/API + #:parse-header + #:process-rows + + ;; the md-connection facilities + #:md-connection + #:md-spec + #:md-strm + #:expand-spec + #:open-next-stream + + ;; common schema facilities + #:push-to-end + #:with-schema + + ;; file based utils for CSV, fixed etc + #:with-open-file-or-stream + #:get-pathname + #:get-absolute-pathname + #:project-fields + #:reformat-then-process + + ;; database cast machinery + #:*default-cast-rules* + #:*cast-rules* + #:cast)) + + (defpackage #:pgloader.queue (:use #:cl #:pgloader.params #:pgloader.monitor) (:import-from #:pgloader.pgsql diff --git a/src/parsers/command-copy.lisp b/src/parsers/command-copy.lisp index ca423d2..af3f373 100644 --- a/src/parsers/command-copy.lisp +++ b/src/parsers/command-copy.lisp @@ -117,8 +117,8 @@ `(lambda () (let* (,@(pgsql-connection-bindings pg-db-conn gucs) ,@(batch-control-bindings options) - (source-db (with-stats-collection ("fetch" :section :pre) - (expand (fetch-file ,copy-conn))))) + (source-db (with-stats-collection ("fetch" :section :pre) + (expand (fetch-file ,copy-conn))))) (progn ,(sql-code-block pg-db-conn :pre before "before load") @@ -138,10 +138,10 @@ options :extras '(:truncate :drop-indexes :disable-triggers))))) - (pgloader.sources:copy-from source - :truncate truncate - :drop-indexes drop-indexes - :disable-triggers disable-triggers)) + (pgloader.sources:copy-database source + :truncate truncate + :drop-indexes drop-indexes + :disable-triggers disable-triggers)) ,(sql-code-block pg-db-conn :post after "after load"))))) diff --git a/src/parsers/command-csv.lisp b/src/parsers/command-csv.lisp index 6ca6fed..9886d00 100644 --- a/src/parsers/command-csv.lisp +++ b/src/parsers/command-csv.lisp @@ -50,7 +50,7 @@ (cons :skip-lines (parse-integer (text digits)))))) (defrule option-csv-header (and kw-csv kw-header) - (:constant (cons :csv-header t))) + (:constant (cons :header t))) (defrule option-fields-enclosed-by (and kw-fields (? kw-optionally) kw-enclosed kw-by separator) @@ -419,8 +419,8 @@ `(lambda () (let* (,@(pgsql-connection-bindings pg-db-conn gucs) ,@(batch-control-bindings options) - (source-db (with-stats-collection ("fetch" :section :pre) - (expand (fetch-file ,csv-conn))))) + (source-db (with-stats-collection ("fetch" :section :pre) + (expand (fetch-file ,csv-conn))))) (progn ,(sql-code-block pg-db-conn :pre before "before load") @@ -440,10 +440,10 @@ options :extras '(:truncate :drop-indexes :disable-triggers))))) - (pgloader.sources:copy-from source - :truncate truncate - :drop-indexes drop-indexes - :disable-triggers disable-triggers)) + (pgloader.sources:copy-database source + :truncate truncate + :drop-indexes drop-indexes + :disable-triggers disable-triggers)) ,(sql-code-block pg-db-conn :post after "after load"))))) diff --git a/src/parsers/command-fixed.lisp b/src/parsers/command-fixed.lisp index af52694..be8cc65 100644 --- a/src/parsers/command-fixed.lisp +++ b/src/parsers/command-fixed.lisp @@ -125,8 +125,8 @@ `(lambda () (let* (,@(pgsql-connection-bindings pg-db-conn gucs) ,@(batch-control-bindings options) - (source-db (with-stats-collection ("fetch" :section :pre) - (expand (fetch-file ,fixed-conn))))) + (source-db (with-stats-collection ("fetch" :section :pre) + (expand (fetch-file ,fixed-conn))))) (progn ,(sql-code-block pg-db-conn :pre before "before load") @@ -144,10 +144,10 @@ :columns ',columns :skip-lines ,(or (getf options :skip-line) 0)))) - (pgloader.sources:copy-from source - :truncate truncate - :drop-indexes drop-indexes - :disable-triggers disable-triggers)) + (pgloader.sources:copy-database source + :truncate truncate + :drop-indexes drop-indexes + :disable-triggers disable-triggers)) ,(sql-code-block pg-db-conn :post after "after load"))))) diff --git a/src/sources/common/api.lisp b/src/sources/common/api.lisp index e314658..9640952 100644 --- a/src/sources/common/api.lisp +++ b/src/sources/common/api.lisp @@ -41,9 +41,13 @@ (defgeneric copy-to-queue (source queue) (:documentation - "Load data from SOURCE and queue each row into QUEUE. Typicall + "Load data from SOURCE and queue each row into QUEUE. Typical implementation will directly use pgloader.queue:map-push-queue.")) +(defgeneric copy-column-list (source) + (:documentation + "Return the list of column names for the data sent in the queue.")) + (defgeneric copy-from (source &key truncate) (:documentation "Load data from SOURCE into its target as defined by the SOURCE object.")) @@ -97,3 +101,36 @@ ;; (defgeneric fetch-metadata (connection &key) ;; (:documentation "Full discovery of the CONNECTION data source.")) + +;;; +;;; Class hierarchy allowing to share features among a subcategory of +;;; pgloader sources. Those subcategory are divided in about the same set as +;;; the connection types. +;;; +;;; fd-connection: single file reader, copy +;;; md-connection: multiple file reader, md-copy +;;; db-connection: database connection reader, with introspection, db-copy +;;; +;;; Of those only md-copy objects share a lot in common, so we have another +;;; layer of protocols just for them here, and the shared implementation +;;; lives in md-methods.lisp in this directory. +;;; + +(defclass md-copy (copy) + ((encoding :accessor encoding ; file encoding + :initarg :encoding) ; + (skip-lines :accessor skip-lines ; skip firt N lines + :initarg :skip-lines ; + :initform 0) ; + (header :accessor header ; CSV headers are col names + :initarg :header ; + :initform nil)) ; + (:documentation "pgloader Multiple Files Data Source (csv, fixed, copy).")) + +(defgeneric parse-header (md-copy header) + (:documentation "Parse the file header and return a list of fields.")) + +(defgeneric process-rows (md-copy stream process-fn) + (:documentation "Process rows from a given input stream.")) + + diff --git a/src/sources/common/md-methods.lisp b/src/sources/common/md-methods.lisp new file mode 100644 index 0000000..aec8974 --- /dev/null +++ b/src/sources/common/md-methods.lisp @@ -0,0 +1,82 @@ +;;; +;;; Generic API for pgloader sources +;;; Methods for source types with multiple files input +;;; + +(in-package :pgloader.sources) + +(defmethod parse-header ((copy md-copy) header) + "Unsupported by default, to be implemented in each md-copy subclass." + (error "Parsing the header of a ~s is not implemented yet." (type-of copy))) + +(defmethod map-rows ((copy md-copy) &key process-row-fn) + "Load data from a text file in CSV format, with support for advanced + projecting capabilities. See `project-fields' for details. + + Each row is pre-processed then PROCESS-ROW-FN is called with the row as a + list as its only parameter. + + Finally returns how many rows where read and processed." + + (with-connection (cnx (source copy)) + (loop :for input := (open-next-stream cnx + :direction :input + :external-format (encoding copy) + :if-does-not-exist nil) + :while input + :do (progn + ;; we handle skipping more than one line here, as cl-copy only knows + ;; about skipping the first line + (loop repeat (skip-lines copy) do (read-line input nil nil)) + + ;; we might now have to read the fields from the header line + (when (header copy) + (setf (fields copy) + (parse-header copy (read-line input nil nil))) + + (log-message :debug "Parsed header columns ~s" (fields copy))) + + ;; read in the text file, split it into columns, process NULL + ;; columns the way postmodern expects them, and call + ;; PROCESS-ROW-FN on them + (let ((reformat-then-process + (reformat-then-process :fields (fields copy) + :columns (columns copy) + :target (target copy) + :process-row-fn process-row-fn))) + (process-rows copy input reformat-then-process)))))) + +(defmethod copy-column-list ((copy md-copy)) + "We did reformat-then-process the column list, so we now send them in the + COPY buffer as found in (columns fixed)." + (mapcar (lambda (col) + ;; always double quote column names + (format nil "~s" (car col))) + (columns copy))) + +(defmethod copy-database ((copy md-copy) + &key + truncate + disable-triggers + drop-indexes + + ;; generic API, but ignored here + data-only + schema-only + create-tables + include-drop + create-indexes + reset-sequences) + "Copy the contents of the COPY formated file to PostgreSQL." + (declare (ignore data-only schema-only + create-tables include-drop + create-indexes reset-sequences)) + (let ((indexes (maybe-drop-indexes (target-db copy) + (target copy) + :drop-indexes drop-indexes))) + (copy-from copy + :truncate truncate + :disable-triggers disable-triggers) + + ;; re-create the indexes + (create-indexes-again (target-db copy) indexes :drop-indexes drop-indexes))) diff --git a/src/sources/common/methods.lisp b/src/sources/common/methods.lisp new file mode 100644 index 0000000..ed0929e --- /dev/null +++ b/src/sources/common/methods.lisp @@ -0,0 +1,61 @@ +;;; +;;; Generic API for pgloader sources +;;; +(in-package :pgloader.sources) + +;;; +;;; Common API implementation +;;; +(defmethod copy-to-queue ((copy copy) queue) + "Copy data from given COPY definition into lparallel.queue QUEUE" + (pgloader.queue:map-push-queue copy queue)) + + +(defmethod copy-column-list ((copy copy)) + "Default column list is an empty list." + nil) + +(defmethod copy-to ((copy copy) pgsql-copy-filename) + "Extract data from COPY file into a PotgreSQL COPY TEXT formated file" + (with-open-file (text-file pgsql-copy-filename + :direction :output + :if-exists :supersede + :external-format :utf-8) + (let ((row-fn (lambda (row) + (format-vector-row text-file row (transforms copy))))) + (map-rows copy :process-row-fn row-fn)))) + +(defmethod copy-from ((copy copy) + &key + (kernel nil k-s-p) + truncate + disable-triggers) + "Copy data from COPY source into PostgreSQL." + (let* ((lp:*kernel* (or kernel (make-kernel 2))) + (channel (lp:make-channel)) + (queue (lq:make-queue :fixed-capacity *concurrent-batches*)) + (table-name (format-table-name (target copy)))) + + (with-stats-collection ((target copy) :dbname (db-name (target-db copy))) + (lp:task-handler-bind ((error #'lp:invoke-transfer-error)) + (log-message :notice "COPY ~s" table-name) + + ;; start a tast to read data from the source into the queue + (lp:submit-task channel #'copy-to-queue copy queue) + + ;; and start another task to push that data from the queue into + ;; PostgreSQL + (lp:submit-task channel + #'pgloader.pgsql:copy-from-queue + (target-db copy) + (target copy) + queue + :columns (copy-column-list copy) + :truncate truncate + :disable-triggers disable-triggers) + + ;; now wait until both the tasks are over, and kill the kernel + (loop :for tasks :below 2 :do (lp:receive-result channel) + :finally + (log-message :info "COPY ~s done." table-name) + (unless k-s-p (lp:end-kernel))))))) diff --git a/src/sources/copy.lisp b/src/sources/copy.lisp index 8a9d229..836e52c 100644 --- a/src/sources/copy.lisp +++ b/src/sources/copy.lisp @@ -9,7 +9,7 @@ "Assign the type slot to sqlite." (setf (slot-value copy 'type) "copy")) -(defclass copy-copy (copy) +(defclass copy-copy (md-copy) ((encoding :accessor encoding ; file encoding :initarg :encoding) ; (skip-lines :accessor skip-lines ; we might want to skip COPY lines @@ -56,85 +56,24 @@ ;; see format-row-for-copy for details (sq:split-sequence delimiter line))) -(defmethod map-rows ((copy copy-copy) &key process-row-fn) - "Load data from a text file in Copy Columns format. - - Each row is pre-processed then PROCESS-ROW-FN is called with the row as a - list as its only parameter. - - Returns how many rows were read and processed." - (with-connection (cnx (source copy)) - (loop :for input := (open-next-stream cnx - :direction :input - :external-format (encoding copy) - :if-does-not-exist nil) - :while input - :do (progn - ;; ignore as much as skip-lines lines in the file - (loop repeat (skip-lines copy) do (read-line input nil nil)) - - ;; read in the text file, split it into columns, process NULL - ;; columns the way postmodern expects them, and call - ;; PROCESS-ROW-FN on them - (let ((reformat-then-process - (reformat-then-process :fields (fields copy) - :columns (columns copy) - :target (target copy) - :process-row-fn process-row-fn))) - (loop - :with fun := reformat-then-process - :for line := (read-line input nil nil) - :counting line :into read - :while line - :do (handler-case - (funcall fun (parse-row line - :delimiter (delimiter copy) - :null-as (null-as copy))) - (condition (e) - (progn - (log-message :error "~a" e) - (update-stats :data (target copy) :errs 1)))))))))) +(defmethod process-rows ((copy copy-copy) stream process-fn) + "Process rows from STREAM according to COPY specifications and PROCESS-FN." + (loop + :with fun := process-fn + :for line := (read-line stream nil nil) + :counting line :into read + :while line + :do (handler-case + (funcall fun (parse-row line + :delimiter (delimiter copy) + :null-as (null-as copy))) + (condition (e) + (progn + (log-message :error "~a" e) + (update-stats :data (target copy) :errs 1)))))) (defmethod copy-to-queue ((copy copy-copy) queue) "Copy data from given COPY definition into lparallel.queue DATAQ" (pgloader.queue:map-push-queue copy queue 'pre-formatted)) -(defmethod copy-from ((copy copy-copy) - &key - truncate - disable-triggers - drop-indexes) - "Copy data from given COPY file definition into its PostgreSQL target table." - (let* ((lp:*kernel* (make-kernel 2)) - (channel (lp:make-channel)) - (queue (lq:make-queue :fixed-capacity *concurrent-batches*)) - (indexes (maybe-drop-indexes (target-db copy) - (target copy) - :drop-indexes drop-indexes))) - - (with-stats-collection ((target copy) :dbname (db-name (target-db copy))) - (lp:task-handler-bind ((error #'lp:invoke-transfer-error)) - (log-message :notice "COPY ~a" (target copy)) - (lp:submit-task channel #'copy-to-queue copy queue) - - ;; and start another task to push that data from the queue to PostgreSQL - (lp:submit-task channel - ;; this function update :rows stats - #'pgloader.pgsql:copy-from-queue - (target-db copy) (target copy) queue - ;; we only are interested into the column names here - :columns (mapcar (lambda (col) - ;; always double quote column names - (format nil "~s" (car col))) - (columns copy)) - :truncate truncate - :disable-triggers disable-triggers) - - ;; now wait until both the tasks are over - (loop for tasks below 2 do (lp:receive-result channel) - finally (lp:end-kernel)))) - - ;; re-create the indexes - (create-indexes-again (target-db copy) indexes - :drop-indexes drop-indexes))) diff --git a/src/sources/csv/csv.lisp b/src/sources/csv/csv.lisp index 6d20660..a2345c8 100644 --- a/src/sources/csv/csv.lisp +++ b/src/sources/csv/csv.lisp @@ -13,17 +13,9 @@ ;;; ;;; Implementing the pgloader source API ;;; -(defclass copy-csv (copy) +(defclass copy-csv (md-copy) ((source-type :accessor source-type ; one of :inline, :stdin, :regex :initarg :source-type) ; or :filename - (encoding :accessor encoding ; file encoding - :initarg :encoding) ; - (csv-header :accessor csv-header ; CSV headers are col names - :initarg :csv-header - :initform nil) ; - (skip-lines :accessor skip-lines ; CSV skip firt N lines - :initarg :skip-lines ; - :initform 0) ; (separator :accessor csv-separator ; CSV separator :initarg :separator ; :initform #\Tab) ; @@ -59,7 +51,7 @@ ;;; ;;; Read a file format in CSV format, and call given function on each line. ;;; -(defun parse-csv-header (csv header) +(defmethod parse-header ((csv copy-csv) header) "Parse the header line given csv setup." ;; a field entry is a list of field name and options (mapcar #'list @@ -73,99 +65,25 @@ :trim-outer-whitespace (csv-trim-blanks csv) :newline (csv-newline csv))))) -(defmethod map-rows ((csv copy-csv) &key process-row-fn) - "Load data from a text file in CSV format, with support for advanced - projecting capabilities. See `project-fields' for details. +(defmethod process-rows ((csv copy-csv) stream process-fn) + "Process rows from STREAM according to COPY specifications and PROCESS-FN." + (handler-case + (handler-bind ((cl-csv:csv-parse-error + #'(lambda (c) + (log-message :error "~a" c) + (cl-csv::continue)))) + (cl-csv:read-csv stream + :row-fn process-fn + :separator (csv-separator csv) + :quote (csv-quote csv) + :escape (csv-escape csv) + :escape-mode (csv-escape-mode csv) + :unquoted-empty-string-is-nil t + :quoted-empty-string-is-nil nil + :trim-outer-whitespace (csv-trim-blanks csv) + :newline (csv-newline csv))) + (condition (e) + (progn + (log-message :error "~a" e) + (update-stats :data (target csv) :errs 1))))) - Each row is pre-processed then PROCESS-ROW-FN is called with the row as a - list as its only parameter. - - Finally returns how many rows where read and processed." - - (with-connection (cnx (source csv)) - (loop :for input := (open-next-stream cnx - :direction :input - :external-format (encoding csv) - :if-does-not-exist nil) - :while input - :do (progn - ;; we handle skipping more than one line here, as cl-csv only knows - ;; about skipping the first line - (loop repeat (skip-lines csv) do (read-line input nil nil)) - - ;; we might now have to read the CSV fields from the header line - (when (csv-header csv) - (setf (fields csv) - (parse-csv-header csv (read-line input nil nil))) - - (log-message :debug "Parsed header columns ~s" (fields csv))) - - ;; read in the text file, split it into columns, process NULL - ;; columns the way postmodern expects them, and call - ;; PROCESS-ROW-FN on them - (let ((reformat-then-process - (reformat-then-process :fields (fields csv) - :columns (columns csv) - :target (target csv) - :process-row-fn process-row-fn))) - (handler-case - (handler-bind ((cl-csv:csv-parse-error - #'(lambda (c) - (log-message :error "~a" c) - (cl-csv::continue)))) - (cl-csv:read-csv input - :row-fn (compile nil reformat-then-process) - :separator (csv-separator csv) - :quote (csv-quote csv) - :escape (csv-escape csv) - :escape-mode (csv-escape-mode csv) - :unquoted-empty-string-is-nil t - :quoted-empty-string-is-nil nil - :trim-outer-whitespace (csv-trim-blanks csv) - :newline (csv-newline csv))) - (condition (e) - (progn - (log-message :error "~a" e) - (update-stats :data (target csv) :errs 1))))))))) - -(defmethod copy-to-queue ((csv copy-csv) queue) - "Copy data from given CSV definition into lparallel.queue DATAQ" - (map-push-queue csv queue)) - -(defmethod copy-from ((csv copy-csv) - &key - truncate - disable-triggers - drop-indexes) - "Copy data from given CSV file definition into its PostgreSQL target table." - (let* ((lp:*kernel* (make-kernel 2)) - (channel (lp:make-channel)) - (queue (lq:make-queue :fixed-capacity *concurrent-batches*)) - (indexes (maybe-drop-indexes (target-db csv) - (target csv) - :drop-indexes drop-indexes))) - - (with-stats-collection ((target csv) :dbname (db-name (target-db csv))) - (lp:task-handler-bind () ;; ((error #'lp:invoke-transfer-error)) - (log-message :notice "COPY ~a" (target csv)) - (lp:submit-task channel #'copy-to-queue csv queue) - - ;; and start another task to push that data from the queue to PostgreSQL - (lp:submit-task channel - ;; this function update :rows stats - #'pgloader.pgsql:copy-from-queue - (target-db csv) (target csv) queue - ;; we only are interested into the column names here - :columns (mapcar (lambda (col) - ;; always double quote column names - (format nil "~s" (car col))) - (columns csv)) - :truncate truncate - :disable-triggers disable-triggers) - - ;; now wait until both the tasks are over - (loop for tasks below 2 do (lp:receive-result channel) - finally (lp:end-kernel)))) - - ;; re-create the indexes - (create-indexes-again (target-db csv) indexes :drop-indexes drop-indexes))) diff --git a/src/sources/db3/db3.lisp b/src/sources/db3/db3.lisp index 36d67b6..621c981 100644 --- a/src/sources/db3/db3.lisp +++ b/src/sources/db3/db3.lisp @@ -45,46 +45,6 @@ :do (funcall process-row-fn row-array) :finally (return count))))) -(defmethod copy-to ((db3 copy-db3) pgsql-copy-filename) - "Extract data from DB3 file into a PotgreSQL COPY TEXT formated file" - (with-open-file (text-file pgsql-copy-filename - :direction :output - :if-exists :supersede - :external-format :utf-8) - (let ((transforms (list-transforms (source db3)))) - (map-rows db3 - :process-row-fn - (lambda (row) - (format-vector-row text-file row transforms)))))) - -(defmethod copy-to-queue ((db3 copy-db3) queue) - "Copy data from DB3 file FILENAME into queue DATAQ" - (pgloader.queue:map-push-queue db3 queue)) - -(defmethod copy-from ((db3 copy-db3) - &key (kernel nil k-s-p) truncate disable-triggers) - (let* ((lp:*kernel* (or kernel (make-kernel 2))) - (channel (lp:make-channel)) - (queue (lq:make-queue :fixed-capacity *concurrent-batches*))) - - (with-stats-collection ((target db3) :dbname (db-name (target-db db3))) - (lp:task-handler-bind ((error #'lp:invoke-transfer-error)) - (log-message :notice "COPY \"~a\" from '~a'" (target db3) (source db3)) - (lp:submit-task channel #'copy-to-queue db3 queue) - - ;; and start another task to push that data from the queue to PostgreSQL - (lp:submit-task channel - #'pgloader.pgsql:copy-from-queue - (target-db db3) (target db3) queue - :truncate truncate - :disable-triggers disable-triggers) - - ;; now wait until both the tasks are over, and kill the kernel - (loop for tasks below 2 do (lp:receive-result channel) - finally - (log-message :info "COPY \"~a\" done." (target db3)) - (unless k-s-p (lp:end-kernel))))))) - (defmethod copy-database ((db3 copy-db3) &key table-name diff --git a/src/sources/fixed.lisp b/src/sources/fixed.lisp index 5b116dc..fd7a4d5 100644 --- a/src/sources/fixed.lisp +++ b/src/sources/fixed.lisp @@ -10,7 +10,7 @@ "Assign the type slot to sqlite." (setf (slot-value fixed 'type) "fixed")) -(defclass copy-fixed (copy) +(defclass copy-fixed (md-copy) ((encoding :accessor encoding ; file encoding :initarg :encoding) ; (skip-lines :accessor skip-lines ; CSV headers @@ -44,82 +44,17 @@ (when (<= start len) (subseq line start (min len end))))))) -(defmethod map-rows ((fixed copy-fixed) &key process-row-fn) - "Load data from a text file in Fixed Columns format. - - Each row is pre-processed then PROCESS-ROW-FN is called with the row as a - list as its only parameter. - - Returns how many rows where read and processed." - (with-connection (cnx (source fixed)) - (loop :for input := (open-next-stream cnx - :direction :input - :external-format (encoding fixed) - :if-does-not-exist nil) - :while input - :do (progn ;; ignore as much as skip-lines lines in the file - (loop repeat (skip-lines fixed) do (read-line input nil nil)) - - ;; read in the text file, split it into columns, process NULL - ;; columns the way postmodern expects them, and call - ;; PROCESS-ROW-FN on them - (let ((reformat-then-process - (reformat-then-process :fields (fields fixed) - :columns (columns fixed) - :target (target fixed) - :process-row-fn process-row-fn))) - (loop - :with fun := (compile nil reformat-then-process) - :with fixed-cols-specs := (mapcar #'cdr (fields fixed)) - :for line := (read-line input nil nil) - :counting line :into read - :while line - :do (handler-case - (funcall fun (parse-row fixed-cols-specs line)) - (condition (e) - (progn - (log-message :error "~a" e) - (update-stats :data (target fixed) :errs 1)))))))))) - -(defmethod copy-to-queue ((fixed copy-fixed) queue) - "Copy data from given FIXED definition into lparallel.queue DATAQ" - (pgloader.queue:map-push-queue fixed queue)) - -(defmethod copy-from ((fixed copy-fixed) - &key - truncate - disable-triggers - drop-indexes) - "Copy data from given FIXED file definition into its PostgreSQL target table." - (let* ((lp:*kernel* (make-kernel 2)) - (channel (lp:make-channel)) - (queue (lq:make-queue :fixed-capacity *concurrent-batches*)) - (indexes (maybe-drop-indexes (target-db fixed) - (target fixed) - :drop-indexes drop-indexes))) - - (with-stats-collection ((target fixed) :dbname (db-name (target-db fixed))) - (lp:task-handler-bind () ;; ((error #'lp:invoke-transfer-error)) - (log-message :notice "COPY ~a" (target fixed)) - (lp:submit-task channel #'copy-to-queue fixed queue) - - ;; and start another task to push that data from the queue to PostgreSQL - (lp:submit-task channel - ;; this function update :rows stats - #'pgloader.pgsql:copy-from-queue - (target-db fixed) (target fixed) queue - ;; we only are interested into the column names here - :columns (mapcar (lambda (col) - ;; always double quote column names - (format nil "~s" (car col))) - (columns fixed)) - :truncate truncate - :disable-triggers disable-triggers) - - ;; now wait until both the tasks are over - (loop for tasks below 2 do (lp:receive-result channel) - finally (lp:end-kernel)))) - - ;; re-create the indexes - (create-indexes-again (target-db fixed) indexes :drop-indexes drop-indexes))) - +(defmethod process-rows ((fixed copy-fixed) stream process-fn) + "Process rows from STREAM according to COPY specifications and PROCESS-FN." + (loop + :with fun := process-fn + :with fixed-cols-specs := (mapcar #'cdr (fields fixed)) + :for line := (read-line stream nil nil) + :counting line :into read + :while line + :do (handler-case + (funcall fun (parse-row fixed-cols-specs line)) + (condition (e) + (progn + (log-message :error "~a" e) + (update-stats :data (target fixed) :errs 1)))))) diff --git a/src/sources/ixf/ixf.lisp b/src/sources/ixf/ixf.lisp index da833c5..ccdb044 100644 --- a/src/sources/ixf/ixf.lisp +++ b/src/sources/ixf/ixf.lisp @@ -73,34 +73,6 @@ (ixf:read-headers ixf) (ixf:map-data ixf process-row-fn))))) -(defmethod copy-to-queue ((ixf copy-ixf) queue) - "Copy data from IXF file FILENAME into queue DATAQ" - (pgloader.queue:map-push-queue ixf queue)) - -(defmethod copy-from ((ixf copy-ixf) - &key (kernel nil k-s-p) truncate disable-triggers) - (let* ((lp:*kernel* (or kernel (make-kernel 2))) - (channel (lp:make-channel)) - (queue (lq:make-queue :fixed-capacity *concurrent-batches*))) - - (with-stats-collection ((target ixf) :dbname (db-name (target-db ixf))) - (lp:task-handler-bind ((error #'lp:invoke-transfer-error)) - (log-message :notice "COPY \"~a\" from '~a'" (target ixf) (source ixf)) - (lp:submit-task channel #'copy-to-queue ixf queue) - - ;; and start another task to push that data from the queue to PostgreSQL - (lp:submit-task channel - #'pgloader.pgsql:copy-from-queue - (target-db ixf) (target ixf) queue - :truncate truncate - :disable-triggers disable-triggers) - - ;; now wait until both the tasks are over, and kill the kernel - (loop for tasks below 2 do (lp:receive-result channel) - finally - (log-message :info "COPY \"~a\" done." (target ixf)) - (unless k-s-p (lp:end-kernel))))))) - (defmethod copy-database ((ixf copy-ixf) &key table-name diff --git a/src/sources/mssql/mssql.lisp b/src/sources/mssql/mssql.lisp index eb9784c..b22afda 100644 --- a/src/sources/mssql/mssql.lisp +++ b/src/sources/mssql/mssql.lisp @@ -51,41 +51,6 @@ (log-message :error "~a" e) (update-stats :data (target mssql) :errs 1))))))) -(defmethod copy-to-queue ((mssql copy-mssql) queue) - "Copy data from MSSQL table DBNAME.TABLE-NAME into queue DATAQ" - (map-push-queue mssql queue)) - -(defmethod copy-from ((mssql copy-mssql) - &key (kernel nil k-s-p) truncate disable-triggers) - "Connect in parallel to MSSQL and PostgreSQL and stream the data." - (let* ((lp:*kernel* (or kernel (make-kernel 2))) - (channel (lp:make-channel)) - (queue (lq:make-queue :fixed-capacity *concurrent-batches*)) - (table-name (target mssql))) - - ;; we account stats against the target table-name, because that's all we - ;; know on the PostgreSQL thread - (with-stats-collection (table-name :dbname (db-name (target-db mssql))) - (lp:task-handler-bind ((error #'lp:invoke-transfer-error)) - (log-message :notice "COPY ~a" table-name) - ;; read data from Mssql - (lp:submit-task channel #'copy-to-queue mssql queue) - - ;; and start another task to push that data from the queue to PostgreSQL - (lp:submit-task channel #'pgloader.pgsql:copy-from-queue - (target-db mssql) (target mssql) queue - :truncate truncate - :disable-triggers disable-triggers) - - ;; now wait until both the tasks are over - (loop for tasks below 2 do (lp:receive-result channel) - finally - (log-message :info "COPY ~a done." table-name) - (unless k-s-p (lp:end-kernel))))) - - ;; return the copy-mssql object we just did the COPY for - mssql)) - (defun complete-pgsql-database (pgconn all-columns all-fkeys pkeys &key data-only diff --git a/src/sources/mysql/mysql.lisp b/src/sources/mysql/mysql.lisp index 0966d1f..7a33d73 100644 --- a/src/sources/mysql/mysql.lisp +++ b/src/sources/mysql/mysql.lisp @@ -71,67 +71,11 @@ (invoke-restart 'qmynd-impl::use-nil)))) (mysql-query sql :row-fn process-row-fn :result-type 'vector)))))) -;;; -;;; Use map-rows and pgsql-text-copy-format to fill in a CSV file on disk -;;; with MySQL data in there. -;;; -(defmethod copy-to ((mysql copy-mysql) filename) - "Extract data from MySQL in PostgreSQL COPY TEXT format" - (with-open-file (text-file filename - :direction :output - :if-exists :supersede - :external-format :utf-8) - (map-rows mysql - :process-row-fn - (lambda (row) - (format-vector-row text-file row (transforms mysql)))))) - -;;; -;;; Export MySQL data to our lparallel data queue. All the work is done in -;;; other basic layers, simple enough function. -;;; -(defmethod copy-to-queue ((mysql copy-mysql) queue) - "Copy data from MySQL table DBNAME.TABLE-NAME into queue DATAQ" - (map-push-queue mysql queue)) -;;; -;;; Direct "stream" in between mysql fetching of results and PostgreSQL COPY -;;; protocol -;;; -(defmethod copy-from ((mysql copy-mysql) - &key (kernel nil k-s-p) truncate disable-triggers) - "Connect in parallel to MySQL and PostgreSQL and stream the data." - (let* ((lp:*kernel* (or kernel (make-kernel 2))) - (channel (lp:make-channel)) - (queue (lq:make-queue :fixed-capacity *concurrent-batches*)) - (table-name (target mysql))) - - ;; we account stats against the target table-name, because that's all we - ;; know on the PostgreSQL thread - (with-stats-collection (table-name :dbname (db-name (target-db mysql))) - (lp:task-handler-bind ((error #'lp:invoke-transfer-error)) - (log-message :notice "COPY ~a" table-name) - ;; read data from MySQL - (lp:submit-task channel #'copy-to-queue mysql queue) - - ;; and start another task to push that data from the queue to PostgreSQL - (lp:submit-task channel #'pgloader.pgsql:copy-from-queue - (target-db mysql) (target mysql) queue - :columns (mapcar #'apply-identifier-case - (mapcar #'mysql-column-name - (fields mysql))) - :truncate truncate - :disable-triggers disable-triggers) - - ;; now wait until both the tasks are over - (loop for tasks below 2 do (lp:receive-result channel) - finally - (log-message :info "COPY ~a done." table-name) - (unless k-s-p (lp:end-kernel))))) - - ;; return the copy-mysql object we just did the COPY for - mysql)) +(defmethod copy-column-list ((mysql copy-mysql)) + "We are sending the data in the MySQL columns ordering here." + (mapcar #'apply-identifier-case (mapcar #'mysql-column-name (fields mysql)))) ;;; diff --git a/src/sources/sqlite/sqlite.lisp b/src/sources/sqlite/sqlite.lisp index 9107c76..bded369 100644 --- a/src/sources/sqlite/sqlite.lisp +++ b/src/sources/sqlite/sqlite.lisp @@ -113,37 +113,6 @@ (log-message :error "~a" e) (update-stats :data (target sqlite) :errs 1))))))) - -(defmethod copy-to-queue ((sqlite copy-sqlite) queue) - "Copy data from SQLite table TABLE-NAME within connection DB into queue DATAQ" - (map-push-queue sqlite queue)) - -(defmethod copy-from ((sqlite copy-sqlite) - &key (kernel nil k-s-p) truncate disable-triggers) - "Stream the contents from a SQLite database table down to PostgreSQL." - (let* ((lp:*kernel* (or kernel (make-kernel 2))) - (channel (lp:make-channel)) - (queue (lq:make-queue :fixed-capacity *concurrent-batches*))) - - (with-stats-collection ((target sqlite) :dbname (db-name (target-db sqlite))) - (lp:task-handler-bind ((error #'lp:invoke-transfer-error)) - (log-message :notice "COPY ~a" (target sqlite)) - ;; read data from SQLite - (lp:submit-task channel #'copy-to-queue sqlite queue) - - ;; and start another task to push that data from the queue to PostgreSQL - (lp:submit-task channel - #'pgloader.pgsql:copy-from-queue - (target-db sqlite) (target sqlite) queue - :truncate truncate - :disable-triggers disable-triggers) - - ;; now wait until both the tasks are over - (loop for tasks below 2 do (lp:receive-result channel) - finally - (log-message :info "COPY ~a done." (target sqlite)) - (unless k-s-p (lp:end-kernel))))))) - (defun fetch-sqlite-metadata (sqlite &key including excluding) "SQLite introspection to prepare the migration." (let (all-columns all-indexes)