From 5c10f12a073da3992ea4755341fbe8d330b6d2c7 Mon Sep 17 00:00:00 2001 From: Dimitri Fontaine Date: Sat, 24 Feb 2018 19:24:22 +0100 Subject: [PATCH] Fix duplicate package names. In a previous commit we re-used the package name pgloader.copy for the now separated implementation of the COPY protocol, but this package was already in use for the implementation of the COPY file format as a pgloader source. Oops. And CCL was happily doing its magic anyway, so that I've been blind to the problem. To fix, rename the new package pgloader.pgcopy, and to avoid having to deal with other problems of the same kind in the future, rename every source package pgloader.source., so that we now have pgloader.source.copy and pgloader.pgcopy, two visibily different packages to deal with. This light refactoring came with a challenge tho. The split in between the pgloader.sources API and the rest of the code involved some circular depencendies in the namespaces. CL is pretty flexible here because it can reload code definitions at runtime, but it was still a mess. To untangle it, implement a new namespace, the pgloader.load package, where we can use the pgloader.sources API and the pgloader.connection and pgloader.pgsql APIs too. A little problem gave birth to quite a massive patch. As it happens when refactoring and cleaning-up the dirt in any large enough project, right? See #748. --- pgloader.asd | 19 +- src/load/api.lisp | 71 ++++++++ src/load/copy-data.lisp | 149 +++++++++++++++ src/load/load-file.lisp | 128 +++++++++++++ .../migrate-database.lisp} | 20 +-- src/package.lisp | 156 ++++++++-------- src/parsers/command-copy.lisp | 24 +-- src/parsers/command-csv.lisp | 24 +-- src/parsers/command-dbf.lisp | 34 ++-- src/parsers/command-fixed.lisp | 24 +-- src/parsers/command-ixf.lisp | 36 ++-- src/parsers/command-mssql.lisp | 20 +-- src/parsers/command-mysql.lisp | 22 +-- src/parsers/command-sqlite.lisp | 20 +-- src/pg-copy/copy-batch.lisp | 2 +- src/pg-copy/copy-db-write.lisp | 2 +- src/pg-copy/copy-format.lisp | 2 +- src/pg-copy/copy-from-queue.lisp | 2 +- src/pg-copy/copy-retry-batch.lisp | 2 +- src/pg-copy/copy-rows-in-batch.lisp | 2 +- src/pg-copy/copy-rows-in-stream.lisp | 2 +- src/sources/common/api.lisp | 80 --------- src/sources/common/md-methods.lisp | 169 +++--------------- src/sources/common/methods.lisp | 163 +++-------------- src/sources/copy.lisp | 2 +- src/sources/csv/csv-database.lisp | 2 +- src/sources/csv/csv-guess.lisp | 2 +- src/sources/csv/csv.lisp | 2 +- src/sources/db3/db3-schema.lisp | 2 +- src/sources/db3/db3.lisp | 2 +- src/sources/fixed.lisp | 2 +- src/sources/ixf/ixf-schema.lisp | 2 +- src/sources/ixf/ixf.lisp | 2 +- src/sources/mssql/mssql-cast-rules.lisp | 2 +- src/sources/mssql/mssql-index-filters.lisp | 2 +- src/sources/mssql/mssql-schema.lisp | 2 +- src/sources/mssql/mssql.lisp | 2 +- src/sources/mysql/mysql-cast-rules.lisp | 2 +- src/sources/mysql/mysql-connection.lisp | 2 +- src/sources/mysql/mysql-csv.lisp | 2 +- src/sources/mysql/mysql-schema.lisp | 2 +- src/sources/mysql/mysql.lisp | 2 +- src/sources/sqlite/sqlite-cast-rules.lisp | 2 +- src/sources/sqlite/sqlite-schema.lisp | 2 +- src/sources/sqlite/sqlite.lisp | 2 +- 45 files changed, 621 insertions(+), 594 deletions(-) create mode 100644 src/load/api.lisp create mode 100644 src/load/copy-data.lisp create mode 100644 src/load/load-file.lisp rename src/{sources/common/db-methods.lisp => load/migrate-database.lisp} (95%) diff --git a/pgloader.asd b/pgloader.asd index b0ae530..9c0778b 100644 --- a/pgloader.asd +++ b/pgloader.asd @@ -110,11 +110,11 @@ "utils") :components ((:module "common" + :serial t :components ((:file "api") - (:file "methods" :depends-on ("api")) - (:file "md-methods" :depends-on ("api")) - (:file "db-methods" :depends-on ("api")) + (:file "methods") + (:file "md-methods") (:file "casting-rules") (:file "files-and-pathnames") (:file "project-fields"))) @@ -199,6 +199,19 @@ (:file "copy-retry-batch") (:file "copy-from-queue"))) + (:module "load" + :depends-on ("params" + "package" + "utils" + "pgsql" + "sources") + :serial t + :components + ((:file "api") + (:file "copy-data") + (:file "load-file") + (:file "migrate-database"))) + (:module "parsers" :depends-on ("params" "package" diff --git a/src/load/api.lisp b/src/load/api.lisp new file mode 100644 index 0000000..69769c8 --- /dev/null +++ b/src/load/api.lisp @@ -0,0 +1,71 @@ +;;; +;;; Generic API for pgloader data loading and database migrations. +;;; +(in-package :pgloader.load) + +(defgeneric copy-from (source &key) + (:documentation + "Load data from SOURCE into its target as defined by the SOURCE object.")) + +;; That one is more an export than a load. It always export to a single very +;; well defined format, the importing utility is defined in +;; src/pgsql-copy-format.lisp + +(defgeneric copy-to (source filename) + (:documentation + "Load data from SOURCE and serialize it into FILENAME, using PostgreSQL + COPY TEXT format.")) + +;; The next generic function is only to get instanciated for sources +;; actually containing more than a single source item (tables, collections, +;; etc) + +(defgeneric copy-database (source + &key + worker-count + concurrency + max-parallel-create-index + truncate + data-only + schema-only + create-tables + include-drop + foreign-keys + create-indexes + reset-sequences + disable-triggers + materialize-views + set-table-oids + including + excluding) + (:documentation + "Auto-discover source schema, convert it to PostgreSQL, migrate the data + from the source definition to PostgreSQL for all the discovered + items (tables, collections, etc), then reset the PostgreSQL sequences + created by SERIAL columns in the first step. + + The target tables are automatically discovered, the only-tables + parameter allows to filter them out.")) + + + +(defgeneric prepare-pgsql-database (db-copy catalog + &key + truncate + create-tables + create-schemas + drop-indexes + set-table-oids + materialize-views + foreign-keys + include-drop) + (:documentation "Prepare the target PostgreSQL database.")) + +(defgeneric complete-pgsql-database (db-copy catalog pkeys + &key + foreign-keys + create-indexes + create-triggers + reset-sequences) + (:documentation "Alter load duties for database sources copy support.")) + diff --git a/src/load/copy-data.lisp b/src/load/copy-data.lisp new file mode 100644 index 0000000..67a68d6 --- /dev/null +++ b/src/load/copy-data.lisp @@ -0,0 +1,149 @@ +;;; +;;; Generic API for pgloader sources +;;; +(in-package :pgloader.load) + +;;; +;;; Common API implementation +;;; +(defmethod queue-raw-data ((copy copy) rawq concurrency) + "Stream data as read by the map-queue method on the COPY argument into QUEUE, + as given." + (log-message :debug "Reader started for ~a" (format-table-name (target copy))) + (let* ((start-time (get-internal-real-time)) + (row-count 0) + (process-row + (if (or (eq :data *log-min-messages*) + (eq :data *client-min-messages*)) + ;; when debugging, use a lambda with debug traces + (lambda (row) + (log-message :data "< ~s" row) + (lq:push-queue row rawq) + (incf row-count)) + + ;; usual non-debug case + (lambda (row) + (lq:push-queue row rawq) + (incf row-count))))) + + ;; signal we are starting + (update-stats :data (target copy) :start start-time) + + ;; call the source-specific method for reading input data + (map-rows copy :process-row-fn process-row) + + ;; process last batches and send them to queues + ;; and mark end of stream + (loop :repeat concurrency :do (lq:push-queue :end-of-data rawq)) + + (let ((seconds (elapsed-time-since start-time))) + (log-message :debug "Reader for ~a is done in ~6$s" + (format-table-name (target copy)) seconds) + (update-stats :data (target copy) :read row-count :rs seconds) + (list :reader (target copy) seconds)))) + + +(defmethod copy-to ((copy copy) pgsql-copy-filename) + "Extract data from COPY file into a PotgreSQL COPY TEXT formated file" + (with-open-file (text-file pgsql-copy-filename + :direction :output + :if-exists :supersede + :external-format :utf-8) + (let ((row-fn (lambda (row) + (format-vector-row text-file row (transforms copy))))) + (map-rows copy :process-row-fn row-fn)))) + +(defmethod copy-from ((copy copy) + &key + (kernel nil k-s-p) + (channel nil c-s-p) + (worker-count 8) + (concurrency 2) + (multiple-readers nil) + (on-error-stop *on-error-stop*) + disable-triggers) + "Copy data from COPY source into PostgreSQL." + (let* ((table-name (format-table-name (target copy))) + (lp:*kernel* (or kernel (make-kernel worker-count))) + (channel (or channel (lp:make-channel))) + (readers nil) + (task-count 0)) + + (flet ((submit-task (channel function &rest args) + (apply #'lp:submit-task channel function args) + (incf task-count))) + + (lp:task-handler-bind + ((copy-init-error + #'(lambda (condition) + ;; everything has been handled already + (lp:invoke-transfer-error condition))) + (on-error-stop + #'(lambda (condition) + ;; everything has been handled already + (lp:invoke-transfer-error condition))) + #+pgloader-image + (error + #'(lambda (condition) + (log-message :error "A thread failed with error: ~a" condition) + (log-message :error "~a" + (trivial-backtrace:print-backtrace condition + :output nil)) + (lp::invoke-transfer-error condition)))) + (log-message :notice "COPY ~a" table-name) + + ;; Check for Read Concurrency Support from our source + (when (and multiple-readers (< 1 concurrency)) + (let ((label "Check Concurrency Support")) + (with-stats-collection (label :section :pre) + (setf readers (concurrency-support copy concurrency)) + (update-stats :pre label :read 1 :rows (if readers 1 0)) + (when readers + (log-message :notice "Multiple Readers Enabled for ~a" + (format-table-name (target copy))))))) + + ;; when reader is non-nil, we have reader concurrency support! + (if readers + ;; here we have detected Concurrency Support: we create as many + ;; readers as writers and create associated couples, each couple + ;; shares its own queue + (let ((rawqs + (loop :repeat concurrency :collect + (lq:make-queue :fixed-capacity *prefetch-rows*)))) + (log-message :info "Read Concurrency Enabled for ~s" + (format-table-name (target copy))) + + (loop :for rawq :in rawqs :for reader :in readers :do + ;; each reader pretends to be alone, pass 1 as concurrency + (submit-task channel #'queue-raw-data reader rawq 1) + + (submit-task channel #'copy-rows-from-queue + copy rawq + :on-error-stop on-error-stop + :disable-triggers disable-triggers))) + + ;; no Read Concurrency Support detected, start a single reader + ;; task, using a single data queue that is read by multiple + ;; writers. + (let ((rawq + (lq:make-queue :fixed-capacity *prefetch-rows*))) + (submit-task channel #'queue-raw-data copy rawq concurrency) + + ;; start a task to transform the raw data in the copy format + ;; and send that data down to PostgreSQL + (loop :repeat concurrency :do + (submit-task channel #'copy-rows-from-queue + copy rawq + :on-error-stop on-error-stop + :disable-triggers disable-triggers)))) + + ;; now wait until both the tasks are over, and kill the kernel + (unless c-s-p + (log-message :debug "waiting for ~d tasks" task-count) + (loop :repeat task-count :do (lp:receive-result channel)) + (log-message :notice "COPY ~s done." table-name) + (unless k-s-p (lp:end-kernel :wait t))) + + ;; return task-count, which is how many tasks we submitted to our + ;; lparallel kernel. + task-count)))) diff --git a/src/load/load-file.lisp b/src/load/load-file.lisp new file mode 100644 index 0000000..19819e5 --- /dev/null +++ b/src/load/load-file.lisp @@ -0,0 +1,128 @@ +;;; +;;; Generic API for pgloader sources +;;; Methods for source types with multiple files input +;;; + +(in-package :pgloader.load) + +(defmethod copy-database ((copy md-copy) + &key + (on-error-stop *on-error-stop*) + truncate + disable-triggers + drop-indexes + + max-parallel-create-index + + ;; generic API, but ignored here + (worker-count 4) + (concurrency 1) + + data-only + schema-only + create-tables + include-drop + foreign-keys + create-indexes + reset-sequences + materialize-views + set-table-oids + including + excluding) + "Copy the contents of the COPY formated file to PostgreSQL." + (declare (ignore data-only schema-only + create-tables include-drop foreign-keys + create-indexes reset-sequences materialize-views + set-table-oids including excluding)) + + (let* ((*on-error-stop* on-error-stop) + (pgconn (target-db copy)) + pgsql-catalog) + + (handler-case + (with-pgsql-connection (pgconn) + (setf pgsql-catalog + (fetch-pgsql-catalog (db-name pgconn) :table (target copy))) + + ;; if the user didn't tell us the column list of the table, now is + ;; a proper time to set it in the copy object + (unless (and (slot-boundp copy 'columns) + (slot-value copy 'columns)) + (setf (columns copy) + (mapcar (lambda (col) + ;; we need to handle the md-copy format for the + ;; column list, which allow for user given + ;; options: each column is a list which car is + ;; the column name. + (list (column-name col))) + (table-field-list (first (table-list pgsql-catalog)))))) + + (log-message :data "CATALOG: ~s" pgsql-catalog) + + ;; this sets (table-index-list (target copy)) + (maybe-drop-indexes pgsql-catalog :drop-indexes drop-indexes) + + ;; now is the proper time to truncate, before parallel operations + (when truncate + (truncate-tables pgsql-catalog))) + + (cl-postgres:database-error (e) + (log-message :fatal "Failed to prepare target PostgreSQL table.") + (log-message :fatal "~a" e) + (return-from copy-database))) + + ;; Keep the PostgreSQL table target around in the copy instance, + ;; with the following subtleties to deal with: + ;; 1. the catalog fetching did fill-in PostgreSQL columns as fields + ;; 2. we might target fewer pg columns than the table actually has + (let ((table (first (table-list pgsql-catalog)))) + (setf (table-column-list table) + (loop :for column-name :in (mapcar #'first (columns copy)) + :collect (find column-name (table-field-list table) + :key #'column-name + :test #'string=))) + (setf (target copy) table)) + + ;; expand the specs of our source, we might have to care about several + ;; files actually. + (let* ((lp:*kernel* (make-kernel worker-count)) + (channel (lp:make-channel)) + (path-list (expand-spec (source copy))) + (task-count 0)) + (with-stats-collection ("Files Processed" :section :post + :use-result-as-read t + :use-result-as-rows t) + (loop :for path-spec :in path-list + :count t + :do (let ((table-source (clone-copy-for copy path-spec))) + (incf task-count + (copy-from table-source + :concurrency concurrency + :kernel lp:*kernel* + :channel channel + :on-error-stop on-error-stop + :disable-triggers disable-triggers))))) + + ;; end kernel + (with-stats-collection ("COPY Threads Completion" :section :post + :use-result-as-read t + :use-result-as-rows t) + (loop :repeat task-count + :do (handler-case + (destructuring-bind (task table seconds) + (lp:receive-result channel) + (log-message :debug + "Finished processing ~a for ~s ~50T~6$s" + task (format-table-name table) seconds)) + (condition (e) + (log-message :fatal "~a" e))) + :finally (progn + (lp:end-kernel :wait nil) + (return task-count)))) + (lp:end-kernel :wait t)) + + ;; re-create the indexes from the target table entry + (create-indexes-again (target-db copy) + pgsql-catalog + :max-parallel-create-index max-parallel-create-index + :drop-indexes drop-indexes))) diff --git a/src/sources/common/db-methods.lisp b/src/load/migrate-database.lisp similarity index 95% rename from src/sources/common/db-methods.lisp rename to src/load/migrate-database.lisp index f706ed7..d4ea090 100644 --- a/src/sources/common/db-methods.lisp +++ b/src/load/migrate-database.lisp @@ -3,7 +3,7 @@ ;;; Methods for database source types (with introspection) ;;; -(in-package :pgloader.sources) +(in-package :pgloader.load) ;;; ;;; Prepare the PostgreSQL database before streaming the data into it. @@ -113,11 +113,6 @@ ;; log the catalog we just fetched and (maybe) merged (log-message :data "CATALOG: ~s" catalog)) -(defmethod cleanup ((copy db-copy) (catalog catalog) &key materialize-views) - "In case anything wrong happens at `prepare-pgsql-database' step, this - function will be called to clean-up the mess left behind, if any." - (declare (ignorable materialize-views)) - t) (defmethod complete-pgsql-database ((copy db-copy) (catalog catalog) @@ -209,19 +204,6 @@ :create-triggers create-triggers :reset-sequences reset-sequences)))) -(defmethod instanciate-table-copy-object ((copy db-copy) (table table)) - "Create an new instance for copying TABLE data." - (let* ((fields (table-field-list table)) - (columns (table-column-list table)) - (transforms (mapcar #'column-transform columns))) - (make-instance (class-of copy) - :source-db (clone-connection (source-db copy)) - :target-db (clone-connection (target-db copy)) - :source table - :target table - :fields fields - :columns columns - :transforms transforms))) (defun process-catalog (copy catalog &key alter-table alter-schema) "Do all the PostgreSQL catalog tweaking here: casts, index WHERE clause diff --git a/src/package.lisp b/src/package.lisp index aee98f5..a57400d 100644 --- a/src/package.lisp +++ b/src/package.lisp @@ -429,10 +429,12 @@ #:get-date-columns #:format-vector-row)) + +;;; +;;; pgloader Sources API and common helpers +;;; (defpackage #:pgloader.sources - (:use #:cl - #:pgloader.params #:pgloader.utils #:pgloader.connection - #:pgloader.pgsql) + (:use #:cl #:pgloader.params #:pgloader.utils #:pgloader.connection) (:import-from #:pgloader.transforms #:precision #:scale @@ -445,6 +447,12 @@ #:md-copy #:db-copy + ;; main data access api + #:map-rows + #:copy-column-list + #:data-is-preformatted-p + #:preprocess-row + ;; accessors #:source-db #:target-db @@ -460,16 +468,6 @@ #:skip-lines #:header - ;; main protocol/api - #:concurrency-support - #:map-rows - #:copy-column-list - #:queue-raw-data - #:data-is-preformatted-p - #:copy-from - #:copy-to - #:copy-database - ;; md-copy protocol/api #:parse-header #:process-rows @@ -481,20 +479,18 @@ #:expand-spec #:clone-copy-for - ;; the db-methods - #:fetch-metadata - #:prepare-pgsql-database - #:cleanup - #:instanciate-table-copy-object - #:complete-pgsql-database - #:end-kernels - ;; file based utils for csv, fixed etc #:with-open-file-or-stream #:get-pathname #:project-fields #:reformat-then-process + ;; the db-methods + #:fetch-metadata + #:cleanup + #:instanciate-table-copy-object + #:concurrency-support + ;; database cast machinery #:*default-cast-rules* #:*cast-rules* @@ -505,15 +501,10 @@ ;;; ;;; COPY protocol related facilities ;;; -(defpackage #:pgloader.copy - (:use #:cl #:pgloader.params #:pgloader.utils +(defpackage #:pgloader.pgcopy + (:use #:cl + #:pgloader.params #:pgloader.utils #:pgloader.connection #:pgloader.pgsql #:pgloader.sources) - (:import-from #:pgloader.pgsql - #:with-pgsql-connection - #:with-schema - #:with-disabled-triggers - #:postgresql-unavailable - #:postgresql-retryable) (:import-from #:cl-postgres #:database-error-context) (:import-from #:cl-postgres-trivial-utf-8 @@ -521,9 +512,32 @@ #:as-utf-8-bytes #:string-to-utf-8-bytes) (:export #:copy-rows-from-queue - #:format-vector-row)) + #:format-vector-row + #:copy-init-error)) + +;;; +;;; The pgloader.load package implements data transfert from a pgloader +;;; source to a PostgreSQL database, using the pgloader.pgcopy COPY +;;; implementation. +;;; +(defpackage #:pgloader.load + (:use #:cl #:pgloader.params #:pgloader.utils #:pgloader.connection + #:pgloader.pgsql #:pgloader.pgcopy #:pgloader.sources) + (:export + ;; main protocol/api + #:concurrency-support + #:queue-raw-data + #:copy-from + #:copy-to + #:copy-database + + ;; the db-methods + #:prepare-pgsql-database + #:instanciate-table-copy-object + #:complete-pgsql-database)) + ;;; ;;; other utilities @@ -548,7 +562,7 @@ ;; ;; specific source handling ;; -(defpackage #:pgloader.csv +(defpackage #:pgloader.source.csv (:use #:cl #:pgloader.params #:pgloader.utils #:pgloader.connection #:pgloader.sources) @@ -565,11 +579,11 @@ #:guess-csv-params #:guess-all-csv-params)) -(defpackage #:pgloader.fixed +(defpackage #:pgloader.source.fixed (:use #:cl #:pgloader.params #:pgloader.utils #:pgloader.connection #:pgloader.sources) - (:import-from #:pgloader.csv + (:import-from #:pgloader.source.csv #:csv-connection #:specs #:csv-specs) @@ -580,11 +594,11 @@ #:copy-fixed #:copy-from)) -(defpackage #:pgloader.copy +(defpackage #:pgloader.source.copy (:use #:cl #:pgloader.params #:pgloader.utils #:pgloader.connection #:pgloader.sources) - (:import-from #:pgloader.csv + (:import-from #:pgloader.source.csv #:csv-connection #:specs #:csv-specs) @@ -595,7 +609,7 @@ #:copy-copy #:copy-from)) -(defpackage #:pgloader.ixf +(defpackage #:pgloader.source.ixf (:use #:cl #:pgloader.params #:pgloader.utils #:pgloader.connection #:pgloader.sources) @@ -610,7 +624,7 @@ #:map-rows #:copy-from)) -(defpackage #:pgloader.db3 +(defpackage #:pgloader.source.db3 (:use #:cl #:pgloader.params #:pgloader.utils #:pgloader.connection #:pgloader.sources) @@ -626,7 +640,7 @@ #:copy-to #:copy-from)) -(defpackage #:pgloader.mysql +(defpackage #:pgloader.source.mysql (:use #:cl #:pgloader.params #:pgloader.utils #:pgloader.connection #:pgloader.sources) @@ -650,16 +664,9 @@ #:copy-mysql #:*decoding-as* #:*mysql-default-cast-rules* - #:with-mysql-connection - #:map-rows - #:copy-to - #:copy-from - #:copy-database - #:list-databases - #:export-database - #:export-import-database)) + #:with-mysql-connection)) -(defpackage #:pgloader.sqlite +(defpackage #:pgloader.source.sqlite (:use #:cl #:pgloader.params #:pgloader.utils #:pgloader.connection #:pgloader.sources) @@ -675,13 +682,9 @@ #:comment-on-tables-and-columns) (:export #:sqlite-connection #:copy-sqlite - #:*sqlite-default-cast-rules* - #:map-rows - #:copy-to - #:copy-from - #:copy-database)) + #:*sqlite-default-cast-rules*)) -(defpackage #:pgloader.mssql +(defpackage #:pgloader.source.mssql (:use #:cl #:pgloader.params #:pgloader.utils #:pgloader.connection #:pgloader.sources) @@ -702,14 +705,10 @@ #:reset-sequences) (:export #:mssql-connection #:copy-mssql - #:*mssql-default-cast-rules* - #:map-rows - #:copy-to - #:copy-from - #:copy-database)) + #:*mssql-default-cast-rules*)) -(defpackage #:pgloader.mssql.index-filter - (:use #:cl #:esrap #:pgloader.utils #:pgloader.mssql) +(defpackage #:pgloader.source.mssql.index-filter + (:use #:cl #:esrap #:pgloader.utils #:pgloader.source.mssql) (:import-from #:pgloader.pgsql #:translate-index-filter)) @@ -731,9 +730,13 @@ #:pgloader.params #:pgloader.utils #:pgloader.sql #:pgloader.connection) (:shadow #:namestring #:number #:inline) (:import-from #:alexandria #:read-file-into-string) + (:import-from #:pgloader.load + #:copy-database) (:import-from #:pgloader.sources #:md-connection - #:md-spec) + #:md-spec + #:*default-cast-rules* + #:*cast-rules*) (:import-from #:pgloader.pgsql #:pgsql-connection #:with-pgsql-transaction @@ -741,29 +744,36 @@ #:pgconn-use-ssl #:pgconn-table-name #:make-table) - (:import-from #:pgloader.csv + (:import-from #:pgloader.source.csv + #:copy-csv #:csv-connection #:specs #:csv-specs) - (:import-from #:pgloader.fixed + (:import-from #:pgloader.source.fixed + #:copy-fixed #:fixed-connection) - (:import-from #:pgloader.copy + (:import-from #:pgloader.source.copy + #:copy-copy #:copy-connection) - (:import-from #:pgloader.sources - #:*default-cast-rules* - #:*cast-rules*) - (:import-from #:pgloader.mysql + (:import-from #:pgloader.source.mysql + #:copy-mysql #:mysql-connection #:*decoding-as* #:*mysql-default-cast-rules*) - (:import-from #:pgloader.mssql + (:import-from #:pgloader.source.mssql + #:copy-mssql #:mssql-connection #:*mssql-default-cast-rules*) - (:import-from #:pgloader.sqlite + (:import-from #:pgloader.source.sqlite + #:copy-sqlite #:sqlite-connection #:*sqlite-default-cast-rules*) - (:import-from #:pgloader.db3 #:dbf-connection) - (:import-from #:pgloader.ixf #:ixf-connection) + (:import-from #:pgloader.source.db3 + #:copy-db3 + #:dbf-connection) + (:import-from #:pgloader.source.ixf + #:copy-ixf + #:ixf-connection) (:export #:parse-commands #:parse-commands-from-file #:initialize-context @@ -812,7 +822,7 @@ (defpackage #:pgloader (:use #:cl #:pgloader.params #:pgloader.utils #:pgloader.parser - #:pgloader.connection #:pgloader.copy #:metabang.bind) + #:pgloader.connection #:pgloader.pgcopy #:metabang.bind) (:import-from #:pgloader.pgsql #:pgconn-table-name #:pgsql-connection) diff --git a/src/parsers/command-copy.lisp b/src/parsers/command-copy.lisp index 555d447..a58f00e 100644 --- a/src/parsers/command-copy.lisp +++ b/src/parsers/command-copy.lisp @@ -115,7 +115,7 @@ `(lambda () (let* (,@(pgsql-connection-bindings pg-db-conn gucs) ,@(batch-control-bindings options) - ,@(identifier-case-binding options) + ,@(identifier-case-binding options) (source-db (with-stats-collection ("fetch" :section :pre) (expand (fetch-file ,copy-conn))))) @@ -128,7 +128,7 @@ (drop-indexes (getf ',options :drop-indexes)) (max-parallel-create-index (getf ',options :max-parallel-create-index)) (source - (make-instance 'pgloader.copy:copy-copy + (make-instance 'copy-copy :target-db ,pg-db-conn :source source-db :target (create-table ',target-table-name) @@ -142,16 +142,16 @@ :drop-indexes :disable-triggers :max-parallel-create-index))))) - (pgloader.sources:copy-database source - ,@ (when worker-count - (list :worker-count worker-count)) - ,@ (when concurrency - (list :concurrency concurrency)) - :on-error-stop on-error-stop - :truncate truncate - :drop-indexes drop-indexes - :disable-triggers disable-triggers - :max-parallel-create-index max-parallel-create-index)) + (copy-database source + ,@ (when worker-count + (list :worker-count worker-count)) + ,@ (when concurrency + (list :concurrency concurrency)) + :on-error-stop on-error-stop + :truncate truncate + :drop-indexes drop-indexes + :disable-triggers disable-triggers + :max-parallel-create-index max-parallel-create-index)) ,(sql-code-block pg-db-conn :post after "after load"))))) diff --git a/src/parsers/command-csv.lisp b/src/parsers/command-csv.lisp index 88819df..af91471 100644 --- a/src/parsers/command-csv.lisp +++ b/src/parsers/command-csv.lisp @@ -427,7 +427,7 @@ `(lambda () (let* (,@(pgsql-connection-bindings pg-db-conn gucs) ,@(batch-control-bindings options) - ,@(identifier-case-binding options) + ,@(identifier-case-binding options) (source-db (with-stats-collection ("fetch" :section :pre) (expand (fetch-file ,csv-conn))))) @@ -440,7 +440,7 @@ (drop-indexes (getf ',options :drop-indexes)) (max-parallel-create-index (getf ',options :max-parallel-create-index)) (source - (make-instance 'pgloader.csv:copy-csv + (make-instance 'copy-csv :target-db ,pg-db-conn :source source-db :target (create-table ',target-table-name) @@ -454,16 +454,16 @@ :drop-indexes :disable-triggers :max-parallel-create-index))))) - (pgloader.sources:copy-database source - ,@ (when worker-count - (list :worker-count worker-count)) - ,@ (when concurrency - (list :concurrency concurrency)) - :on-error-stop on-error-stop - :truncate truncate - :drop-indexes drop-indexes - :disable-triggers disable-triggers - :max-parallel-create-index max-parallel-create-index)) + (copy-database source + ,@ (when worker-count + (list :worker-count worker-count)) + ,@ (when concurrency + (list :concurrency concurrency)) + :on-error-stop on-error-stop + :truncate truncate + :drop-indexes drop-indexes + :disable-triggers disable-triggers + :max-parallel-create-index max-parallel-create-index)) ,(sql-code-block pg-db-conn :post after "after load"))))) diff --git a/src/parsers/command-dbf.lisp b/src/parsers/command-dbf.lisp index 0a6fc3c..84cf6b3 100644 --- a/src/parsers/command-dbf.lisp +++ b/src/parsers/command-dbf.lisp @@ -94,29 +94,29 @@ target-table-name (encoding :ascii) gucs before after options - &allow-other-keys) + &allow-other-keys) `(lambda () (let* (,@(pgsql-connection-bindings pg-db-conn gucs) ,@(batch-control-bindings options) - ,@(identifier-case-binding options) - (on-error-stop (getf ',options :on-error-stop)) - (source-db (with-stats-collection ("fetch" :section :pre) - (expand (fetch-file ,dbf-db-conn)))) - (source - (make-instance 'pgloader.db3:copy-db3 - :target-db ,pg-db-conn - :encoding ,encoding - :source-db source-db - :target (create-table ',target-table-name)))) + ,@(identifier-case-binding options) + (on-error-stop (getf ',options :on-error-stop)) + (source-db (with-stats-collection ("fetch" :section :pre) + (expand (fetch-file ,dbf-db-conn)))) + (source + (make-instance 'copy-db3 + :target-db ,pg-db-conn + :encoding ,encoding + :source-db source-db + :target (create-table ',target-table-name)))) ,(sql-code-block pg-db-conn :pre before "before load") - (pgloader.sources:copy-database source - ,@(remove-batch-control-option options) - :on-error-stop on-error-stop - :create-indexes nil - :foreign-keys nil - :reset-sequences nil) + (copy-database source + ,@(remove-batch-control-option options) + :on-error-stop on-error-stop + :create-indexes nil + :foreign-keys nil + :reset-sequences nil) ,(sql-code-block pg-db-conn :post after "after load")))) diff --git a/src/parsers/command-fixed.lisp b/src/parsers/command-fixed.lisp index 9d85b17..9fe2d78 100644 --- a/src/parsers/command-fixed.lisp +++ b/src/parsers/command-fixed.lisp @@ -124,7 +124,7 @@ `(lambda () (let* (,@(pgsql-connection-bindings pg-db-conn gucs) ,@(batch-control-bindings options) - ,@(identifier-case-binding options) + ,@(identifier-case-binding options) (source-db (with-stats-collection ("fetch" :section :pre) (expand (fetch-file ,fixed-conn))))) @@ -137,7 +137,7 @@ (drop-indexes ,(getf options :drop-indexes)) (max-parallel-create-index ,(getf options :max-parallel-create-index)) (source - (make-instance 'pgloader.fixed:copy-fixed + (make-instance 'copy-fixed :target-db ,pg-db-conn :source source-db :target (create-table ',target-table-name) @@ -146,16 +146,16 @@ :columns ',columns :skip-lines ,(or (getf options :skip-line) 0)))) - (pgloader.sources:copy-database source - ,@ (when worker-count - (list :worker-count worker-count)) - ,@ (when concurrency - (list :concurrency concurrency)) - :on-error-stop on-error-stop - :truncate truncate - :drop-indexes drop-indexes - :disable-triggers disable-triggers - :max-parallel-create-index max-parallel-create-index)) + (copy-database source + ,@ (when worker-count + (list :worker-count worker-count)) + ,@ (when concurrency + (list :concurrency concurrency)) + :on-error-stop on-error-stop + :truncate truncate + :drop-indexes drop-indexes + :disable-triggers disable-triggers + :max-parallel-create-index max-parallel-create-index)) ,(sql-code-block pg-db-conn :post after "after load"))))) diff --git a/src/parsers/command-ixf.lisp b/src/parsers/command-ixf.lisp index 24511ee..62ba4a5 100644 --- a/src/parsers/command-ixf.lisp +++ b/src/parsers/command-ixf.lisp @@ -78,31 +78,31 @@ &key target-table-name gucs before after options - &allow-other-keys) + &allow-other-keys) `(lambda () (let* (,@(pgsql-connection-bindings pg-db-conn gucs) ,@(batch-control-bindings options) - ,@(identifier-case-binding options) - (timezone (getf ',options :timezone)) - (on-error-stop(getf ',options :on-error-stop)) - (source-db (with-stats-collection ("fetch" :section :pre) + ,@(identifier-case-binding options) + (timezone (getf ',options :timezone)) + (on-error-stop(getf ',options :on-error-stop)) + (source-db (with-stats-collection ("fetch" :section :pre) (expand (fetch-file ,ixf-db-conn)))) - (source - (make-instance 'pgloader.ixf:copy-ixf - :target-db ,pg-db-conn - :source-db source-db - :target (create-table ',target-table-name) - :timezone timezone))) + (source + (make-instance 'copy-ixf + :target-db ,pg-db-conn + :source-db source-db + :target (create-table ',target-table-name) + :timezone timezone))) ,(sql-code-block pg-db-conn :pre before "before load") - (pgloader.sources:copy-database source - ,@(remove-batch-control-option - options - :extras '(:timezone)) - :on-error-stop on-error-stop - :foreign-keys nil - :reset-sequences nil) + (copy-database source + ,@(remove-batch-control-option + options + :extras '(:timezone)) + :on-error-stop on-error-stop + :foreign-keys nil + :reset-sequences nil) ,(sql-code-block pg-db-conn :post after "after load")))) diff --git a/src/parsers/command-mssql.lisp b/src/parsers/command-mssql.lisp index f53b92d..41fcb8a 100644 --- a/src/parsers/command-mssql.lisp +++ b/src/parsers/command-mssql.lisp @@ -142,7 +142,7 @@ casts before after options alter-schema alter-table including excluding - &allow-other-keys) + &allow-other-keys) `(lambda () ;; now is the time to load the CFFI lib we need (freetds) (let (#+sbcl(sb-ext:*muffled-warnings* 'style-warning)) @@ -156,20 +156,20 @@ ,@(batch-control-bindings options) ,@(identifier-case-binding options) (source - (make-instance 'pgloader.mssql::copy-mssql + (make-instance 'copy-mssql :target-db ,pg-db-conn :source-db ,ms-db-conn))) ,(sql-code-block pg-db-conn :pre before "before load") - (pgloader.mssql:copy-database source - :including ',including - :excluding ',excluding - :alter-schema ',alter-schema - :alter-table ',alter-table - :set-table-oids t - :on-error-stop on-error-stop - ,@(remove-batch-control-option options)) + (copy-database source + :including ',including + :excluding ',excluding + :alter-schema ',alter-schema + :alter-table ',alter-table + :set-table-oids t + :on-error-stop on-error-stop + ,@(remove-batch-control-option options)) ,(sql-code-block pg-db-conn :post after "after load")))) diff --git a/src/parsers/command-mysql.lisp b/src/parsers/command-mysql.lisp index 10274cb..9ccf9c4 100644 --- a/src/parsers/command-mysql.lisp +++ b/src/parsers/command-mysql.lisp @@ -149,7 +149,7 @@ ((:including incl)) ((:excluding excl)) ((:decoding decoding-as)) - &allow-other-keys) + &allow-other-keys) `(lambda () (let* ((*default-cast-rules* ',*mysql-default-cast-rules*) (*cast-rules* ',casts) @@ -160,21 +160,21 @@ ,@(batch-control-bindings options) ,@(identifier-case-binding options) (source - (make-instance 'pgloader.mysql::copy-mysql + (make-instance 'copy-mysql :target-db ,pg-db-conn :source-db ,my-db-conn))) ,(sql-code-block pg-db-conn :pre before "before load") - (pgloader.mysql:copy-database source - :including ',incl - :excluding ',excl - :materialize-views ',views - :alter-table ',alter-table - :alter-schema ',alter-schema - :set-table-oids t - :on-error-stop on-error-stop - ,@(remove-batch-control-option options)) + (copy-database source + :including ',incl + :excluding ',excl + :materialize-views ',views + :alter-table ',alter-table + :alter-schema ',alter-schema + :set-table-oids t + :on-error-stop on-error-stop + ,@(remove-batch-control-option options)) ,(sql-code-block pg-db-conn :post after "after load")))) diff --git a/src/parsers/command-sqlite.lisp b/src/parsers/command-sqlite.lisp index d238bff..a6ff248 100644 --- a/src/parsers/command-sqlite.lisp +++ b/src/parsers/command-sqlite.lisp @@ -105,22 +105,22 @@ load database ,@(batch-control-bindings options) ,@(identifier-case-binding options) (source-db (with-stats-collection ("fetch" :section :pre) - (expand (fetch-file ,sqlite-db-conn)))) + (expand (fetch-file ,sqlite-db-conn)))) (source - (make-instance 'pgloader.sqlite::copy-sqlite + (make-instance 'copy-sqlite :target-db ,pg-db-conn :source-db source-db))) ,(sql-code-block pg-db-conn :pre before "before load") - (pgloader.sqlite:copy-database source - :alter-table ',alter-table - :alter-schema ',alter-schema - :set-table-oids t - :including ',incl - :excluding ',excl - :on-error-stop on-error-stop - ,@(remove-batch-control-option options)) + (copy-database source + :alter-table ',alter-table + :alter-schema ',alter-schema + :set-table-oids t + :including ',incl + :excluding ',excl + :on-error-stop on-error-stop + ,@(remove-batch-control-option options)) ,(sql-code-block pg-db-conn :post after "after load")))) diff --git a/src/pg-copy/copy-batch.lisp b/src/pg-copy/copy-batch.lisp index 68a30d7..c20f8ad 100644 --- a/src/pg-copy/copy-batch.lisp +++ b/src/pg-copy/copy-batch.lisp @@ -1,7 +1,7 @@ ;;; ;;; Tools to handle internal queueing, using lparallel.queue ;;; -(in-package :pgloader.copy) +(in-package :pgloader.pgcopy) ;;; ;;; The pgloader architectures uses a reader thread and a writer thread. The diff --git a/src/pg-copy/copy-db-write.lisp b/src/pg-copy/copy-db-write.lisp index d526de1..82e9b44 100644 --- a/src/pg-copy/copy-db-write.lisp +++ b/src/pg-copy/copy-db-write.lisp @@ -3,7 +3,7 @@ ;;; ;;; Here, sending the data in the COPY stream opened in copy-batch. ;;; -(in-package :pgloader.copy) +(in-package :pgloader.pgcopy) (define-condition copy-init-error (error) ((table :initarg :table :reader copy-init-error-table) diff --git a/src/pg-copy/copy-format.lisp b/src/pg-copy/copy-format.lisp index 8441ce9..5903c11 100644 --- a/src/pg-copy/copy-format.lisp +++ b/src/pg-copy/copy-format.lisp @@ -1,7 +1,7 @@ ;;; ;;; Tools to handle PostgreSQL data format ;;; -(in-package :pgloader.copy) +(in-package :pgloader.pgcopy) ;;; ;;; Format row to PostgreSQL COPY format, the TEXT variant. diff --git a/src/pg-copy/copy-from-queue.lisp b/src/pg-copy/copy-from-queue.lisp index f586c13..4a537f8 100644 --- a/src/pg-copy/copy-from-queue.lisp +++ b/src/pg-copy/copy-from-queue.lisp @@ -1,7 +1,7 @@ ;;; ;;; The PostgreSQL COPY TO implementation, with batches and retries. ;;; -(in-package :pgloader.copy) +(in-package :pgloader.pgcopy) ;;; ;;; We receive raw input rows from an lparallel queue, push their content diff --git a/src/pg-copy/copy-retry-batch.lisp b/src/pg-copy/copy-retry-batch.lisp index 9420c1f..693d897 100644 --- a/src/pg-copy/copy-retry-batch.lisp +++ b/src/pg-copy/copy-retry-batch.lisp @@ -1,7 +1,7 @@ ;;; ;;; The PostgreSQL COPY TO implementation, with batches and retries. ;;; -(in-package #:pgloader.copy) +(in-package #:pgloader.pgcopy) ;;; ;;; Compute how many rows we're going to try loading next, depending on diff --git a/src/pg-copy/copy-rows-in-batch.lisp b/src/pg-copy/copy-rows-in-batch.lisp index 5542581..73a175e 100644 --- a/src/pg-copy/copy-rows-in-batch.lisp +++ b/src/pg-copy/copy-rows-in-batch.lisp @@ -1,7 +1,7 @@ ;;; ;;; The PostgreSQL COPY TO implementation, with batches and retries. ;;; -(in-package :pgloader.copy) +(in-package :pgloader.pgcopy) (defun batch-rows-to-copy (table columns copy nbcols queue) "Add rows that we pop from QUEUE into a batch, that we then COPY over to diff --git a/src/pg-copy/copy-rows-in-stream.lisp b/src/pg-copy/copy-rows-in-stream.lisp index bbe9eb2..7b1b813 100644 --- a/src/pg-copy/copy-rows-in-stream.lisp +++ b/src/pg-copy/copy-rows-in-stream.lisp @@ -1,7 +1,7 @@ ;;; ;;; The PostgreSQL COPY TO implementation, with batches and retries. ;;; -(in-package :pgloader.copy) +(in-package :pgloader.pgcopy) (defun stream-rows-to-copy (table columns copy nbcols queue &optional (db pomo:*database*)) diff --git a/src/sources/common/api.lisp b/src/sources/common/api.lisp index d23d0fe..091e3b5 100644 --- a/src/sources/common/api.lisp +++ b/src/sources/common/api.lisp @@ -67,67 +67,7 @@ (:documentation "Return the list of column names for the data sent in the queue.")) -(defgeneric copy-from (source &key) - (:documentation - "Load data from SOURCE into its target as defined by the SOURCE object.")) -;; That one is more an export than a load. It always export to a single very -;; well defined format, the importing utility is defined in -;; src/pgsql-copy-format.lisp - -(defgeneric copy-to (source filename) - (:documentation - "Load data from SOURCE and serialize it into FILENAME, using PostgreSQL - COPY TEXT format.")) - -;; The next generic function is only to get instanciated for sources -;; actually containing more than a single source item (tables, collections, -;; etc) - -(defgeneric copy-database (source - &key - worker-count - concurrency - max-parallel-create-index - truncate - data-only - schema-only - create-tables - include-drop - foreign-keys - create-indexes - reset-sequences - disable-triggers - materialize-views - set-table-oids - including - excluding) - (:documentation - "Auto-discover source schema, convert it to PostgreSQL, migrate the data - from the source definition to PostgreSQL for all the discovered - items (tables, collections, etc), then reset the PostgreSQL sequences - created by SERIAL columns in the first step. - - The target tables are automatically discovered, the only-tables - parameter allows to filter them out.")) - - -;;; -;;; Common API to introspec a data source, when that's possible. Typically -;;; when the source is a database system. -;;; - -;; (defgeneric list-all-columns (connection &key) -;; (:documentation "Discover all columns in CONNECTION source.")) - -;; (defgeneric list-all-indexes (connection &key) -;; (:documentation "Discover all indexes in CONNECTION source.")) - -;; (defgeneric list-all-fkeys (connection &key) -;; (:documentation "Discover all foreign keys in CONNECTION source.")) - -;; (defgeneric fetch-metadata (connection &key) -;; (:documentation "Full discovery of the CONNECTION data source.")) ;;; @@ -182,28 +122,8 @@ including excluding)) -(defgeneric prepare-pgsql-database (db-copy catalog - &key - truncate - create-tables - create-schemas - drop-indexes - set-table-oids - materialize-views - foreign-keys - include-drop) - (:documentation "Prepare the target PostgreSQL database.")) - (defgeneric cleanup (db-copy catalog &key materialize-views) (:documentation "Clean-up after prepare-pgsql-database failure.")) -(defgeneric complete-pgsql-database (db-copy catalog pkeys - &key - foreign-keys - create-indexes - create-triggers - reset-sequences) - (:documentation "Alter load duties for database sources copy support.")) - (defgeneric instanciate-table-copy-object (db-copy table) (:documentation "Create a new instance for copying TABLE data.")) diff --git a/src/sources/common/md-methods.lisp b/src/sources/common/md-methods.lisp index 37aaaeb..8694dd1 100644 --- a/src/sources/common/md-methods.lisp +++ b/src/sources/common/md-methods.lisp @@ -1,42 +1,13 @@ ;;; -;;; Generic API for pgloader sources -;;; Methods for source types with multiple files input +;;; Common methods implementations, default behaviour, for md objects ;;; -(in-package :pgloader.sources) +(in-package #:pgloader.sources) (defmethod parse-header ((copy md-copy) header) "Unsupported by default, to be implemented in each md-copy subclass." (error "Parsing the header of a ~s is not implemented yet." (type-of copy))) -(defmethod map-rows ((copy md-copy) &key process-row-fn) - "Load data from a text file in CSV format, with support for advanced - projecting capabilities. See `project-fields' for details. - - Each row is pre-processed then PROCESS-ROW-FN is called with the row as a - list as its only parameter. - - Finally returns how many rows where read and processed." - - (with-connection (cnx (source copy) - :direction :input - :external-format (encoding copy) - :if-does-not-exist nil) - (let ((input (md-strm cnx))) - ;; we handle skipping more than one line here, as cl-copy only knows - ;; about skipping the first line - (loop :repeat (skip-lines copy) :do (read-line input nil nil)) - - ;; we might now have to read the fields from the header line - (when (header copy) - (setf (fields copy) - (parse-header copy (read-line input nil nil))) - - (log-message :debug "Parsed header columns ~s" (fields copy))) - - ;; read in the text file, split it into columns - (process-rows copy input process-row-fn)))) - (defmethod preprocess-row ((copy md-copy)) "The file based readers possibly have extra work to do with user defined fields to columns projections (mapping)." @@ -70,124 +41,30 @@ :skip-lines (skip-lines copy) :header (header copy))) -(defmethod copy-database ((copy md-copy) - &key - (on-error-stop *on-error-stop*) - truncate - disable-triggers - drop-indexes +(defmethod map-rows ((copy md-copy) &key process-row-fn) + "Load data from a text file in CSV format, with support for advanced + projecting capabilities. See `project-fields' for details. - max-parallel-create-index + Each row is pre-processed then PROCESS-ROW-FN is called with the row as a + list as its only parameter. - ;; generic API, but ignored here - (worker-count 4) - (concurrency 1) + Finally returns how many rows where read and processed." - data-only - schema-only - create-tables - include-drop - foreign-keys - create-indexes - reset-sequences - materialize-views - set-table-oids - including - excluding) - "Copy the contents of the COPY formated file to PostgreSQL." - (declare (ignore data-only schema-only - create-tables include-drop foreign-keys - create-indexes reset-sequences materialize-views - set-table-oids including excluding)) + (with-connection (cnx (source copy) + :direction :input + :external-format (encoding copy) + :if-does-not-exist nil) + (let ((input (md-strm cnx))) + ;; we handle skipping more than one line here, as cl-copy only knows + ;; about skipping the first line + (loop :repeat (skip-lines copy) :do (read-line input nil nil)) - (let* ((*on-error-stop* on-error-stop) - (pgconn (target-db copy)) - pgsql-catalog) + ;; we might now have to read the fields from the header line + (when (header copy) + (setf (fields copy) + (parse-header copy (read-line input nil nil))) - (handler-case - (with-pgsql-connection (pgconn) - (setf pgsql-catalog - (fetch-pgsql-catalog (db-name pgconn) :table (target copy))) + (log-message :debug "Parsed header columns ~s" (fields copy))) - ;; if the user didn't tell us the column list of the table, now is - ;; a proper time to set it in the copy object - (unless (and (slot-boundp copy 'columns) - (slot-value copy 'columns)) - (setf (columns copy) - (mapcar (lambda (col) - ;; we need to handle the md-copy format for the - ;; column list, which allow for user given - ;; options: each column is a list which car is - ;; the column name. - (list (column-name col))) - (table-field-list (first (table-list pgsql-catalog)))))) - - (log-message :data "CATALOG: ~s" pgsql-catalog) - - ;; this sets (table-index-list (target copy)) - (maybe-drop-indexes pgsql-catalog :drop-indexes drop-indexes) - - ;; now is the proper time to truncate, before parallel operations - (when truncate - (truncate-tables pgsql-catalog))) - - (cl-postgres:database-error (e) - (log-message :fatal "Failed to prepare target PostgreSQL table.") - (log-message :fatal "~a" e) - (return-from copy-database))) - - ;; Keep the PostgreSQL table target around in the copy instance, - ;; with the following subtleties to deal with: - ;; 1. the catalog fetching did fill-in PostgreSQL columns as fields - ;; 2. we might target fewer pg columns than the table actually has - (let ((table (first (table-list pgsql-catalog)))) - (setf (table-column-list table) - (loop :for column-name :in (mapcar #'first (columns copy)) - :collect (find column-name (table-field-list table) - :key #'column-name - :test #'string=))) - (setf (target copy) table)) - - ;; expand the specs of our source, we might have to care about several - ;; files actually. - (let* ((lp:*kernel* (make-kernel worker-count)) - (channel (lp:make-channel)) - (path-list (expand-spec (source copy))) - (task-count 0)) - (with-stats-collection ("Files Processed" :section :post - :use-result-as-read t - :use-result-as-rows t) - (loop :for path-spec :in path-list - :count t - :do (let ((table-source (clone-copy-for copy path-spec))) - (incf task-count - (copy-from table-source - :concurrency concurrency - :kernel lp:*kernel* - :channel channel - :on-error-stop on-error-stop - :disable-triggers disable-triggers))))) - - ;; end kernel - (with-stats-collection ("COPY Threads Completion" :section :post - :use-result-as-read t - :use-result-as-rows t) - (loop :repeat task-count - :do (handler-case - (destructuring-bind (task table seconds) - (lp:receive-result channel) - (log-message :debug - "Finished processing ~a for ~s ~50T~6$s" - task (format-table-name table) seconds)) - (condition (e) - (log-message :fatal "~a" e))) - :finally (progn - (lp:end-kernel :wait nil) - (return task-count)))) - (lp:end-kernel :wait t)) - - ;; re-create the indexes from the target table entry - (create-indexes-again (target-db copy) - pgsql-catalog - :max-parallel-create-index max-parallel-create-index - :drop-indexes drop-indexes))) + ;; read in the text file, split it into columns + (process-rows copy input process-row-fn)))) diff --git a/src/sources/common/methods.lisp b/src/sources/common/methods.lisp index c92ea3e..15361c7 100644 --- a/src/sources/common/methods.lisp +++ b/src/sources/common/methods.lisp @@ -1,46 +1,8 @@ ;;; -;;; Generic API for pgloader sources +;;; Common methods implementations, default behaviour. ;;; -(in-package :pgloader.sources) -;;; -;;; Common API implementation -;;; -(defmethod queue-raw-data ((copy copy) rawq concurrency) - "Stream data as read by the map-queue method on the COPY argument into QUEUE, - as given." - (log-message :debug "Reader started for ~a" (format-table-name (target copy))) - (let* ((start-time (get-internal-real-time)) - (row-count 0) - (process-row - (if (or (eq :data *log-min-messages*) - (eq :data *client-min-messages*)) - ;; when debugging, use a lambda with debug traces - (lambda (row) - (log-message :data "< ~s" row) - (lq:push-queue row rawq) - (incf row-count)) - - ;; usual non-debug case - (lambda (row) - (lq:push-queue row rawq) - (incf row-count))))) - - ;; signal we are starting - (update-stats :data (target copy) :start start-time) - - ;; call the source-specific method for reading input data - (map-rows copy :process-row-fn process-row) - - ;; process last batches and send them to queues - ;; and mark end of stream - (loop :repeat concurrency :do (lq:push-queue :end-of-data rawq)) - - (let ((seconds (elapsed-time-since start-time))) - (log-message :debug "Reader for ~a is done in ~6$s" - (format-table-name (target copy)) seconds) - (update-stats :data (target copy) :read row-count :rs seconds) - (list :reader (target copy) seconds)))) +(in-package #:pgloader.sources) (defmethod data-is-preformatted-p ((copy copy)) "By default, data is not preformatted." @@ -54,107 +16,22 @@ "Default column list is an empty list." nil) -(defmethod copy-to ((copy copy) pgsql-copy-filename) - "Extract data from COPY file into a PotgreSQL COPY TEXT formated file" - (with-open-file (text-file pgsql-copy-filename - :direction :output - :if-exists :supersede - :external-format :utf-8) - (let ((row-fn (lambda (row) - (format-vector-row text-file row (transforms copy))))) - (map-rows copy :process-row-fn row-fn)))) +(defmethod cleanup ((copy db-copy) (catalog catalog) &key materialize-views) + "In case anything wrong happens at `prepare-pgsql-database' step, this + function will be called to clean-up the mess left behind, if any." + (declare (ignorable materialize-views)) + t) -(defmethod copy-from ((copy copy) - &key - (kernel nil k-s-p) - (channel nil c-s-p) - (worker-count 8) - (concurrency 2) - (multiple-readers nil) - (on-error-stop *on-error-stop*) - disable-triggers) - "Copy data from COPY source into PostgreSQL." - (let* ((table-name (format-table-name (target copy))) - (lp:*kernel* (or kernel (make-kernel worker-count))) - (channel (or channel (lp:make-channel))) - (readers nil) - (task-count 0)) - - (flet ((submit-task (channel function &rest args) - (apply #'lp:submit-task channel function args) - (incf task-count))) - - (lp:task-handler-bind - ((pgloader.copy::copy-init-error - #'(lambda (condition) - ;; everything has been handled already - (lp:invoke-transfer-error condition))) - (on-error-stop - #'(lambda (condition) - ;; everything has been handled already - (lp:invoke-transfer-error condition))) - #+pgloader-image - (error - #'(lambda (condition) - (log-message :error "A thread failed with error: ~a" condition) - (log-message :error "~a" - (trivial-backtrace:print-backtrace condition - :output nil)) - (lp::invoke-transfer-error condition)))) - (log-message :notice "COPY ~a" table-name) - - ;; Check for Read Concurrency Support from our source - (when (and multiple-readers (< 1 concurrency)) - (let ((label "Check Concurrency Support")) - (with-stats-collection (label :section :pre) - (setf readers (concurrency-support copy concurrency)) - (update-stats :pre label :read 1 :rows (if readers 1 0)) - (when readers - (log-message :notice "Multiple Readers Enabled for ~a" - (format-table-name (target copy))))))) - - ;; when reader is non-nil, we have reader concurrency support! - (if readers - ;; here we have detected Concurrency Support: we create as many - ;; readers as writers and create associated couples, each couple - ;; shares its own queue - (let ((rawqs - (loop :repeat concurrency :collect - (lq:make-queue :fixed-capacity *prefetch-rows*)))) - (log-message :info "Read Concurrency Enabled for ~s" - (format-table-name (target copy))) - - (loop :for rawq :in rawqs :for reader :in readers :do - ;; each reader pretends to be alone, pass 1 as concurrency - (submit-task channel #'queue-raw-data reader rawq 1) - - (submit-task channel #'pgloader.copy::copy-rows-from-queue - copy rawq - :on-error-stop on-error-stop - :disable-triggers disable-triggers))) - - ;; no Read Concurrency Support detected, start a single reader - ;; task, using a single data queue that is read by multiple - ;; writers. - (let ((rawq - (lq:make-queue :fixed-capacity *prefetch-rows*))) - (submit-task channel #'queue-raw-data copy rawq concurrency) - - ;; start a task to transform the raw data in the copy format - ;; and send that data down to PostgreSQL - (loop :repeat concurrency :do - (submit-task channel #'pgloader.copy::copy-rows-from-queue - copy rawq - :on-error-stop on-error-stop - :disable-triggers disable-triggers)))) - - ;; now wait until both the tasks are over, and kill the kernel - (unless c-s-p - (log-message :debug "waiting for ~d tasks" task-count) - (loop :repeat task-count :do (lp:receive-result channel)) - (log-message :notice "COPY ~s done." table-name) - (unless k-s-p (lp:end-kernel :wait t))) - - ;; return task-count, which is how many tasks we submitted to our - ;; lparallel kernel. - task-count)))) +(defmethod instanciate-table-copy-object ((copy db-copy) (table table)) + "Create an new instance for copying TABLE data." + (let* ((fields (table-field-list table)) + (columns (table-column-list table)) + (transforms (mapcar #'column-transform columns))) + (make-instance (class-of copy) + :source-db (clone-connection (source-db copy)) + :target-db (clone-connection (target-db copy)) + :source table + :target table + :fields fields + :columns columns + :transforms transforms))) diff --git a/src/sources/copy.lisp b/src/sources/copy.lisp index 20ba472..b24e4be 100644 --- a/src/sources/copy.lisp +++ b/src/sources/copy.lisp @@ -1,7 +1,7 @@ ;;; ;;; Read a file format in PostgreSQL COPY TEXT format. ;;; -(in-package :pgloader.copy) +(in-package :pgloader.source.copy) (defclass copy-connection (md-connection) ()) diff --git a/src/sources/csv/csv-database.lisp b/src/sources/csv/csv-database.lisp index 3faef8f..d8523d3 100644 --- a/src/sources/csv/csv-database.lisp +++ b/src/sources/csv/csv-database.lisp @@ -2,7 +2,7 @@ ;;; Experimental code, used to be used a long time ago, before this lisp ;;; code became pgloader. The idea is to use it again sometimes, someway. ;;; -(in-package #:pgloader.csv) +(in-package #:pgloader.source.csv) ;;; ;;; When you exported a whole database as a bunch of CSV files to be found diff --git a/src/sources/csv/csv-guess.lisp b/src/sources/csv/csv-guess.lisp index 2960cab..2b40bd1 100644 --- a/src/sources/csv/csv-guess.lisp +++ b/src/sources/csv/csv-guess.lisp @@ -1,7 +1,7 @@ ;;; ;;; Automatic guess the CSV format parameters ;;; -(in-package #:pgloader.csv) +(in-package #:pgloader.source.csv) (defparameter *separators* '(#\Tab #\, #\; #\| #\% #\^ #\! #\$) "Common CSV separators to try when guessing file parameters.") diff --git a/src/sources/csv/csv.lisp b/src/sources/csv/csv.lisp index 1cbabf0..f58007c 100644 --- a/src/sources/csv/csv.lisp +++ b/src/sources/csv/csv.lisp @@ -2,7 +2,7 @@ ;;; Tools to handle MySQL data fetching ;;; -(in-package :pgloader.csv) +(in-package :pgloader.source.csv) (defclass csv-connection (md-connection) ()) diff --git a/src/sources/db3/db3-schema.lisp b/src/sources/db3/db3-schema.lisp index 06cbbc9..1be657b 100644 --- a/src/sources/db3/db3-schema.lisp +++ b/src/sources/db3/db3-schema.lisp @@ -2,7 +2,7 @@ ;;; Tools to handle the DBF file format ;;; -(in-package :pgloader.db3) +(in-package :pgloader.source.db3) (defclass dbf-connection (fd-connection) ((db3 :initarg db3 :accessor fd-db3)) diff --git a/src/sources/db3/db3.lisp b/src/sources/db3/db3.lisp index 69b9741..03eb244 100644 --- a/src/sources/db3/db3.lisp +++ b/src/sources/db3/db3.lisp @@ -2,7 +2,7 @@ ;;; Tools to handle the DBF file format ;;; -(in-package :pgloader.db3) +(in-package :pgloader.source.db3) ;;; ;;; Integration with pgloader diff --git a/src/sources/fixed.lisp b/src/sources/fixed.lisp index 92027fe..8d36a8f 100644 --- a/src/sources/fixed.lisp +++ b/src/sources/fixed.lisp @@ -2,7 +2,7 @@ ;;; Tools to handle fixed width files ;;; -(in-package :pgloader.fixed) +(in-package :pgloader.source.fixed) (defclass fixed-connection (md-connection) ()) diff --git a/src/sources/ixf/ixf-schema.lisp b/src/sources/ixf/ixf-schema.lisp index 9c537a6..775c9db 100644 --- a/src/sources/ixf/ixf-schema.lisp +++ b/src/sources/ixf/ixf-schema.lisp @@ -3,7 +3,7 @@ ;;; ;;; http://www-01.ibm.com/support/knowledgecenter/SSEPGG_10.5.0/com.ibm.db2.luw.admin.dm.doc/doc/r0004667.html -(in-package :pgloader.ixf) +(in-package :pgloader.source.ixf) (defclass ixf-connection (fd-connection) () (:documentation "pgloader connection parameters for IXF files.")) diff --git a/src/sources/ixf/ixf.lisp b/src/sources/ixf/ixf.lisp index 275ac6d..676eed7 100644 --- a/src/sources/ixf/ixf.lisp +++ b/src/sources/ixf/ixf.lisp @@ -3,7 +3,7 @@ ;;; ;;; http://www-01.ibm.com/support/knowledgecenter/SSEPGG_10.5.0/com.ibm.db2.luw.admin.dm.doc/doc/r0004667.html -(in-package :pgloader.ixf) +(in-package :pgloader.source.ixf) ;;; ;;; Integration with pgloader diff --git a/src/sources/mssql/mssql-cast-rules.lisp b/src/sources/mssql/mssql-cast-rules.lisp index d25d029..471740b 100644 --- a/src/sources/mssql/mssql-cast-rules.lisp +++ b/src/sources/mssql/mssql-cast-rules.lisp @@ -2,7 +2,7 @@ ;;; Tools to handle MS SQL data type casting rules ;;; -(in-package :pgloader.mssql) +(in-package :pgloader.source.mssql) (defparameter *mssql-default-cast-rules* `((:source (:type "char") :target (:type "text" :drop-typemod t)) diff --git a/src/sources/mssql/mssql-index-filters.lisp b/src/sources/mssql/mssql-index-filters.lisp index 75a9fcd..d05886b 100644 --- a/src/sources/mssql/mssql-index-filters.lisp +++ b/src/sources/mssql/mssql-index-filters.lisp @@ -4,7 +4,7 @@ ;;; clauses. ;;; -(in-package #:pgloader.mssql.index-filter) +(in-package #:pgloader.source.mssql.index-filter) (defmethod translate-index-filter ((table table) (index index) diff --git a/src/sources/mssql/mssql-schema.lisp b/src/sources/mssql/mssql-schema.lisp index fc39734..97d3d36 100644 --- a/src/sources/mssql/mssql-schema.lisp +++ b/src/sources/mssql/mssql-schema.lisp @@ -2,7 +2,7 @@ ;;; Tools to query the MS SQL Schema to reproduce in PostgreSQL ;;; -(in-package :pgloader.mssql) +(in-package :pgloader.source.mssql) (defvar *mssql-db* nil "The MS SQL database connection handler.") diff --git a/src/sources/mssql/mssql.lisp b/src/sources/mssql/mssql.lisp index 02e67ce..cd71141 100644 --- a/src/sources/mssql/mssql.lisp +++ b/src/sources/mssql/mssql.lisp @@ -2,7 +2,7 @@ ;;; Tools to handle the MS SQL Database ;;; -(in-package :pgloader.mssql) +(in-package :pgloader.source.mssql) (defclass copy-mssql (db-copy) ((encoding :accessor encoding ; allows forcing encoding diff --git a/src/sources/mysql/mysql-cast-rules.lisp b/src/sources/mysql/mysql-cast-rules.lisp index 242ba4a..f6ddcf8 100644 --- a/src/sources/mysql/mysql-cast-rules.lisp +++ b/src/sources/mysql/mysql-cast-rules.lisp @@ -2,7 +2,7 @@ ;;; Tools to handle MySQL data type casting rules ;;; -(in-package :pgloader.mysql) +(in-package :pgloader.source.mysql) (defun enum-or-set-name (table-name column-name type ctype typemod) (declare (ignore type ctype typemod)) diff --git a/src/sources/mysql/mysql-connection.lisp b/src/sources/mysql/mysql-connection.lisp index 6db6704..46a0d3f 100644 --- a/src/sources/mysql/mysql-connection.lisp +++ b/src/sources/mysql/mysql-connection.lisp @@ -2,7 +2,7 @@ ;;; Tools to handle MySQL connection and querying ;;; -(in-package :pgloader.mysql) +(in-package :pgloader.source.mysql) (defvar *connection* nil "Current MySQL connection") diff --git a/src/sources/mysql/mysql-csv.lisp b/src/sources/mysql/mysql-csv.lisp index 7bec085..f20daf9 100644 --- a/src/sources/mysql/mysql-csv.lisp +++ b/src/sources/mysql/mysql-csv.lisp @@ -2,7 +2,7 @@ ;;; Tools to handle MySQL data fetching: old code, untested, kept for reference. ;;; -(in-package :pgloader.mysql) +(in-package :pgloader.source.mysql) ;;; ;;; MySQL bulk export to file, in PostgreSQL COPY TEXT format diff --git a/src/sources/mysql/mysql-schema.lisp b/src/sources/mysql/mysql-schema.lisp index 4d9de20..00f4162 100644 --- a/src/sources/mysql/mysql-schema.lisp +++ b/src/sources/mysql/mysql-schema.lisp @@ -2,7 +2,7 @@ ;;; Tools to query the MySQL Schema to reproduce in PostgreSQL ;;; -(in-package :pgloader.mysql) +(in-package :pgloader.source.mysql) ;;; ;;; Those functions are to be called from withing an already established diff --git a/src/sources/mysql/mysql.lisp b/src/sources/mysql/mysql.lisp index 78c5e06..1710c8c 100644 --- a/src/sources/mysql/mysql.lisp +++ b/src/sources/mysql/mysql.lisp @@ -2,7 +2,7 @@ ;;; Tools to handle MySQL data fetching ;;; -(in-package :pgloader.mysql) +(in-package :pgloader.source.mysql) (defclass copy-mysql (db-copy) ((encoding :accessor encoding ; allows forcing encoding diff --git a/src/sources/sqlite/sqlite-cast-rules.lisp b/src/sources/sqlite/sqlite-cast-rules.lisp index 37a7145..3b2cf50 100644 --- a/src/sources/sqlite/sqlite-cast-rules.lisp +++ b/src/sources/sqlite/sqlite-cast-rules.lisp @@ -2,7 +2,7 @@ ;;; Tools to handle the SQLite Database ;;; -(in-package :pgloader.sqlite) +(in-package :pgloader.source.sqlite) (defvar *sqlite-db* nil "The SQLite database connection handler.") diff --git a/src/sources/sqlite/sqlite-schema.lisp b/src/sources/sqlite/sqlite-schema.lisp index 16ee9db..7174fea 100644 --- a/src/sources/sqlite/sqlite-schema.lisp +++ b/src/sources/sqlite/sqlite-schema.lisp @@ -1,7 +1,7 @@ ;;; ;;; SQLite tools connecting to a database ;;; -(in-package :pgloader.sqlite) +(in-package :pgloader.source.sqlite) (defvar *sqlite-db* nil "The SQLite database connection handler.") diff --git a/src/sources/sqlite/sqlite.lisp b/src/sources/sqlite/sqlite.lisp index f6803ab..f6f97de 100644 --- a/src/sources/sqlite/sqlite.lisp +++ b/src/sources/sqlite/sqlite.lisp @@ -2,7 +2,7 @@ ;;; Tools to handle the SQLite Database ;;; -(in-package :pgloader.sqlite) +(in-package :pgloader.source.sqlite) (defclass copy-sqlite (db-copy) ((db :accessor db :initarg :db))