diff --git a/src/sources/mysql/mysql.lisp b/src/sources/mysql/mysql.lisp index 646986c..ec25427 100644 --- a/src/sources/mysql/mysql.lisp +++ b/src/sources/mysql/mysql.lisp @@ -406,22 +406,35 @@ ;; now end the kernels (let ((lp:*kernel* copy-kernel)) - (with-stats-collection ("COPY Threads Completion" :section :post) - (loop :for tasks :below (* 4 table-count) - :do (destructuring-bind (task table-name seconds) - (lp:receive-result copy-channel) - (log-message :info "Finished processing ~a for ~s ~50T~6$s" - task table-name seconds) - (when (eq :writer task) - (update-stats :data table-name :secs seconds)))) - (lp:end-kernel))) + (with-stats-collection ("COPY Threads Completion" :section :post + :use-result-as-read t + :use-result-as-rows t) + (let ((workers-count (* 4 table-count))) + (loop :for tasks :below workers-count + :do (destructuring-bind (task table-name seconds) + (lp:receive-result copy-channel) + (log-message :debug "Finished processing ~a for ~s ~50T~6$s" + task table-name seconds) + (when (eq :writer task) + (update-stats :data table-name :secs seconds)))) + (prog1 + workers-count + (lp:end-kernel))))) (let ((lp:*kernel* idx-kernel)) ;; wait until the indexes are done being built... ;; don't forget accounting for that waiting time. (when (and create-indexes (not data-only)) - (with-stats-collection ("Index Build Completion" :section :post) - (loop for idx in all-indexes do (lp:receive-result idx-channel)))) + (with-stats-collection ("Index Build Completion" :section :post + :use-result-as-read t + :use-result-as-rows t) + (let ((nb-indexes + (reduce #'+ all-indexes :key (lambda (entry) + (length (cdr entry)))))) + (log-message :debug "Waiting for ~a index completion" nb-indexes) + (loop :for count :below nb-indexes + :do (lp:receive-result idx-channel)) + nb-indexes))) (lp:end-kernel)) ;; diff --git a/src/sources/sqlite/sqlite.lisp b/src/sources/sqlite/sqlite.lisp index 0838e34..e46cfa2 100644 --- a/src/sources/sqlite/sqlite.lisp +++ b/src/sources/sqlite/sqlite.lisp @@ -229,22 +229,34 @@ ;; now end the kernels (let ((lp:*kernel* copy-kernel)) - (with-stats-collection ("COPY Threads Completion" :section :post) - (loop :for tasks :below (* 4 table-count) - :do (destructuring-bind (task table-name seconds) - (lp:receive-result copy-channel) - (log-message :debug "Finished processing ~a for ~s ~50T~6$s" - task table-name seconds) - (when (eq :writer task) - (update-stats :data table-name :secs seconds)))) - (lp:end-kernel))) + (with-stats-collection ("COPY Threads Completion" :section :post + :use-result-as-read t + :use-result-as-rows t) + (let ((workers-count (* 4 table-count))) + (loop :for tasks :below workers-count + :do (destructuring-bind (task table-name seconds) + (lp:receive-result copy-channel) + (log-message :debug "Finished processing ~a for ~s ~50T~6$s" + task table-name seconds) + (when (eq :writer task) + (update-stats :data table-name :secs seconds)))) + (prog1 + workers-count + (lp:end-kernel))))) (let ((lp:*kernel* idx-kernel)) ;; wait until the indexes are done being built... ;; don't forget accounting for that waiting time. (when (and create-indexes (not data-only)) - (with-stats-collection ("Index Build Completion" :section :post) - (loop for idx in all-indexes do (lp:receive-result idx-channel)))) + (with-stats-collection ("Index Build Completion" :section :post + :use-result-as-read t + :use-result-as-rows t) + (let ((nb-indexes + (reduce #'+ all-indexes :key (lambda (entry) + (length (cdr entry)))))) + (loop :for count :below nb-indexes + :do (lp:receive-result idx-channel)) + nb-indexes))) (lp:end-kernel)) ;; don't forget to reset sequences, but only when we did actually import