From aa8b756315d1d4c066ba11d001a53218e3d64e07 Mon Sep 17 00:00:00 2001 From: Dimitri Fontaine Date: Sat, 16 Jan 2016 19:50:21 +0100 Subject: [PATCH] Fix when to create indexes. In the recent refactoring and improvements of parallelism the indexes creation would kick in before we know that the data is done being copied over to the target table. Fix that by maintaining a writers-count hashtable and only starting to create indexes when that count reaches zero, meaning all the concurrent tasks started to handle the COPY of the data are now done. --- pgloader.asd | 23 ++++-- src/package.lisp | 81 +++++++++++++++++++- src/sources/common/db-methods.lisp | 119 +++++++++++++++-------------- src/utils/report.lisp | 40 +++++----- src/utils/schema-structs.lisp | 12 +-- 5 files changed, 181 insertions(+), 94 deletions(-) diff --git a/pgloader.asd b/pgloader.asd index edc35c1..0ab7010 100644 --- a/pgloader.asd +++ b/pgloader.asd @@ -55,8 +55,21 @@ (:file "logs") (:file "utils") (:file "state") + + ;; user defined transforms package and pgloader + ;; provided ones + (:file "transforms") + + ;; PostgreSQL related utils + (:file "read-sql-files") + (:file "quoting") + (:file "schema-structs" :depends-on ("quoting")) + + ;; State, monitoring, reporting (:file "reject" :depends-on ("state")) - (:file "report" :depends-on ("state" "utils")) + (:file "report" :depends-on ("state" + "utils" + "schema-structs")) (:file "monitor" :depends-on ("logs" "state" "reject" @@ -64,13 +77,7 @@ (:file "archive" :depends-on ("logs")) ;; generic connection api - (:file "connection" :depends-on ("archive")) - - ;; those are one-package-per-file - (:file "transforms") - (:file "read-sql-files") - (:file "quoting") - (:file "schema-structs" :depends-on ("quoting")))) + (:file "connection" :depends-on ("archive")))) ;; package pgloader.pgsql (:module pgsql diff --git a/src/package.lisp b/src/package.lisp index 5af1e14..5620500 100644 --- a/src/package.lisp +++ b/src/package.lisp @@ -23,8 +23,81 @@ #:start-logger #:stop-logger)) -(defpackage #:pgloader.state +(defpackage #:pgloader.schema (:use #:cl #:pgloader.params) + (:export #:catalog + #:schema + #:table + #:column + #:index + #:index + #:fkey + + #:cast ; generic function for sources + + #:make-catalog + #:make-schema + #:make-table + #:create-table + #:make-view + #:make-column + #:make-index + #:make-index + #:make-fkey + + #:catalog-name + #:catalog-schema-list + #:schema-name + #:schema-table-list + #:schema-view-list + #:table-name + #:table-source-name + #:table-schema + #:table-oid + #:table-comment + #:table-field-list + #:table-column-list + #:table-index-list + #:table-fkey-list + #:column-name + #:column-type-name + #:column-type-mod + #:column-type-nullable + #:column-default + #:column-comment + #:column-transform + + #:table-list + #:view-list + #:add-schema + #:find-schema + #:maybe-add-schema + #:add-table + #:find-table + #:maybe-add-table + #:add-view + #:find-view + #:maybe-add-view + #:add-field + #:add-column + #:add-index + #:add-fkey + #:find-fkey + #:maybe-add-fkey + #:count-tables + #:count-views + #:count-indexes + #:count-fkeys + #:max-indexes-per-table + + #:push-to-end + #:with-schema + + #:format-default-value + #:format-column)) + +(defpackage #:pgloader.state + (:use #:cl #:pgloader.params #:pgloader.schema) (:export #:format-table-name #:make-pgstate #:pgstate-tabnames @@ -70,7 +143,9 @@ #:timing)) (defpackage #:pgloader.utils - (:use #:cl #:pgloader.params #:pgloader.monitor #:pgloader.state) + (:use #:cl + #:pgloader.params #:pgloader.schema + #:pgloader.monitor #:pgloader.state) (:import-from #:alexandria #:appendf #:read-file-into-string) @@ -113,7 +188,7 @@ ;; quoting #:apply-identifier-case - ;; Schema structure bits + ;; schema #:catalog #:schema #:table diff --git a/src/sources/common/db-methods.lisp b/src/sources/common/db-methods.lisp index 82f63c8..f72d57b 100644 --- a/src/sources/common/db-methods.lisp +++ b/src/sources/common/db-methods.lisp @@ -109,7 +109,7 @@ :source-db (clone-connection (source-db copy)) :target-db (clone-connection (target-db copy)) :source table - :target (table-name table) + :target table :fields fields :columns columns :transforms transforms))) @@ -155,7 +155,7 @@ :including including :excluding excluding)) pkeys - (table-count 0) + (writers-count (make-hash-table :size (count-tables catalog))) (max-indexes (when create-indexes (max-indexes-per-table catalog))) (idx-kernel (when (and max-indexes (< 0 max-indexes)) @@ -208,33 +208,70 @@ ;; first COPY the data from source to PostgreSQL, using copy-kernel (unless schema-only - (incf table-count) + ;; prepare the writers-count hash-table, as we start + ;; copy-from, we have concurrency tasks writing. + (setf (gethash table writers-count) concurrency) (copy-from table-source :concurrency concurrency :kernel copy-kernel :channel copy-channel - :disable-triggers disable-triggers)) - - ;; Create the indexes for that table in parallel with the next - ;; COPY, and all at once in concurrent threads to benefit from - ;; PostgreSQL synchronous scan ability - ;; - ;; We just push new index build as they come along, if one - ;; index build requires much more time than the others our - ;; index build might get unsync: indexes for different tables - ;; will get built in parallel --- not a big problem. - (when (and create-indexes (not data-only)) - (let* ((*preserve-index-names* (eq :preserve index-names))) - (alexandria:appendf - pkeys - (create-indexes-in-kernel (target-db copy) - table - idx-kernel idx-channel)))))) + :disable-triggers disable-triggers)))) ;; now end the kernels - (end-kernels copy-kernel copy-channel idx-kernel idx-channel - table-count (count-indexes catalog) - :concurrency concurrency) + ;; and each time a table is done, launch its indexing + (unless schema-only + (let ((lp:*kernel* copy-kernel)) + (with-stats-collection ("COPY Threads Completion" :section :post + :use-result-as-read t + :use-result-as-rows t) + (let ((worker-count (* (hash-table-count writers-count) + (task-count concurrency)))) + (loop :for tasks :below worker-count + :do (destructuring-bind (task table seconds) + (lp:receive-result copy-channel) + (log-message :debug + "Finished processing ~a for ~s ~50T~6$s" + task (format-table-name table) seconds) + (when (eq :writer task) + (update-stats :data table :secs seconds) + + ;; + ;; Start the CREATE INDEX parallel tasks only when + ;; the data has been fully copied over to the + ;; corresponding table, that's when the writers + ;; count is down to zero. + ;; + (decf (gethash table writers-count)) + (log-message :debug "writers-counts[~a] = ~a" + (format-table-name table) + (gethash table writers-count)) + + (when (and create-indexes + (not data-only) + (zerop (gethash table writers-count))) + (let* ((*preserve-index-names* + (eq :preserve index-names))) + (alexandria:appendf + pkeys + (create-indexes-in-kernel (target-db copy) + table + idx-kernel + idx-channel))))))) + (prog1 + worker-count + (lp:end-kernel :wait nil)))))) + + (when create-indexes + (let ((lp:*kernel* idx-kernel)) + ;; wait until the indexes are done being built... + ;; don't forget accounting for that waiting time. + (with-stats-collection ("Index Build Completion" :section :post + :use-result-as-read t + :use-result-as-rows t) + (loop :for count :below (count-indexes catalog) + :do (lp:receive-result idx-channel)) + (lp:end-kernel :wait t) + (count-indexes catalog)))) ;; ;; Complete the PostgreSQL database before handing over. @@ -250,39 +287,3 @@ ;; Time to cleanup! ;; (cleanup copy catalog :materialize-views materialize-views))) - - -;;; -;;; Lower level tools -;;; -(defun end-kernels (copy-kernel copy-channel idx-kernel idx-channel - table-count index-count - &key (concurrency 2)) - "Terminate the lparallel kernels, waiting for all threads." - (when copy-kernel - (let ((lp:*kernel* copy-kernel)) - (with-stats-collection ("COPY Threads Completion" :section :post - :use-result-as-read t - :use-result-as-rows t) - (let ((worker-count (* table-count (task-count concurrency)))) - (loop :for tasks :below worker-count - :do (destructuring-bind (task table-name seconds) - (lp:receive-result copy-channel) - (log-message :debug "Finished processing ~a for ~s ~50T~6$s" - task table-name seconds) - (when (eq :writer task) - (update-stats :data table-name :secs seconds)))) - (prog1 - worker-count - (lp:end-kernel :wait nil)))))) - - (when idx-kernel - (let ((lp:*kernel* idx-kernel)) - ;; wait until the indexes are done being built... - ;; don't forget accounting for that waiting time. - (with-stats-collection ("Index Build Completion" :section :post - :use-result-as-read t - :use-result-as-rows t) - (loop :for count :below index-count - :count (lp:receive-result idx-channel))) - (lp:end-kernel)))) diff --git a/src/utils/report.lisp b/src/utils/report.lisp index 570c641..0997ee2 100644 --- a/src/utils/report.lisp +++ b/src/utils/report.lisp @@ -143,23 +143,24 @@ "Report a whole summary." (when header (report-header)) (loop - for table-name in (reverse (pgstate-tabnames pgstate)) - for pgtable = (gethash table-name (pgstate-tables pgstate)) - do - (with-slots (read rows errs secs rs ws) pgtable - (format *report-stream* - *header-tname-format* - *max-length-table-name* - (format-table-name table-name)) - (report-results read rows errs - (cond ((> 0 secs) (format-interval secs nil)) - ((and rs ws (= 0 secs)) - (format-interval (max rs ws) nil)) - (t (format-interval secs nil))) - (when (and rs (not (= rs 0.0))) (format-interval rs nil)) - (when (and ws (not (= ws 0.0))) (format-interval ws nil)))) - finally (when footer - (report-pgstate-stats pgstate footer)))) + :for label :in (reverse (pgstate-tabnames pgstate)) + :for pgtable := (gethash label (pgstate-tables pgstate)) + :do (with-slots (read rows errs secs rs ws) pgtable + (format *report-stream* + *header-tname-format* + *max-length-table-name* + (etypecase label + (string label) + (table (format-table-name label)))) + (report-results read rows errs + (cond ((> 0 secs) (format-interval secs nil)) + ((and rs ws (= 0 secs)) + (format-interval (max rs ws) nil)) + (t (format-interval secs nil))) + (when (and rs (not (= rs 0.0))) (format-interval rs nil)) + (when (and ws (not (= ws 0.0))) (format-interval ws nil)))) + :finally (when footer + (report-pgstate-stats pgstate footer)))) (defun parse-summary-type (&optional (pathname *summary-pathname*)) "Return the summary type we want: human-readable, csv, json." @@ -173,7 +174,10 @@ "Compute the max length of a table-name in the legend." (reduce #'max (mapcar #'length - (mapcar #'format-table-name + (mapcar (lambda (entry) + (etypecase entry + (string entry) + (table (format-table-name entry)))) (append (pgstate-tabnames data) (pgstate-tabnames pre) (pgstate-tabnames post) diff --git a/src/utils/schema-structs.lisp b/src/utils/schema-structs.lisp index b0ab147..a960f4e 100644 --- a/src/utils/schema-structs.lisp +++ b/src/utils/schema-structs.lisp @@ -8,7 +8,7 @@ ;;; Utility function using those definitions are found in schema.lisp in the ;;; same directory. ;;; -(in-package :pgloader.utils) +(in-package :pgloader.schema) (defmacro push-to-end (item place) `(setf ,place (nconc ,place (list ,item)))) @@ -314,14 +314,14 @@ ;;; Not a generic/method because only used for the table object, and we want ;;; to use the usual structure print-method in stack traces. ;;; -(defun format-table-name (table) +(defgeneric format-table-name (object) + (:documentation "Format the OBJECT name for PostgreSQL.")) + +(defmethod format-table-name ((table table)) "TABLE should be a table instance, but for hysterical raisins might be a CONS of a schema name and a table name, or just the table name as a string." - (etypecase table - (table (format nil "~@[~a.~]~a" (table-schema table) (table-name table))) - (cons (format nil "~a.~a" (car table) (cdr table))) - (string table))) + (format nil "~@[~a.~]~a" (table-schema table) (table-name table))) ;;;