Fix incorrect per-table total time metrics.

The concurrency nature of pgloader made it non obvious where to implement
the timers properly, and as a result the tracking of how long it took to
actually transfer the data was... just wrong.

Rather than trying to measure the time spent in any particular piece of the
code, we now emit "start" and "stop" stats messages to the monitor thread at
the right places (which are way easier to find, in the worker threads) and
have the monitor figure out how long it took really.

Fix #506.
This commit is contained in:
Dimitri Fontaine 2017-04-30 17:25:47 +02:00
parent 20ea1d78c4
commit 8254d63453
7 changed files with 91 additions and 62 deletions

View File

@ -201,6 +201,9 @@
#:pgstate-decf
#:pgtable-initialize-reject-files
#:pgtable-secs
#:pgtable-start
#:pgtable-stop
#:pgtable-reject-data
#:pgtable-reject-logs
#:report-pgtable-stats

View File

@ -139,7 +139,9 @@
(update-stats :data table :rows rows)
(incf seconds batch-seconds))))))
(update-stats :data table :ws seconds)
;; each writer thread sends its own stop timestamp and the monitor keeps
;; only the latest entry
(update-stats :data table :ws seconds :stop (get-internal-real-time))
(log-message :debug "Writer[~a] for ~a is done in ~6$s"
(lp:kernel-worker-index)
(format-table-name table) seconds)

View File

@ -348,8 +348,6 @@
"Finished processing ~a for ~s ~50T~6$s"
task (format-table-name table) seconds)
(when (eq :writer task)
(update-stats :data table :secs seconds)
;;
;; Start the CREATE INDEX parallel tasks only when
;; the data has been fully copied over to the

View File

@ -165,9 +165,7 @@
(lp:receive-result channel)
(log-message :debug
"Finished processing ~a for ~s ~50T~6$s"
task (format-table-name table) seconds)
(when (eq :writer task)
(update-stats :data table :secs seconds)))
task (format-table-name table) seconds))
(condition (e)
(log-message :fatal "~a" e))))
(prog1

View File

@ -27,6 +27,9 @@
(setf bclist (cdr bclist)
qclist (cdr qclist))))))
;; signal we are starting
(update-stats :data (target copy) :start start-time)
;; call the source-specific method for reading input data
(map-rows copy :process-row-fn process-row)
@ -150,48 +153,47 @@
(fmtqs (loop :repeat concurrency :collect
(lq:make-queue :fixed-capacity *concurrent-batches*))))
(with-stats-collection ((target copy) :dbname (db-name (target-db copy)))
(lp:task-handler-bind
((error #'(lambda (condition)
(log-message :error "A thread failed with error: ~a"
condition)
#-pgloader-image
(if (member *client-min-messages* (list :debug :data))
(lp::invoke-debugger condition)
(lp::invoke-transfer-error condition))
#+pgloader-image
(if (member *client-min-messages* (list :debug :data))
(log-message :fatal "Backtrace: ~a"
(trivial-backtrace:print-backtrace
condition
:output nil
:verbose t))
(lp::invoke-transfer-error condition)))))
(log-message :notice "COPY ~s" table-name)
(lp:task-handler-bind
((error #'(lambda (condition)
(log-message :error "A thread failed with error: ~a"
condition)
#-pgloader-image
(if (member *client-min-messages* (list :debug :data))
(lp::invoke-debugger condition)
(lp::invoke-transfer-error condition))
#+pgloader-image
(if (member *client-min-messages* (list :debug :data))
(log-message :fatal "Backtrace: ~a"
(trivial-backtrace:print-backtrace
condition
:output nil
:verbose t))
(lp::invoke-transfer-error condition)))))
(log-message :notice "COPY ~s" table-name)
;; start a task to read data from the source into the queue
(lp:submit-task channel #'queue-raw-data copy rawqs)
;; start a task to read data from the source into the queue
(lp:submit-task channel #'queue-raw-data copy rawqs)
;; now start transformer threads to process raw vectors from our
;; source into preprocessed batches to send down to PostgreSQL
(loop :for rawq :in rawqs
:for fmtq :in fmtqs
:do (lp:submit-task channel #'format-data-to-copy copy rawq fmtq))
;; now start transformer threads to process raw vectors from our
;; source into preprocessed batches to send down to PostgreSQL
(loop :for rawq :in rawqs
:for fmtq :in fmtqs
:do (lp:submit-task channel #'format-data-to-copy copy rawq fmtq))
(loop :for fmtq :in fmtqs
:do (lp:submit-task channel
#'pgloader.pgsql:copy-from-queue
(clone-connection (target-db copy))
(target copy)
fmtq
:columns (copy-column-list copy)
:on-error-stop on-error-stop
:disable-triggers disable-triggers))
(loop :for fmtq :in fmtqs
:do (lp:submit-task channel
#'pgloader.pgsql:copy-from-queue
(clone-connection (target-db copy))
(target copy)
fmtq
:columns (copy-column-list copy)
:on-error-stop on-error-stop
:disable-triggers disable-triggers))
;; now wait until both the tasks are over, and kill the kernel
(unless c-s-p
(log-message :debug "waiting for ~d tasks" (task-count concurrency))
(loop :repeat (task-count concurrency)
:do (lp:receive-result channel))
(log-message :notice "COPY ~s done." table-name)
(unless k-s-p (lp:end-kernel :wait t)))))))
;; now wait until both the tasks are over, and kill the kernel
(unless c-s-p
(log-message :debug "waiting for ~d tasks" (task-count concurrency))
(loop :repeat (task-count concurrency)
:do (lp:receive-result channel))
(log-message :notice "COPY ~s done." table-name)
(unless k-s-p (lp:end-kernel :wait t))))))

View File

@ -31,7 +31,7 @@
(defstruct noop)
(defstruct log-message category description arguments)
(defstruct new-label section label dbname)
(defstruct update-stats section label read rows errs secs rs ws)
(defstruct update-stats section label read rows errs secs rs ws start stop)
(defstruct bad-row section label condition data)
(defun log-message (category description &rest arguments)
@ -45,7 +45,7 @@
SECTION."
(send-event (make-new-label :section section :label label :dbname dbname)))
(defun update-stats (section label &key read rows errs secs rs ws)
(defun update-stats (section label &key read rows errs secs rs ws start stop)
"Send an event to update stats for given SECTION and LABEL."
(send-event (make-update-stats :section section
:label label
@ -54,7 +54,9 @@
:errs errs
:secs secs
:rs rs
:ws ws)))
:ws ws
:start start
:stop stop)))
(defun process-bad-row (table condition data)
"Send an event to log the bad row DATA in the reject and log files for given
@ -215,18 +217,39 @@
(new-label-dbname event)))))
(update-stats
;; it only costs an extra hash table lookup...
(pgstate-new-label (getf *sections* (update-stats-section event))
(update-stats-label event))
(let* ((pgstate (getf *sections* (update-stats-section event)))
(label (update-stats-label event))
(table (pgstate-new-label pgstate label)))
(pgstate-incf (getf *sections* (update-stats-section event))
(update-stats-label event)
:read (update-stats-read event)
:rows (update-stats-rows event)
:secs (update-stats-secs event)
:errs (update-stats-errs event)
:rs (update-stats-rs event)
:ws (update-stats-ws event)))
(pgstate-incf pgstate label
:read (update-stats-read event)
:rows (update-stats-rows event)
:secs (update-stats-secs event)
:errs (update-stats-errs event)
:rs (update-stats-rs event)
:ws (update-stats-ws event))
(when (update-stats-start event)
(log-message :debug "start ~a ~30t ~a"
(pgloader.catalog:format-table-name label)
(update-stats-start event))
(setf (pgtable-start table) (update-stats-start event)))
;; each PostgreSQL writer thread will send a stop even, here
;; we only keep the latest one.
(when (and (update-stats-stop event)
(or (null (pgtable-stop table))
(< (pgtable-stop table) (update-stats-stop event))))
(setf (pgtable-stop table) (update-stats-stop event))
(let ((secs (elapsed-time-since (pgtable-start table)
(pgtable-stop table))))
(setf (pgtable-secs table) secs)
(log-message :debug " stop ~a ~30t | ~a .. ~a = ~a"
(pgloader.catalog:format-table-name label)
(pgtable-start table)
(pgtable-stop table)
secs)))))
(bad-row
(%process-bad-row (bad-row-label event)

View File

@ -16,6 +16,8 @@
(secs 0.0 :type float) ; how many seconds did it take
(rs 0.0 :type float) ; seconds spent reading
(ws 0.0 :type float) ; seconds spent writing
(start 0 :type integer) ; internal real time when we started
(stop 0 :type integer) ; internal real time when we finished
reject-data reject-logs) ; files where to find reject data
(defstruct pgstate
@ -80,7 +82,8 @@
"Instanciate a new pgtable structure to hold our stats, and return it."
(or (pgstate-get-label pgstate label)
(let* ((pgtable (setf (gethash label (pgstate-tables pgstate))
(make-pgtable :name label))))
(make-pgtable :name label
:start (get-internal-real-time)))))
;; maintain the ordering
(push label (pgstate-tabnames pgstate))