From 2d47c4f0f51e36d6d42adb213e6e3525a4d789dd Mon Sep 17 00:00:00 2001 From: Dimitri Fontaine Date: Fri, 5 Aug 2016 11:42:06 +0200 Subject: [PATCH] Use internal catalog when loading from files. Replace the ad-hoc code that was used before in the load from file code path to use our full internal catalog representation, and adjust APIs to that end. The goal is to use catalogs everywhere in the PostgreSQL target API and allowing to process reason explicitely about source and target catalogs, see #400 for the main use case. --- src/pgsql/connection.lisp | 10 +-- src/pgsql/pgsql-ddl.lisp | 2 +- src/pgsql/pgsql-schema.lisp | 48 ++++++++++--- src/pgsql/schema.lisp | 92 +++++++++++++------------ src/sources/common/md-methods.lisp | 104 ++++++++++++++--------------- 5 files changed, 145 insertions(+), 111 deletions(-) diff --git a/src/pgsql/connection.lisp b/src/pgsql/connection.lisp index d5c7acc..92ee311 100644 --- a/src/pgsql/connection.lisp +++ b/src/pgsql/connection.lisp @@ -226,8 +226,8 @@ ;;; ;;; DDL support with stats (timing, object count) ;;; - -(defun pgsql-connect-and-execute-with-timing (pgconn section label sql &key (count 1)) +(defun pgsql-connect-and-execute-with-timing (pgconn section label sql + &key (count 1)) "Run pgsql-execute-with-timing within a newly establised connection." (with-pgsql-connection (pgconn) (pomo:with-transaction () @@ -252,8 +252,10 @@ (format nil "SET LOCAL client_min_messages TO ~a;" (symbol-name client-min-messages)))) - (log-message :notice "~a" sql) - (pomo:execute sql) + (loop :for sql :in (alexandria::ensure-list sql) + :do (progn + (log-message :notice "~a" sql) + (pomo:execute sql))) (when client-min-messages (pomo:execute (format nil "RESET client_min_messages;")))) diff --git a/src/pgsql/pgsql-ddl.lisp b/src/pgsql/pgsql-ddl.lisp index e104fc1..d07264d 100644 --- a/src/pgsql/pgsql-ddl.lisp +++ b/src/pgsql/pgsql-ddl.lisp @@ -159,7 +159,7 @@ (format-table-name table) (index-columns index) (index-filter index))) - (format stream + (format nil ;; don't use the index schema name here, PostgreSQL doesn't ;; like it, might be implicit from the table's schema ;; itself... diff --git a/src/pgsql/pgsql-schema.lisp b/src/pgsql/pgsql-schema.lisp index 73a5c2c..94de59e 100644 --- a/src/pgsql/pgsql-schema.lisp +++ b/src/pgsql/pgsql-schema.lisp @@ -13,10 +13,11 @@ ;; rewrite the table constraint as an including expression (let ((schema (or (table-schema table) - (make-schema :name (query-table-schema (table-name table)))))) - (setf including - (list (cons (schema-name schema) - (list (table-name table))))))) + (query-table-schema table)))) + (setf including + (list (cons (schema-name schema) + (list + (format-table-name-as-including-exp table))))))) (list-all-columns catalog :table-type :table @@ -31,17 +32,42 @@ :including including :excluding excluding)) + (log-message :debug "fetch-pgsql-catalog: ~d tables, ~d indexes, ~d fkeys" + (count-tables catalog) + (count-indexes catalog) + (count-fkeys catalog)) + + (when (and table (/= 1 (count-tables catalog))) + (error "pgloader found ~d target tables for name ~s|:~{~% ~a~}" + (count-tables catalog) + (format-table-name table) + (mapcar #'format-table-name (table-list catalog)))) + catalog)) -(defun query-table-schema (table-name) +(defun format-table-name-as-including-exp (table) + "Return a table name suitable for a catalog lookup using ~ operator." + (let ((table-name (table-name table))) + (format nil "^~a$" + (cond ((pgloader.quoting::quoted-p table-name) + ;; when the table name comes from the user (e.g. in the + ;; load file) then we might have to unquote it: the + ;; PostgreSQL catalogs does not store object names in + ;; their quoted form. + (subseq table-name 1 (1- (length table-name)))) + + (t table-name))))) + +(defun query-table-schema (table) "Get PostgreSQL schema name where to locate TABLE-NAME by following the current search_path rules. A PostgreSQL connection must be opened." - (pomo:query (format nil " + (make-schema :name + (pomo:query (format nil " select nspname from pg_namespace n join pg_class c on n.oid = c.relnamespace where c.oid = '~a'::regclass;" - table-name) :single)) + (table-name table)) :single))) (defvar *table-type* '((:table . "r") @@ -142,13 +168,13 @@ order by n.nspname, r.relname" including ; do we print the clause? (filter-list-to-where-clause including nil - "n.nspname" - "i.relname") + "rn.nspname" + "r.relname") excluding ; do we print the clause? (filter-list-to-where-clause excluding nil - "n.nspname" - "i.relname"))) + "rn.nspname" + "r.relname"))) :do (let* ((schema (find-schema catalog schema-name)) (tschema (find-schema catalog table-schema)) (table (find-table tschema table-name)) diff --git a/src/pgsql/schema.lisp b/src/pgsql/schema.lisp index 6597a04..59d1f26 100644 --- a/src/pgsql/schema.lisp +++ b/src/pgsql/schema.lisp @@ -188,18 +188,20 @@ (let* ((lp:*kernel* kernel)) (loop :for index :in (table-index-list table) - :collect (multiple-value-bind (sql pkey) - ;; we postpone the pkey upgrade of the index for later. - (format-create-sql index) + :for pkey := (multiple-value-bind (sql pkey) + ;; we postpone the pkey upgrade of the index for later. + (format-create-sql index) - (lp:submit-task channel - #'pgsql-connect-and-execute-with-timing - ;; each thread must have its own connection - (clone-connection pgconn) - :post label sql) + (lp:submit-task channel + #'pgsql-connect-and-execute-with-timing + ;; each thread must have its own connection + (clone-connection pgconn) + :post label sql) - ;; return the pkey "upgrade" statement - pkey)))) + ;; return the pkey "upgrade" statement + pkey) + :when pkey + :collect pkey))) ;;; ;;; Protect from non-unique index names @@ -232,58 +234,64 @@ ;;; ;;; Higher level API to care about indexes ;;; -(defun maybe-drop-indexes (target table &key (section :pre) drop-indexes) +(defun maybe-drop-indexes (target catalog &key (section :pre) drop-indexes) "Drop the indexes for TABLE-NAME on TARGET PostgreSQL connection, and returns a list of indexes to create again." (with-pgsql-connection (target) - (let ((indexes (table-index-list table)) - ;; we get the list of indexes from PostgreSQL catalogs, so don't - ;; question their spelling, just quote them. - (*identifier-case* :quote)) + (loop :for table :in (table-list catalog) + :do + (let ((indexes (table-index-list table)) + ;; we get the list of indexes from PostgreSQL catalogs, so don't + ;; question their spelling, just quote them. + (*identifier-case* :quote)) - (cond ((and indexes (not drop-indexes)) - (log-message :warning - "Target table ~s has ~d indexes defined against it." - (format-table-name table) (length indexes)) - (log-message :warning - "That could impact loading performance badly.") - (log-message :warning - "Consider the option 'drop indexes'.")) + (cond ((and indexes (not drop-indexes)) + (log-message :warning + "Target table ~s has ~d indexes defined against it." + (format-table-name table) (length indexes)) + (log-message :warning + "That could impact loading performance badly.") + (log-message :warning + "Consider the option 'drop indexes'.")) - (indexes - ;; drop the indexes now - (with-stats-collection ("drop indexes" :section section) - (drop-indexes section table))))))) + (indexes + ;; drop the indexes now + (with-stats-collection ("drop indexes" :section section) + (drop-indexes section table)))))))) -(defun create-indexes-again (target table +(defun create-indexes-again (target catalog &key max-parallel-create-index (section :post) drop-indexes) "Create the indexes that we dropped previously." - (when (and (table-index-list table) drop-indexes) + (when drop-indexes (let* ((*preserve-index-names* t) ;; we get the list of indexes from PostgreSQL catalogs, so don't ;; question their spelling, just quote them. (*identifier-case* :quote) (idx-kernel (make-kernel (or max-parallel-create-index - (count-indexes table)))) + (count-indexes catalog)))) (idx-channel (let ((lp:*kernel* idx-kernel)) (lp:make-channel)))) - (let ((pkeys - (create-indexes-in-kernel target table idx-kernel idx-channel))) + (loop :for table :in (table-list catalog) + :when (table-index-list table) + :do + (let ((pkeys + (create-indexes-in-kernel target table idx-kernel idx-channel))) - (with-stats-collection ("Index Build Completion" :section section) - (loop :repeat (count-indexes table) - :do (lp:receive-result idx-channel)) - (lp:end-kernel :wait t)) + (with-stats-collection ("Index Build Completion" :section section) + (loop :repeat (count-indexes table) + :do (lp:receive-result idx-channel)) + (lp:end-kernel :wait t)) + + ;; turn unique indexes into pkeys now + (with-stats-collection ("Constraints" :section section) + (pgsql-connect-and-execute-with-timing target + section + "Constrants" + pkeys))))))) - ;; turn unique indexes into pkeys now - (with-pgsql-connection (target) - (with-stats-collection ("Constraints" :section section) - (loop :for sql :in pkeys - :when sql - :do (pgsql-execute-with-timing section "Constraints" sql)))))))) ;;; ;;; Sequences diff --git a/src/sources/common/md-methods.lisp b/src/sources/common/md-methods.lisp index b999128..8077eef 100644 --- a/src/sources/common/md-methods.lisp +++ b/src/sources/common/md-methods.lisp @@ -100,63 +100,61 @@ set-table-oids including excluding)) (let* ((pgsql-catalog - (fetch-pgsql-catalog (target-db copy) :table (target copy)))) - - (when (= 1 (count-tables pgsql-catalog)) - ;; we found the target table, grab its definition - (setf (target copy) - (first (schema-table-list - (first (catalog-schema-list pgsql-catalog)))))) + (handler-case + (fetch-pgsql-catalog (target-db copy) :table (target copy)) + (condition (e) + (log-message :fatal "Failed to fetch target PostgreSQL catalogs.") + (log-message :fatal "~a" e) + (return-from copy-database))))) ;; this sets (table-index-list (target copy)) (maybe-drop-indexes (target-db copy) - (target copy) - :drop-indexes drop-indexes)) + pgsql-catalog + :drop-indexes drop-indexes) + ;; ensure we truncate only once, do it before going parallel + (when truncate + (truncate-tables (target-db copy) pgsql-catalog)) - ;; ensure we truncate only one - (when truncate - (truncate-tables (clone-connection (target-db copy)) (target copy))) + ;; expand the specs of our source, we might have to care about several + ;; files actually. + (let* ((lp:*kernel* (make-kernel worker-count)) + (channel (lp:make-channel)) + (path-list (expand-spec (source copy)))) + (loop :for path-spec :in path-list + :do (let ((table-source (clone-copy-for copy path-spec))) + (copy-from table-source + :concurrency concurrency + :kernel lp:*kernel* + :channel channel + :on-error-stop on-error-stop + :truncate nil + :disable-triggers disable-triggers))) - ;; expand the specs of our source, we might have to care about several - ;; files actually. - (let* ((lp:*kernel* (make-kernel worker-count)) - (channel (lp:make-channel)) - (path-list (expand-spec (source copy)))) - (loop :for path-spec :in path-list - :do (let ((table-source (clone-copy-for copy path-spec))) - (copy-from table-source - :concurrency concurrency - :kernel lp:*kernel* - :channel channel - :on-error-stop on-error-stop - :truncate nil - :disable-triggers disable-triggers))) + ;; end kernel + (with-stats-collection ("COPY Threads Completion" :section :post + :use-result-as-read t + :use-result-as-rows t) + (let ((worker-count (* (length path-list) + (task-count concurrency)))) + (loop :for tasks :below worker-count + :do (handler-case + (destructuring-bind (task table seconds) + (lp:receive-result channel) + (log-message :debug + "Finished processing ~a for ~s ~50T~6$s" + task (format-table-name table) seconds) + (when (eq :writer task) + (update-stats :data table :secs seconds))) + (condition (e) + (log-message :fatal "~a" e)))) + (prog1 + worker-count + (lp:end-kernel :wait nil)))) + (lp:end-kernel :wait t)) - ;; end kernel - (with-stats-collection ("COPY Threads Completion" :section :post - :use-result-as-read t - :use-result-as-rows t) - (let ((worker-count (* (length path-list) - (task-count concurrency)))) - (loop :for tasks :below worker-count - :do (handler-case - (destructuring-bind (task table seconds) - (lp:receive-result channel) - (log-message :debug - "Finished processing ~a for ~s ~50T~6$s" - task (format-table-name table) seconds) - (when (eq :writer task) - (update-stats :data table :secs seconds))) - (condition (e) - (log-message :fatal "~a" e)))) - (prog1 - worker-count - (lp:end-kernel :wait nil)))) - (lp:end-kernel :wait t)) - - ;; re-create the indexes from the target table entry - (create-indexes-again (target-db copy) - (target copy) - :max-parallel-create-index max-parallel-create-index - :drop-indexes drop-indexes)) + ;; re-create the indexes from the target table entry + (create-indexes-again (target-db copy) + pgsql-catalog + :max-parallel-create-index max-parallel-create-index + :drop-indexes drop-indexes)))