Use internal catalog when loading from files.

Replace the ad-hoc code that was used before in the load from file code
path to use our full internal catalog representation, and adjust APIs to
that end.

The goal is to use catalogs everywhere in the PostgreSQL target API and
allowing to process reason explicitely about source and target catalogs,
see #400 for the main use case.
This commit is contained in:
Dimitri Fontaine 2016-08-05 11:42:06 +02:00
parent 42c8012e94
commit 2d47c4f0f5
5 changed files with 145 additions and 111 deletions

View File

@ -226,8 +226,8 @@
;;;
;;; DDL support with stats (timing, object count)
;;;
(defun pgsql-connect-and-execute-with-timing (pgconn section label sql &key (count 1))
(defun pgsql-connect-and-execute-with-timing (pgconn section label sql
&key (count 1))
"Run pgsql-execute-with-timing within a newly establised connection."
(with-pgsql-connection (pgconn)
(pomo:with-transaction ()
@ -252,8 +252,10 @@
(format nil "SET LOCAL client_min_messages TO ~a;"
(symbol-name client-min-messages))))
(log-message :notice "~a" sql)
(pomo:execute sql)
(loop :for sql :in (alexandria::ensure-list sql)
:do (progn
(log-message :notice "~a" sql)
(pomo:execute sql)))
(when client-min-messages
(pomo:execute (format nil "RESET client_min_messages;"))))

View File

@ -159,7 +159,7 @@
(format-table-name table)
(index-columns index)
(index-filter index)))
(format stream
(format nil
;; don't use the index schema name here, PostgreSQL doesn't
;; like it, might be implicit from the table's schema
;; itself...

View File

@ -13,10 +13,11 @@
;; rewrite the table constraint as an including expression
(let ((schema
(or (table-schema table)
(make-schema :name (query-table-schema (table-name table))))))
(setf including
(list (cons (schema-name schema)
(list (table-name table)))))))
(query-table-schema table))))
(setf including
(list (cons (schema-name schema)
(list
(format-table-name-as-including-exp table)))))))
(list-all-columns catalog
:table-type :table
@ -31,17 +32,42 @@
:including including
:excluding excluding))
(log-message :debug "fetch-pgsql-catalog: ~d tables, ~d indexes, ~d fkeys"
(count-tables catalog)
(count-indexes catalog)
(count-fkeys catalog))
(when (and table (/= 1 (count-tables catalog)))
(error "pgloader found ~d target tables for name ~s|:~{~% ~a~}"
(count-tables catalog)
(format-table-name table)
(mapcar #'format-table-name (table-list catalog))))
catalog))
(defun query-table-schema (table-name)
(defun format-table-name-as-including-exp (table)
"Return a table name suitable for a catalog lookup using ~ operator."
(let ((table-name (table-name table)))
(format nil "^~a$"
(cond ((pgloader.quoting::quoted-p table-name)
;; when the table name comes from the user (e.g. in the
;; load file) then we might have to unquote it: the
;; PostgreSQL catalogs does not store object names in
;; their quoted form.
(subseq table-name 1 (1- (length table-name))))
(t table-name)))))
(defun query-table-schema (table)
"Get PostgreSQL schema name where to locate TABLE-NAME by following the
current search_path rules. A PostgreSQL connection must be opened."
(pomo:query (format nil "
(make-schema :name
(pomo:query (format nil "
select nspname
from pg_namespace n
join pg_class c on n.oid = c.relnamespace
where c.oid = '~a'::regclass;"
table-name) :single))
(table-name table)) :single)))
(defvar *table-type* '((:table . "r")
@ -142,13 +168,13 @@ order by n.nspname, r.relname"
including ; do we print the clause?
(filter-list-to-where-clause including
nil
"n.nspname"
"i.relname")
"rn.nspname"
"r.relname")
excluding ; do we print the clause?
(filter-list-to-where-clause excluding
nil
"n.nspname"
"i.relname")))
"rn.nspname"
"r.relname")))
:do (let* ((schema (find-schema catalog schema-name))
(tschema (find-schema catalog table-schema))
(table (find-table tschema table-name))

View File

@ -188,18 +188,20 @@
(let* ((lp:*kernel* kernel))
(loop
:for index :in (table-index-list table)
:collect (multiple-value-bind (sql pkey)
;; we postpone the pkey upgrade of the index for later.
(format-create-sql index)
:for pkey := (multiple-value-bind (sql pkey)
;; we postpone the pkey upgrade of the index for later.
(format-create-sql index)
(lp:submit-task channel
#'pgsql-connect-and-execute-with-timing
;; each thread must have its own connection
(clone-connection pgconn)
:post label sql)
(lp:submit-task channel
#'pgsql-connect-and-execute-with-timing
;; each thread must have its own connection
(clone-connection pgconn)
:post label sql)
;; return the pkey "upgrade" statement
pkey))))
;; return the pkey "upgrade" statement
pkey)
:when pkey
:collect pkey)))
;;;
;;; Protect from non-unique index names
@ -232,58 +234,64 @@
;;;
;;; Higher level API to care about indexes
;;;
(defun maybe-drop-indexes (target table &key (section :pre) drop-indexes)
(defun maybe-drop-indexes (target catalog &key (section :pre) drop-indexes)
"Drop the indexes for TABLE-NAME on TARGET PostgreSQL connection, and
returns a list of indexes to create again."
(with-pgsql-connection (target)
(let ((indexes (table-index-list table))
;; we get the list of indexes from PostgreSQL catalogs, so don't
;; question their spelling, just quote them.
(*identifier-case* :quote))
(loop :for table :in (table-list catalog)
:do
(let ((indexes (table-index-list table))
;; we get the list of indexes from PostgreSQL catalogs, so don't
;; question their spelling, just quote them.
(*identifier-case* :quote))
(cond ((and indexes (not drop-indexes))
(log-message :warning
"Target table ~s has ~d indexes defined against it."
(format-table-name table) (length indexes))
(log-message :warning
"That could impact loading performance badly.")
(log-message :warning
"Consider the option 'drop indexes'."))
(cond ((and indexes (not drop-indexes))
(log-message :warning
"Target table ~s has ~d indexes defined against it."
(format-table-name table) (length indexes))
(log-message :warning
"That could impact loading performance badly.")
(log-message :warning
"Consider the option 'drop indexes'."))
(indexes
;; drop the indexes now
(with-stats-collection ("drop indexes" :section section)
(drop-indexes section table)))))))
(indexes
;; drop the indexes now
(with-stats-collection ("drop indexes" :section section)
(drop-indexes section table))))))))
(defun create-indexes-again (target table
(defun create-indexes-again (target catalog
&key
max-parallel-create-index
(section :post)
drop-indexes)
"Create the indexes that we dropped previously."
(when (and (table-index-list table) drop-indexes)
(when drop-indexes
(let* ((*preserve-index-names* t)
;; we get the list of indexes from PostgreSQL catalogs, so don't
;; question their spelling, just quote them.
(*identifier-case* :quote)
(idx-kernel (make-kernel (or max-parallel-create-index
(count-indexes table))))
(count-indexes catalog))))
(idx-channel (let ((lp:*kernel* idx-kernel))
(lp:make-channel))))
(let ((pkeys
(create-indexes-in-kernel target table idx-kernel idx-channel)))
(loop :for table :in (table-list catalog)
:when (table-index-list table)
:do
(let ((pkeys
(create-indexes-in-kernel target table idx-kernel idx-channel)))
(with-stats-collection ("Index Build Completion" :section section)
(loop :repeat (count-indexes table)
:do (lp:receive-result idx-channel))
(lp:end-kernel :wait t))
(with-stats-collection ("Index Build Completion" :section section)
(loop :repeat (count-indexes table)
:do (lp:receive-result idx-channel))
(lp:end-kernel :wait t))
;; turn unique indexes into pkeys now
(with-stats-collection ("Constraints" :section section)
(pgsql-connect-and-execute-with-timing target
section
"Constrants"
pkeys)))))))
;; turn unique indexes into pkeys now
(with-pgsql-connection (target)
(with-stats-collection ("Constraints" :section section)
(loop :for sql :in pkeys
:when sql
:do (pgsql-execute-with-timing section "Constraints" sql))))))))
;;;
;;; Sequences

View File

@ -100,63 +100,61 @@
set-table-oids including excluding))
(let* ((pgsql-catalog
(fetch-pgsql-catalog (target-db copy) :table (target copy))))
(when (= 1 (count-tables pgsql-catalog))
;; we found the target table, grab its definition
(setf (target copy)
(first (schema-table-list
(first (catalog-schema-list pgsql-catalog))))))
(handler-case
(fetch-pgsql-catalog (target-db copy) :table (target copy))
(condition (e)
(log-message :fatal "Failed to fetch target PostgreSQL catalogs.")
(log-message :fatal "~a" e)
(return-from copy-database)))))
;; this sets (table-index-list (target copy))
(maybe-drop-indexes (target-db copy)
(target copy)
:drop-indexes drop-indexes))
pgsql-catalog
:drop-indexes drop-indexes)
;; ensure we truncate only once, do it before going parallel
(when truncate
(truncate-tables (target-db copy) pgsql-catalog))
;; ensure we truncate only one
(when truncate
(truncate-tables (clone-connection (target-db copy)) (target copy)))
;; expand the specs of our source, we might have to care about several
;; files actually.
(let* ((lp:*kernel* (make-kernel worker-count))
(channel (lp:make-channel))
(path-list (expand-spec (source copy))))
(loop :for path-spec :in path-list
:do (let ((table-source (clone-copy-for copy path-spec)))
(copy-from table-source
:concurrency concurrency
:kernel lp:*kernel*
:channel channel
:on-error-stop on-error-stop
:truncate nil
:disable-triggers disable-triggers)))
;; expand the specs of our source, we might have to care about several
;; files actually.
(let* ((lp:*kernel* (make-kernel worker-count))
(channel (lp:make-channel))
(path-list (expand-spec (source copy))))
(loop :for path-spec :in path-list
:do (let ((table-source (clone-copy-for copy path-spec)))
(copy-from table-source
:concurrency concurrency
:kernel lp:*kernel*
:channel channel
:on-error-stop on-error-stop
:truncate nil
:disable-triggers disable-triggers)))
;; end kernel
(with-stats-collection ("COPY Threads Completion" :section :post
:use-result-as-read t
:use-result-as-rows t)
(let ((worker-count (* (length path-list)
(task-count concurrency))))
(loop :for tasks :below worker-count
:do (handler-case
(destructuring-bind (task table seconds)
(lp:receive-result channel)
(log-message :debug
"Finished processing ~a for ~s ~50T~6$s"
task (format-table-name table) seconds)
(when (eq :writer task)
(update-stats :data table :secs seconds)))
(condition (e)
(log-message :fatal "~a" e))))
(prog1
worker-count
(lp:end-kernel :wait nil))))
(lp:end-kernel :wait t))
;; end kernel
(with-stats-collection ("COPY Threads Completion" :section :post
:use-result-as-read t
:use-result-as-rows t)
(let ((worker-count (* (length path-list)
(task-count concurrency))))
(loop :for tasks :below worker-count
:do (handler-case
(destructuring-bind (task table seconds)
(lp:receive-result channel)
(log-message :debug
"Finished processing ~a for ~s ~50T~6$s"
task (format-table-name table) seconds)
(when (eq :writer task)
(update-stats :data table :secs seconds)))
(condition (e)
(log-message :fatal "~a" e))))
(prog1
worker-count
(lp:end-kernel :wait nil))))
(lp:end-kernel :wait t))
;; re-create the indexes from the target table entry
(create-indexes-again (target-db copy)
(target copy)
:max-parallel-create-index max-parallel-create-index
:drop-indexes drop-indexes))
;; re-create the indexes from the target table entry
(create-indexes-again (target-db copy)
pgsql-catalog
:max-parallel-create-index max-parallel-create-index
:drop-indexes drop-indexes)))