diff --git a/pgloader.asd b/pgloader.asd index b0ae530..9c0778b 100644 --- a/pgloader.asd +++ b/pgloader.asd @@ -110,11 +110,11 @@ "utils") :components ((:module "common" + :serial t :components ((:file "api") - (:file "methods" :depends-on ("api")) - (:file "md-methods" :depends-on ("api")) - (:file "db-methods" :depends-on ("api")) + (:file "methods") + (:file "md-methods") (:file "casting-rules") (:file "files-and-pathnames") (:file "project-fields"))) @@ -199,6 +199,19 @@ (:file "copy-retry-batch") (:file "copy-from-queue"))) + (:module "load" + :depends-on ("params" + "package" + "utils" + "pgsql" + "sources") + :serial t + :components + ((:file "api") + (:file "copy-data") + (:file "load-file") + (:file "migrate-database"))) + (:module "parsers" :depends-on ("params" "package" diff --git a/src/load/api.lisp b/src/load/api.lisp new file mode 100644 index 0000000..69769c8 --- /dev/null +++ b/src/load/api.lisp @@ -0,0 +1,71 @@ +;;; +;;; Generic API for pgloader data loading and database migrations. +;;; +(in-package :pgloader.load) + +(defgeneric copy-from (source &key) + (:documentation + "Load data from SOURCE into its target as defined by the SOURCE object.")) + +;; That one is more an export than a load. It always export to a single very +;; well defined format, the importing utility is defined in +;; src/pgsql-copy-format.lisp + +(defgeneric copy-to (source filename) + (:documentation + "Load data from SOURCE and serialize it into FILENAME, using PostgreSQL + COPY TEXT format.")) + +;; The next generic function is only to get instanciated for sources +;; actually containing more than a single source item (tables, collections, +;; etc) + +(defgeneric copy-database (source + &key + worker-count + concurrency + max-parallel-create-index + truncate + data-only + schema-only + create-tables + include-drop + foreign-keys + create-indexes + reset-sequences + disable-triggers + materialize-views + set-table-oids + including + excluding) + (:documentation + "Auto-discover source schema, convert it to PostgreSQL, migrate the data + from the source definition to PostgreSQL for all the discovered + items (tables, collections, etc), then reset the PostgreSQL sequences + created by SERIAL columns in the first step. + + The target tables are automatically discovered, the only-tables + parameter allows to filter them out.")) + + + +(defgeneric prepare-pgsql-database (db-copy catalog + &key + truncate + create-tables + create-schemas + drop-indexes + set-table-oids + materialize-views + foreign-keys + include-drop) + (:documentation "Prepare the target PostgreSQL database.")) + +(defgeneric complete-pgsql-database (db-copy catalog pkeys + &key + foreign-keys + create-indexes + create-triggers + reset-sequences) + (:documentation "Alter load duties for database sources copy support.")) + diff --git a/src/load/copy-data.lisp b/src/load/copy-data.lisp new file mode 100644 index 0000000..67a68d6 --- /dev/null +++ b/src/load/copy-data.lisp @@ -0,0 +1,149 @@ +;;; +;;; Generic API for pgloader sources +;;; +(in-package :pgloader.load) + +;;; +;;; Common API implementation +;;; +(defmethod queue-raw-data ((copy copy) rawq concurrency) + "Stream data as read by the map-queue method on the COPY argument into QUEUE, + as given." + (log-message :debug "Reader started for ~a" (format-table-name (target copy))) + (let* ((start-time (get-internal-real-time)) + (row-count 0) + (process-row + (if (or (eq :data *log-min-messages*) + (eq :data *client-min-messages*)) + ;; when debugging, use a lambda with debug traces + (lambda (row) + (log-message :data "< ~s" row) + (lq:push-queue row rawq) + (incf row-count)) + + ;; usual non-debug case + (lambda (row) + (lq:push-queue row rawq) + (incf row-count))))) + + ;; signal we are starting + (update-stats :data (target copy) :start start-time) + + ;; call the source-specific method for reading input data + (map-rows copy :process-row-fn process-row) + + ;; process last batches and send them to queues + ;; and mark end of stream + (loop :repeat concurrency :do (lq:push-queue :end-of-data rawq)) + + (let ((seconds (elapsed-time-since start-time))) + (log-message :debug "Reader for ~a is done in ~6$s" + (format-table-name (target copy)) seconds) + (update-stats :data (target copy) :read row-count :rs seconds) + (list :reader (target copy) seconds)))) + + +(defmethod copy-to ((copy copy) pgsql-copy-filename) + "Extract data from COPY file into a PotgreSQL COPY TEXT formated file" + (with-open-file (text-file pgsql-copy-filename + :direction :output + :if-exists :supersede + :external-format :utf-8) + (let ((row-fn (lambda (row) + (format-vector-row text-file row (transforms copy))))) + (map-rows copy :process-row-fn row-fn)))) + +(defmethod copy-from ((copy copy) + &key + (kernel nil k-s-p) + (channel nil c-s-p) + (worker-count 8) + (concurrency 2) + (multiple-readers nil) + (on-error-stop *on-error-stop*) + disable-triggers) + "Copy data from COPY source into PostgreSQL." + (let* ((table-name (format-table-name (target copy))) + (lp:*kernel* (or kernel (make-kernel worker-count))) + (channel (or channel (lp:make-channel))) + (readers nil) + (task-count 0)) + + (flet ((submit-task (channel function &rest args) + (apply #'lp:submit-task channel function args) + (incf task-count))) + + (lp:task-handler-bind + ((copy-init-error + #'(lambda (condition) + ;; everything has been handled already + (lp:invoke-transfer-error condition))) + (on-error-stop + #'(lambda (condition) + ;; everything has been handled already + (lp:invoke-transfer-error condition))) + #+pgloader-image + (error + #'(lambda (condition) + (log-message :error "A thread failed with error: ~a" condition) + (log-message :error "~a" + (trivial-backtrace:print-backtrace condition + :output nil)) + (lp::invoke-transfer-error condition)))) + (log-message :notice "COPY ~a" table-name) + + ;; Check for Read Concurrency Support from our source + (when (and multiple-readers (< 1 concurrency)) + (let ((label "Check Concurrency Support")) + (with-stats-collection (label :section :pre) + (setf readers (concurrency-support copy concurrency)) + (update-stats :pre label :read 1 :rows (if readers 1 0)) + (when readers + (log-message :notice "Multiple Readers Enabled for ~a" + (format-table-name (target copy))))))) + + ;; when reader is non-nil, we have reader concurrency support! + (if readers + ;; here we have detected Concurrency Support: we create as many + ;; readers as writers and create associated couples, each couple + ;; shares its own queue + (let ((rawqs + (loop :repeat concurrency :collect + (lq:make-queue :fixed-capacity *prefetch-rows*)))) + (log-message :info "Read Concurrency Enabled for ~s" + (format-table-name (target copy))) + + (loop :for rawq :in rawqs :for reader :in readers :do + ;; each reader pretends to be alone, pass 1 as concurrency + (submit-task channel #'queue-raw-data reader rawq 1) + + (submit-task channel #'copy-rows-from-queue + copy rawq + :on-error-stop on-error-stop + :disable-triggers disable-triggers))) + + ;; no Read Concurrency Support detected, start a single reader + ;; task, using a single data queue that is read by multiple + ;; writers. + (let ((rawq + (lq:make-queue :fixed-capacity *prefetch-rows*))) + (submit-task channel #'queue-raw-data copy rawq concurrency) + + ;; start a task to transform the raw data in the copy format + ;; and send that data down to PostgreSQL + (loop :repeat concurrency :do + (submit-task channel #'copy-rows-from-queue + copy rawq + :on-error-stop on-error-stop + :disable-triggers disable-triggers)))) + + ;; now wait until both the tasks are over, and kill the kernel + (unless c-s-p + (log-message :debug "waiting for ~d tasks" task-count) + (loop :repeat task-count :do (lp:receive-result channel)) + (log-message :notice "COPY ~s done." table-name) + (unless k-s-p (lp:end-kernel :wait t))) + + ;; return task-count, which is how many tasks we submitted to our + ;; lparallel kernel. + task-count)))) diff --git a/src/load/load-file.lisp b/src/load/load-file.lisp new file mode 100644 index 0000000..19819e5 --- /dev/null +++ b/src/load/load-file.lisp @@ -0,0 +1,128 @@ +;;; +;;; Generic API for pgloader sources +;;; Methods for source types with multiple files input +;;; + +(in-package :pgloader.load) + +(defmethod copy-database ((copy md-copy) + &key + (on-error-stop *on-error-stop*) + truncate + disable-triggers + drop-indexes + + max-parallel-create-index + + ;; generic API, but ignored here + (worker-count 4) + (concurrency 1) + + data-only + schema-only + create-tables + include-drop + foreign-keys + create-indexes + reset-sequences + materialize-views + set-table-oids + including + excluding) + "Copy the contents of the COPY formated file to PostgreSQL." + (declare (ignore data-only schema-only + create-tables include-drop foreign-keys + create-indexes reset-sequences materialize-views + set-table-oids including excluding)) + + (let* ((*on-error-stop* on-error-stop) + (pgconn (target-db copy)) + pgsql-catalog) + + (handler-case + (with-pgsql-connection (pgconn) + (setf pgsql-catalog + (fetch-pgsql-catalog (db-name pgconn) :table (target copy))) + + ;; if the user didn't tell us the column list of the table, now is + ;; a proper time to set it in the copy object + (unless (and (slot-boundp copy 'columns) + (slot-value copy 'columns)) + (setf (columns copy) + (mapcar (lambda (col) + ;; we need to handle the md-copy format for the + ;; column list, which allow for user given + ;; options: each column is a list which car is + ;; the column name. + (list (column-name col))) + (table-field-list (first (table-list pgsql-catalog)))))) + + (log-message :data "CATALOG: ~s" pgsql-catalog) + + ;; this sets (table-index-list (target copy)) + (maybe-drop-indexes pgsql-catalog :drop-indexes drop-indexes) + + ;; now is the proper time to truncate, before parallel operations + (when truncate + (truncate-tables pgsql-catalog))) + + (cl-postgres:database-error (e) + (log-message :fatal "Failed to prepare target PostgreSQL table.") + (log-message :fatal "~a" e) + (return-from copy-database))) + + ;; Keep the PostgreSQL table target around in the copy instance, + ;; with the following subtleties to deal with: + ;; 1. the catalog fetching did fill-in PostgreSQL columns as fields + ;; 2. we might target fewer pg columns than the table actually has + (let ((table (first (table-list pgsql-catalog)))) + (setf (table-column-list table) + (loop :for column-name :in (mapcar #'first (columns copy)) + :collect (find column-name (table-field-list table) + :key #'column-name + :test #'string=))) + (setf (target copy) table)) + + ;; expand the specs of our source, we might have to care about several + ;; files actually. + (let* ((lp:*kernel* (make-kernel worker-count)) + (channel (lp:make-channel)) + (path-list (expand-spec (source copy))) + (task-count 0)) + (with-stats-collection ("Files Processed" :section :post + :use-result-as-read t + :use-result-as-rows t) + (loop :for path-spec :in path-list + :count t + :do (let ((table-source (clone-copy-for copy path-spec))) + (incf task-count + (copy-from table-source + :concurrency concurrency + :kernel lp:*kernel* + :channel channel + :on-error-stop on-error-stop + :disable-triggers disable-triggers))))) + + ;; end kernel + (with-stats-collection ("COPY Threads Completion" :section :post + :use-result-as-read t + :use-result-as-rows t) + (loop :repeat task-count + :do (handler-case + (destructuring-bind (task table seconds) + (lp:receive-result channel) + (log-message :debug + "Finished processing ~a for ~s ~50T~6$s" + task (format-table-name table) seconds)) + (condition (e) + (log-message :fatal "~a" e))) + :finally (progn + (lp:end-kernel :wait nil) + (return task-count)))) + (lp:end-kernel :wait t)) + + ;; re-create the indexes from the target table entry + (create-indexes-again (target-db copy) + pgsql-catalog + :max-parallel-create-index max-parallel-create-index + :drop-indexes drop-indexes))) diff --git a/src/sources/common/db-methods.lisp b/src/load/migrate-database.lisp similarity index 95% rename from src/sources/common/db-methods.lisp rename to src/load/migrate-database.lisp index f706ed7..d4ea090 100644 --- a/src/sources/common/db-methods.lisp +++ b/src/load/migrate-database.lisp @@ -3,7 +3,7 @@ ;;; Methods for database source types (with introspection) ;;; -(in-package :pgloader.sources) +(in-package :pgloader.load) ;;; ;;; Prepare the PostgreSQL database before streaming the data into it. @@ -113,11 +113,6 @@ ;; log the catalog we just fetched and (maybe) merged (log-message :data "CATALOG: ~s" catalog)) -(defmethod cleanup ((copy db-copy) (catalog catalog) &key materialize-views) - "In case anything wrong happens at `prepare-pgsql-database' step, this - function will be called to clean-up the mess left behind, if any." - (declare (ignorable materialize-views)) - t) (defmethod complete-pgsql-database ((copy db-copy) (catalog catalog) @@ -209,19 +204,6 @@ :create-triggers create-triggers :reset-sequences reset-sequences)))) -(defmethod instanciate-table-copy-object ((copy db-copy) (table table)) - "Create an new instance for copying TABLE data." - (let* ((fields (table-field-list table)) - (columns (table-column-list table)) - (transforms (mapcar #'column-transform columns))) - (make-instance (class-of copy) - :source-db (clone-connection (source-db copy)) - :target-db (clone-connection (target-db copy)) - :source table - :target table - :fields fields - :columns columns - :transforms transforms))) (defun process-catalog (copy catalog &key alter-table alter-schema) "Do all the PostgreSQL catalog tweaking here: casts, index WHERE clause diff --git a/src/package.lisp b/src/package.lisp index aee98f5..a57400d 100644 --- a/src/package.lisp +++ b/src/package.lisp @@ -429,10 +429,12 @@ #:get-date-columns #:format-vector-row)) + +;;; +;;; pgloader Sources API and common helpers +;;; (defpackage #:pgloader.sources - (:use #:cl - #:pgloader.params #:pgloader.utils #:pgloader.connection - #:pgloader.pgsql) + (:use #:cl #:pgloader.params #:pgloader.utils #:pgloader.connection) (:import-from #:pgloader.transforms #:precision #:scale @@ -445,6 +447,12 @@ #:md-copy #:db-copy + ;; main data access api + #:map-rows + #:copy-column-list + #:data-is-preformatted-p + #:preprocess-row + ;; accessors #:source-db #:target-db @@ -460,16 +468,6 @@ #:skip-lines #:header - ;; main protocol/api - #:concurrency-support - #:map-rows - #:copy-column-list - #:queue-raw-data - #:data-is-preformatted-p - #:copy-from - #:copy-to - #:copy-database - ;; md-copy protocol/api #:parse-header #:process-rows @@ -481,20 +479,18 @@ #:expand-spec #:clone-copy-for - ;; the db-methods - #:fetch-metadata - #:prepare-pgsql-database - #:cleanup - #:instanciate-table-copy-object - #:complete-pgsql-database - #:end-kernels - ;; file based utils for csv, fixed etc #:with-open-file-or-stream #:get-pathname #:project-fields #:reformat-then-process + ;; the db-methods + #:fetch-metadata + #:cleanup + #:instanciate-table-copy-object + #:concurrency-support + ;; database cast machinery #:*default-cast-rules* #:*cast-rules* @@ -505,15 +501,10 @@ ;;; ;;; COPY protocol related facilities ;;; -(defpackage #:pgloader.copy - (:use #:cl #:pgloader.params #:pgloader.utils +(defpackage #:pgloader.pgcopy + (:use #:cl + #:pgloader.params #:pgloader.utils #:pgloader.connection #:pgloader.pgsql #:pgloader.sources) - (:import-from #:pgloader.pgsql - #:with-pgsql-connection - #:with-schema - #:with-disabled-triggers - #:postgresql-unavailable - #:postgresql-retryable) (:import-from #:cl-postgres #:database-error-context) (:import-from #:cl-postgres-trivial-utf-8 @@ -521,9 +512,32 @@ #:as-utf-8-bytes #:string-to-utf-8-bytes) (:export #:copy-rows-from-queue - #:format-vector-row)) + #:format-vector-row + #:copy-init-error)) + +;;; +;;; The pgloader.load package implements data transfert from a pgloader +;;; source to a PostgreSQL database, using the pgloader.pgcopy COPY +;;; implementation. +;;; +(defpackage #:pgloader.load + (:use #:cl #:pgloader.params #:pgloader.utils #:pgloader.connection + #:pgloader.pgsql #:pgloader.pgcopy #:pgloader.sources) + (:export + ;; main protocol/api + #:concurrency-support + #:queue-raw-data + #:copy-from + #:copy-to + #:copy-database + + ;; the db-methods + #:prepare-pgsql-database + #:instanciate-table-copy-object + #:complete-pgsql-database)) + ;;; ;;; other utilities @@ -548,7 +562,7 @@ ;; ;; specific source handling ;; -(defpackage #:pgloader.csv +(defpackage #:pgloader.source.csv (:use #:cl #:pgloader.params #:pgloader.utils #:pgloader.connection #:pgloader.sources) @@ -565,11 +579,11 @@ #:guess-csv-params #:guess-all-csv-params)) -(defpackage #:pgloader.fixed +(defpackage #:pgloader.source.fixed (:use #:cl #:pgloader.params #:pgloader.utils #:pgloader.connection #:pgloader.sources) - (:import-from #:pgloader.csv + (:import-from #:pgloader.source.csv #:csv-connection #:specs #:csv-specs) @@ -580,11 +594,11 @@ #:copy-fixed #:copy-from)) -(defpackage #:pgloader.copy +(defpackage #:pgloader.source.copy (:use #:cl #:pgloader.params #:pgloader.utils #:pgloader.connection #:pgloader.sources) - (:import-from #:pgloader.csv + (:import-from #:pgloader.source.csv #:csv-connection #:specs #:csv-specs) @@ -595,7 +609,7 @@ #:copy-copy #:copy-from)) -(defpackage #:pgloader.ixf +(defpackage #:pgloader.source.ixf (:use #:cl #:pgloader.params #:pgloader.utils #:pgloader.connection #:pgloader.sources) @@ -610,7 +624,7 @@ #:map-rows #:copy-from)) -(defpackage #:pgloader.db3 +(defpackage #:pgloader.source.db3 (:use #:cl #:pgloader.params #:pgloader.utils #:pgloader.connection #:pgloader.sources) @@ -626,7 +640,7 @@ #:copy-to #:copy-from)) -(defpackage #:pgloader.mysql +(defpackage #:pgloader.source.mysql (:use #:cl #:pgloader.params #:pgloader.utils #:pgloader.connection #:pgloader.sources) @@ -650,16 +664,9 @@ #:copy-mysql #:*decoding-as* #:*mysql-default-cast-rules* - #:with-mysql-connection - #:map-rows - #:copy-to - #:copy-from - #:copy-database - #:list-databases - #:export-database - #:export-import-database)) + #:with-mysql-connection)) -(defpackage #:pgloader.sqlite +(defpackage #:pgloader.source.sqlite (:use #:cl #:pgloader.params #:pgloader.utils #:pgloader.connection #:pgloader.sources) @@ -675,13 +682,9 @@ #:comment-on-tables-and-columns) (:export #:sqlite-connection #:copy-sqlite - #:*sqlite-default-cast-rules* - #:map-rows - #:copy-to - #:copy-from - #:copy-database)) + #:*sqlite-default-cast-rules*)) -(defpackage #:pgloader.mssql +(defpackage #:pgloader.source.mssql (:use #:cl #:pgloader.params #:pgloader.utils #:pgloader.connection #:pgloader.sources) @@ -702,14 +705,10 @@ #:reset-sequences) (:export #:mssql-connection #:copy-mssql - #:*mssql-default-cast-rules* - #:map-rows - #:copy-to - #:copy-from - #:copy-database)) + #:*mssql-default-cast-rules*)) -(defpackage #:pgloader.mssql.index-filter - (:use #:cl #:esrap #:pgloader.utils #:pgloader.mssql) +(defpackage #:pgloader.source.mssql.index-filter + (:use #:cl #:esrap #:pgloader.utils #:pgloader.source.mssql) (:import-from #:pgloader.pgsql #:translate-index-filter)) @@ -731,9 +730,13 @@ #:pgloader.params #:pgloader.utils #:pgloader.sql #:pgloader.connection) (:shadow #:namestring #:number #:inline) (:import-from #:alexandria #:read-file-into-string) + (:import-from #:pgloader.load + #:copy-database) (:import-from #:pgloader.sources #:md-connection - #:md-spec) + #:md-spec + #:*default-cast-rules* + #:*cast-rules*) (:import-from #:pgloader.pgsql #:pgsql-connection #:with-pgsql-transaction @@ -741,29 +744,36 @@ #:pgconn-use-ssl #:pgconn-table-name #:make-table) - (:import-from #:pgloader.csv + (:import-from #:pgloader.source.csv + #:copy-csv #:csv-connection #:specs #:csv-specs) - (:import-from #:pgloader.fixed + (:import-from #:pgloader.source.fixed + #:copy-fixed #:fixed-connection) - (:import-from #:pgloader.copy + (:import-from #:pgloader.source.copy + #:copy-copy #:copy-connection) - (:import-from #:pgloader.sources - #:*default-cast-rules* - #:*cast-rules*) - (:import-from #:pgloader.mysql + (:import-from #:pgloader.source.mysql + #:copy-mysql #:mysql-connection #:*decoding-as* #:*mysql-default-cast-rules*) - (:import-from #:pgloader.mssql + (:import-from #:pgloader.source.mssql + #:copy-mssql #:mssql-connection #:*mssql-default-cast-rules*) - (:import-from #:pgloader.sqlite + (:import-from #:pgloader.source.sqlite + #:copy-sqlite #:sqlite-connection #:*sqlite-default-cast-rules*) - (:import-from #:pgloader.db3 #:dbf-connection) - (:import-from #:pgloader.ixf #:ixf-connection) + (:import-from #:pgloader.source.db3 + #:copy-db3 + #:dbf-connection) + (:import-from #:pgloader.source.ixf + #:copy-ixf + #:ixf-connection) (:export #:parse-commands #:parse-commands-from-file #:initialize-context @@ -812,7 +822,7 @@ (defpackage #:pgloader (:use #:cl #:pgloader.params #:pgloader.utils #:pgloader.parser - #:pgloader.connection #:pgloader.copy #:metabang.bind) + #:pgloader.connection #:pgloader.pgcopy #:metabang.bind) (:import-from #:pgloader.pgsql #:pgconn-table-name #:pgsql-connection) diff --git a/src/parsers/command-copy.lisp b/src/parsers/command-copy.lisp index 555d447..a58f00e 100644 --- a/src/parsers/command-copy.lisp +++ b/src/parsers/command-copy.lisp @@ -115,7 +115,7 @@ `(lambda () (let* (,@(pgsql-connection-bindings pg-db-conn gucs) ,@(batch-control-bindings options) - ,@(identifier-case-binding options) + ,@(identifier-case-binding options) (source-db (with-stats-collection ("fetch" :section :pre) (expand (fetch-file ,copy-conn))))) @@ -128,7 +128,7 @@ (drop-indexes (getf ',options :drop-indexes)) (max-parallel-create-index (getf ',options :max-parallel-create-index)) (source - (make-instance 'pgloader.copy:copy-copy + (make-instance 'copy-copy :target-db ,pg-db-conn :source source-db :target (create-table ',target-table-name) @@ -142,16 +142,16 @@ :drop-indexes :disable-triggers :max-parallel-create-index))))) - (pgloader.sources:copy-database source - ,@ (when worker-count - (list :worker-count worker-count)) - ,@ (when concurrency - (list :concurrency concurrency)) - :on-error-stop on-error-stop - :truncate truncate - :drop-indexes drop-indexes - :disable-triggers disable-triggers - :max-parallel-create-index max-parallel-create-index)) + (copy-database source + ,@ (when worker-count + (list :worker-count worker-count)) + ,@ (when concurrency + (list :concurrency concurrency)) + :on-error-stop on-error-stop + :truncate truncate + :drop-indexes drop-indexes + :disable-triggers disable-triggers + :max-parallel-create-index max-parallel-create-index)) ,(sql-code-block pg-db-conn :post after "after load"))))) diff --git a/src/parsers/command-csv.lisp b/src/parsers/command-csv.lisp index 88819df..af91471 100644 --- a/src/parsers/command-csv.lisp +++ b/src/parsers/command-csv.lisp @@ -427,7 +427,7 @@ `(lambda () (let* (,@(pgsql-connection-bindings pg-db-conn gucs) ,@(batch-control-bindings options) - ,@(identifier-case-binding options) + ,@(identifier-case-binding options) (source-db (with-stats-collection ("fetch" :section :pre) (expand (fetch-file ,csv-conn))))) @@ -440,7 +440,7 @@ (drop-indexes (getf ',options :drop-indexes)) (max-parallel-create-index (getf ',options :max-parallel-create-index)) (source - (make-instance 'pgloader.csv:copy-csv + (make-instance 'copy-csv :target-db ,pg-db-conn :source source-db :target (create-table ',target-table-name) @@ -454,16 +454,16 @@ :drop-indexes :disable-triggers :max-parallel-create-index))))) - (pgloader.sources:copy-database source - ,@ (when worker-count - (list :worker-count worker-count)) - ,@ (when concurrency - (list :concurrency concurrency)) - :on-error-stop on-error-stop - :truncate truncate - :drop-indexes drop-indexes - :disable-triggers disable-triggers - :max-parallel-create-index max-parallel-create-index)) + (copy-database source + ,@ (when worker-count + (list :worker-count worker-count)) + ,@ (when concurrency + (list :concurrency concurrency)) + :on-error-stop on-error-stop + :truncate truncate + :drop-indexes drop-indexes + :disable-triggers disable-triggers + :max-parallel-create-index max-parallel-create-index)) ,(sql-code-block pg-db-conn :post after "after load"))))) diff --git a/src/parsers/command-dbf.lisp b/src/parsers/command-dbf.lisp index 0a6fc3c..84cf6b3 100644 --- a/src/parsers/command-dbf.lisp +++ b/src/parsers/command-dbf.lisp @@ -94,29 +94,29 @@ target-table-name (encoding :ascii) gucs before after options - &allow-other-keys) + &allow-other-keys) `(lambda () (let* (,@(pgsql-connection-bindings pg-db-conn gucs) ,@(batch-control-bindings options) - ,@(identifier-case-binding options) - (on-error-stop (getf ',options :on-error-stop)) - (source-db (with-stats-collection ("fetch" :section :pre) - (expand (fetch-file ,dbf-db-conn)))) - (source - (make-instance 'pgloader.db3:copy-db3 - :target-db ,pg-db-conn - :encoding ,encoding - :source-db source-db - :target (create-table ',target-table-name)))) + ,@(identifier-case-binding options) + (on-error-stop (getf ',options :on-error-stop)) + (source-db (with-stats-collection ("fetch" :section :pre) + (expand (fetch-file ,dbf-db-conn)))) + (source + (make-instance 'copy-db3 + :target-db ,pg-db-conn + :encoding ,encoding + :source-db source-db + :target (create-table ',target-table-name)))) ,(sql-code-block pg-db-conn :pre before "before load") - (pgloader.sources:copy-database source - ,@(remove-batch-control-option options) - :on-error-stop on-error-stop - :create-indexes nil - :foreign-keys nil - :reset-sequences nil) + (copy-database source + ,@(remove-batch-control-option options) + :on-error-stop on-error-stop + :create-indexes nil + :foreign-keys nil + :reset-sequences nil) ,(sql-code-block pg-db-conn :post after "after load")))) diff --git a/src/parsers/command-fixed.lisp b/src/parsers/command-fixed.lisp index 9d85b17..9fe2d78 100644 --- a/src/parsers/command-fixed.lisp +++ b/src/parsers/command-fixed.lisp @@ -124,7 +124,7 @@ `(lambda () (let* (,@(pgsql-connection-bindings pg-db-conn gucs) ,@(batch-control-bindings options) - ,@(identifier-case-binding options) + ,@(identifier-case-binding options) (source-db (with-stats-collection ("fetch" :section :pre) (expand (fetch-file ,fixed-conn))))) @@ -137,7 +137,7 @@ (drop-indexes ,(getf options :drop-indexes)) (max-parallel-create-index ,(getf options :max-parallel-create-index)) (source - (make-instance 'pgloader.fixed:copy-fixed + (make-instance 'copy-fixed :target-db ,pg-db-conn :source source-db :target (create-table ',target-table-name) @@ -146,16 +146,16 @@ :columns ',columns :skip-lines ,(or (getf options :skip-line) 0)))) - (pgloader.sources:copy-database source - ,@ (when worker-count - (list :worker-count worker-count)) - ,@ (when concurrency - (list :concurrency concurrency)) - :on-error-stop on-error-stop - :truncate truncate - :drop-indexes drop-indexes - :disable-triggers disable-triggers - :max-parallel-create-index max-parallel-create-index)) + (copy-database source + ,@ (when worker-count + (list :worker-count worker-count)) + ,@ (when concurrency + (list :concurrency concurrency)) + :on-error-stop on-error-stop + :truncate truncate + :drop-indexes drop-indexes + :disable-triggers disable-triggers + :max-parallel-create-index max-parallel-create-index)) ,(sql-code-block pg-db-conn :post after "after load"))))) diff --git a/src/parsers/command-ixf.lisp b/src/parsers/command-ixf.lisp index 24511ee..62ba4a5 100644 --- a/src/parsers/command-ixf.lisp +++ b/src/parsers/command-ixf.lisp @@ -78,31 +78,31 @@ &key target-table-name gucs before after options - &allow-other-keys) + &allow-other-keys) `(lambda () (let* (,@(pgsql-connection-bindings pg-db-conn gucs) ,@(batch-control-bindings options) - ,@(identifier-case-binding options) - (timezone (getf ',options :timezone)) - (on-error-stop(getf ',options :on-error-stop)) - (source-db (with-stats-collection ("fetch" :section :pre) + ,@(identifier-case-binding options) + (timezone (getf ',options :timezone)) + (on-error-stop(getf ',options :on-error-stop)) + (source-db (with-stats-collection ("fetch" :section :pre) (expand (fetch-file ,ixf-db-conn)))) - (source - (make-instance 'pgloader.ixf:copy-ixf - :target-db ,pg-db-conn - :source-db source-db - :target (create-table ',target-table-name) - :timezone timezone))) + (source + (make-instance 'copy-ixf + :target-db ,pg-db-conn + :source-db source-db + :target (create-table ',target-table-name) + :timezone timezone))) ,(sql-code-block pg-db-conn :pre before "before load") - (pgloader.sources:copy-database source - ,@(remove-batch-control-option - options - :extras '(:timezone)) - :on-error-stop on-error-stop - :foreign-keys nil - :reset-sequences nil) + (copy-database source + ,@(remove-batch-control-option + options + :extras '(:timezone)) + :on-error-stop on-error-stop + :foreign-keys nil + :reset-sequences nil) ,(sql-code-block pg-db-conn :post after "after load")))) diff --git a/src/parsers/command-mssql.lisp b/src/parsers/command-mssql.lisp index f53b92d..41fcb8a 100644 --- a/src/parsers/command-mssql.lisp +++ b/src/parsers/command-mssql.lisp @@ -142,7 +142,7 @@ casts before after options alter-schema alter-table including excluding - &allow-other-keys) + &allow-other-keys) `(lambda () ;; now is the time to load the CFFI lib we need (freetds) (let (#+sbcl(sb-ext:*muffled-warnings* 'style-warning)) @@ -156,20 +156,20 @@ ,@(batch-control-bindings options) ,@(identifier-case-binding options) (source - (make-instance 'pgloader.mssql::copy-mssql + (make-instance 'copy-mssql :target-db ,pg-db-conn :source-db ,ms-db-conn))) ,(sql-code-block pg-db-conn :pre before "before load") - (pgloader.mssql:copy-database source - :including ',including - :excluding ',excluding - :alter-schema ',alter-schema - :alter-table ',alter-table - :set-table-oids t - :on-error-stop on-error-stop - ,@(remove-batch-control-option options)) + (copy-database source + :including ',including + :excluding ',excluding + :alter-schema ',alter-schema + :alter-table ',alter-table + :set-table-oids t + :on-error-stop on-error-stop + ,@(remove-batch-control-option options)) ,(sql-code-block pg-db-conn :post after "after load")))) diff --git a/src/parsers/command-mysql.lisp b/src/parsers/command-mysql.lisp index 10274cb..9ccf9c4 100644 --- a/src/parsers/command-mysql.lisp +++ b/src/parsers/command-mysql.lisp @@ -149,7 +149,7 @@ ((:including incl)) ((:excluding excl)) ((:decoding decoding-as)) - &allow-other-keys) + &allow-other-keys) `(lambda () (let* ((*default-cast-rules* ',*mysql-default-cast-rules*) (*cast-rules* ',casts) @@ -160,21 +160,21 @@ ,@(batch-control-bindings options) ,@(identifier-case-binding options) (source - (make-instance 'pgloader.mysql::copy-mysql + (make-instance 'copy-mysql :target-db ,pg-db-conn :source-db ,my-db-conn))) ,(sql-code-block pg-db-conn :pre before "before load") - (pgloader.mysql:copy-database source - :including ',incl - :excluding ',excl - :materialize-views ',views - :alter-table ',alter-table - :alter-schema ',alter-schema - :set-table-oids t - :on-error-stop on-error-stop - ,@(remove-batch-control-option options)) + (copy-database source + :including ',incl + :excluding ',excl + :materialize-views ',views + :alter-table ',alter-table + :alter-schema ',alter-schema + :set-table-oids t + :on-error-stop on-error-stop + ,@(remove-batch-control-option options)) ,(sql-code-block pg-db-conn :post after "after load")))) diff --git a/src/parsers/command-sqlite.lisp b/src/parsers/command-sqlite.lisp index d238bff..a6ff248 100644 --- a/src/parsers/command-sqlite.lisp +++ b/src/parsers/command-sqlite.lisp @@ -105,22 +105,22 @@ load database ,@(batch-control-bindings options) ,@(identifier-case-binding options) (source-db (with-stats-collection ("fetch" :section :pre) - (expand (fetch-file ,sqlite-db-conn)))) + (expand (fetch-file ,sqlite-db-conn)))) (source - (make-instance 'pgloader.sqlite::copy-sqlite + (make-instance 'copy-sqlite :target-db ,pg-db-conn :source-db source-db))) ,(sql-code-block pg-db-conn :pre before "before load") - (pgloader.sqlite:copy-database source - :alter-table ',alter-table - :alter-schema ',alter-schema - :set-table-oids t - :including ',incl - :excluding ',excl - :on-error-stop on-error-stop - ,@(remove-batch-control-option options)) + (copy-database source + :alter-table ',alter-table + :alter-schema ',alter-schema + :set-table-oids t + :including ',incl + :excluding ',excl + :on-error-stop on-error-stop + ,@(remove-batch-control-option options)) ,(sql-code-block pg-db-conn :post after "after load")))) diff --git a/src/pg-copy/copy-batch.lisp b/src/pg-copy/copy-batch.lisp index 68a30d7..c20f8ad 100644 --- a/src/pg-copy/copy-batch.lisp +++ b/src/pg-copy/copy-batch.lisp @@ -1,7 +1,7 @@ ;;; ;;; Tools to handle internal queueing, using lparallel.queue ;;; -(in-package :pgloader.copy) +(in-package :pgloader.pgcopy) ;;; ;;; The pgloader architectures uses a reader thread and a writer thread. The diff --git a/src/pg-copy/copy-db-write.lisp b/src/pg-copy/copy-db-write.lisp index d526de1..82e9b44 100644 --- a/src/pg-copy/copy-db-write.lisp +++ b/src/pg-copy/copy-db-write.lisp @@ -3,7 +3,7 @@ ;;; ;;; Here, sending the data in the COPY stream opened in copy-batch. ;;; -(in-package :pgloader.copy) +(in-package :pgloader.pgcopy) (define-condition copy-init-error (error) ((table :initarg :table :reader copy-init-error-table) diff --git a/src/pg-copy/copy-format.lisp b/src/pg-copy/copy-format.lisp index 8441ce9..5903c11 100644 --- a/src/pg-copy/copy-format.lisp +++ b/src/pg-copy/copy-format.lisp @@ -1,7 +1,7 @@ ;;; ;;; Tools to handle PostgreSQL data format ;;; -(in-package :pgloader.copy) +(in-package :pgloader.pgcopy) ;;; ;;; Format row to PostgreSQL COPY format, the TEXT variant. diff --git a/src/pg-copy/copy-from-queue.lisp b/src/pg-copy/copy-from-queue.lisp index f586c13..4a537f8 100644 --- a/src/pg-copy/copy-from-queue.lisp +++ b/src/pg-copy/copy-from-queue.lisp @@ -1,7 +1,7 @@ ;;; ;;; The PostgreSQL COPY TO implementation, with batches and retries. ;;; -(in-package :pgloader.copy) +(in-package :pgloader.pgcopy) ;;; ;;; We receive raw input rows from an lparallel queue, push their content diff --git a/src/pg-copy/copy-retry-batch.lisp b/src/pg-copy/copy-retry-batch.lisp index 9420c1f..693d897 100644 --- a/src/pg-copy/copy-retry-batch.lisp +++ b/src/pg-copy/copy-retry-batch.lisp @@ -1,7 +1,7 @@ ;;; ;;; The PostgreSQL COPY TO implementation, with batches and retries. ;;; -(in-package #:pgloader.copy) +(in-package #:pgloader.pgcopy) ;;; ;;; Compute how many rows we're going to try loading next, depending on diff --git a/src/pg-copy/copy-rows-in-batch.lisp b/src/pg-copy/copy-rows-in-batch.lisp index 5542581..73a175e 100644 --- a/src/pg-copy/copy-rows-in-batch.lisp +++ b/src/pg-copy/copy-rows-in-batch.lisp @@ -1,7 +1,7 @@ ;;; ;;; The PostgreSQL COPY TO implementation, with batches and retries. ;;; -(in-package :pgloader.copy) +(in-package :pgloader.pgcopy) (defun batch-rows-to-copy (table columns copy nbcols queue) "Add rows that we pop from QUEUE into a batch, that we then COPY over to diff --git a/src/pg-copy/copy-rows-in-stream.lisp b/src/pg-copy/copy-rows-in-stream.lisp index bbe9eb2..7b1b813 100644 --- a/src/pg-copy/copy-rows-in-stream.lisp +++ b/src/pg-copy/copy-rows-in-stream.lisp @@ -1,7 +1,7 @@ ;;; ;;; The PostgreSQL COPY TO implementation, with batches and retries. ;;; -(in-package :pgloader.copy) +(in-package :pgloader.pgcopy) (defun stream-rows-to-copy (table columns copy nbcols queue &optional (db pomo:*database*)) diff --git a/src/sources/common/api.lisp b/src/sources/common/api.lisp index d23d0fe..091e3b5 100644 --- a/src/sources/common/api.lisp +++ b/src/sources/common/api.lisp @@ -67,67 +67,7 @@ (:documentation "Return the list of column names for the data sent in the queue.")) -(defgeneric copy-from (source &key) - (:documentation - "Load data from SOURCE into its target as defined by the SOURCE object.")) -;; That one is more an export than a load. It always export to a single very -;; well defined format, the importing utility is defined in -;; src/pgsql-copy-format.lisp - -(defgeneric copy-to (source filename) - (:documentation - "Load data from SOURCE and serialize it into FILENAME, using PostgreSQL - COPY TEXT format.")) - -;; The next generic function is only to get instanciated for sources -;; actually containing more than a single source item (tables, collections, -;; etc) - -(defgeneric copy-database (source - &key - worker-count - concurrency - max-parallel-create-index - truncate - data-only - schema-only - create-tables - include-drop - foreign-keys - create-indexes - reset-sequences - disable-triggers - materialize-views - set-table-oids - including - excluding) - (:documentation - "Auto-discover source schema, convert it to PostgreSQL, migrate the data - from the source definition to PostgreSQL for all the discovered - items (tables, collections, etc), then reset the PostgreSQL sequences - created by SERIAL columns in the first step. - - The target tables are automatically discovered, the only-tables - parameter allows to filter them out.")) - - -;;; -;;; Common API to introspec a data source, when that's possible. Typically -;;; when the source is a database system. -;;; - -;; (defgeneric list-all-columns (connection &key) -;; (:documentation "Discover all columns in CONNECTION source.")) - -;; (defgeneric list-all-indexes (connection &key) -;; (:documentation "Discover all indexes in CONNECTION source.")) - -;; (defgeneric list-all-fkeys (connection &key) -;; (:documentation "Discover all foreign keys in CONNECTION source.")) - -;; (defgeneric fetch-metadata (connection &key) -;; (:documentation "Full discovery of the CONNECTION data source.")) ;;; @@ -182,28 +122,8 @@ including excluding)) -(defgeneric prepare-pgsql-database (db-copy catalog - &key - truncate - create-tables - create-schemas - drop-indexes - set-table-oids - materialize-views - foreign-keys - include-drop) - (:documentation "Prepare the target PostgreSQL database.")) - (defgeneric cleanup (db-copy catalog &key materialize-views) (:documentation "Clean-up after prepare-pgsql-database failure.")) -(defgeneric complete-pgsql-database (db-copy catalog pkeys - &key - foreign-keys - create-indexes - create-triggers - reset-sequences) - (:documentation "Alter load duties for database sources copy support.")) - (defgeneric instanciate-table-copy-object (db-copy table) (:documentation "Create a new instance for copying TABLE data.")) diff --git a/src/sources/common/md-methods.lisp b/src/sources/common/md-methods.lisp index 37aaaeb..8694dd1 100644 --- a/src/sources/common/md-methods.lisp +++ b/src/sources/common/md-methods.lisp @@ -1,42 +1,13 @@ ;;; -;;; Generic API for pgloader sources -;;; Methods for source types with multiple files input +;;; Common methods implementations, default behaviour, for md objects ;;; -(in-package :pgloader.sources) +(in-package #:pgloader.sources) (defmethod parse-header ((copy md-copy) header) "Unsupported by default, to be implemented in each md-copy subclass." (error "Parsing the header of a ~s is not implemented yet." (type-of copy))) -(defmethod map-rows ((copy md-copy) &key process-row-fn) - "Load data from a text file in CSV format, with support for advanced - projecting capabilities. See `project-fields' for details. - - Each row is pre-processed then PROCESS-ROW-FN is called with the row as a - list as its only parameter. - - Finally returns how many rows where read and processed." - - (with-connection (cnx (source copy) - :direction :input - :external-format (encoding copy) - :if-does-not-exist nil) - (let ((input (md-strm cnx))) - ;; we handle skipping more than one line here, as cl-copy only knows - ;; about skipping the first line - (loop :repeat (skip-lines copy) :do (read-line input nil nil)) - - ;; we might now have to read the fields from the header line - (when (header copy) - (setf (fields copy) - (parse-header copy (read-line input nil nil))) - - (log-message :debug "Parsed header columns ~s" (fields copy))) - - ;; read in the text file, split it into columns - (process-rows copy input process-row-fn)))) - (defmethod preprocess-row ((copy md-copy)) "The file based readers possibly have extra work to do with user defined fields to columns projections (mapping)." @@ -70,124 +41,30 @@ :skip-lines (skip-lines copy) :header (header copy))) -(defmethod copy-database ((copy md-copy) - &key - (on-error-stop *on-error-stop*) - truncate - disable-triggers - drop-indexes +(defmethod map-rows ((copy md-copy) &key process-row-fn) + "Load data from a text file in CSV format, with support for advanced + projecting capabilities. See `project-fields' for details. - max-parallel-create-index + Each row is pre-processed then PROCESS-ROW-FN is called with the row as a + list as its only parameter. - ;; generic API, but ignored here - (worker-count 4) - (concurrency 1) + Finally returns how many rows where read and processed." - data-only - schema-only - create-tables - include-drop - foreign-keys - create-indexes - reset-sequences - materialize-views - set-table-oids - including - excluding) - "Copy the contents of the COPY formated file to PostgreSQL." - (declare (ignore data-only schema-only - create-tables include-drop foreign-keys - create-indexes reset-sequences materialize-views - set-table-oids including excluding)) + (with-connection (cnx (source copy) + :direction :input + :external-format (encoding copy) + :if-does-not-exist nil) + (let ((input (md-strm cnx))) + ;; we handle skipping more than one line here, as cl-copy only knows + ;; about skipping the first line + (loop :repeat (skip-lines copy) :do (read-line input nil nil)) - (let* ((*on-error-stop* on-error-stop) - (pgconn (target-db copy)) - pgsql-catalog) + ;; we might now have to read the fields from the header line + (when (header copy) + (setf (fields copy) + (parse-header copy (read-line input nil nil))) - (handler-case - (with-pgsql-connection (pgconn) - (setf pgsql-catalog - (fetch-pgsql-catalog (db-name pgconn) :table (target copy))) + (log-message :debug "Parsed header columns ~s" (fields copy))) - ;; if the user didn't tell us the column list of the table, now is - ;; a proper time to set it in the copy object - (unless (and (slot-boundp copy 'columns) - (slot-value copy 'columns)) - (setf (columns copy) - (mapcar (lambda (col) - ;; we need to handle the md-copy format for the - ;; column list, which allow for user given - ;; options: each column is a list which car is - ;; the column name. - (list (column-name col))) - (table-field-list (first (table-list pgsql-catalog)))))) - - (log-message :data "CATALOG: ~s" pgsql-catalog) - - ;; this sets (table-index-list (target copy)) - (maybe-drop-indexes pgsql-catalog :drop-indexes drop-indexes) - - ;; now is the proper time to truncate, before parallel operations - (when truncate - (truncate-tables pgsql-catalog))) - - (cl-postgres:database-error (e) - (log-message :fatal "Failed to prepare target PostgreSQL table.") - (log-message :fatal "~a" e) - (return-from copy-database))) - - ;; Keep the PostgreSQL table target around in the copy instance, - ;; with the following subtleties to deal with: - ;; 1. the catalog fetching did fill-in PostgreSQL columns as fields - ;; 2. we might target fewer pg columns than the table actually has - (let ((table (first (table-list pgsql-catalog)))) - (setf (table-column-list table) - (loop :for column-name :in (mapcar #'first (columns copy)) - :collect (find column-name (table-field-list table) - :key #'column-name - :test #'string=))) - (setf (target copy) table)) - - ;; expand the specs of our source, we might have to care about several - ;; files actually. - (let* ((lp:*kernel* (make-kernel worker-count)) - (channel (lp:make-channel)) - (path-list (expand-spec (source copy))) - (task-count 0)) - (with-stats-collection ("Files Processed" :section :post - :use-result-as-read t - :use-result-as-rows t) - (loop :for path-spec :in path-list - :count t - :do (let ((table-source (clone-copy-for copy path-spec))) - (incf task-count - (copy-from table-source - :concurrency concurrency - :kernel lp:*kernel* - :channel channel - :on-error-stop on-error-stop - :disable-triggers disable-triggers))))) - - ;; end kernel - (with-stats-collection ("COPY Threads Completion" :section :post - :use-result-as-read t - :use-result-as-rows t) - (loop :repeat task-count - :do (handler-case - (destructuring-bind (task table seconds) - (lp:receive-result channel) - (log-message :debug - "Finished processing ~a for ~s ~50T~6$s" - task (format-table-name table) seconds)) - (condition (e) - (log-message :fatal "~a" e))) - :finally (progn - (lp:end-kernel :wait nil) - (return task-count)))) - (lp:end-kernel :wait t)) - - ;; re-create the indexes from the target table entry - (create-indexes-again (target-db copy) - pgsql-catalog - :max-parallel-create-index max-parallel-create-index - :drop-indexes drop-indexes))) + ;; read in the text file, split it into columns + (process-rows copy input process-row-fn)))) diff --git a/src/sources/common/methods.lisp b/src/sources/common/methods.lisp index c92ea3e..15361c7 100644 --- a/src/sources/common/methods.lisp +++ b/src/sources/common/methods.lisp @@ -1,46 +1,8 @@ ;;; -;;; Generic API for pgloader sources +;;; Common methods implementations, default behaviour. ;;; -(in-package :pgloader.sources) -;;; -;;; Common API implementation -;;; -(defmethod queue-raw-data ((copy copy) rawq concurrency) - "Stream data as read by the map-queue method on the COPY argument into QUEUE, - as given." - (log-message :debug "Reader started for ~a" (format-table-name (target copy))) - (let* ((start-time (get-internal-real-time)) - (row-count 0) - (process-row - (if (or (eq :data *log-min-messages*) - (eq :data *client-min-messages*)) - ;; when debugging, use a lambda with debug traces - (lambda (row) - (log-message :data "< ~s" row) - (lq:push-queue row rawq) - (incf row-count)) - - ;; usual non-debug case - (lambda (row) - (lq:push-queue row rawq) - (incf row-count))))) - - ;; signal we are starting - (update-stats :data (target copy) :start start-time) - - ;; call the source-specific method for reading input data - (map-rows copy :process-row-fn process-row) - - ;; process last batches and send them to queues - ;; and mark end of stream - (loop :repeat concurrency :do (lq:push-queue :end-of-data rawq)) - - (let ((seconds (elapsed-time-since start-time))) - (log-message :debug "Reader for ~a is done in ~6$s" - (format-table-name (target copy)) seconds) - (update-stats :data (target copy) :read row-count :rs seconds) - (list :reader (target copy) seconds)))) +(in-package #:pgloader.sources) (defmethod data-is-preformatted-p ((copy copy)) "By default, data is not preformatted." @@ -54,107 +16,22 @@ "Default column list is an empty list." nil) -(defmethod copy-to ((copy copy) pgsql-copy-filename) - "Extract data from COPY file into a PotgreSQL COPY TEXT formated file" - (with-open-file (text-file pgsql-copy-filename - :direction :output - :if-exists :supersede - :external-format :utf-8) - (let ((row-fn (lambda (row) - (format-vector-row text-file row (transforms copy))))) - (map-rows copy :process-row-fn row-fn)))) +(defmethod cleanup ((copy db-copy) (catalog catalog) &key materialize-views) + "In case anything wrong happens at `prepare-pgsql-database' step, this + function will be called to clean-up the mess left behind, if any." + (declare (ignorable materialize-views)) + t) -(defmethod copy-from ((copy copy) - &key - (kernel nil k-s-p) - (channel nil c-s-p) - (worker-count 8) - (concurrency 2) - (multiple-readers nil) - (on-error-stop *on-error-stop*) - disable-triggers) - "Copy data from COPY source into PostgreSQL." - (let* ((table-name (format-table-name (target copy))) - (lp:*kernel* (or kernel (make-kernel worker-count))) - (channel (or channel (lp:make-channel))) - (readers nil) - (task-count 0)) - - (flet ((submit-task (channel function &rest args) - (apply #'lp:submit-task channel function args) - (incf task-count))) - - (lp:task-handler-bind - ((pgloader.copy::copy-init-error - #'(lambda (condition) - ;; everything has been handled already - (lp:invoke-transfer-error condition))) - (on-error-stop - #'(lambda (condition) - ;; everything has been handled already - (lp:invoke-transfer-error condition))) - #+pgloader-image - (error - #'(lambda (condition) - (log-message :error "A thread failed with error: ~a" condition) - (log-message :error "~a" - (trivial-backtrace:print-backtrace condition - :output nil)) - (lp::invoke-transfer-error condition)))) - (log-message :notice "COPY ~a" table-name) - - ;; Check for Read Concurrency Support from our source - (when (and multiple-readers (< 1 concurrency)) - (let ((label "Check Concurrency Support")) - (with-stats-collection (label :section :pre) - (setf readers (concurrency-support copy concurrency)) - (update-stats :pre label :read 1 :rows (if readers 1 0)) - (when readers - (log-message :notice "Multiple Readers Enabled for ~a" - (format-table-name (target copy))))))) - - ;; when reader is non-nil, we have reader concurrency support! - (if readers - ;; here we have detected Concurrency Support: we create as many - ;; readers as writers and create associated couples, each couple - ;; shares its own queue - (let ((rawqs - (loop :repeat concurrency :collect - (lq:make-queue :fixed-capacity *prefetch-rows*)))) - (log-message :info "Read Concurrency Enabled for ~s" - (format-table-name (target copy))) - - (loop :for rawq :in rawqs :for reader :in readers :do - ;; each reader pretends to be alone, pass 1 as concurrency - (submit-task channel #'queue-raw-data reader rawq 1) - - (submit-task channel #'pgloader.copy::copy-rows-from-queue - copy rawq - :on-error-stop on-error-stop - :disable-triggers disable-triggers))) - - ;; no Read Concurrency Support detected, start a single reader - ;; task, using a single data queue that is read by multiple - ;; writers. - (let ((rawq - (lq:make-queue :fixed-capacity *prefetch-rows*))) - (submit-task channel #'queue-raw-data copy rawq concurrency) - - ;; start a task to transform the raw data in the copy format - ;; and send that data down to PostgreSQL - (loop :repeat concurrency :do - (submit-task channel #'pgloader.copy::copy-rows-from-queue - copy rawq - :on-error-stop on-error-stop - :disable-triggers disable-triggers)))) - - ;; now wait until both the tasks are over, and kill the kernel - (unless c-s-p - (log-message :debug "waiting for ~d tasks" task-count) - (loop :repeat task-count :do (lp:receive-result channel)) - (log-message :notice "COPY ~s done." table-name) - (unless k-s-p (lp:end-kernel :wait t))) - - ;; return task-count, which is how many tasks we submitted to our - ;; lparallel kernel. - task-count)))) +(defmethod instanciate-table-copy-object ((copy db-copy) (table table)) + "Create an new instance for copying TABLE data." + (let* ((fields (table-field-list table)) + (columns (table-column-list table)) + (transforms (mapcar #'column-transform columns))) + (make-instance (class-of copy) + :source-db (clone-connection (source-db copy)) + :target-db (clone-connection (target-db copy)) + :source table + :target table + :fields fields + :columns columns + :transforms transforms))) diff --git a/src/sources/copy.lisp b/src/sources/copy.lisp index 20ba472..b24e4be 100644 --- a/src/sources/copy.lisp +++ b/src/sources/copy.lisp @@ -1,7 +1,7 @@ ;;; ;;; Read a file format in PostgreSQL COPY TEXT format. ;;; -(in-package :pgloader.copy) +(in-package :pgloader.source.copy) (defclass copy-connection (md-connection) ()) diff --git a/src/sources/csv/csv-database.lisp b/src/sources/csv/csv-database.lisp index 3faef8f..d8523d3 100644 --- a/src/sources/csv/csv-database.lisp +++ b/src/sources/csv/csv-database.lisp @@ -2,7 +2,7 @@ ;;; Experimental code, used to be used a long time ago, before this lisp ;;; code became pgloader. The idea is to use it again sometimes, someway. ;;; -(in-package #:pgloader.csv) +(in-package #:pgloader.source.csv) ;;; ;;; When you exported a whole database as a bunch of CSV files to be found diff --git a/src/sources/csv/csv-guess.lisp b/src/sources/csv/csv-guess.lisp index 2960cab..2b40bd1 100644 --- a/src/sources/csv/csv-guess.lisp +++ b/src/sources/csv/csv-guess.lisp @@ -1,7 +1,7 @@ ;;; ;;; Automatic guess the CSV format parameters ;;; -(in-package #:pgloader.csv) +(in-package #:pgloader.source.csv) (defparameter *separators* '(#\Tab #\, #\; #\| #\% #\^ #\! #\$) "Common CSV separators to try when guessing file parameters.") diff --git a/src/sources/csv/csv.lisp b/src/sources/csv/csv.lisp index 1cbabf0..f58007c 100644 --- a/src/sources/csv/csv.lisp +++ b/src/sources/csv/csv.lisp @@ -2,7 +2,7 @@ ;;; Tools to handle MySQL data fetching ;;; -(in-package :pgloader.csv) +(in-package :pgloader.source.csv) (defclass csv-connection (md-connection) ()) diff --git a/src/sources/db3/db3-schema.lisp b/src/sources/db3/db3-schema.lisp index 06cbbc9..1be657b 100644 --- a/src/sources/db3/db3-schema.lisp +++ b/src/sources/db3/db3-schema.lisp @@ -2,7 +2,7 @@ ;;; Tools to handle the DBF file format ;;; -(in-package :pgloader.db3) +(in-package :pgloader.source.db3) (defclass dbf-connection (fd-connection) ((db3 :initarg db3 :accessor fd-db3)) diff --git a/src/sources/db3/db3.lisp b/src/sources/db3/db3.lisp index 69b9741..03eb244 100644 --- a/src/sources/db3/db3.lisp +++ b/src/sources/db3/db3.lisp @@ -2,7 +2,7 @@ ;;; Tools to handle the DBF file format ;;; -(in-package :pgloader.db3) +(in-package :pgloader.source.db3) ;;; ;;; Integration with pgloader diff --git a/src/sources/fixed.lisp b/src/sources/fixed.lisp index 92027fe..8d36a8f 100644 --- a/src/sources/fixed.lisp +++ b/src/sources/fixed.lisp @@ -2,7 +2,7 @@ ;;; Tools to handle fixed width files ;;; -(in-package :pgloader.fixed) +(in-package :pgloader.source.fixed) (defclass fixed-connection (md-connection) ()) diff --git a/src/sources/ixf/ixf-schema.lisp b/src/sources/ixf/ixf-schema.lisp index 9c537a6..775c9db 100644 --- a/src/sources/ixf/ixf-schema.lisp +++ b/src/sources/ixf/ixf-schema.lisp @@ -3,7 +3,7 @@ ;;; ;;; http://www-01.ibm.com/support/knowledgecenter/SSEPGG_10.5.0/com.ibm.db2.luw.admin.dm.doc/doc/r0004667.html -(in-package :pgloader.ixf) +(in-package :pgloader.source.ixf) (defclass ixf-connection (fd-connection) () (:documentation "pgloader connection parameters for IXF files.")) diff --git a/src/sources/ixf/ixf.lisp b/src/sources/ixf/ixf.lisp index 275ac6d..676eed7 100644 --- a/src/sources/ixf/ixf.lisp +++ b/src/sources/ixf/ixf.lisp @@ -3,7 +3,7 @@ ;;; ;;; http://www-01.ibm.com/support/knowledgecenter/SSEPGG_10.5.0/com.ibm.db2.luw.admin.dm.doc/doc/r0004667.html -(in-package :pgloader.ixf) +(in-package :pgloader.source.ixf) ;;; ;;; Integration with pgloader diff --git a/src/sources/mssql/mssql-cast-rules.lisp b/src/sources/mssql/mssql-cast-rules.lisp index d25d029..471740b 100644 --- a/src/sources/mssql/mssql-cast-rules.lisp +++ b/src/sources/mssql/mssql-cast-rules.lisp @@ -2,7 +2,7 @@ ;;; Tools to handle MS SQL data type casting rules ;;; -(in-package :pgloader.mssql) +(in-package :pgloader.source.mssql) (defparameter *mssql-default-cast-rules* `((:source (:type "char") :target (:type "text" :drop-typemod t)) diff --git a/src/sources/mssql/mssql-index-filters.lisp b/src/sources/mssql/mssql-index-filters.lisp index 75a9fcd..d05886b 100644 --- a/src/sources/mssql/mssql-index-filters.lisp +++ b/src/sources/mssql/mssql-index-filters.lisp @@ -4,7 +4,7 @@ ;;; clauses. ;;; -(in-package #:pgloader.mssql.index-filter) +(in-package #:pgloader.source.mssql.index-filter) (defmethod translate-index-filter ((table table) (index index) diff --git a/src/sources/mssql/mssql-schema.lisp b/src/sources/mssql/mssql-schema.lisp index fc39734..97d3d36 100644 --- a/src/sources/mssql/mssql-schema.lisp +++ b/src/sources/mssql/mssql-schema.lisp @@ -2,7 +2,7 @@ ;;; Tools to query the MS SQL Schema to reproduce in PostgreSQL ;;; -(in-package :pgloader.mssql) +(in-package :pgloader.source.mssql) (defvar *mssql-db* nil "The MS SQL database connection handler.") diff --git a/src/sources/mssql/mssql.lisp b/src/sources/mssql/mssql.lisp index 02e67ce..cd71141 100644 --- a/src/sources/mssql/mssql.lisp +++ b/src/sources/mssql/mssql.lisp @@ -2,7 +2,7 @@ ;;; Tools to handle the MS SQL Database ;;; -(in-package :pgloader.mssql) +(in-package :pgloader.source.mssql) (defclass copy-mssql (db-copy) ((encoding :accessor encoding ; allows forcing encoding diff --git a/src/sources/mysql/mysql-cast-rules.lisp b/src/sources/mysql/mysql-cast-rules.lisp index 242ba4a..f6ddcf8 100644 --- a/src/sources/mysql/mysql-cast-rules.lisp +++ b/src/sources/mysql/mysql-cast-rules.lisp @@ -2,7 +2,7 @@ ;;; Tools to handle MySQL data type casting rules ;;; -(in-package :pgloader.mysql) +(in-package :pgloader.source.mysql) (defun enum-or-set-name (table-name column-name type ctype typemod) (declare (ignore type ctype typemod)) diff --git a/src/sources/mysql/mysql-connection.lisp b/src/sources/mysql/mysql-connection.lisp index 6db6704..46a0d3f 100644 --- a/src/sources/mysql/mysql-connection.lisp +++ b/src/sources/mysql/mysql-connection.lisp @@ -2,7 +2,7 @@ ;;; Tools to handle MySQL connection and querying ;;; -(in-package :pgloader.mysql) +(in-package :pgloader.source.mysql) (defvar *connection* nil "Current MySQL connection") diff --git a/src/sources/mysql/mysql-csv.lisp b/src/sources/mysql/mysql-csv.lisp index 7bec085..f20daf9 100644 --- a/src/sources/mysql/mysql-csv.lisp +++ b/src/sources/mysql/mysql-csv.lisp @@ -2,7 +2,7 @@ ;;; Tools to handle MySQL data fetching: old code, untested, kept for reference. ;;; -(in-package :pgloader.mysql) +(in-package :pgloader.source.mysql) ;;; ;;; MySQL bulk export to file, in PostgreSQL COPY TEXT format diff --git a/src/sources/mysql/mysql-schema.lisp b/src/sources/mysql/mysql-schema.lisp index 4d9de20..00f4162 100644 --- a/src/sources/mysql/mysql-schema.lisp +++ b/src/sources/mysql/mysql-schema.lisp @@ -2,7 +2,7 @@ ;;; Tools to query the MySQL Schema to reproduce in PostgreSQL ;;; -(in-package :pgloader.mysql) +(in-package :pgloader.source.mysql) ;;; ;;; Those functions are to be called from withing an already established diff --git a/src/sources/mysql/mysql.lisp b/src/sources/mysql/mysql.lisp index 78c5e06..1710c8c 100644 --- a/src/sources/mysql/mysql.lisp +++ b/src/sources/mysql/mysql.lisp @@ -2,7 +2,7 @@ ;;; Tools to handle MySQL data fetching ;;; -(in-package :pgloader.mysql) +(in-package :pgloader.source.mysql) (defclass copy-mysql (db-copy) ((encoding :accessor encoding ; allows forcing encoding diff --git a/src/sources/sqlite/sqlite-cast-rules.lisp b/src/sources/sqlite/sqlite-cast-rules.lisp index 37a7145..3b2cf50 100644 --- a/src/sources/sqlite/sqlite-cast-rules.lisp +++ b/src/sources/sqlite/sqlite-cast-rules.lisp @@ -2,7 +2,7 @@ ;;; Tools to handle the SQLite Database ;;; -(in-package :pgloader.sqlite) +(in-package :pgloader.source.sqlite) (defvar *sqlite-db* nil "The SQLite database connection handler.") diff --git a/src/sources/sqlite/sqlite-schema.lisp b/src/sources/sqlite/sqlite-schema.lisp index 16ee9db..7174fea 100644 --- a/src/sources/sqlite/sqlite-schema.lisp +++ b/src/sources/sqlite/sqlite-schema.lisp @@ -1,7 +1,7 @@ ;;; ;;; SQLite tools connecting to a database ;;; -(in-package :pgloader.sqlite) +(in-package :pgloader.source.sqlite) (defvar *sqlite-db* nil "The SQLite database connection handler.") diff --git a/src/sources/sqlite/sqlite.lisp b/src/sources/sqlite/sqlite.lisp index f6803ab..f6f97de 100644 --- a/src/sources/sqlite/sqlite.lisp +++ b/src/sources/sqlite/sqlite.lisp @@ -2,7 +2,7 @@ ;;; Tools to handle the SQLite Database ;;; -(in-package :pgloader.sqlite) +(in-package :pgloader.source.sqlite) (defclass copy-sqlite (db-copy) ((db :accessor db :initarg :db))