mirror of
https://github.com/dimitri/pgloader.git
synced 2026-05-05 10:56:10 +02:00
Fix index completion management in MySQL and SQLite.
We used to wait for the wrong number of workers, meaning the rest of the code began running before the indexes where all available. A user report where one of the indexes takes a very long time to compute made it obvious. In passing, also improve reporting of those rendez-vous sections.
This commit is contained in:
parent
af9e423f0b
commit
2dd7f68a30
@ -406,22 +406,35 @@
|
||||
|
||||
;; now end the kernels
|
||||
(let ((lp:*kernel* copy-kernel))
|
||||
(with-stats-collection ("COPY Threads Completion" :section :post)
|
||||
(loop :for tasks :below (* 4 table-count)
|
||||
:do (destructuring-bind (task table-name seconds)
|
||||
(lp:receive-result copy-channel)
|
||||
(log-message :info "Finished processing ~a for ~s ~50T~6$s"
|
||||
task table-name seconds)
|
||||
(when (eq :writer task)
|
||||
(update-stats :data table-name :secs seconds))))
|
||||
(lp:end-kernel)))
|
||||
(with-stats-collection ("COPY Threads Completion" :section :post
|
||||
:use-result-as-read t
|
||||
:use-result-as-rows t)
|
||||
(let ((workers-count (* 4 table-count)))
|
||||
(loop :for tasks :below workers-count
|
||||
:do (destructuring-bind (task table-name seconds)
|
||||
(lp:receive-result copy-channel)
|
||||
(log-message :debug "Finished processing ~a for ~s ~50T~6$s"
|
||||
task table-name seconds)
|
||||
(when (eq :writer task)
|
||||
(update-stats :data table-name :secs seconds))))
|
||||
(prog1
|
||||
workers-count
|
||||
(lp:end-kernel)))))
|
||||
|
||||
(let ((lp:*kernel* idx-kernel))
|
||||
;; wait until the indexes are done being built...
|
||||
;; don't forget accounting for that waiting time.
|
||||
(when (and create-indexes (not data-only))
|
||||
(with-stats-collection ("Index Build Completion" :section :post)
|
||||
(loop for idx in all-indexes do (lp:receive-result idx-channel))))
|
||||
(with-stats-collection ("Index Build Completion" :section :post
|
||||
:use-result-as-read t
|
||||
:use-result-as-rows t)
|
||||
(let ((nb-indexes
|
||||
(reduce #'+ all-indexes :key (lambda (entry)
|
||||
(length (cdr entry))))))
|
||||
(log-message :debug "Waiting for ~a index completion" nb-indexes)
|
||||
(loop :for count :below nb-indexes
|
||||
:do (lp:receive-result idx-channel))
|
||||
nb-indexes)))
|
||||
(lp:end-kernel))
|
||||
|
||||
;;
|
||||
|
||||
@ -229,22 +229,34 @@
|
||||
|
||||
;; now end the kernels
|
||||
(let ((lp:*kernel* copy-kernel))
|
||||
(with-stats-collection ("COPY Threads Completion" :section :post)
|
||||
(loop :for tasks :below (* 4 table-count)
|
||||
:do (destructuring-bind (task table-name seconds)
|
||||
(lp:receive-result copy-channel)
|
||||
(log-message :debug "Finished processing ~a for ~s ~50T~6$s"
|
||||
task table-name seconds)
|
||||
(when (eq :writer task)
|
||||
(update-stats :data table-name :secs seconds))))
|
||||
(lp:end-kernel)))
|
||||
(with-stats-collection ("COPY Threads Completion" :section :post
|
||||
:use-result-as-read t
|
||||
:use-result-as-rows t)
|
||||
(let ((workers-count (* 4 table-count)))
|
||||
(loop :for tasks :below workers-count
|
||||
:do (destructuring-bind (task table-name seconds)
|
||||
(lp:receive-result copy-channel)
|
||||
(log-message :debug "Finished processing ~a for ~s ~50T~6$s"
|
||||
task table-name seconds)
|
||||
(when (eq :writer task)
|
||||
(update-stats :data table-name :secs seconds))))
|
||||
(prog1
|
||||
workers-count
|
||||
(lp:end-kernel)))))
|
||||
|
||||
(let ((lp:*kernel* idx-kernel))
|
||||
;; wait until the indexes are done being built...
|
||||
;; don't forget accounting for that waiting time.
|
||||
(when (and create-indexes (not data-only))
|
||||
(with-stats-collection ("Index Build Completion" :section :post)
|
||||
(loop for idx in all-indexes do (lp:receive-result idx-channel))))
|
||||
(with-stats-collection ("Index Build Completion" :section :post
|
||||
:use-result-as-read t
|
||||
:use-result-as-rows t)
|
||||
(let ((nb-indexes
|
||||
(reduce #'+ all-indexes :key (lambda (entry)
|
||||
(length (cdr entry))))))
|
||||
(loop :for count :below nb-indexes
|
||||
:do (lp:receive-result idx-channel))
|
||||
nb-indexes)))
|
||||
(lp:end-kernel))
|
||||
|
||||
;; don't forget to reset sequences, but only when we did actually import
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user