diff --git a/src/package.lisp b/src/package.lisp index 808557d..25ef561 100644 --- a/src/package.lisp +++ b/src/package.lisp @@ -201,6 +201,9 @@ #:pgstate-decf #:pgtable-initialize-reject-files + #:pgtable-secs + #:pgtable-start + #:pgtable-stop #:pgtable-reject-data #:pgtable-reject-logs #:report-pgtable-stats diff --git a/src/pgsql/copy-from-queue.lisp b/src/pgsql/copy-from-queue.lisp index f17add7..7f90bcc 100644 --- a/src/pgsql/copy-from-queue.lisp +++ b/src/pgsql/copy-from-queue.lisp @@ -139,7 +139,9 @@ (update-stats :data table :rows rows) (incf seconds batch-seconds)))))) - (update-stats :data table :ws seconds) + ;; each writer thread sends its own stop timestamp and the monitor keeps + ;; only the latest entry + (update-stats :data table :ws seconds :stop (get-internal-real-time)) (log-message :debug "Writer[~a] for ~a is done in ~6$s" (lp:kernel-worker-index) (format-table-name table) seconds) diff --git a/src/sources/common/db-methods.lisp b/src/sources/common/db-methods.lisp index 5273e6b..304e2c3 100644 --- a/src/sources/common/db-methods.lisp +++ b/src/sources/common/db-methods.lisp @@ -348,8 +348,6 @@ "Finished processing ~a for ~s ~50T~6$s" task (format-table-name table) seconds) (when (eq :writer task) - (update-stats :data table :secs seconds) - ;; ;; Start the CREATE INDEX parallel tasks only when ;; the data has been fully copied over to the diff --git a/src/sources/common/md-methods.lisp b/src/sources/common/md-methods.lisp index 43cc50f..8bf168c 100644 --- a/src/sources/common/md-methods.lisp +++ b/src/sources/common/md-methods.lisp @@ -165,9 +165,7 @@ (lp:receive-result channel) (log-message :debug "Finished processing ~a for ~s ~50T~6$s" - task (format-table-name table) seconds) - (when (eq :writer task) - (update-stats :data table :secs seconds))) + task (format-table-name table) seconds)) (condition (e) (log-message :fatal "~a" e)))) (prog1 diff --git a/src/sources/common/methods.lisp b/src/sources/common/methods.lisp index c2afa04..44b0130 100644 --- a/src/sources/common/methods.lisp +++ b/src/sources/common/methods.lisp @@ -27,6 +27,9 @@ (setf bclist (cdr bclist) qclist (cdr qclist)))))) + ;; signal we are starting + (update-stats :data (target copy) :start start-time) + ;; call the source-specific method for reading input data (map-rows copy :process-row-fn process-row) @@ -150,48 +153,47 @@ (fmtqs (loop :repeat concurrency :collect (lq:make-queue :fixed-capacity *concurrent-batches*)))) - (with-stats-collection ((target copy) :dbname (db-name (target-db copy))) - (lp:task-handler-bind - ((error #'(lambda (condition) - (log-message :error "A thread failed with error: ~a" - condition) - #-pgloader-image - (if (member *client-min-messages* (list :debug :data)) - (lp::invoke-debugger condition) - (lp::invoke-transfer-error condition)) - #+pgloader-image - (if (member *client-min-messages* (list :debug :data)) - (log-message :fatal "Backtrace: ~a" - (trivial-backtrace:print-backtrace - condition - :output nil - :verbose t)) - (lp::invoke-transfer-error condition))))) - (log-message :notice "COPY ~s" table-name) + (lp:task-handler-bind + ((error #'(lambda (condition) + (log-message :error "A thread failed with error: ~a" + condition) + #-pgloader-image + (if (member *client-min-messages* (list :debug :data)) + (lp::invoke-debugger condition) + (lp::invoke-transfer-error condition)) + #+pgloader-image + (if (member *client-min-messages* (list :debug :data)) + (log-message :fatal "Backtrace: ~a" + (trivial-backtrace:print-backtrace + condition + :output nil + :verbose t)) + (lp::invoke-transfer-error condition))))) + (log-message :notice "COPY ~s" table-name) - ;; start a task to read data from the source into the queue - (lp:submit-task channel #'queue-raw-data copy rawqs) + ;; start a task to read data from the source into the queue + (lp:submit-task channel #'queue-raw-data copy rawqs) - ;; now start transformer threads to process raw vectors from our - ;; source into preprocessed batches to send down to PostgreSQL - (loop :for rawq :in rawqs - :for fmtq :in fmtqs - :do (lp:submit-task channel #'format-data-to-copy copy rawq fmtq)) + ;; now start transformer threads to process raw vectors from our + ;; source into preprocessed batches to send down to PostgreSQL + (loop :for rawq :in rawqs + :for fmtq :in fmtqs + :do (lp:submit-task channel #'format-data-to-copy copy rawq fmtq)) - (loop :for fmtq :in fmtqs - :do (lp:submit-task channel - #'pgloader.pgsql:copy-from-queue - (clone-connection (target-db copy)) - (target copy) - fmtq - :columns (copy-column-list copy) - :on-error-stop on-error-stop - :disable-triggers disable-triggers)) + (loop :for fmtq :in fmtqs + :do (lp:submit-task channel + #'pgloader.pgsql:copy-from-queue + (clone-connection (target-db copy)) + (target copy) + fmtq + :columns (copy-column-list copy) + :on-error-stop on-error-stop + :disable-triggers disable-triggers)) - ;; now wait until both the tasks are over, and kill the kernel - (unless c-s-p - (log-message :debug "waiting for ~d tasks" (task-count concurrency)) - (loop :repeat (task-count concurrency) - :do (lp:receive-result channel)) - (log-message :notice "COPY ~s done." table-name) - (unless k-s-p (lp:end-kernel :wait t))))))) + ;; now wait until both the tasks are over, and kill the kernel + (unless c-s-p + (log-message :debug "waiting for ~d tasks" (task-count concurrency)) + (loop :repeat (task-count concurrency) + :do (lp:receive-result channel)) + (log-message :notice "COPY ~s done." table-name) + (unless k-s-p (lp:end-kernel :wait t)))))) diff --git a/src/utils/monitor.lisp b/src/utils/monitor.lisp index 80f0dcc..619dcc2 100644 --- a/src/utils/monitor.lisp +++ b/src/utils/monitor.lisp @@ -31,7 +31,7 @@ (defstruct noop) (defstruct log-message category description arguments) (defstruct new-label section label dbname) -(defstruct update-stats section label read rows errs secs rs ws) +(defstruct update-stats section label read rows errs secs rs ws start stop) (defstruct bad-row section label condition data) (defun log-message (category description &rest arguments) @@ -45,7 +45,7 @@ SECTION." (send-event (make-new-label :section section :label label :dbname dbname))) -(defun update-stats (section label &key read rows errs secs rs ws) +(defun update-stats (section label &key read rows errs secs rs ws start stop) "Send an event to update stats for given SECTION and LABEL." (send-event (make-update-stats :section section :label label @@ -54,7 +54,9 @@ :errs errs :secs secs :rs rs - :ws ws))) + :ws ws + :start start + :stop stop))) (defun process-bad-row (table condition data) "Send an event to log the bad row DATA in the reject and log files for given @@ -215,18 +217,39 @@ (new-label-dbname event))))) (update-stats - ;; it only costs an extra hash table lookup... - (pgstate-new-label (getf *sections* (update-stats-section event)) - (update-stats-label event)) + (let* ((pgstate (getf *sections* (update-stats-section event))) + (label (update-stats-label event)) + (table (pgstate-new-label pgstate label))) - (pgstate-incf (getf *sections* (update-stats-section event)) - (update-stats-label event) - :read (update-stats-read event) - :rows (update-stats-rows event) - :secs (update-stats-secs event) - :errs (update-stats-errs event) - :rs (update-stats-rs event) - :ws (update-stats-ws event))) + (pgstate-incf pgstate label + :read (update-stats-read event) + :rows (update-stats-rows event) + :secs (update-stats-secs event) + :errs (update-stats-errs event) + :rs (update-stats-rs event) + :ws (update-stats-ws event)) + + (when (update-stats-start event) + (log-message :debug "start ~a ~30t ~a" + (pgloader.catalog:format-table-name label) + (update-stats-start event)) + (setf (pgtable-start table) (update-stats-start event))) + + ;; each PostgreSQL writer thread will send a stop even, here + ;; we only keep the latest one. + (when (and (update-stats-stop event) + (or (null (pgtable-stop table)) + (< (pgtable-stop table) (update-stats-stop event)))) + (setf (pgtable-stop table) (update-stats-stop event)) + (let ((secs (elapsed-time-since (pgtable-start table) + (pgtable-stop table)))) + (setf (pgtable-secs table) secs) + + (log-message :debug " stop ~a ~30t | ~a .. ~a = ~a" + (pgloader.catalog:format-table-name label) + (pgtable-start table) + (pgtable-stop table) + secs))))) (bad-row (%process-bad-row (bad-row-label event) diff --git a/src/utils/state.lisp b/src/utils/state.lisp index bd2caad..db2292e 100644 --- a/src/utils/state.lisp +++ b/src/utils/state.lisp @@ -16,6 +16,8 @@ (secs 0.0 :type float) ; how many seconds did it take (rs 0.0 :type float) ; seconds spent reading (ws 0.0 :type float) ; seconds spent writing + (start 0 :type integer) ; internal real time when we started + (stop 0 :type integer) ; internal real time when we finished reject-data reject-logs) ; files where to find reject data (defstruct pgstate @@ -80,7 +82,8 @@ "Instanciate a new pgtable structure to hold our stats, and return it." (or (pgstate-get-label pgstate label) (let* ((pgtable (setf (gethash label (pgstate-tables pgstate)) - (make-pgtable :name label)))) + (make-pgtable :name label + :start (get-internal-real-time))))) ;; maintain the ordering (push label (pgstate-tabnames pgstate))