Fix a hang scenario in schema-only.

The parallelism in pgloader is now smart enough to begin fetching data from
the next table while the previous one is still not done being written down
to PostgreSQL, but when doing so I introduced a bug in the way indexes are
taken care of.

Specifically, in schema-only mode of operations, we would wait for indexes
we skipped creating. The skipping is the bug here, so make sure we create
indexes even when we don't want to copy any data over.
This commit is contained in:
Dimitri Fontaine 2017-02-25 17:09:07 +01:00
parent 2f7169e286
commit 96b2af6b2a

View File

@ -200,6 +200,29 @@
(when alter-table
(alter-table catalog alter-table)))
(defun create-indexes (copy table kernel channel
&key
data-only
create-tables
create-indexes
(index-names :uniquify))
"Launch the CREATE INDEX in parralel for given table. Returns a list of
the UNIQUE indexes that are created here and need to then be upgraded as
PRIMARY KEYS."
(when (and create-indexes (not data-only))
(let* ((*preserve-index-names*
(or (eq :preserve index-names)
;; if we didn't create the tables, we
;; are re-installing the pre-existing
;; indexes
(not create-tables))))
;; that returns Primary Keys to be upgraded from the UNIQUE indexes
;; that we just prepared in this step.
(create-indexes-in-kernel (target-db copy)
table
kernel
channel))))
;;;
;;; Generic enough implementation of the copy-database method.
@ -300,16 +323,26 @@
:do (let ((table-source (instanciate-table-copy-object copy table)))
;; first COPY the data from source to PostgreSQL, using copy-kernel
(unless schema-only
;; prepare the writers-count hash-table, as we start
;; copy-from, we have concurrency tasks writing.
(setf (gethash table writers-count) concurrency)
(copy-from table-source
:concurrency concurrency
:kernel copy-kernel
:channel copy-channel
:on-error-stop on-error-stop
:disable-triggers disable-triggers))))
(if schema-only
;; start indexing straight away then
(alexandria:appendf
pkeys
(create-indexes copy table idx-kernel idx-channel
:data-only data-only
:create-tables create-tables
:create-indexes create-indexes
:index-names index-names))
;; prepare the writers-count hash-table, as we start
;; copy-from, we have concurrency tasks writing.
(progn
(setf (gethash table writers-count) concurrency)
(copy-from table-source
:concurrency concurrency
:kernel copy-kernel
:channel copy-channel
:on-error-stop on-error-stop
:disable-triggers disable-triggers)))))
;; now end the kernels
;; and each time a table is done, launch its indexing
@ -318,46 +351,39 @@
(with-stats-collection ("COPY Threads Completion" :section :post
:use-result-as-read t
:use-result-as-rows t)
(let ((worker-count (* (hash-table-count writers-count)
(task-count concurrency))))
(loop :for tasks :below worker-count
:do (destructuring-bind (task table seconds)
(lp:receive-result copy-channel)
(log-message :debug
"Finished processing ~a for ~s ~50T~6$s"
task (format-table-name table) seconds)
(when (eq :writer task)
(update-stats :data table :secs seconds)
(let ((worker-count (* (hash-table-count writers-count)
(task-count concurrency))))
(loop :for tasks :below worker-count
:do (destructuring-bind (task table seconds)
(lp:receive-result copy-channel)
(log-message :debug
"Finished processing ~a for ~s ~50T~6$s"
task (format-table-name table) seconds)
(when (eq :writer task)
(update-stats :data table :secs seconds)
;;
;; Start the CREATE INDEX parallel tasks only when
;; the data has been fully copied over to the
;; corresponding table, that's when the writers
;; count is down to zero.
;;
(decf (gethash table writers-count))
(log-message :debug "writers-counts[~a] = ~a"
(format-table-name table)
(gethash table writers-count))
;;
;; Start the CREATE INDEX parallel tasks only when
;; the data has been fully copied over to the
;; corresponding table, that's when the writers
;; count is down to zero.
;;
(decf (gethash table writers-count))
(log-message :debug "writers-counts[~a] = ~a"
(format-table-name table)
(gethash table writers-count))
(when (and create-indexes
(not data-only)
(zerop (gethash table writers-count)))
(let* ((*preserve-index-names*
(or (eq :preserve index-names)
;; if we didn't create the tables, we
;; are re-installing the pre-existing
;; indexes
(not create-tables))))
(alexandria:appendf
pkeys
(create-indexes-in-kernel (target-db copy)
table
idx-kernel
idx-channel)))))))
(prog1
worker-count
(lp:end-kernel :wait nil))))))
(when (zerop (gethash table writers-count))
(alexandria:appendf
pkeys
(create-indexes copy table idx-kernel idx-channel
:data-only data-only
:create-tables create-tables
:create-indexes create-indexes
:index-names index-names))))))
(prog1
worker-count
(lp:end-kernel :wait nil))))))
(when create-indexes
(let ((lp:*kernel* idx-kernel))
@ -366,8 +392,8 @@
(with-stats-collection ("Index Build Completion" :section :post
:use-result-as-read t
:use-result-as-rows t)
(loop :for count :below (count-indexes catalog)
:do (lp:receive-result idx-channel))
(loop :for count :below (count-indexes catalog)
:do (lp:receive-result idx-channel))
(lp:end-kernel :wait t)
(count-indexes catalog))))