From 4f3b3472a20adb83563fb6752a597f4ba01439fe Mon Sep 17 00:00:00 2001 From: Dimitri Fontaine Date: Thu, 22 Oct 2015 22:36:40 +0200 Subject: [PATCH] Cleanup and timing display improvements. Have each thread publish its own start-time so that the main thread may compute time spent in source and target processing, in order to fix the crude hack of taking (max read-time write-time) in the total time column of the summary. We still have some strange artefacts here: we consider that the full processing time is bound to the writer thread (:target), because it needs to have the reader done already to be able to COPY the last batch... but in testing I've seen some :source timings higher than the :target ones... Let's solve problems one at a time tho, I guess multi-threading and accurate wall clock times aren't to be expected to mix and match that easily anyway (multi cores, single RTC and all that). --- src/package.lisp | 2 ++ src/pgsql/pgsql.lisp | 47 +++++++++++++++++---------------- src/pgsql/schema.lisp | 13 +++++++++ src/queue.lisp | 4 +-- src/sources/common/methods.lisp | 5 ++-- src/sources/mysql/mysql.lisp | 8 ++++-- 6 files changed, 50 insertions(+), 29 deletions(-) diff --git a/src/package.lisp b/src/package.lisp index d4a15af..ddda5a8 100644 --- a/src/package.lisp +++ b/src/package.lisp @@ -66,6 +66,7 @@ #:send-event #:start-monitor #:stop-monitor + #:elapsed-time-since #:timing)) (defpackage #:pgloader.utils @@ -77,6 +78,7 @@ (:export #:with-monitor ; monitor #:*monitoring-queue* #:with-stats-collection + #:elapsed-time-since #:timing ;; bits from alexandria diff --git a/src/pgsql/pgsql.lisp b/src/pgsql/pgsql.lisp index f1475e8..4b90e7b 100644 --- a/src/pgsql/pgsql.lisp +++ b/src/pgsql/pgsql.lisp @@ -49,32 +49,33 @@ "Fetch from the QUEUE messages containing how many rows are in the *writer-batch* for us to send down to PostgreSQL, and when that's done update stats." - (when truncate - (truncate-tables pgconn (list table-name))) + (let ((start-time (get-internal-real-time))) + (when truncate + (truncate-tables pgconn (list table-name))) - (with-pgsql-connection (pgconn) - (with-schema (unqualified-table-name table-name) - (when disable-triggers (disable-triggers unqualified-table-name)) - (log-message :info "pgsql:copy-from-queue: ~a ~a" table-name columns) + (with-pgsql-connection (pgconn) + (with-schema (unqualified-table-name table-name) + (with-disabled-triggers (unqualified-table-name + :disable-triggers disable-triggers) + (log-message :info "pgsql:copy-from-queue: ~a ~a" table-name columns) - (loop - for (mesg batch read oversized?) = (lq:pop-queue queue) - until (eq mesg :end-of-data) - for (rows ws) = (multiple-value-bind (result secs) - (timing - (copy-batch unqualified-table-name columns batch read)) - (list result secs)) - do (progn - ;; The SBCL implementation needs some Garbage Collection - ;; decision making help... and now is a pretty good time. - #+sbcl (when oversized? (sb-ext:gc :full t)) - (log-message :debug "copy-batch ~a ~d row~:p~:[~; [oversized]~]" - unqualified-table-name rows oversized?) - (update-stats :data table-name :rows rows :ws ws))) + (loop + :for (mesg batch read oversized?) := (lq:pop-queue queue) + :until (eq mesg :end-of-data) + :for (rows ws) := (multiple-value-bind (result secs) + (timing + (copy-batch unqualified-table-name + columns batch read)) + (list result secs)) + :do (progn + ;; The SBCL implementation needs some Garbage Collection + ;; decision making help... and now is a pretty good time. + #+sbcl (when oversized? (sb-ext:gc :full t)) + (log-message :debug "copy-batch ~a ~d row~:p~:[~; [oversized]~]" + unqualified-table-name rows oversized?) + (update-stats :data table-name :rows rows :ws ws)))))) - (when disable-triggers (enable-triggers unqualified-table-name)))) - - (cons :target table-name)) + (list :target table-name start-time))) ;;; ;;; Compute how many rows we're going to try loading next, depending on diff --git a/src/pgsql/schema.lisp b/src/pgsql/schema.lisp index 155d312..1949749 100644 --- a/src/pgsql/schema.lisp +++ b/src/pgsql/schema.lisp @@ -259,6 +259,19 @@ (log-message :info "~a" sql) (pomo:execute sql))) +(defmacro with-disabled-triggers ((table-name &key disable-triggers) + &body forms) + "Run FORMS with PostgreSQL triggers disabled for TABLE-NAME if + DISABLE-TRIGGERS is T A PostgreSQL connection must be opened already + where this macro is used." + `(if ,disable-triggers + (progn + (disable-triggers ,table-name) + (unwind-protect + (progn ,@forms) + (enable-triggers ,table-name))) + (progn ,@forms))) + ;;; ;;; Index support diff --git a/src/queue.lisp b/src/queue.lisp index e66c99e..fec4251 100644 --- a/src/queue.lisp +++ b/src/queue.lisp @@ -37,7 +37,7 @@ (with-slots (start data count bytes) *current-batch* (update-stats :data (target copy) :read count - :rs (pgloader.monitor::elapsed-time-since start)) + :rs (elapsed-time-since start)) (lq:push-queue (list :batch data count oversized?) queue)) (setf *current-batch* (make-batch)))) @@ -77,7 +77,7 @@ (log-message :debug "Sending last batch (~d rows)" count) (update-stats :data (target copy) :read count - :rs (pgloader.monitor::elapsed-time-since start)) + :rs (elapsed-time-since start)) (lq:push-queue (list :batch data count nil) queue)))) ;; signal we're done diff --git a/src/sources/common/methods.lisp b/src/sources/common/methods.lisp index 5e4c2b4..8000609 100644 --- a/src/sources/common/methods.lisp +++ b/src/sources/common/methods.lisp @@ -8,8 +8,9 @@ ;;; (defmethod copy-to-queue ((copy copy) queue) "Copy data from given COPY definition into lparallel.queue QUEUE" - (pgloader.queue:map-push-queue copy queue) - (cons :source (target copy))) + (let ((start-time (get-internal-real-time))) + (pgloader.queue:map-push-queue copy queue) + (list :source (target copy) start-time))) (defmethod copy-column-list ((copy copy)) "Default column list is an empty list." diff --git a/src/sources/mysql/mysql.lisp b/src/sources/mysql/mysql.lisp index 16747ee..3c8cb77 100644 --- a/src/sources/mysql/mysql.lisp +++ b/src/sources/mysql/mysql.lisp @@ -408,10 +408,14 @@ (let ((lp:*kernel* copy-kernel)) (with-stats-collection ("COPY Threads Completion" :section :post) (loop :for tasks :below (* 2 table-count) - :do (destructuring-bind (task . table-name) + :do (destructuring-bind (task table-name start-time) (lp:receive-result copy-channel) + (declare (ignorable start-time)) (log-message :debug "Finished processing ~a for ~s" - task table-name))) + task table-name) + (when (eq :target task) + (update-stats :data table-name + :secs (elapsed-time-since start-time))))) (lp:end-kernel))) (let ((lp:*kernel* idx-kernel))