Fix when to create indexes.

In the recent refactoring and improvements of parallelism the indexes
creation would kick in before we know that the data is done being copied
over to the target table.

Fix that by maintaining a writers-count hashtable and only starting to
create indexes when that count reaches zero, meaning all the concurrent
tasks started to handle the COPY of the data are now done.
This commit is contained in:
Dimitri Fontaine 2016-01-16 19:50:21 +01:00
parent dcc8eb6d61
commit aa8b756315
5 changed files with 181 additions and 94 deletions

View File

@ -55,8 +55,21 @@
(:file "logs")
(:file "utils")
(:file "state")
;; user defined transforms package and pgloader
;; provided ones
(:file "transforms")
;; PostgreSQL related utils
(:file "read-sql-files")
(:file "quoting")
(:file "schema-structs" :depends-on ("quoting"))
;; State, monitoring, reporting
(:file "reject" :depends-on ("state"))
(:file "report" :depends-on ("state" "utils"))
(:file "report" :depends-on ("state"
"utils"
"schema-structs"))
(:file "monitor" :depends-on ("logs"
"state"
"reject"
@ -64,13 +77,7 @@
(:file "archive" :depends-on ("logs"))
;; generic connection api
(:file "connection" :depends-on ("archive"))
;; those are one-package-per-file
(:file "transforms")
(:file "read-sql-files")
(:file "quoting")
(:file "schema-structs" :depends-on ("quoting"))))
(:file "connection" :depends-on ("archive"))))
;; package pgloader.pgsql
(:module pgsql

View File

@ -23,8 +23,81 @@
#:start-logger
#:stop-logger))
(defpackage #:pgloader.state
(defpackage #:pgloader.schema
(:use #:cl #:pgloader.params)
(:export #:catalog
#:schema
#:table
#:column
#:index
#:index
#:fkey
#:cast ; generic function for sources
#:make-catalog
#:make-schema
#:make-table
#:create-table
#:make-view
#:make-column
#:make-index
#:make-index
#:make-fkey
#:catalog-name
#:catalog-schema-list
#:schema-name
#:schema-table-list
#:schema-view-list
#:table-name
#:table-source-name
#:table-schema
#:table-oid
#:table-comment
#:table-field-list
#:table-column-list
#:table-index-list
#:table-fkey-list
#:column-name
#:column-type-name
#:column-type-mod
#:column-type-nullable
#:column-default
#:column-comment
#:column-transform
#:table-list
#:view-list
#:add-schema
#:find-schema
#:maybe-add-schema
#:add-table
#:find-table
#:maybe-add-table
#:add-view
#:find-view
#:maybe-add-view
#:add-field
#:add-column
#:add-index
#:add-fkey
#:find-fkey
#:maybe-add-fkey
#:count-tables
#:count-views
#:count-indexes
#:count-fkeys
#:max-indexes-per-table
#:push-to-end
#:with-schema
#:format-default-value
#:format-column))
(defpackage #:pgloader.state
(:use #:cl #:pgloader.params #:pgloader.schema)
(:export #:format-table-name
#:make-pgstate
#:pgstate-tabnames
@ -70,7 +143,9 @@
#:timing))
(defpackage #:pgloader.utils
(:use #:cl #:pgloader.params #:pgloader.monitor #:pgloader.state)
(:use #:cl
#:pgloader.params #:pgloader.schema
#:pgloader.monitor #:pgloader.state)
(:import-from #:alexandria
#:appendf
#:read-file-into-string)
@ -113,7 +188,7 @@
;; quoting
#:apply-identifier-case
;; Schema structure bits
;; schema
#:catalog
#:schema
#:table

View File

@ -109,7 +109,7 @@
:source-db (clone-connection (source-db copy))
:target-db (clone-connection (target-db copy))
:source table
:target (table-name table)
:target table
:fields fields
:columns columns
:transforms transforms)))
@ -155,7 +155,7 @@
:including including
:excluding excluding))
pkeys
(table-count 0)
(writers-count (make-hash-table :size (count-tables catalog)))
(max-indexes (when create-indexes
(max-indexes-per-table catalog)))
(idx-kernel (when (and max-indexes (< 0 max-indexes))
@ -208,33 +208,70 @@
;; first COPY the data from source to PostgreSQL, using copy-kernel
(unless schema-only
(incf table-count)
;; prepare the writers-count hash-table, as we start
;; copy-from, we have concurrency tasks writing.
(setf (gethash table writers-count) concurrency)
(copy-from table-source
:concurrency concurrency
:kernel copy-kernel
:channel copy-channel
:disable-triggers disable-triggers))
;; Create the indexes for that table in parallel with the next
;; COPY, and all at once in concurrent threads to benefit from
;; PostgreSQL synchronous scan ability
;;
;; We just push new index build as they come along, if one
;; index build requires much more time than the others our
;; index build might get unsync: indexes for different tables
;; will get built in parallel --- not a big problem.
(when (and create-indexes (not data-only))
(let* ((*preserve-index-names* (eq :preserve index-names)))
(alexandria:appendf
pkeys
(create-indexes-in-kernel (target-db copy)
table
idx-kernel idx-channel))))))
:disable-triggers disable-triggers))))
;; now end the kernels
(end-kernels copy-kernel copy-channel idx-kernel idx-channel
table-count (count-indexes catalog)
:concurrency concurrency)
;; and each time a table is done, launch its indexing
(unless schema-only
(let ((lp:*kernel* copy-kernel))
(with-stats-collection ("COPY Threads Completion" :section :post
:use-result-as-read t
:use-result-as-rows t)
(let ((worker-count (* (hash-table-count writers-count)
(task-count concurrency))))
(loop :for tasks :below worker-count
:do (destructuring-bind (task table seconds)
(lp:receive-result copy-channel)
(log-message :debug
"Finished processing ~a for ~s ~50T~6$s"
task (format-table-name table) seconds)
(when (eq :writer task)
(update-stats :data table :secs seconds)
;;
;; Start the CREATE INDEX parallel tasks only when
;; the data has been fully copied over to the
;; corresponding table, that's when the writers
;; count is down to zero.
;;
(decf (gethash table writers-count))
(log-message :debug "writers-counts[~a] = ~a"
(format-table-name table)
(gethash table writers-count))
(when (and create-indexes
(not data-only)
(zerop (gethash table writers-count)))
(let* ((*preserve-index-names*
(eq :preserve index-names)))
(alexandria:appendf
pkeys
(create-indexes-in-kernel (target-db copy)
table
idx-kernel
idx-channel)))))))
(prog1
worker-count
(lp:end-kernel :wait nil))))))
(when create-indexes
(let ((lp:*kernel* idx-kernel))
;; wait until the indexes are done being built...
;; don't forget accounting for that waiting time.
(with-stats-collection ("Index Build Completion" :section :post
:use-result-as-read t
:use-result-as-rows t)
(loop :for count :below (count-indexes catalog)
:do (lp:receive-result idx-channel))
(lp:end-kernel :wait t)
(count-indexes catalog))))
;;
;; Complete the PostgreSQL database before handing over.
@ -250,39 +287,3 @@
;; Time to cleanup!
;;
(cleanup copy catalog :materialize-views materialize-views)))
;;;
;;; Lower level tools
;;;
(defun end-kernels (copy-kernel copy-channel idx-kernel idx-channel
table-count index-count
&key (concurrency 2))
"Terminate the lparallel kernels, waiting for all threads."
(when copy-kernel
(let ((lp:*kernel* copy-kernel))
(with-stats-collection ("COPY Threads Completion" :section :post
:use-result-as-read t
:use-result-as-rows t)
(let ((worker-count (* table-count (task-count concurrency))))
(loop :for tasks :below worker-count
:do (destructuring-bind (task table-name seconds)
(lp:receive-result copy-channel)
(log-message :debug "Finished processing ~a for ~s ~50T~6$s"
task table-name seconds)
(when (eq :writer task)
(update-stats :data table-name :secs seconds))))
(prog1
worker-count
(lp:end-kernel :wait nil))))))
(when idx-kernel
(let ((lp:*kernel* idx-kernel))
;; wait until the indexes are done being built...
;; don't forget accounting for that waiting time.
(with-stats-collection ("Index Build Completion" :section :post
:use-result-as-read t
:use-result-as-rows t)
(loop :for count :below index-count
:count (lp:receive-result idx-channel)))
(lp:end-kernel))))

View File

@ -143,23 +143,24 @@
"Report a whole summary."
(when header (report-header))
(loop
for table-name in (reverse (pgstate-tabnames pgstate))
for pgtable = (gethash table-name (pgstate-tables pgstate))
do
(with-slots (read rows errs secs rs ws) pgtable
(format *report-stream*
*header-tname-format*
*max-length-table-name*
(format-table-name table-name))
(report-results read rows errs
(cond ((> 0 secs) (format-interval secs nil))
((and rs ws (= 0 secs))
(format-interval (max rs ws) nil))
(t (format-interval secs nil)))
(when (and rs (not (= rs 0.0))) (format-interval rs nil))
(when (and ws (not (= ws 0.0))) (format-interval ws nil))))
finally (when footer
(report-pgstate-stats pgstate footer))))
:for label :in (reverse (pgstate-tabnames pgstate))
:for pgtable := (gethash label (pgstate-tables pgstate))
:do (with-slots (read rows errs secs rs ws) pgtable
(format *report-stream*
*header-tname-format*
*max-length-table-name*
(etypecase label
(string label)
(table (format-table-name label))))
(report-results read rows errs
(cond ((> 0 secs) (format-interval secs nil))
((and rs ws (= 0 secs))
(format-interval (max rs ws) nil))
(t (format-interval secs nil)))
(when (and rs (not (= rs 0.0))) (format-interval rs nil))
(when (and ws (not (= ws 0.0))) (format-interval ws nil))))
:finally (when footer
(report-pgstate-stats pgstate footer))))
(defun parse-summary-type (&optional (pathname *summary-pathname*))
"Return the summary type we want: human-readable, csv, json."
@ -173,7 +174,10 @@
"Compute the max length of a table-name in the legend."
(reduce #'max
(mapcar #'length
(mapcar #'format-table-name
(mapcar (lambda (entry)
(etypecase entry
(string entry)
(table (format-table-name entry))))
(append (pgstate-tabnames data)
(pgstate-tabnames pre)
(pgstate-tabnames post)

View File

@ -8,7 +8,7 @@
;;; Utility function using those definitions are found in schema.lisp in the
;;; same directory.
;;;
(in-package :pgloader.utils)
(in-package :pgloader.schema)
(defmacro push-to-end (item place)
`(setf ,place (nconc ,place (list ,item))))
@ -314,14 +314,14 @@
;;; Not a generic/method because only used for the table object, and we want
;;; to use the usual structure print-method in stack traces.
;;;
(defun format-table-name (table)
(defgeneric format-table-name (object)
(:documentation "Format the OBJECT name for PostgreSQL."))
(defmethod format-table-name ((table table))
"TABLE should be a table instance, but for hysterical raisins might be a
CONS of a schema name and a table name, or just the table name as a
string."
(etypecase table
(table (format nil "~@[~a.~]~a" (table-schema table) (table-name table)))
(cons (format nil "~a.~a" (car table) (cdr table)))
(string table)))
(format nil "~@[~a.~]~a" (table-schema table) (table-name table)))
;;;