From 8254d63453d4100c00b9a3c6f4b2eda48b80976b Mon Sep 17 00:00:00 2001 From: Dimitri Fontaine Date: Sun, 30 Apr 2017 17:25:47 +0200 Subject: [PATCH] Fix incorrect per-table total time metrics. The concurrency nature of pgloader made it non obvious where to implement the timers properly, and as a result the tracking of how long it took to actually transfer the data was... just wrong. Rather than trying to measure the time spent in any particular piece of the code, we now emit "start" and "stop" stats messages to the monitor thread at the right places (which are way easier to find, in the worker threads) and have the monitor figure out how long it took really. Fix #506. --- src/package.lisp | 3 ++ src/pgsql/copy-from-queue.lisp | 4 +- src/sources/common/db-methods.lisp | 2 - src/sources/common/md-methods.lisp | 4 +- src/sources/common/methods.lisp | 84 +++++++++++++++--------------- src/utils/monitor.lisp | 51 +++++++++++++----- src/utils/state.lisp | 5 +- 7 files changed, 91 insertions(+), 62 deletions(-) diff --git a/src/package.lisp b/src/package.lisp index 808557d..25ef561 100644 --- a/src/package.lisp +++ b/src/package.lisp @@ -201,6 +201,9 @@ #:pgstate-decf #:pgtable-initialize-reject-files + #:pgtable-secs + #:pgtable-start + #:pgtable-stop #:pgtable-reject-data #:pgtable-reject-logs #:report-pgtable-stats diff --git a/src/pgsql/copy-from-queue.lisp b/src/pgsql/copy-from-queue.lisp index f17add7..7f90bcc 100644 --- a/src/pgsql/copy-from-queue.lisp +++ b/src/pgsql/copy-from-queue.lisp @@ -139,7 +139,9 @@ (update-stats :data table :rows rows) (incf seconds batch-seconds)))))) - (update-stats :data table :ws seconds) + ;; each writer thread sends its own stop timestamp and the monitor keeps + ;; only the latest entry + (update-stats :data table :ws seconds :stop (get-internal-real-time)) (log-message :debug "Writer[~a] for ~a is done in ~6$s" (lp:kernel-worker-index) (format-table-name table) seconds) diff --git a/src/sources/common/db-methods.lisp b/src/sources/common/db-methods.lisp index 5273e6b..304e2c3 100644 --- a/src/sources/common/db-methods.lisp +++ b/src/sources/common/db-methods.lisp @@ -348,8 +348,6 @@ "Finished processing ~a for ~s ~50T~6$s" task (format-table-name table) seconds) (when (eq :writer task) - (update-stats :data table :secs seconds) - ;; ;; Start the CREATE INDEX parallel tasks only when ;; the data has been fully copied over to the diff --git a/src/sources/common/md-methods.lisp b/src/sources/common/md-methods.lisp index 43cc50f..8bf168c 100644 --- a/src/sources/common/md-methods.lisp +++ b/src/sources/common/md-methods.lisp @@ -165,9 +165,7 @@ (lp:receive-result channel) (log-message :debug "Finished processing ~a for ~s ~50T~6$s" - task (format-table-name table) seconds) - (when (eq :writer task) - (update-stats :data table :secs seconds))) + task (format-table-name table) seconds)) (condition (e) (log-message :fatal "~a" e)))) (prog1 diff --git a/src/sources/common/methods.lisp b/src/sources/common/methods.lisp index c2afa04..44b0130 100644 --- a/src/sources/common/methods.lisp +++ b/src/sources/common/methods.lisp @@ -27,6 +27,9 @@ (setf bclist (cdr bclist) qclist (cdr qclist)))))) + ;; signal we are starting + (update-stats :data (target copy) :start start-time) + ;; call the source-specific method for reading input data (map-rows copy :process-row-fn process-row) @@ -150,48 +153,47 @@ (fmtqs (loop :repeat concurrency :collect (lq:make-queue :fixed-capacity *concurrent-batches*)))) - (with-stats-collection ((target copy) :dbname (db-name (target-db copy))) - (lp:task-handler-bind - ((error #'(lambda (condition) - (log-message :error "A thread failed with error: ~a" - condition) - #-pgloader-image - (if (member *client-min-messages* (list :debug :data)) - (lp::invoke-debugger condition) - (lp::invoke-transfer-error condition)) - #+pgloader-image - (if (member *client-min-messages* (list :debug :data)) - (log-message :fatal "Backtrace: ~a" - (trivial-backtrace:print-backtrace - condition - :output nil - :verbose t)) - (lp::invoke-transfer-error condition))))) - (log-message :notice "COPY ~s" table-name) + (lp:task-handler-bind + ((error #'(lambda (condition) + (log-message :error "A thread failed with error: ~a" + condition) + #-pgloader-image + (if (member *client-min-messages* (list :debug :data)) + (lp::invoke-debugger condition) + (lp::invoke-transfer-error condition)) + #+pgloader-image + (if (member *client-min-messages* (list :debug :data)) + (log-message :fatal "Backtrace: ~a" + (trivial-backtrace:print-backtrace + condition + :output nil + :verbose t)) + (lp::invoke-transfer-error condition))))) + (log-message :notice "COPY ~s" table-name) - ;; start a task to read data from the source into the queue - (lp:submit-task channel #'queue-raw-data copy rawqs) + ;; start a task to read data from the source into the queue + (lp:submit-task channel #'queue-raw-data copy rawqs) - ;; now start transformer threads to process raw vectors from our - ;; source into preprocessed batches to send down to PostgreSQL - (loop :for rawq :in rawqs - :for fmtq :in fmtqs - :do (lp:submit-task channel #'format-data-to-copy copy rawq fmtq)) + ;; now start transformer threads to process raw vectors from our + ;; source into preprocessed batches to send down to PostgreSQL + (loop :for rawq :in rawqs + :for fmtq :in fmtqs + :do (lp:submit-task channel #'format-data-to-copy copy rawq fmtq)) - (loop :for fmtq :in fmtqs - :do (lp:submit-task channel - #'pgloader.pgsql:copy-from-queue - (clone-connection (target-db copy)) - (target copy) - fmtq - :columns (copy-column-list copy) - :on-error-stop on-error-stop - :disable-triggers disable-triggers)) + (loop :for fmtq :in fmtqs + :do (lp:submit-task channel + #'pgloader.pgsql:copy-from-queue + (clone-connection (target-db copy)) + (target copy) + fmtq + :columns (copy-column-list copy) + :on-error-stop on-error-stop + :disable-triggers disable-triggers)) - ;; now wait until both the tasks are over, and kill the kernel - (unless c-s-p - (log-message :debug "waiting for ~d tasks" (task-count concurrency)) - (loop :repeat (task-count concurrency) - :do (lp:receive-result channel)) - (log-message :notice "COPY ~s done." table-name) - (unless k-s-p (lp:end-kernel :wait t))))))) + ;; now wait until both the tasks are over, and kill the kernel + (unless c-s-p + (log-message :debug "waiting for ~d tasks" (task-count concurrency)) + (loop :repeat (task-count concurrency) + :do (lp:receive-result channel)) + (log-message :notice "COPY ~s done." table-name) + (unless k-s-p (lp:end-kernel :wait t)))))) diff --git a/src/utils/monitor.lisp b/src/utils/monitor.lisp index 80f0dcc..619dcc2 100644 --- a/src/utils/monitor.lisp +++ b/src/utils/monitor.lisp @@ -31,7 +31,7 @@ (defstruct noop) (defstruct log-message category description arguments) (defstruct new-label section label dbname) -(defstruct update-stats section label read rows errs secs rs ws) +(defstruct update-stats section label read rows errs secs rs ws start stop) (defstruct bad-row section label condition data) (defun log-message (category description &rest arguments) @@ -45,7 +45,7 @@ SECTION." (send-event (make-new-label :section section :label label :dbname dbname))) -(defun update-stats (section label &key read rows errs secs rs ws) +(defun update-stats (section label &key read rows errs secs rs ws start stop) "Send an event to update stats for given SECTION and LABEL." (send-event (make-update-stats :section section :label label @@ -54,7 +54,9 @@ :errs errs :secs secs :rs rs - :ws ws))) + :ws ws + :start start + :stop stop))) (defun process-bad-row (table condition data) "Send an event to log the bad row DATA in the reject and log files for given @@ -215,18 +217,39 @@ (new-label-dbname event))))) (update-stats - ;; it only costs an extra hash table lookup... - (pgstate-new-label (getf *sections* (update-stats-section event)) - (update-stats-label event)) + (let* ((pgstate (getf *sections* (update-stats-section event))) + (label (update-stats-label event)) + (table (pgstate-new-label pgstate label))) - (pgstate-incf (getf *sections* (update-stats-section event)) - (update-stats-label event) - :read (update-stats-read event) - :rows (update-stats-rows event) - :secs (update-stats-secs event) - :errs (update-stats-errs event) - :rs (update-stats-rs event) - :ws (update-stats-ws event))) + (pgstate-incf pgstate label + :read (update-stats-read event) + :rows (update-stats-rows event) + :secs (update-stats-secs event) + :errs (update-stats-errs event) + :rs (update-stats-rs event) + :ws (update-stats-ws event)) + + (when (update-stats-start event) + (log-message :debug "start ~a ~30t ~a" + (pgloader.catalog:format-table-name label) + (update-stats-start event)) + (setf (pgtable-start table) (update-stats-start event))) + + ;; each PostgreSQL writer thread will send a stop even, here + ;; we only keep the latest one. + (when (and (update-stats-stop event) + (or (null (pgtable-stop table)) + (< (pgtable-stop table) (update-stats-stop event)))) + (setf (pgtable-stop table) (update-stats-stop event)) + (let ((secs (elapsed-time-since (pgtable-start table) + (pgtable-stop table)))) + (setf (pgtable-secs table) secs) + + (log-message :debug " stop ~a ~30t | ~a .. ~a = ~a" + (pgloader.catalog:format-table-name label) + (pgtable-start table) + (pgtable-stop table) + secs))))) (bad-row (%process-bad-row (bad-row-label event) diff --git a/src/utils/state.lisp b/src/utils/state.lisp index bd2caad..db2292e 100644 --- a/src/utils/state.lisp +++ b/src/utils/state.lisp @@ -16,6 +16,8 @@ (secs 0.0 :type float) ; how many seconds did it take (rs 0.0 :type float) ; seconds spent reading (ws 0.0 :type float) ; seconds spent writing + (start 0 :type integer) ; internal real time when we started + (stop 0 :type integer) ; internal real time when we finished reject-data reject-logs) ; files where to find reject data (defstruct pgstate @@ -80,7 +82,8 @@ "Instanciate a new pgtable structure to hold our stats, and return it." (or (pgstate-get-label pgstate label) (let* ((pgtable (setf (gethash label (pgstate-tables pgstate)) - (make-pgtable :name label)))) + (make-pgtable :name label + :start (get-internal-real-time))))) ;; maintain the ordering (push label (pgstate-tabnames pgstate))