Cleanup and timing display improvements.

Have each thread publish its own start-time so that the main thread may
compute time spent in source and target processing, in order to fix the
crude hack of taking (max read-time write-time) in the total time column
of the summary.

We still have some strange artefacts here: we consider that the full
processing time is bound to the writer thread (:target), because it
needs to have the reader done already to be able to COPY the last
batch... but in testing I've seen some :source timings higher than the
:target ones...

Let's solve problems one at a time tho, I guess multi-threading and
accurate wall clock times aren't to be expected to mix and match that
easily anyway (multi cores, single RTC and all that).
This commit is contained in:
Dimitri Fontaine 2015-10-22 22:36:40 +02:00
parent 933d1c8d6b
commit 4f3b3472a2
6 changed files with 50 additions and 29 deletions

View File

@ -66,6 +66,7 @@
#:send-event
#:start-monitor
#:stop-monitor
#:elapsed-time-since
#:timing))
(defpackage #:pgloader.utils
@ -77,6 +78,7 @@
(:export #:with-monitor ; monitor
#:*monitoring-queue*
#:with-stats-collection
#:elapsed-time-since
#:timing
;; bits from alexandria

View File

@ -49,32 +49,33 @@
"Fetch from the QUEUE messages containing how many rows are in the
*writer-batch* for us to send down to PostgreSQL, and when that's done
update stats."
(when truncate
(truncate-tables pgconn (list table-name)))
(let ((start-time (get-internal-real-time)))
(when truncate
(truncate-tables pgconn (list table-name)))
(with-pgsql-connection (pgconn)
(with-schema (unqualified-table-name table-name)
(when disable-triggers (disable-triggers unqualified-table-name))
(log-message :info "pgsql:copy-from-queue: ~a ~a" table-name columns)
(with-pgsql-connection (pgconn)
(with-schema (unqualified-table-name table-name)
(with-disabled-triggers (unqualified-table-name
:disable-triggers disable-triggers)
(log-message :info "pgsql:copy-from-queue: ~a ~a" table-name columns)
(loop
for (mesg batch read oversized?) = (lq:pop-queue queue)
until (eq mesg :end-of-data)
for (rows ws) = (multiple-value-bind (result secs)
(timing
(copy-batch unqualified-table-name columns batch read))
(list result secs))
do (progn
;; The SBCL implementation needs some Garbage Collection
;; decision making help... and now is a pretty good time.
#+sbcl (when oversized? (sb-ext:gc :full t))
(log-message :debug "copy-batch ~a ~d row~:p~:[~; [oversized]~]"
unqualified-table-name rows oversized?)
(update-stats :data table-name :rows rows :ws ws)))
(loop
:for (mesg batch read oversized?) := (lq:pop-queue queue)
:until (eq mesg :end-of-data)
:for (rows ws) := (multiple-value-bind (result secs)
(timing
(copy-batch unqualified-table-name
columns batch read))
(list result secs))
:do (progn
;; The SBCL implementation needs some Garbage Collection
;; decision making help... and now is a pretty good time.
#+sbcl (when oversized? (sb-ext:gc :full t))
(log-message :debug "copy-batch ~a ~d row~:p~:[~; [oversized]~]"
unqualified-table-name rows oversized?)
(update-stats :data table-name :rows rows :ws ws))))))
(when disable-triggers (enable-triggers unqualified-table-name))))
(cons :target table-name))
(list :target table-name start-time)))
;;;
;;; Compute how many rows we're going to try loading next, depending on

View File

@ -259,6 +259,19 @@
(log-message :info "~a" sql)
(pomo:execute sql)))
(defmacro with-disabled-triggers ((table-name &key disable-triggers)
&body forms)
"Run FORMS with PostgreSQL triggers disabled for TABLE-NAME if
DISABLE-TRIGGERS is T A PostgreSQL connection must be opened already
where this macro is used."
`(if ,disable-triggers
(progn
(disable-triggers ,table-name)
(unwind-protect
(progn ,@forms)
(enable-triggers ,table-name)))
(progn ,@forms)))
;;;
;;; Index support

View File

@ -37,7 +37,7 @@
(with-slots (start data count bytes) *current-batch*
(update-stats :data (target copy)
:read count
:rs (pgloader.monitor::elapsed-time-since start))
:rs (elapsed-time-since start))
(lq:push-queue (list :batch data count oversized?) queue))
(setf *current-batch* (make-batch))))
@ -77,7 +77,7 @@
(log-message :debug "Sending last batch (~d rows)" count)
(update-stats :data (target copy)
:read count
:rs (pgloader.monitor::elapsed-time-since start))
:rs (elapsed-time-since start))
(lq:push-queue (list :batch data count nil) queue))))
;; signal we're done

View File

@ -8,8 +8,9 @@
;;;
(defmethod copy-to-queue ((copy copy) queue)
"Copy data from given COPY definition into lparallel.queue QUEUE"
(pgloader.queue:map-push-queue copy queue)
(cons :source (target copy)))
(let ((start-time (get-internal-real-time)))
(pgloader.queue:map-push-queue copy queue)
(list :source (target copy) start-time)))
(defmethod copy-column-list ((copy copy))
"Default column list is an empty list."

View File

@ -408,10 +408,14 @@
(let ((lp:*kernel* copy-kernel))
(with-stats-collection ("COPY Threads Completion" :section :post)
(loop :for tasks :below (* 2 table-count)
:do (destructuring-bind (task . table-name)
:do (destructuring-bind (task table-name start-time)
(lp:receive-result copy-channel)
(declare (ignorable start-time))
(log-message :debug "Finished processing ~a for ~s"
task table-name)))
task table-name)
(when (eq :target task)
(update-stats :data table-name
:secs (elapsed-time-since start-time)))))
(lp:end-kernel)))
(let ((lp:*kernel* idx-kernel))