diff --git a/src/pgsql/connection.lisp b/src/pgsql/connection.lisp index d5c7acc..92ee311 100644 --- a/src/pgsql/connection.lisp +++ b/src/pgsql/connection.lisp @@ -226,8 +226,8 @@ ;;; ;;; DDL support with stats (timing, object count) ;;; - -(defun pgsql-connect-and-execute-with-timing (pgconn section label sql &key (count 1)) +(defun pgsql-connect-and-execute-with-timing (pgconn section label sql + &key (count 1)) "Run pgsql-execute-with-timing within a newly establised connection." (with-pgsql-connection (pgconn) (pomo:with-transaction () @@ -252,8 +252,10 @@ (format nil "SET LOCAL client_min_messages TO ~a;" (symbol-name client-min-messages)))) - (log-message :notice "~a" sql) - (pomo:execute sql) + (loop :for sql :in (alexandria::ensure-list sql) + :do (progn + (log-message :notice "~a" sql) + (pomo:execute sql))) (when client-min-messages (pomo:execute (format nil "RESET client_min_messages;")))) diff --git a/src/pgsql/pgsql-ddl.lisp b/src/pgsql/pgsql-ddl.lisp index e104fc1..d07264d 100644 --- a/src/pgsql/pgsql-ddl.lisp +++ b/src/pgsql/pgsql-ddl.lisp @@ -159,7 +159,7 @@ (format-table-name table) (index-columns index) (index-filter index))) - (format stream + (format nil ;; don't use the index schema name here, PostgreSQL doesn't ;; like it, might be implicit from the table's schema ;; itself... diff --git a/src/pgsql/pgsql-schema.lisp b/src/pgsql/pgsql-schema.lisp index 73a5c2c..94de59e 100644 --- a/src/pgsql/pgsql-schema.lisp +++ b/src/pgsql/pgsql-schema.lisp @@ -13,10 +13,11 @@ ;; rewrite the table constraint as an including expression (let ((schema (or (table-schema table) - (make-schema :name (query-table-schema (table-name table)))))) - (setf including - (list (cons (schema-name schema) - (list (table-name table))))))) + (query-table-schema table)))) + (setf including + (list (cons (schema-name schema) + (list + (format-table-name-as-including-exp table))))))) (list-all-columns catalog :table-type :table @@ -31,17 +32,42 @@ :including including :excluding excluding)) + (log-message :debug "fetch-pgsql-catalog: ~d tables, ~d indexes, ~d fkeys" + (count-tables catalog) + (count-indexes catalog) + (count-fkeys catalog)) + + (when (and table (/= 1 (count-tables catalog))) + (error "pgloader found ~d target tables for name ~s|:~{~% ~a~}" + (count-tables catalog) + (format-table-name table) + (mapcar #'format-table-name (table-list catalog)))) + catalog)) -(defun query-table-schema (table-name) +(defun format-table-name-as-including-exp (table) + "Return a table name suitable for a catalog lookup using ~ operator." + (let ((table-name (table-name table))) + (format nil "^~a$" + (cond ((pgloader.quoting::quoted-p table-name) + ;; when the table name comes from the user (e.g. in the + ;; load file) then we might have to unquote it: the + ;; PostgreSQL catalogs does not store object names in + ;; their quoted form. + (subseq table-name 1 (1- (length table-name)))) + + (t table-name))))) + +(defun query-table-schema (table) "Get PostgreSQL schema name where to locate TABLE-NAME by following the current search_path rules. A PostgreSQL connection must be opened." - (pomo:query (format nil " + (make-schema :name + (pomo:query (format nil " select nspname from pg_namespace n join pg_class c on n.oid = c.relnamespace where c.oid = '~a'::regclass;" - table-name) :single)) + (table-name table)) :single))) (defvar *table-type* '((:table . "r") @@ -142,13 +168,13 @@ order by n.nspname, r.relname" including ; do we print the clause? (filter-list-to-where-clause including nil - "n.nspname" - "i.relname") + "rn.nspname" + "r.relname") excluding ; do we print the clause? (filter-list-to-where-clause excluding nil - "n.nspname" - "i.relname"))) + "rn.nspname" + "r.relname"))) :do (let* ((schema (find-schema catalog schema-name)) (tschema (find-schema catalog table-schema)) (table (find-table tschema table-name)) diff --git a/src/pgsql/schema.lisp b/src/pgsql/schema.lisp index 6597a04..59d1f26 100644 --- a/src/pgsql/schema.lisp +++ b/src/pgsql/schema.lisp @@ -188,18 +188,20 @@ (let* ((lp:*kernel* kernel)) (loop :for index :in (table-index-list table) - :collect (multiple-value-bind (sql pkey) - ;; we postpone the pkey upgrade of the index for later. - (format-create-sql index) + :for pkey := (multiple-value-bind (sql pkey) + ;; we postpone the pkey upgrade of the index for later. + (format-create-sql index) - (lp:submit-task channel - #'pgsql-connect-and-execute-with-timing - ;; each thread must have its own connection - (clone-connection pgconn) - :post label sql) + (lp:submit-task channel + #'pgsql-connect-and-execute-with-timing + ;; each thread must have its own connection + (clone-connection pgconn) + :post label sql) - ;; return the pkey "upgrade" statement - pkey)))) + ;; return the pkey "upgrade" statement + pkey) + :when pkey + :collect pkey))) ;;; ;;; Protect from non-unique index names @@ -232,58 +234,64 @@ ;;; ;;; Higher level API to care about indexes ;;; -(defun maybe-drop-indexes (target table &key (section :pre) drop-indexes) +(defun maybe-drop-indexes (target catalog &key (section :pre) drop-indexes) "Drop the indexes for TABLE-NAME on TARGET PostgreSQL connection, and returns a list of indexes to create again." (with-pgsql-connection (target) - (let ((indexes (table-index-list table)) - ;; we get the list of indexes from PostgreSQL catalogs, so don't - ;; question their spelling, just quote them. - (*identifier-case* :quote)) + (loop :for table :in (table-list catalog) + :do + (let ((indexes (table-index-list table)) + ;; we get the list of indexes from PostgreSQL catalogs, so don't + ;; question their spelling, just quote them. + (*identifier-case* :quote)) - (cond ((and indexes (not drop-indexes)) - (log-message :warning - "Target table ~s has ~d indexes defined against it." - (format-table-name table) (length indexes)) - (log-message :warning - "That could impact loading performance badly.") - (log-message :warning - "Consider the option 'drop indexes'.")) + (cond ((and indexes (not drop-indexes)) + (log-message :warning + "Target table ~s has ~d indexes defined against it." + (format-table-name table) (length indexes)) + (log-message :warning + "That could impact loading performance badly.") + (log-message :warning + "Consider the option 'drop indexes'.")) - (indexes - ;; drop the indexes now - (with-stats-collection ("drop indexes" :section section) - (drop-indexes section table))))))) + (indexes + ;; drop the indexes now + (with-stats-collection ("drop indexes" :section section) + (drop-indexes section table)))))))) -(defun create-indexes-again (target table +(defun create-indexes-again (target catalog &key max-parallel-create-index (section :post) drop-indexes) "Create the indexes that we dropped previously." - (when (and (table-index-list table) drop-indexes) + (when drop-indexes (let* ((*preserve-index-names* t) ;; we get the list of indexes from PostgreSQL catalogs, so don't ;; question their spelling, just quote them. (*identifier-case* :quote) (idx-kernel (make-kernel (or max-parallel-create-index - (count-indexes table)))) + (count-indexes catalog)))) (idx-channel (let ((lp:*kernel* idx-kernel)) (lp:make-channel)))) - (let ((pkeys - (create-indexes-in-kernel target table idx-kernel idx-channel))) + (loop :for table :in (table-list catalog) + :when (table-index-list table) + :do + (let ((pkeys + (create-indexes-in-kernel target table idx-kernel idx-channel))) - (with-stats-collection ("Index Build Completion" :section section) - (loop :repeat (count-indexes table) - :do (lp:receive-result idx-channel)) - (lp:end-kernel :wait t)) + (with-stats-collection ("Index Build Completion" :section section) + (loop :repeat (count-indexes table) + :do (lp:receive-result idx-channel)) + (lp:end-kernel :wait t)) + + ;; turn unique indexes into pkeys now + (with-stats-collection ("Constraints" :section section) + (pgsql-connect-and-execute-with-timing target + section + "Constrants" + pkeys))))))) - ;; turn unique indexes into pkeys now - (with-pgsql-connection (target) - (with-stats-collection ("Constraints" :section section) - (loop :for sql :in pkeys - :when sql - :do (pgsql-execute-with-timing section "Constraints" sql)))))))) ;;; ;;; Sequences diff --git a/src/sources/common/md-methods.lisp b/src/sources/common/md-methods.lisp index b999128..8077eef 100644 --- a/src/sources/common/md-methods.lisp +++ b/src/sources/common/md-methods.lisp @@ -100,63 +100,61 @@ set-table-oids including excluding)) (let* ((pgsql-catalog - (fetch-pgsql-catalog (target-db copy) :table (target copy)))) - - (when (= 1 (count-tables pgsql-catalog)) - ;; we found the target table, grab its definition - (setf (target copy) - (first (schema-table-list - (first (catalog-schema-list pgsql-catalog)))))) + (handler-case + (fetch-pgsql-catalog (target-db copy) :table (target copy)) + (condition (e) + (log-message :fatal "Failed to fetch target PostgreSQL catalogs.") + (log-message :fatal "~a" e) + (return-from copy-database))))) ;; this sets (table-index-list (target copy)) (maybe-drop-indexes (target-db copy) - (target copy) - :drop-indexes drop-indexes)) + pgsql-catalog + :drop-indexes drop-indexes) + ;; ensure we truncate only once, do it before going parallel + (when truncate + (truncate-tables (target-db copy) pgsql-catalog)) - ;; ensure we truncate only one - (when truncate - (truncate-tables (clone-connection (target-db copy)) (target copy))) + ;; expand the specs of our source, we might have to care about several + ;; files actually. + (let* ((lp:*kernel* (make-kernel worker-count)) + (channel (lp:make-channel)) + (path-list (expand-spec (source copy)))) + (loop :for path-spec :in path-list + :do (let ((table-source (clone-copy-for copy path-spec))) + (copy-from table-source + :concurrency concurrency + :kernel lp:*kernel* + :channel channel + :on-error-stop on-error-stop + :truncate nil + :disable-triggers disable-triggers))) - ;; expand the specs of our source, we might have to care about several - ;; files actually. - (let* ((lp:*kernel* (make-kernel worker-count)) - (channel (lp:make-channel)) - (path-list (expand-spec (source copy)))) - (loop :for path-spec :in path-list - :do (let ((table-source (clone-copy-for copy path-spec))) - (copy-from table-source - :concurrency concurrency - :kernel lp:*kernel* - :channel channel - :on-error-stop on-error-stop - :truncate nil - :disable-triggers disable-triggers))) + ;; end kernel + (with-stats-collection ("COPY Threads Completion" :section :post + :use-result-as-read t + :use-result-as-rows t) + (let ((worker-count (* (length path-list) + (task-count concurrency)))) + (loop :for tasks :below worker-count + :do (handler-case + (destructuring-bind (task table seconds) + (lp:receive-result channel) + (log-message :debug + "Finished processing ~a for ~s ~50T~6$s" + task (format-table-name table) seconds) + (when (eq :writer task) + (update-stats :data table :secs seconds))) + (condition (e) + (log-message :fatal "~a" e)))) + (prog1 + worker-count + (lp:end-kernel :wait nil)))) + (lp:end-kernel :wait t)) - ;; end kernel - (with-stats-collection ("COPY Threads Completion" :section :post - :use-result-as-read t - :use-result-as-rows t) - (let ((worker-count (* (length path-list) - (task-count concurrency)))) - (loop :for tasks :below worker-count - :do (handler-case - (destructuring-bind (task table seconds) - (lp:receive-result channel) - (log-message :debug - "Finished processing ~a for ~s ~50T~6$s" - task (format-table-name table) seconds) - (when (eq :writer task) - (update-stats :data table :secs seconds))) - (condition (e) - (log-message :fatal "~a" e)))) - (prog1 - worker-count - (lp:end-kernel :wait nil)))) - (lp:end-kernel :wait t)) - - ;; re-create the indexes from the target table entry - (create-indexes-again (target-db copy) - (target copy) - :max-parallel-create-index max-parallel-create-index - :drop-indexes drop-indexes)) + ;; re-create the indexes from the target table entry + (create-indexes-again (target-db copy) + pgsql-catalog + :max-parallel-create-index max-parallel-create-index + :drop-indexes drop-indexes)))