From 96a33de08437332f22c9f731203b00e8f625f1b8 Mon Sep 17 00:00:00 2001 From: Dimitri Fontaine Date: Mon, 5 Oct 2015 01:25:08 +0200 Subject: [PATCH] Review the stats and reporting code organisation. In order to later be able to have more worker threads sharing the load (multiple readers and/or writers, maybe more specialized threads too), have all the stats be managed centrally by a single thread. We already have a "monitor" thread that get passed log messages so that the output buffer is not subject to race conditions, extend its use to also deal with statistics messages. In the current code, we send a message each time we read a row. In some future commits we should probably reduce the messaging here to something like one message per batch in the common case. Also, as a nice side effect of the code simplification and refactoring this fixes #283 wherein the before/after sections of individual CSV files within an ARCHIVE command where not counted in the reporting. --- pgloader.asd | 14 ++- src/main.lisp | 17 +-- src/package.lisp | 105 +++++++++------- src/params.lisp | 8 +- src/parsers/command-archive.lisp | 34 ++---- src/parsers/command-copy.lisp | 25 +--- src/parsers/command-csv.lisp | 25 +--- src/parsers/command-dbf.lisp | 19 +-- src/parsers/command-fixed.lisp | 25 +--- src/parsers/command-ixf.lisp | 18 +-- src/parsers/command-mssql.lisp | 18 +-- src/parsers/command-mysql.lisp | 18 +-- src/parsers/command-sql-block.lisp | 4 +- src/parsers/command-sqlite.lisp | 7 +- src/pgsql/pgsql.lisp | 41 +------ src/pgsql/queries.lisp | 13 +- src/pgsql/schema.lisp | 42 +++---- src/queue.lisp | 2 +- src/sources/common/project-fields.lisp | 9 +- src/sources/copy.lisp | 17 +-- src/sources/csv/csv.lisp | 17 +-- src/sources/db3/db3.lisp | 28 +---- src/sources/fixed.lisp | 18 +-- src/sources/ixf/ixf.lisp | 30 ++--- src/sources/mssql/mssql.lisp | 58 +++------ src/sources/mysql/mysql.lisp | 62 +++------- src/sources/sqlite/sqlite.lisp | 52 ++------ src/utils/monitor.lisp | 160 ++++++++++++++++++++++--- src/utils/report.lisp | 126 +++++++++---------- src/utils/state.lisp | 94 +++++++++------ src/utils/threads.lisp | 1 - src/utils/utils.lisp | 40 ------- test/archive.load | 4 +- 33 files changed, 470 insertions(+), 681 deletions(-) diff --git a/pgloader.asd b/pgloader.asd index 3ded7c8..c9da027 100644 --- a/pgloader.asd +++ b/pgloader.asd @@ -53,10 +53,14 @@ ((:file "charsets") (:file "threads") (:file "logs") - (:file "monitor" :depends-on ("logs")) + (:file "utils") (:file "state") - (:file "report" :depends-on ("state")) - (:file "utils" :depends-on ("charsets" "monitor")) + (:file "reject" :depends-on ("state")) + (:file "report" :depends-on ("state" "utils")) + (:file "monitor" :depends-on ("logs" + "state" + "reject" + "report")) (:file "archive" :depends-on ("logs")) ;; those are one-package-per-file @@ -178,8 +182,8 @@ ((:file "mysql-cast-rules") (:file "mysql-schema" :depends-on ("mysql-cast-rules")) - (:file "mysql-csv" - :depends-on ("mysql-schema")) + ;; (:file "mysql-csv" + ;; :depends-on ("mysql-schema")) (:file "mysql" :depends-on ("mysql-cast-rules" "mysql-schema")))))) diff --git a/src/main.lisp b/src/main.lisp index 730173b..9d59489 100644 --- a/src/main.lisp +++ b/src/main.lisp @@ -271,7 +271,8 @@ (with-monitor () ;; tell the user where to look for interesting things - (log-message :log "Main logs in '~a'" (probe-file *log-filename*)) + (log-message :log "Main logs in '~a'" + (uiop:native-namestring *log-filename*)) (log-message :log "Data errors in '~a'~%" *root-dir*) ;; load extra lisp code provided for by the user @@ -398,19 +399,7 @@ (parse-commands-from-file source) (parse-commands source))))))) - ;; maybe duplicate the summary to a file - (let* ((summary-stream (when *summary-pathname* - (open *summary-pathname* - :direction :output - :if-exists :rename - :if-does-not-exist :create))) - (*report-stream* (or summary-stream *standard-output*))) - (unwind-protect - ;; run the commands - (loop for func in funcs do (funcall func)) - - ;; cleanup - (when summary-stream (close summary-stream))))))) + (loop for func in funcs do (funcall func))))) ;;; diff --git a/src/package.lisp b/src/package.lisp index 990b24c..ddb298d 100644 --- a/src/package.lisp +++ b/src/package.lisp @@ -22,32 +22,28 @@ #:start-logger #:stop-logger)) -(defpackage #:pgloader.monitor +(defpackage #:pgloader.state (:use #:cl #:pgloader.params) - (:export #:with-monitor - #:*monitoring-queue* - #:log-message - #:send-event - #:start-monitor - #:stop-monitor)) + (:export #:format-table-name + #:make-pgstate + #:pgstate-tabnames + #:pgstate-tables + #:pgstate-read + #:pgstate-rows + #:pgstate-errs + #:pgstate-secs -(defpackage #:pgloader.utils - (:use #:cl #:pgloader.params) - (:import-from #:alexandria - #:appendf - #:read-file-into-string) - (:import-from #:pgloader.monitor - #:with-monitor - #:*monitoring-queue* - #:log-message) - (:export #:with-monitor ; monitor + #:pgstate-get-label + #:pgstate-new-label + #:pgstate-setf + #:pgstate-incf + #:pgstate-decf - ;; bits from alexandria - #:appendf - #:read-file-into-string - - ;; logs - #:log-message + #:pgtable-initialize-reject-files + #:pgtable-reject-data + #:pgtable-reject-logs + #:report-pgtable-stats + #:report-pgstate-stats ;; report #:report-header @@ -55,28 +51,53 @@ #:report-results #:report-footer #:report-summary - #:report-full-summary - #:with-stats-collection + #:report-full-summary)) + +(defpackage #:pgloader.monitor + (:use #:cl #:pgloader.params #:pgloader.state) + (:export #:with-monitor + #:*monitoring-queue* + #:log-message + #:new-label + #:update-stats + #:process-bad-row + #:with-stats-collection + #:send-event + #:start-monitor + #:stop-monitor + #:timing)) + +(defpackage #:pgloader.utils + (:use #:cl #:pgloader.params #:pgloader.monitor #:pgloader.state) + (:import-from #:alexandria + #:appendf + #:read-file-into-string) + + (:export #:with-monitor ; monitor + #:*monitoring-queue* + #:with-stats-collection + #:timing + + ;; bits from alexandria + #:appendf + #:read-file-into-string + + ;; state + #:make-pgstate + #:format-table-name + #:pgstate-tabnames + + ;; events + #:log-message + #:new-label + #:update-stats + #:process-bad-row ;; utils #:format-interval - #:timing #:camelCase-to-colname #:unquote - ;; state - #:format-table-name - #:make-pgstate - #:pgstate-get-table - #:pgstate-add-table - #:pgstate-setf - #:pgstate-incf - #:pgstate-decf - #:pgtable-reject-data - #:pgtable-reject-logs - #:report-pgtable-stats - #:report-pgstate-stats - ;; threads #:make-kernel @@ -243,17 +264,13 @@ #:reset-sequences)) (defpackage #:pgloader.queue - (:use #:cl #:pgloader.params) - (:import-from #:pgloader.monitor - #:log-message) + (:use #:cl #:pgloader.params #:pgloader.monitor) (:import-from #:pgloader.pgsql #:format-vector-row) (:import-from #:pgloader.sources #:map-rows #:transforms #:target) - (:import-from #:pgloader.utils - #:pgstate-incf) (:export #:map-push-queue)) diff --git a/src/params.lisp b/src/params.lisp index 26ad3aa..836168e 100644 --- a/src/params.lisp +++ b/src/params.lisp @@ -21,7 +21,6 @@ #:*copy-batch-size* #:*concurrent-batches* #:*pg-settings* - #:*state* #:*default-tmpdir* #:init-params-from-environment #:getenv-default)) @@ -68,16 +67,11 @@ (defparameter *dry-run* nil "Set to non-nil to only run checks about the load setup.") -;; we can't use pgloader.utils:make-pgstate yet because params is compiled -;; first in the asd definition, we just make the symbol a special variable. -(defparameter *state* nil - "State of the current loading.") - (defparameter *fd-path-root* nil "Where to load files from, when loading from an archive or expanding regexps.") (defparameter *root-dir* - #+unix (make-pathname :directory "/tmp/pgloader/") + #+unix (uiop:make-pathname* :directory '(:absolute "tmp/pgloader")) #-unix (uiop:merge-pathnames* "pgloader/" (uiop:ensure-directory-pathname (getenv-default "Temp"))) diff --git a/src/parsers/command-archive.lisp b/src/parsers/command-archive.lisp index 9b3129b..d8b79ab 100644 --- a/src/parsers/command-archive.lisp +++ b/src/parsers/command-archive.lisp @@ -42,32 +42,22 @@ (when (and (or before finally) (null pg-db-conn)) (error "When using a BEFORE LOAD DO or a FINALLY block, you must provide an archive level target database connection.")) `(lambda () - (let* ((start-irt (get-internal-real-time)) - (state-before (pgloader.utils:make-pgstate)) - (*state* (pgloader.utils:make-pgstate)) - ,@(pgsql-connection-bindings pg-db-conn nil) - (state-finally ,(when finally `(pgloader.utils:make-pgstate))) + (let* (,@(pgsql-connection-bindings pg-db-conn nil) (archive-file - ,(destructuring-bind (kind url) source - (ecase kind - (:http `(with-stats-collection - ("download" :state state-before) - (pgloader.archive:http-fetch-file ,url))) - (:filename url)))) - (*fd-path-root* - (with-stats-collection ("extract" :state state-before) - (pgloader.archive:expand-archive archive-file)))) + , (destructuring-bind (kind url) source + (ecase kind + (:http `(with-stats-collection + ("download" :section :pre) + (pgloader.archive:http-fetch-file ,url))) + (:filename url)))) + (*fd-path-root* + (with-stats-collection ("extract" :section :pre) + (pgloader.archive:expand-archive archive-file)))) (progn - ,(sql-code-block pg-db-conn 'state-before before "before load") + ,(sql-code-block pg-db-conn :pre before "before load") ;; import from files block ,@(loop for command in commands collect `(funcall ,command)) - ,(sql-code-block pg-db-conn 'state-finally finally "finally") - - ;; reporting - (report-full-summary "Total import time" *state* - :start-time start-irt - :before state-before - :finally state-finally))))))) + ,(sql-code-block pg-db-conn :post finally "finally"))))))) diff --git a/src/parsers/command-copy.lisp b/src/parsers/command-copy.lisp index 672fa85..ca423d2 100644 --- a/src/parsers/command-copy.lisp +++ b/src/parsers/command-copy.lisp @@ -115,20 +115,13 @@ gucs before after ((:copy-options options))) `(lambda () - (let* ((state-before (pgloader.utils:make-pgstate)) - (summary (null *state*)) - (*state* (or *state* (pgloader.utils:make-pgstate))) - (state-idx ,(when (getf options :drop-indexes) - `(pgloader.utils:make-pgstate))) - (state-after ,(when (or after (getf options :drop-indexes)) - `(pgloader.utils:make-pgstate))) - ,@(pgsql-connection-bindings pg-db-conn gucs) + (let* (,@(pgsql-connection-bindings pg-db-conn gucs) ,@(batch-control-bindings options) - (source-db (with-stats-collection ("fetch" :state state-before) + (source-db (with-stats-collection ("fetch" :section :pre) (expand (fetch-file ,copy-conn))))) (progn - ,(sql-code-block pg-db-conn 'state-before before "before load") + ,(sql-code-block pg-db-conn :pre before "before load") (let ((truncate ,(getf options :truncate)) (disable-triggers (getf ',options :disable-triggers)) @@ -146,21 +139,11 @@ :drop-indexes :disable-triggers))))) (pgloader.sources:copy-from source - :state-before state-before - :state-after state-after - :state-indexes state-idx :truncate truncate :drop-indexes drop-indexes :disable-triggers disable-triggers)) - ,(sql-code-block pg-db-conn 'state-after after "after load") - - ;; reporting - (when summary - (report-full-summary "Total import time" *state* - :before state-before - :finally state-after - :parallel state-idx)))))) + ,(sql-code-block pg-db-conn :post after "after load"))))) (defrule load-copy-file load-copy-file-command (:lambda (command) diff --git a/src/parsers/command-csv.lisp b/src/parsers/command-csv.lisp index 5fad5b1..a672c19 100644 --- a/src/parsers/command-csv.lisp +++ b/src/parsers/command-csv.lisp @@ -455,20 +455,13 @@ gucs before after ((:csv-options options))) `(lambda () - (let* ((state-before (pgloader.utils:make-pgstate)) - (summary (null *state*)) - (*state* (or *state* (pgloader.utils:make-pgstate))) - (state-idx ,(when (getf options :drop-indexes) - `(pgloader.utils:make-pgstate))) - (state-after ,(when (or after (getf options :drop-indexes)) - `(pgloader.utils:make-pgstate))) - ,@(pgsql-connection-bindings pg-db-conn gucs) + (let* (,@(pgsql-connection-bindings pg-db-conn gucs) ,@(batch-control-bindings options) - (source-db (with-stats-collection ("fetch" :state state-before) + (source-db (with-stats-collection ("fetch" :section :pre) (expand (fetch-file ,csv-conn))))) (progn - ,(sql-code-block pg-db-conn 'state-before before "before load") + ,(sql-code-block pg-db-conn :pre before "before load") (let ((truncate (getf ',options :truncate)) (disable-triggers (getf ',options :disable-triggers)) @@ -486,21 +479,11 @@ :drop-indexes :disable-triggers))))) (pgloader.sources:copy-from source - :state-before state-before - :state-after state-after - :state-indexes state-idx :truncate truncate :drop-indexes drop-indexes :disable-triggers disable-triggers)) - ,(sql-code-block pg-db-conn 'state-after after "after load") - - ;; reporting - (when summary - (report-full-summary "Total import time" *state* - :before state-before - :finally state-after - :parallel state-idx)))))) + ,(sql-code-block pg-db-conn :post after "after load"))))) (defrule load-csv-file load-csv-file-command (:lambda (command) diff --git a/src/parsers/command-dbf.lisp b/src/parsers/command-dbf.lisp index f46d5dc..35cc4c3 100644 --- a/src/parsers/command-dbf.lisp +++ b/src/parsers/command-dbf.lisp @@ -93,15 +93,11 @@ gucs before after ((:dbf-options options))) `(lambda () - (let* ((state-before (pgloader.utils:make-pgstate)) - (summary (null *state*)) - (*state* (or *state* (pgloader.utils:make-pgstate))) - (state-after ,(when after `(pgloader.utils:make-pgstate))) - ,@(pgsql-connection-bindings pg-db-conn gucs) + (let* (,@(pgsql-connection-bindings pg-db-conn gucs) ,@(batch-control-bindings options) ,@(identifier-case-binding options) (table-name ',(pgconn-table-name pg-db-conn)) - (source-db (with-stats-collection ("fetch" :state state-before) + (source-db (with-stats-collection ("fetch" :section :pre) (expand (fetch-file ,dbf-db-conn)))) (source (make-instance 'pgloader.db3:copy-db3 @@ -110,19 +106,12 @@ :source-db source-db :target table-name))) - ,(sql-code-block pg-db-conn 'state-before before "before load") + ,(sql-code-block pg-db-conn :pre before "before load") (pgloader.sources:copy-database source - :state-before state-before ,@(remove-batch-control-option options)) - ,(sql-code-block pg-db-conn 'state-after after "after load") - - ;; reporting - (when summary - (report-full-summary "Total import time" *state* - :before state-before - :finally state-after))))) + ,(sql-code-block pg-db-conn :post after "after load")))) (defrule load-dbf-file load-dbf-command (:lambda (command) diff --git a/src/parsers/command-fixed.lisp b/src/parsers/command-fixed.lisp index 0485535..af52694 100644 --- a/src/parsers/command-fixed.lisp +++ b/src/parsers/command-fixed.lisp @@ -123,20 +123,13 @@ gucs before after ((:fixed-options options))) `(lambda () - (let* ((state-before (pgloader.utils:make-pgstate)) - (summary (null *state*)) - (*state* (or *state* (pgloader.utils:make-pgstate))) - (state-idx ,(when (getf options :drop-indexes) - `(pgloader.utils:make-pgstate))) - (state-after ,(when (or after (getf options :drop-indexes)) - `(pgloader.utils:make-pgstate))) - ,@(pgsql-connection-bindings pg-db-conn gucs) + (let* (,@(pgsql-connection-bindings pg-db-conn gucs) ,@(batch-control-bindings options) - (source-db (with-stats-collection ("fetch" :state state-before) + (source-db (with-stats-collection ("fetch" :section :pre) (expand (fetch-file ,fixed-conn))))) (progn - ,(sql-code-block pg-db-conn 'state-before before "before load") + ,(sql-code-block pg-db-conn :pre before "before load") (let ((truncate ,(getf options :truncate)) (disable-triggers ,(getf options :disable-triggers)) @@ -152,21 +145,11 @@ :skip-lines ,(or (getf options :skip-line) 0)))) (pgloader.sources:copy-from source - :state-before state-before - :state-after state-after - :state-indexes state-idx :truncate truncate :drop-indexes drop-indexes :disable-triggers disable-triggers)) - ,(sql-code-block pg-db-conn 'state-after after "after load") - - ;; reporting - (when summary - (report-full-summary "Total import time" *state* - :before state-before - :finally state-after - :parallel state-idx)))))) + ,(sql-code-block pg-db-conn :post after "after load"))))) (defrule load-fixed-cols-file load-fixed-cols-file-command (:lambda (command) diff --git a/src/parsers/command-ixf.lisp b/src/parsers/command-ixf.lisp index ab0d3fa..fbad04c 100644 --- a/src/parsers/command-ixf.lisp +++ b/src/parsers/command-ixf.lisp @@ -49,15 +49,11 @@ gucs before after ((:ixf-options options))) `(lambda () - (let* ((state-before (pgloader.utils:make-pgstate)) - (summary (null *state*)) - (*state* (or *state* (pgloader.utils:make-pgstate))) - (state-after ,(when after `(pgloader.utils:make-pgstate))) - ,@(pgsql-connection-bindings pg-db-conn gucs) + (let* (,@(pgsql-connection-bindings pg-db-conn gucs) ,@(batch-control-bindings options) ,@(identifier-case-binding options) (table-name ',(pgconn-table-name pg-db-conn)) - (source-db (with-stats-collection ("fetch" :state state-before) + (source-db (with-stats-collection ("fetch" :section :pre) (expand (fetch-file ,ixf-db-conn)))) (source (make-instance 'pgloader.ixf:copy-ixf @@ -65,18 +61,12 @@ :source-db source-db :target table-name))) - ,(sql-code-block pg-db-conn 'state-before before "before load") + ,(sql-code-block pg-db-conn :pre before "before load") (pgloader.sources:copy-database source - :state-before state-before ,@(remove-batch-control-option options)) - ,(sql-code-block pg-db-conn 'state-after after "after load") - - (when summary - (report-full-summary "Total import time" *state* - :before state-before - :finally state-after))))) + ,(sql-code-block pg-db-conn :post after "after load")))) (defrule load-ixf-file load-ixf-command (:lambda (command) diff --git a/src/parsers/command-mssql.lisp b/src/parsers/command-mssql.lisp index 875d1b9..987713a 100644 --- a/src/parsers/command-mssql.lisp +++ b/src/parsers/command-mssql.lisp @@ -161,11 +161,7 @@ (let (#+sbcl(sb-ext:*muffled-warnings* 'style-warning)) (cffi:load-foreign-library 'mssql::sybdb)) - (let* ((state-before (pgloader.utils:make-pgstate)) - (*state* (or *state* (pgloader.utils:make-pgstate))) - (state-idx (pgloader.utils:make-pgstate)) - (state-after (pgloader.utils:make-pgstate)) - (*default-cast-rules* ',*mssql-default-cast-rules*) + (let* ((*default-cast-rules* ',*mssql-default-cast-rules*) (*cast-rules* ',casts) ,@(pgsql-connection-bindings pg-db-conn gucs) ,@(batch-control-bindings options) @@ -175,22 +171,14 @@ :target-db ,pg-db-conn :source-db ,ms-db-conn))) - ,(sql-code-block pg-db-conn 'state-before before "before load") + ,(sql-code-block pg-db-conn :pre before "before load") (pgloader.mssql:copy-database source - :state-before state-before - :state-after state-after - :state-indexes state-idx :including ',including :excluding ',excluding ,@(remove-batch-control-option options)) - ,(sql-code-block pg-db-conn 'state-after after "after load") - - (report-full-summary "Total import time" *state* - :before state-before - :finally state-after - :parallel state-idx)))) + ,(sql-code-block pg-db-conn :post after "after load")))) (defrule load-mssql-database load-mssql-command (:lambda (source) diff --git a/src/parsers/command-mysql.lisp b/src/parsers/command-mysql.lisp index d395d7e..b6dd1e8 100644 --- a/src/parsers/command-mysql.lisp +++ b/src/parsers/command-mysql.lisp @@ -166,11 +166,7 @@ ((:excluding excl)) ((:decoding decoding-as))) `(lambda () - (let* ((state-before (pgloader.utils:make-pgstate)) - (*state* (or *state* (pgloader.utils:make-pgstate))) - (state-idx (pgloader.utils:make-pgstate)) - (state-after (pgloader.utils:make-pgstate)) - (*default-cast-rules* ',*mysql-default-cast-rules*) + (let* ((*default-cast-rules* ',*mysql-default-cast-rules*) (*cast-rules* ',casts) ,@(pgsql-connection-bindings pg-db-conn gucs) ,@(batch-control-bindings options) @@ -180,24 +176,16 @@ :target-db ,pg-db-conn :source-db ,my-db-conn))) - ,(sql-code-block pg-db-conn 'state-before before "before load") + ,(sql-code-block pg-db-conn :pre before "before load") (pgloader.mysql:copy-database source :including ',incl :excluding ',excl :decoding-as ',decoding-as :materialize-views ',views - :state-before state-before - :state-after state-after - :state-indexes state-idx ,@(remove-batch-control-option options)) - ,(sql-code-block pg-db-conn 'state-after after "after load") - - (report-full-summary "Total import time" *state* - :before state-before - :finally state-after - :parallel state-idx)))) + ,(sql-code-block pg-db-conn :post after "after load")))) (defrule load-mysql-database load-mysql-command (:lambda (source) diff --git a/src/parsers/command-sql-block.lisp b/src/parsers/command-sql-block.lisp index 3c254fd..5ae2f1b 100644 --- a/src/parsers/command-sql-block.lisp +++ b/src/parsers/command-sql-block.lisp @@ -58,12 +58,12 @@ (bind (((_ _ sql-list-of-list) after)) (cons :after (apply #'append sql-list-of-list))))) -(defun sql-code-block (pgconn state commands label) +(defun sql-code-block (pgconn section commands label) "Return lisp code to run COMMANDS against DBNAME, updating STATE." (when commands `(with-stats-collection (,label :dbname ,(db-name pgconn) - :state ,state + :section ,section :use-result-as-read t :use-result-as-rows t) (with-pgsql-transaction (:pgconn ,pgconn) diff --git a/src/parsers/command-sqlite.lisp b/src/parsers/command-sqlite.lisp index aa7d65f..e38776f 100644 --- a/src/parsers/command-sqlite.lisp +++ b/src/parsers/command-sqlite.lisp @@ -101,20 +101,17 @@ load database ((:including incl)) ((:excluding excl))) `(lambda () - (let* ((state-before (pgloader.utils:make-pgstate)) - (*state* (pgloader.utils:make-pgstate)) - (*default-cast-rules* ',*sqlite-default-cast-rules*) + (let* ((*default-cast-rules* ',*sqlite-default-cast-rules*) (*cast-rules* ',casts) ,@(pgsql-connection-bindings pg-db-conn gucs) ,@(batch-control-bindings options) - (source-db (with-stats-collection ("fetch" :state state-before) + (source-db (with-stats-collection ("fetch" :section :pre) (expand (fetch-file ,sqlite-db-conn)))) (source (make-instance 'pgloader.sqlite::copy-sqlite :target-db ,pg-db-conn :source-db source-db))) (pgloader.sqlite:copy-database source - :state-before state-before :including ',incl :excluding ',excl ,@(remove-batch-control-option options))))) diff --git a/src/pgsql/pgsql.lisp b/src/pgsql/pgsql.lisp index b9a3a9d..42464a6 100644 --- a/src/pgsql/pgsql.lisp +++ b/src/pgsql/pgsql.lisp @@ -45,11 +45,10 @@ &key columns (truncate t) - disable-triggers - ((:state *state*) *state*)) + disable-triggers) "Fetch from the QUEUE messages containing how many rows are in the *writer-batch* for us to send down to PostgreSQL, and when that's done - update *state*." + update stats." (when truncate (truncate-tables pgconn (list table-name))) @@ -68,44 +67,10 @@ #+sbcl (when oversized? (sb-ext:gc :full t)) (log-message :debug "copy-batch ~a ~d row~:p~:[~; [oversized]~]" unqualified-table-name rows oversized?) - (pgstate-incf *state* table-name :rows rows))) + (update-stats :data table-name :rows rows))) (when disable-triggers (enable-triggers unqualified-table-name))))) -;;; -;;; When a batch has been refused by PostgreSQL with a data-exception, that -;;; means it contains non-conforming data. Log the error message in a log -;;; file and the erroneous data in a rejected data file for further -;;; processing. -;;; -(defun process-bad-row (table-name condition row) - "Add the row to the reject file, in PostgreSQL COPY TEXT format" - ;; first, update the stats. - (pgstate-incf *state* table-name :errs 1) - - ;; now, the bad row processing - (let* ((table (pgstate-get-table *state* table-name)) - (data (pgtable-reject-data table)) - (logs (pgtable-reject-logs table))) - - ;; first log the rejected data - (with-open-file (reject-data-file data - :direction :output - :if-exists :append - :if-does-not-exist :create - :external-format :utf-8) - ;; the row has already been processed when we get here - (write-string row reject-data-file)) - - ;; now log the condition signaled to reject the data - (with-open-file (reject-logs-file logs - :direction :output - :if-exists :append - :if-does-not-exist :create - :external-format :utf-8) - ;; the row has already been processed when we get here - (format reject-logs-file "~a~%" condition)))) - ;;; ;;; Compute how many rows we're going to try loading next, depending on ;;; where we are in the batch currently and where is the next-error to be diff --git a/src/pgsql/queries.lisp b/src/pgsql/queries.lisp index e7aaeb2..3758a36 100644 --- a/src/pgsql/queries.lisp +++ b/src/pgsql/queries.lisp @@ -109,22 +109,23 @@ (log-message :debug set) (pomo:execute set)))) -(defun pgsql-connect-and-execute-with-timing (pgconn label sql state &key (count 1)) +(defun pgsql-connect-and-execute-with-timing (pgconn section label sql &key (count 1)) "Run pgsql-execute-with-timing within a newly establised connection." (with-pgsql-connection (pgconn) (pomo:with-transaction () - (pgsql-execute-with-timing label sql state :count count)))) + (pgsql-execute-with-timing section label sql :count count)))) -(defun pgsql-execute-with-timing (label sql state &key (count 1)) +(defun pgsql-execute-with-timing (section label sql &key (count 1)) "Execute given SQL and resgister its timing into STATE." (multiple-value-bind (res secs) (timing - (handler-case (pgsql-execute sql) + (handler-case + (pgsql-execute sql) (cl-postgres:database-error (e) (log-message :error "~a" e) - (pgstate-incf state label :errs 1 :rows (- count))))) + (update-stats section label :errs 1 :rows (- count))))) (declare (ignore res)) - (pgstate-incf state label :read count :rows count :secs secs))) + (update-stats section label :read count :rows count :secs secs))) (defun pgsql-execute (sql &key ((:client-min-messages level))) "Execute given SQL in current transaction" diff --git a/src/pgsql/schema.lisp b/src/pgsql/schema.lisp index d24ecc8..dd20849 100644 --- a/src/pgsql/schema.lisp +++ b/src/pgsql/schema.lisp @@ -145,17 +145,17 @@ (defun create-pgsql-fkeys (pgconn all-fkeys &key - state + (section :post) (label "Foreign Keys")) "Actually create the Foreign Key References that where declared in the MySQL database" - (pgstate-add-table state (db-name pgconn) label) + (new-label section label) (loop for (table-name . fkeys) in all-fkeys do (loop for fkey in fkeys for sql = (format-pgsql-create-fkey fkey) do (log-message :notice "~a;" sql) - (pgsql-execute-with-timing "Foreign Keys" sql state)))) + (pgsql-execute-with-timing section label sql)))) ;;; @@ -341,16 +341,11 @@ ;;; Parallel index building. ;;; (defun create-indexes-in-kernel (pgconn indexes kernel channel - &key - state - (label "Create Indexes")) + &key (label "Create Indexes")) "Create indexes for given table in dbname, using given lparallel KERNEL and CHANNEL so that the index build happen in concurrently with the data copying." (let* ((lp:*kernel* kernel)) - ;; ensure we have a stats entry - (pgstate-add-table state (db-name pgconn) label) - (loop :for index :in indexes :collect (multiple-value-bind (sql pkey) @@ -362,7 +357,7 @@ #'pgsql-connect-and-execute-with-timing ;; each thread must have its own connection (new-pgsql-connection pgconn) - label sql state) + :post label sql) ;; return the pkey "upgrade" statement pkey)))) @@ -390,18 +385,18 @@ ;;; ;;; Drop indexes before loading ;;; -(defun drop-indexes (state pgsql-index-list) +(defun drop-indexes (section pgsql-index-list) "Drop indexes in PGSQL-INDEX-LIST. A PostgreSQL connection must already be active when calling that function." (loop :for index :in pgsql-index-list :do (let ((sql (format-pgsql-drop-index index))) (log-message :notice "~a" sql) - (pgsql-execute-with-timing "drop indexes" sql state)))) + (pgsql-execute-with-timing section "drop indexes" sql)))) ;;; ;;; Higher level API to care about indexes ;;; -(defun maybe-drop-indexes (target table-name state &key drop-indexes) +(defun maybe-drop-indexes (target table-name &key (section :pre) drop-indexes) "Drop the indexes for TABLE-NAME on TARGET PostgreSQL connection, and returns a list of indexes to create again." (with-pgsql-connection (target) @@ -420,14 +415,13 @@ (indexes ;; drop the indexes now - (with-stats-collection ("drop indexes" :state state) - (drop-indexes state indexes)))) + (with-stats-collection ("drop indexes" :section section) + (drop-indexes section indexes)))) ;; and return the indexes list indexes))) -(defun create-indexes-again (target indexes state state-parallel - &key drop-indexes) +(defun create-indexes-again (target indexes &key (section :post) drop-indexes) "Create the indexes that we dropped previously." (when (and indexes drop-indexes) (let* ((*preserve-index-names* t) @@ -438,29 +432,27 @@ (idx-channel (let ((lp:*kernel* idx-kernel)) (lp:make-channel)))) (let ((pkeys - (create-indexes-in-kernel target indexes idx-kernel idx-channel - :state state-parallel))) + (create-indexes-in-kernel target indexes idx-kernel idx-channel))) - (with-stats-collection ("Index Build Completion" :state state) + (with-stats-collection ("Index Build Completion" :section section) (loop :for idx :in indexes :do (lp:receive-result idx-channel))) ;; turn unique indexes into pkeys now (with-pgsql-connection (target) - (with-stats-collection ("Constraints" :state state) + (with-stats-collection ("Constraints" :section section) (loop :for sql :in pkeys :when sql :do (progn (log-message :notice "~a" sql) - (pgsql-execute-with-timing "Constraints" sql state))))))))) + (pgsql-execute-with-timing section "Constraints" sql))))))))) ;;; ;;; Sequences ;;; -(defun reset-sequences (table-names &key pgconn state) +(defun reset-sequences (table-names &key pgconn (section :post)) "Reset all sequences created during this MySQL migration." (log-message :notice "Reset sequences") (with-stats-collection ("Reset Sequences" - :dbname (db-name pgconn) :use-result-as-rows t - :state state) + :section section) (reset-all-sequences pgconn :tables table-names))) diff --git a/src/queue.lisp b/src/queue.lisp index 3674e2a..9c829ff 100644 --- a/src/queue.lisp +++ b/src/queue.lisp @@ -55,7 +55,7 @@ (condition (e) (log-message :error "~a" e) - (pgstate-incf *state* (target copy) :errs 1)))) + (update-stats :data (target copy) :errs 1)))) (defun map-push-queue (copy queue &optional pre-formatted) "Apply MAP-ROWS on the COPY instance and a function of ROW that will push diff --git a/src/sources/common/project-fields.lisp b/src/sources/common/project-fields.lisp index 7567425..f666647 100644 --- a/src/sources/common/project-fields.lisp +++ b/src/sources/common/project-fields.lisp @@ -126,7 +126,7 @@ lines we did read in the file." (let ((projection (project-fields :fields fields :columns columns))) (lambda (row) - (pgstate-incf *state* target :read 1) + (update-stats :data target :read 1) ;; cl-csv returns (nil) for an empty line (if (or (null row) (and (null (car row)) (null (cdr row)))) @@ -137,11 +137,8 @@ (handler-case (funcall projection row) (condition (e) - (pgstate-incf *state* target :errs 1) - (log-message :error "Could not read line ~d: ~a" - (pgloader.utils::pgtable-read - (pgstate-get-table *state* target)) - e))))) + (update-stats :data target :errs 1) + (log-message :error "Could not read input: ~a" e))))) (when projected-vector (funcall process-row-fn projected-vector))))))) diff --git a/src/sources/copy.lisp b/src/sources/copy.lisp index 2c4c56c..8a9d229 100644 --- a/src/sources/copy.lisp +++ b/src/sources/copy.lisp @@ -93,7 +93,7 @@ (condition (e) (progn (log-message :error "~a" e) - (pgstate-incf *state* (target copy) :errs 1)))))))))) + (update-stats :data (target copy) :errs 1)))))))))) (defmethod copy-to-queue ((copy copy-copy) queue) "Copy data from given COPY definition into lparallel.queue DATAQ" @@ -101,27 +101,18 @@ (defmethod copy-from ((copy copy-copy) &key - state-before - state-after - state-indexes truncate disable-triggers drop-indexes) "Copy data from given COPY file definition into its PostgreSQL target table." - (let* ((summary (null *state*)) - (*state* (or *state* (pgloader.utils:make-pgstate))) - (lp:*kernel* (make-kernel 2)) + (let* ((lp:*kernel* (make-kernel 2)) (channel (lp:make-channel)) (queue (lq:make-queue :fixed-capacity *concurrent-batches*)) (indexes (maybe-drop-indexes (target-db copy) (target copy) - state-before :drop-indexes drop-indexes))) - (with-stats-collection ((target copy) - :dbname (db-name (target-db copy)) - :state *state* - :summary summary) + (with-stats-collection ((target copy) :dbname (db-name (target-db copy))) (lp:task-handler-bind ((error #'lp:invoke-transfer-error)) (log-message :notice "COPY ~a" (target copy)) (lp:submit-task channel #'copy-to-queue copy queue) @@ -144,6 +135,6 @@ finally (lp:end-kernel)))) ;; re-create the indexes - (create-indexes-again (target-db copy) indexes state-after state-indexes + (create-indexes-again (target-db copy) indexes :drop-indexes drop-indexes))) diff --git a/src/sources/csv/csv.lisp b/src/sources/csv/csv.lisp index 7d77697..6d20660 100644 --- a/src/sources/csv/csv.lisp +++ b/src/sources/csv/csv.lisp @@ -126,7 +126,7 @@ (condition (e) (progn (log-message :error "~a" e) - (pgstate-incf *state* (target csv) :errs 1))))))))) + (update-stats :data (target csv) :errs 1))))))))) (defmethod copy-to-queue ((csv copy-csv) queue) "Copy data from given CSV definition into lparallel.queue DATAQ" @@ -134,26 +134,18 @@ (defmethod copy-from ((csv copy-csv) &key - state-before - state-after - state-indexes truncate disable-triggers drop-indexes) "Copy data from given CSV file definition into its PostgreSQL target table." - (let* ((summary (null *state*)) - (*state* (or *state* (pgloader.utils:make-pgstate))) - (lp:*kernel* (make-kernel 2)) + (let* ((lp:*kernel* (make-kernel 2)) (channel (lp:make-channel)) (queue (lq:make-queue :fixed-capacity *concurrent-batches*)) (indexes (maybe-drop-indexes (target-db csv) (target csv) - state-before :drop-indexes drop-indexes))) - (with-stats-collection ((target csv) - :dbname (db-name (target-db csv)) - :state *state* :summary summary) + (with-stats-collection ((target csv) :dbname (db-name (target-db csv))) (lp:task-handler-bind () ;; ((error #'lp:invoke-transfer-error)) (log-message :notice "COPY ~a" (target csv)) (lp:submit-task channel #'copy-to-queue csv queue) @@ -176,5 +168,4 @@ finally (lp:end-kernel)))) ;; re-create the indexes - (create-indexes-again (target-db csv) indexes state-after state-indexes - :drop-indexes drop-indexes))) + (create-indexes-again (target-db csv) indexes :drop-indexes drop-indexes))) diff --git a/src/sources/db3/db3.lisp b/src/sources/db3/db3.lisp index f0af6ae..36d67b6 100644 --- a/src/sources/db3/db3.lisp +++ b/src/sources/db3/db3.lisp @@ -59,21 +59,15 @@ (defmethod copy-to-queue ((db3 copy-db3) queue) "Copy data from DB3 file FILENAME into queue DATAQ" - (let ((read (pgloader.queue:map-push-queue db3 queue))) - (pgstate-incf *state* (target db3) :read read))) + (pgloader.queue:map-push-queue db3 queue)) (defmethod copy-from ((db3 copy-db3) &key (kernel nil k-s-p) truncate disable-triggers) - (let* ((summary (null *state*)) - (*state* (or *state* (pgloader.utils:make-pgstate))) - (lp:*kernel* (or kernel (make-kernel 2))) + (let* ((lp:*kernel* (or kernel (make-kernel 2))) (channel (lp:make-channel)) (queue (lq:make-queue :fixed-capacity *concurrent-batches*))) - (with-stats-collection ((target db3) - :dbname (db-name (target-db db3)) - :state *state* - :summary summary) + (with-stats-collection ((target db3) :dbname (db-name (target-db db3))) (lp:task-handler-bind ((error #'lp:invoke-transfer-error)) (log-message :notice "COPY \"~a\" from '~a'" (target db3) (source db3)) (lp:submit-task channel #'copy-to-queue db3 queue) @@ -94,7 +88,6 @@ (defmethod copy-database ((db3 copy-db3) &key table-name - state-before data-only schema-only (truncate t) @@ -105,9 +98,7 @@ (reset-sequences t)) "Open the DB3 and stream its content to a PostgreSQL database." (declare (ignore create-indexes reset-sequences)) - (let* ((summary (null *state*)) - (*state* (or *state* (make-pgstate))) - (table-name (or table-name + (let* ((table-name (or table-name (target db3) (source db3)))) @@ -116,9 +107,7 @@ (handler-case (when (and (or create-tables schema-only) (not data-only)) - (with-stats-collection ("create, truncate" - :state state-before - :summary summary) + (with-stats-collection ("create, truncate" :section :pre) (with-pgsql-transaction (:pgconn (target-db db3)) (when create-tables (with-schema (tname table-name) @@ -133,9 +122,4 @@ (return-from copy-database))) (unless schema-only - (copy-from db3 :truncate truncate :disable-triggers disable-triggers)) - - ;; and report the total time spent on the operation - (when summary - (report-full-summary "Total streaming time" *state* - :before state-before)))) + (copy-from db3 :truncate truncate :disable-triggers disable-triggers)))) diff --git a/src/sources/fixed.lisp b/src/sources/fixed.lisp index 54620f5..5b116dc 100644 --- a/src/sources/fixed.lisp +++ b/src/sources/fixed.lisp @@ -79,7 +79,7 @@ (condition (e) (progn (log-message :error "~a" e) - (pgstate-incf *state* (target fixed) :errs 1)))))))))) + (update-stats :data (target fixed) :errs 1)))))))))) (defmethod copy-to-queue ((fixed copy-fixed) queue) "Copy data from given FIXED definition into lparallel.queue DATAQ" @@ -87,27 +87,18 @@ (defmethod copy-from ((fixed copy-fixed) &key - state-before - state-after - state-indexes truncate disable-triggers drop-indexes) "Copy data from given FIXED file definition into its PostgreSQL target table." - (let* ((summary (null *state*)) - (*state* (or *state* (pgloader.utils:make-pgstate))) - (lp:*kernel* (make-kernel 2)) + (let* ((lp:*kernel* (make-kernel 2)) (channel (lp:make-channel)) (queue (lq:make-queue :fixed-capacity *concurrent-batches*)) (indexes (maybe-drop-indexes (target-db fixed) (target fixed) - state-before :drop-indexes drop-indexes))) - (with-stats-collection ((target fixed) - :dbname (db-name (target-db fixed)) - :state *state* - :summary summary) + (with-stats-collection ((target fixed) :dbname (db-name (target-db fixed))) (lp:task-handler-bind () ;; ((error #'lp:invoke-transfer-error)) (log-message :notice "COPY ~a" (target fixed)) (lp:submit-task channel #'copy-to-queue fixed queue) @@ -130,6 +121,5 @@ finally (lp:end-kernel)))) ;; re-create the indexes - (create-indexes-again (target-db fixed) indexes state-after state-indexes - :drop-indexes drop-indexes))) + (create-indexes-again (target-db fixed) indexes :drop-indexes drop-indexes))) diff --git a/src/sources/ixf/ixf.lisp b/src/sources/ixf/ixf.lisp index 8872666..695ec6b 100644 --- a/src/sources/ixf/ixf.lisp +++ b/src/sources/ixf/ixf.lisp @@ -61,28 +61,22 @@ (with-connection (conn (source-db copy-ixf)) (let ((ixf (ixf:make-ixf-file :stream (conn-handle conn))) (row-fn (lambda (row) - (pgstate-incf *state* (target copy-ixf) :read 1) + (update-stats :data (target copy-ixf) :read 1) (funcall process-row-fn row)))) (ixf:read-headers ixf) (ixf:map-data ixf row-fn)))) (defmethod copy-to-queue ((ixf copy-ixf) queue) "Copy data from IXF file FILENAME into queue DATAQ" - (let ((read (pgloader.queue:map-push-queue ixf queue))) - (pgstate-incf *state* (target ixf) :read read))) + (pgloader.queue:map-push-queue ixf queue)) (defmethod copy-from ((ixf copy-ixf) &key (kernel nil k-s-p) truncate disable-triggers) - (let* ((summary (null *state*)) - (*state* (or *state* (pgloader.utils:make-pgstate))) - (lp:*kernel* (or kernel (make-kernel 2))) + (let* ((lp:*kernel* (or kernel (make-kernel 2))) (channel (lp:make-channel)) (queue (lq:make-queue :fixed-capacity *concurrent-batches*))) - (with-stats-collection ((target ixf) - :dbname (db-name (target-db ixf)) - :state *state* - :summary summary) + (with-stats-collection ((target ixf) :dbname (db-name (target-db ixf))) (lp:task-handler-bind ((error #'lp:invoke-transfer-error)) (log-message :notice "COPY \"~a\" from '~a'" (target ixf) (source ixf)) (lp:submit-task channel #'copy-to-queue ixf queue) @@ -103,7 +97,6 @@ (defmethod copy-database ((ixf copy-ixf) &key table-name - state-before data-only schema-only (truncate t) @@ -114,9 +107,7 @@ (reset-sequences t)) "Open the IXF and stream its content to a PostgreSQL database." (declare (ignore create-indexes reset-sequences)) - (let* ((summary (null *state*)) - (*state* (or *state* (make-pgstate))) - (table-name (or table-name + (let* ((table-name (or table-name (target ixf) (source ixf)))) @@ -125,9 +116,7 @@ (handler-case (when (and (or create-tables schema-only) (not data-only)) - (with-stats-collection ("create, truncate" - :state state-before - :summary summary) + (with-stats-collection ("create, truncate" :section :pre) (with-pgsql-transaction (:pgconn (target-db ixf)) (when create-tables (with-schema (tname table-name) @@ -142,10 +131,5 @@ (return-from copy-database))) (unless schema-only - (copy-from ixf :truncate truncate :disable-triggers disable-triggers)) - - ;; and report the total time spent on the operation - (when summary - (report-full-summary "Total streaming time" *state* - :before state-before)))) + (copy-from ixf :truncate truncate :disable-triggers disable-triggers)))) diff --git a/src/sources/mssql/mssql.lisp b/src/sources/mssql/mssql.lisp index 9daee61..0889e9a 100644 --- a/src/sources/mssql/mssql.lisp +++ b/src/sources/mssql/mssql.lisp @@ -37,7 +37,7 @@ table-name))) (row-fn (lambda (row) - (pgstate-incf *state* (target mssql) :read 1) + (update-stats :data (target mssql) :read 1) (funcall process-row-fn row)))) (log-message :debug "~a" sql) (handler-case @@ -45,7 +45,7 @@ ((condition #'(lambda (c) (log-message :error "~a" c) - (pgstate-incf *state* (target mssql) :errs 1) + (update-stats :data (target mssql) :errs 1) (invoke-restart 'mssql::use-nil)))) (mssql::map-query-results sql :row-fn row-fn @@ -53,7 +53,7 @@ (condition (e) (progn (log-message :error "~a" e) - (pgstate-incf *state* (target mssql) :errs 1))))))) + (update-stats :data (target mssql) :errs 1))))))) (defmethod copy-to-queue ((mssql copy-mssql) queue) "Copy data from MSSQL table DBNAME.TABLE-NAME into queue DATAQ" @@ -62,19 +62,14 @@ (defmethod copy-from ((mssql copy-mssql) &key (kernel nil k-s-p) truncate disable-triggers) "Connect in parallel to MSSQL and PostgreSQL and stream the data." - (let* ((summary (null *state*)) - (*state* (or *state* (pgloader.utils:make-pgstate))) - (lp:*kernel* (or kernel (make-kernel 2))) + (let* ((lp:*kernel* (or kernel (make-kernel 2))) (channel (lp:make-channel)) (queue (lq:make-queue :fixed-capacity *concurrent-batches*)) (table-name (target mssql))) ;; we account stats against the target table-name, because that's all we ;; know on the PostgreSQL thread - (with-stats-collection (table-name - :dbname (db-name (target-db mssql)) - :state *state* - :summary summary) + (with-stats-collection (table-name :dbname (db-name (target-db mssql))) (lp:task-handler-bind ((error #'lp:invoke-transfer-error)) (log-message :notice "COPY ~a" table-name) ;; read data from Mssql @@ -97,7 +92,6 @@ (defun complete-pgsql-database (pgconn all-columns all-fkeys pkeys &key - state data-only foreign-keys reset-sequences) @@ -111,18 +105,17 @@ ;; (when reset-sequences (let ((table-names (mapcar #'car (qualified-table-name-list all-columns)))) - (reset-sequences table-names :pgconn pgconn :state state))) + (reset-sequences table-names :pgconn pgconn))) ;; ;; Turn UNIQUE indexes into PRIMARY KEYS now ;; (with-pgsql-connection (pgconn) - (pgstate-add-table state (db-name pgconn) "Primary Keys") (loop :for sql :in pkeys :when sql :do (progn (log-message :notice "~a" sql) - (pgsql-execute-with-timing "Primary Keys" sql state))) + (pgsql-execute-with-timing :post "Primary Keys" sql))) ;; ;; Foreign Key Constraints @@ -132,22 +125,21 @@ ;; and indexes are imported before doing that. ;; (when (and foreign-keys (not data-only)) - (pgstate-add-table state (db-name pgconn) "Foreign Keys") (loop :for (schema . tables) :in all-fkeys :do (loop :for (table-name . fkeys) :in tables :do (loop :for (fk-name . fkey) :in fkeys :for sql := (format-pgsql-create-fkey fkey) :do (progn (log-message :notice "~a;" sql) - (pgsql-execute-with-timing "Foreign Keys" sql state)))))))) + (pgsql-execute-with-timing :post "Foreign Keys" sql)))))))) -(defun fetch-mssql-metadata (mssql &key state including excluding) +(defun fetch-mssql-metadata (mssql &key including excluding) "MS SQL introspection to prepare the migration." (let (all-columns all-indexes all-fkeys) (with-stats-collection ("fetch meta data" :use-result-as-rows t :use-result-as-read t - :state state) + :section :pre) (with-connection (*mssql-db* (source-db mssql)) (setf all-columns (list-all-columns :including including :excluding excluding)) @@ -172,9 +164,6 @@ (defmethod copy-database ((mssql copy-mssql) &key - state-before - state-after - state-indexes (truncate nil) (disable-triggers nil) (data-only nil) @@ -189,19 +178,13 @@ including excluding) "Stream the given MS SQL database down to PostgreSQL." - (let* ((summary (null *state*)) - (*state* (or *state* (make-pgstate))) - (idx-state (or state-indexes (make-pgstate))) - (state-before (or state-before (make-pgstate))) - (state-after (or state-after (make-pgstate))) - (cffi:*default-foreign-encoding* encoding) + (let* ((cffi:*default-foreign-encoding* encoding) (copy-kernel (make-kernel 2)) idx-kernel idx-channel) (destructuring-bind (&key all-columns all-indexes all-fkeys pkeys) ;; to prepare the run we need to fetch MS SQL meta-data (fetch-mssql-metadata mssql - :state state-before :including including :excluding excluding) @@ -220,9 +203,7 @@ (handler-case (cond ((and (or create-tables schema-only) (not data-only)) (log-message :notice "~:[~;DROP then ~]CREATE TABLES" include-drop) - (with-stats-collection ("create, truncate" - :state state-before - :summary summary) + (with-stats-collection ("create, truncate" :section :pre) (with-pgsql-transaction (:pgconn (target-db mssql)) (loop :for (schema . tables) :in all-columns :do (let ((schema (apply-identifier-case schema))) @@ -305,8 +286,7 @@ (create-indexes-in-kernel (target-db mssql) (mapcar #'cdr indexes-with-names) idx-kernel - idx-channel - :state idx-state))))))) + idx-channel))))))) ;; now end the kernels (let ((lp:*kernel* copy-kernel)) (lp:end-kernel)) @@ -314,7 +294,7 @@ ;; wait until the indexes are done being built... ;; don't forget accounting for that waiting time. (when (and create-indexes (not data-only)) - (with-stats-collection ("Index Build Completion" :state *state*) + (with-stats-collection ("Index Build Completion" :section :post) (loop for idx in all-indexes do (lp:receive-result idx-channel)))) (lp:end-kernel)) @@ -323,14 +303,6 @@ ;; (complete-pgsql-database (new-pgsql-connection (target-db mssql)) all-columns all-fkeys pkeys - :state state-after :data-only data-only :foreign-keys foreign-keys - :reset-sequences reset-sequences) - - ;; and report the total time spent on the operation - (when summary - (report-full-summary "Total streaming time" *state* - :before state-before - :finally state-after - :parallel idx-state))))) + :reset-sequences reset-sequences)))) diff --git a/src/sources/mysql/mysql.lisp b/src/sources/mysql/mysql.lisp index 2425fe6..76058c2 100644 --- a/src/sources/mysql/mysql.lisp +++ b/src/sources/mysql/mysql.lisp @@ -52,18 +52,18 @@ (sql (format nil "SELECT ~{~a~^, ~} FROM `~a`;" cols table-name)) (row-fn (lambda (row) - (pgstate-incf *state* (target mysql) :read 1) + (update-stats :data (target mysql) :read 1) (funcall process-row-fn row)))) (handler-bind ;; avoid trying to fetch the character at end-of-input position... ((babel-encodings:end-of-input-in-character #'(lambda (c) - (pgstate-incf *state* (target mysql) :errs 1) + (update-stats :data (target mysql) :errs 1) (log-message :error "~a" c) (invoke-restart 'qmynd-impl::use-nil))) (babel-encodings:character-decoding-error #'(lambda (c) - (pgstate-incf *state* (target mysql) :errs 1) + (update-stats :data (target mysql) :errs 1) (let ((encoding (babel-encodings:character-coding-error-encoding c)) (position (babel-encodings:character-coding-error-position c)) (character @@ -106,19 +106,14 @@ (defmethod copy-from ((mysql copy-mysql) &key (kernel nil k-s-p) truncate disable-triggers) "Connect in parallel to MySQL and PostgreSQL and stream the data." - (let* ((summary (null *state*)) - (*state* (or *state* (pgloader.utils:make-pgstate))) - (lp:*kernel* (or kernel (make-kernel 2))) + (let* ((lp:*kernel* (or kernel (make-kernel 2))) (channel (lp:make-channel)) (queue (lq:make-queue :fixed-capacity *concurrent-batches*)) (table-name (target mysql))) ;; we account stats against the target table-name, because that's all we ;; know on the PostgreSQL thread - (with-stats-collection (table-name - :dbname (db-name (target-db mysql)) - :state *state* - :summary summary) + (with-stats-collection (table-name :dbname (db-name (target-db mysql))) (lp:task-handler-bind ((error #'lp:invoke-transfer-error)) (log-message :notice "COPY ~a" table-name) ;; read data from MySQL @@ -150,7 +145,6 @@ all-columns all-indexes all-fkeys materialize-views view-columns &key - state foreign-keys include-drop) "Prepare the target PostgreSQL database: create tables casting datatypes @@ -165,7 +159,7 @@ (length all-columns) (loop for (name . idxs) in all-indexes sum (length idxs))) - (with-stats-collection ("create, drop" :use-result-as-rows t :state state) + (with-stats-collection ("create, drop" :use-result-as-rows t :section :pre) (with-pgsql-transaction (:pgconn pgconn) ;; we need to first drop the Foreign Key Constraints, so that we ;; can DROP TABLE when asked @@ -188,7 +182,6 @@ (defun complete-pgsql-database (pgconn all-columns all-fkeys pkeys table-comments column-comments &key - state data-only foreign-keys reset-sequences) @@ -201,18 +194,17 @@ ;; while CREATE INDEX statements are in flight (avoid locking). ;; (when reset-sequences - (reset-sequences (mapcar #'car all-columns) :pgconn pgconn :state state)) + (reset-sequences (mapcar #'car all-columns) :pgconn pgconn)) (with-pgsql-connection (pgconn) ;; ;; Turn UNIQUE indexes into PRIMARY KEYS now ;; - (pgstate-add-table state (db-name pgconn) "Primary Keys") (loop :for sql :in pkeys :when sql :do (progn (log-message :notice "~a" sql) - (pgsql-execute-with-timing "Primary Keys" sql state))) + (pgsql-execute-with-timing :post "Primary Keys" sql))) ;; ;; Foreign Key Constraints @@ -222,19 +214,17 @@ ;; and indexes are imported before doing that. ;; (when (and foreign-keys (not data-only)) - (pgstate-add-table state (db-name pgconn) "Foreign Keys") (loop :for (table-name . fkeys) :in all-fkeys :do (loop :for fkey :in fkeys :for sql := (format-pgsql-create-fkey fkey) :do (progn (log-message :notice "~a;" sql) - (pgsql-execute-with-timing "Foreign Keys" sql state))))) + (pgsql-execute-with-timing :post "Foreign Keys" sql))))) ;; ;; And now, comments on tables and columns. ;; (log-message :notice "Comments") - (pgstate-add-table state (db-name pgconn) "Comments") (let* ((quote ;; just something improbably found in a table comment, to use as ;; dollar quoting, and generated at random at that. @@ -256,7 +246,7 @@ quote comment quote) :do (progn (log-message :log "~a" sql) - (pgsql-execute-with-timing "Comments" sql state))) + (pgsql-execute-with-timing :post "Comments" sql))) (loop :for (table-name column-name comment) :in column-comments :for sql := (format nil "comment on column ~a.~a is $~a$~a$~a$" @@ -265,11 +255,10 @@ quote comment quote) :do (progn (log-message :notice "~a;" sql) - (pgsql-execute-with-timing "Comments" sql state)))))) + (pgsql-execute-with-timing :post "Comments" sql)))))) (defun fetch-mysql-metadata (mysql &key - state materialize-views only-tables including @@ -282,7 +271,7 @@ (with-stats-collection ("fetch meta data" :use-result-as-rows t :use-result-as-read t - :state state) + :section :pre) (with-connection (*connection* (source-db mysql)) ;; If asked to MATERIALIZE VIEWS, now is the time to create them in ;; MySQL, when given definitions rather than existing view names. @@ -350,9 +339,6 @@ ;;; (defmethod copy-database ((mysql copy-mysql) &key - state-before - state-after - state-indexes (truncate nil) (disable-triggers nil) (data-only nil) @@ -369,12 +355,7 @@ decoding-as materialize-views) "Export MySQL data and Import it into PostgreSQL" - (let* ((summary (null *state*)) - (*state* (or *state* (make-pgstate))) - (idx-state (or state-indexes (make-pgstate))) - (state-before (or state-before (make-pgstate))) - (state-after (or state-after (make-pgstate))) - (copy-kernel (make-kernel 2)) + (let* ((copy-kernel (make-kernel 2)) idx-kernel idx-channel) (destructuring-bind (&key view-columns all-columns @@ -382,7 +363,6 @@ all-fkeys all-indexes pkeys) ;; to prepare the run, we need to fetch MySQL meta-data (fetch-mysql-metadata mysql - :state state-before :materialize-views materialize-views :only-tables only-tables :including including @@ -409,7 +389,6 @@ all-fkeys materialize-views view-columns - :state state-before :foreign-keys foreign-keys :include-drop include-drop)) (t @@ -479,8 +458,7 @@ (alexandria:appendf pkeys (create-indexes-in-kernel (target-db mysql) - indexes idx-kernel idx-channel - :state idx-state)))))) + indexes idx-kernel idx-channel)))))) ;; now end the kernels (let ((lp:*kernel* copy-kernel)) (lp:end-kernel)) @@ -488,7 +466,7 @@ ;; wait until the indexes are done being built... ;; don't forget accounting for that waiting time. (when (and create-indexes (not data-only)) - (with-stats-collection ("Index Build Completion" :state *state*) + (with-stats-collection ("Index Build Completion" :section :post) (loop for idx in all-indexes do (lp:receive-result idx-channel)))) (lp:end-kernel)) @@ -505,14 +483,6 @@ (complete-pgsql-database (new-pgsql-connection (target-db mysql)) all-columns all-fkeys pkeys table-comments column-comments - :state state-after :data-only data-only :foreign-keys foreign-keys - :reset-sequences reset-sequences) - - ;; and report the total time spent on the operation - (when summary - (report-full-summary "Total streaming time" *state* - :before state-before - :finally state-after - :parallel idx-state))))) + :reset-sequences reset-sequences)))) diff --git a/src/sources/sqlite/sqlite.lisp b/src/sources/sqlite/sqlite.lisp index e086e03..c0a7308 100644 --- a/src/sources/sqlite/sqlite.lisp +++ b/src/sources/sqlite/sqlite.lisp @@ -106,14 +106,14 @@ v) counting t into rows do (progn - (pgstate-incf *state* (target sqlite) :read 1) + (update-stats :data (target sqlite) :read 1) (funcall process-row-fn row)) finally (sqlite:finalize-statement statement) (return rows)) (condition (e) (log-message :error "~a" e) - (pgstate-incf *state* (target sqlite) :errs 1))))))) + (update-stats :data (target sqlite) :errs 1))))))) (defmethod copy-to-queue ((sqlite copy-sqlite) queue) @@ -123,16 +123,11 @@ (defmethod copy-from ((sqlite copy-sqlite) &key (kernel nil k-s-p) truncate disable-triggers) "Stream the contents from a SQLite database table down to PostgreSQL." - (let* ((summary (null *state*)) - (*state* (or *state* (pgloader.utils:make-pgstate))) - (lp:*kernel* (or kernel (make-kernel 2))) + (let* ((lp:*kernel* (or kernel (make-kernel 2))) (channel (lp:make-channel)) (queue (lq:make-queue :fixed-capacity *concurrent-batches*))) - (with-stats-collection ((target sqlite) - :dbname (db-name (target-db sqlite)) - :state *state* - :summary summary) + (with-stats-collection ((target sqlite) :dbname (db-name (target-db sqlite))) (lp:task-handler-bind ((error #'lp:invoke-transfer-error)) (log-message :notice "COPY ~a" (target sqlite)) ;; read data from SQLite @@ -151,17 +146,13 @@ (log-message :info "COPY ~a done." (target sqlite)) (unless k-s-p (lp:end-kernel))))))) -(defun fetch-sqlite-metadata (sqlite - &key - state - including - excluding) +(defun fetch-sqlite-metadata (sqlite &key including excluding) "SQLite introspection to prepare the migration." (let (all-columns all-indexes) (with-stats-collection ("fetch meta data" :use-result-as-rows t :use-result-as-read t - :state state) + :section :pre) (with-connection (conn (source-db sqlite)) (let ((*sqlite-db* (conn-handle conn))) (setf all-columns (filter-column-list (list-all-columns *sqlite-db*) @@ -182,7 +173,6 @@ (defmethod copy-database ((sqlite copy-sqlite) &key - state-before data-only schema-only (truncate nil) @@ -197,20 +187,12 @@ (encoding :utf-8)) "Stream the given SQLite database down to PostgreSQL." (declare (ignore only-tables)) - (let* ((summary (null *state*)) - (*state* (or *state* (make-pgstate))) - (state-before (or state-before (make-pgstate))) - (idx-state (make-pgstate)) - (seq-state (make-pgstate)) - (cffi:*default-foreign-encoding* encoding) + (let* ((cffi:*default-foreign-encoding* encoding) (copy-kernel (make-kernel 2)) idx-kernel idx-channel) (destructuring-bind (&key all-columns all-indexes pkeys) - (fetch-sqlite-metadata sqlite - :state state-before - :including including - :excluding excluding) + (fetch-sqlite-metadata sqlite :including including :excluding excluding) (let ((max-indexes (loop for (table . indexes) in all-indexes @@ -227,9 +209,7 @@ (handler-case (cond ((and (or create-tables schema-only) (not data-only)) (log-message :notice "~:[~;DROP then ~]CREATE TABLES" include-drop) - (with-stats-collection ("create, truncate" - :state state-before - :summary summary) + (with-stats-collection ("create, truncate" :section :pre) (with-pgsql-transaction (:pgconn (target-db sqlite)) (create-tables all-columns :include-drop include-drop)))) @@ -271,8 +251,7 @@ (alexandria:appendf pkeys (create-indexes-in-kernel (target-db sqlite) indexes - idx-kernel idx-channel - :state idx-state)))))) + idx-kernel idx-channel)))))) ;; now end the kernels (let ((lp:*kernel* copy-kernel)) (lp:end-kernel)) @@ -280,7 +259,7 @@ ;; wait until the indexes are done being built... ;; don't forget accounting for that waiting time. (when (and create-indexes (not data-only)) - (with-stats-collection ("index build completion" :state *state*) + (with-stats-collection ("index build completion" :section :post) (loop for idx in all-indexes do (lp:receive-result idx-channel)))) (lp:end-kernel)) @@ -288,12 +267,5 @@ ;; the data. (when reset-sequences (reset-sequences (mapcar #'car all-columns) - :pgconn (target-db sqlite) - :state seq-state)) - - ;; and report the total time spent on the operation - (report-full-summary "Total streaming time" *state* - :before state-before - :finally seq-state - :parallel idx-state)))) + :pgconn (target-db sqlite)))))) diff --git a/src/utils/monitor.lisp b/src/utils/monitor.lisp index b3c3f8d..6f08b1e 100644 --- a/src/utils/monitor.lisp +++ b/src/utils/monitor.lisp @@ -18,8 +18,8 @@ (defvar *monitoring-channel* nil "Internal lparallel channel.") -(defvar *sections* nil - "List of currently monitored activities (per category or concurrency.") +(defvar *sections* '(:pre nil :data nil :post nil) + "plist of load sections: :pre, :data and :post.") ;;; @@ -29,8 +29,9 @@ (defstruct stop stop-logger) (defstruct noop) (defstruct log-message category description arguments) -(defstruct new-label dbname section label) +(defstruct new-label section label dbname) (defstruct update-stats section label read rows errs secs rs ws) +(defstruct bad-row section label condition data) (defun log-message (category description &rest arguments) "Send given message into our monitoring queue for processing." @@ -38,6 +39,59 @@ :description description :arguments arguments))) +(defun new-label (section label &optional dbname) + "Send an event to create a new LABEL for registering a shared state under + SECTION." + (send-event (make-new-label :section section :label label :dbname dbname))) + +(defun update-stats (section label &key read rows errs secs rs ws) + "Send an event to update stats for given SECTION and LABEL." + (send-event (make-update-stats :section section + :label label + :read read + :rows rows + :errs errs + :secs secs + :rs rs + :ws ws))) + +(defun process-bad-row (table-name condition data) + "Send an event to log the bad row DATA in the reject and log files for given + TABLE-NAME (a label in section :data), for reason found in CONDITION." + (send-event (make-bad-row :section :data + :label table-name + :condition condition + :data data))) + +;;; +;;; Easier API to manage statistics collection and state updates +;;; +(defmacro with-stats-collection ((table-name + &key + (section :data) + dbname + use-result-as-read + use-result-as-rows) + &body forms) + "Measure time spent in running BODY into STATE, accounting the seconds to + given DBNAME and TABLE-NAME" + (let ((result (gensym "result")) + (secs (gensym "secs"))) + `(prog2 + (new-label ,section ,table-name ,dbname) + (multiple-value-bind (,result ,secs) + (timing ,@forms) + (cond ((and ,use-result-as-read ,use-result-as-rows) + (update-stats ,section ,table-name + :read ,result :rows ,result :secs ,secs)) + (,use-result-as-read + (update-stats ,section ,table-name :read ,result :secs ,secs)) + (,use-result-as-rows + (update-stats ,section ,table-name :rows ,result :secs ,secs)) + (t + (update-stats ,section ,table-name :secs ,secs))) + ,result)))) + ;;; ;;; Now, the monitor thread management @@ -65,7 +119,9 @@ (*client-min-messages* . ,*client-min-messages*) (*monitoring-queue* . ,*monitoring-queue*) (*error-output* . ,*error-output*) - (*standard-output* . ,*standard-output*))) + (*standard-output* . ,*standard-output*) + (*summary-pathname* . ,*summary-pathname*) + (*sections* . ',*sections*))) (lparallel:*kernel* (lp:make-kernel 1 :bindings bindings)) (*monitoring-channel* (lp:make-channel))) @@ -84,24 +140,29 @@ (defmacro with-monitor ((&key (start-logger t)) &body body) "Start and stop the monitor around BODY code. The monitor is responsible for processing logs into a central logfile" - `(if ,start-logger - (let* ((*monitoring-queue* (lq:make-queue)) - (*monitoring-channel* (start-monitor :start-logger ,start-logger))) - (unwind-protect - ,@body - (stop-monitor :channel *monitoring-channel* - :stop-logger ,start-logger))) + `(let ((*sections* (list :pre (make-pgstate) + :data (make-pgstate) + :post (make-pgstate)))) + (if ,start-logger + (let* ((*monitoring-queue* (lq:make-queue)) + (*monitoring-channel* (start-monitor :start-logger ,start-logger))) + (unwind-protect + ,@body + (stop-monitor :channel *monitoring-channel* + :stop-logger ,start-logger))) - ;; logger has already been started - (progn ,@body))) + ;; logger has already been started + (progn ,@body)))) (defun monitor (queue) "Receives and process messages from *monitoring-queue*." ;; process messages from the queue - (loop :for event := (multiple-value-bind (event available) - (lq:try-pop-queue queue) - (if available event (make-noop))) + (loop :with start-time := (get-internal-real-time) + + :for event := (multiple-value-bind (event available) + (lq:try-pop-queue queue) + (if available event (make-noop))) :do (typecase event (start (when (start-start-logger event) @@ -110,7 +171,22 @@ (stop (cl-log:log-message :info "Stopping monitor") - (when (stop-stop-logger event) (pgloader.logs:stop-logger))) + + ;; report the summary now + (let* ((summary-stream (when *summary-pathname* + (open *summary-pathname* + :direction :output + :if-exists :rename + :if-does-not-exist :create))) + (*report-stream* (or summary-stream *standard-output*))) + (report-full-summary "Total import time" + *sections* + (elapsed-time-since start-time)) + (when summary-stream (close summary-stream))) + + ;; time to shut down the logger? + (when (stop-stop-logger event) + (pgloader.logs:stop-logger))) (noop (sleep 0.2)) ; avoid buzy looping @@ -123,6 +199,54 @@ (log-message-description event) (log-message-arguments event)) (log-message-description event)))) - (cl-log:log-message (log-message-category event) "~a" mesg)))) + (cl-log:log-message (log-message-category event) "~a" mesg))) + + (new-label + (let ((label + (pgstate-new-label (getf *sections* (new-label-section event)) + (new-label-label event)))) + + (when (eq :data (new-label-section event)) + (pgtable-initialize-reject-files label + (new-label-dbname event))))) + + (update-stats + ;; it only costs an extra hash table lookup... + (pgstate-new-label (getf *sections* (update-stats-section event)) + (update-stats-label event)) + + (pgstate-incf (getf *sections* (update-stats-section event)) + (update-stats-label event) + :read (update-stats-read event) + :rows (update-stats-rows event) + :secs (update-stats-secs event))) + + (bad-row + (%process-bad-row (bad-row-label event) + (bad-row-condition event) + (bad-row-data event)))) :until (typep event 'stop))) + + +;;; +;;; Internal utils +;;; +(defun elapsed-time-since (start) + "Return how many seconds ticked between START and now" + (let ((now (get-internal-real-time))) + (coerce (/ (- now start) internal-time-units-per-second) 'double-float))) + + +;;; +;;; Timing Macro +;;; +(defmacro timing (&body forms) + "return both how much real time was spend in body and its result" + (let ((start (gensym)) + (end (gensym)) + (result (gensym))) + `(let* ((,start (get-internal-real-time)) + (,result (progn ,@forms)) + (,end (get-internal-real-time))) + (values ,result (/ (- ,end ,start) internal-time-units-per-second))))) diff --git a/src/utils/report.lisp b/src/utils/report.lisp index 5d787d3..f3df033 100644 --- a/src/utils/report.lisp +++ b/src/utils/report.lisp @@ -2,7 +2,7 @@ ;;; Pretty print a report while doing bulk operations ;;; -(in-package :pgloader.utils) +(in-package :pgloader.state) (defvar *header-line* "~&~v@{~A~:*~} --------- --------- --------- --------------") @@ -63,6 +63,31 @@ "Return the format string to use for a given TYPE of output and KEY." (getf (cadr (assoc type *header-format-strings*)) key)) +;;; +;;; Timing Formating +;;; +(defun format-interval (seconds &optional (stream t)) + "Output the number of seconds in a human friendly way" + (multiple-value-bind (years months days hours mins secs millisecs) + (date:decode-interval (date:encode-interval :second seconds)) + (declare (ignore millisecs)) + (format + stream + "~:[~*~;~d years ~]~:[~*~;~d months ~]~:[~*~;~d days ~]~:[~*~;~dh~]~:[~*~;~dm~]~5,3fs" + (< 0 years) years + (< 0 months) months + (< 0 days) days + (< 0 hours) hours + (< 0 mins) mins + (+ secs (- (multiple-value-bind (r q) + (truncate seconds 60) + (declare (ignore r)) + q) + secs))))) + +;;; +;;; Pretty printing reports in several formats +;;; (defun report-header () ;; (apply #'format *report-stream* *header-cols-format* *header-cols-names*) (format *report-stream* @@ -106,7 +131,7 @@ ;;; ;;; Pretty print the whole summary from a state ;;; -(defun report-summary (&key ((:state pgstate) *state*) (header t) footer) +(defun report-summary (pgstate &key (header t) footer) "Report a whole summary." (when header (report-header)) (loop @@ -122,34 +147,6 @@ finally (when footer (report-pgstate-stats pgstate footer)))) -(defmacro with-stats-collection ((table-name - &key - dbname - summary - use-result-as-read - use-result-as-rows - ((:state pgstate) *state*)) - &body forms) - "Measure time spent in running BODY into STATE, accounting the seconds to - given DBNAME and TABLE-NAME" - (let ((result (gensym "result")) - (secs (gensym "secs"))) - `(prog2 - (pgstate-add-table ,pgstate ,dbname ,table-name) - (multiple-value-bind (,result ,secs) - (timing ,@forms) - (cond ((and ,use-result-as-read ,use-result-as-rows) - (pgstate-incf ,pgstate ,table-name - :read ,result :rows ,result :secs ,secs)) - (,use-result-as-read - (pgstate-incf ,pgstate ,table-name :read ,result :secs ,secs)) - (,use-result-as-rows - (pgstate-incf ,pgstate ,table-name :rows ,result :secs ,secs)) - (t - (pgstate-incf ,pgstate ,table-name :secs ,secs))) - ,result) - (when ,summary (report-summary))))) - (defun parse-summary-type (&optional (pathname *summary-pathname*)) "Return the summary type we want: human-readable, csv, json." (when pathname @@ -158,25 +155,30 @@ ((string= "copy" (pathname-type pathname)) :copy) (t :human-readable)))) -(defun report-full-summary (legend state - &key before finally parallel start-time) +(defun max-length-table-name (legend data pre post) + "Compute the max length of a table-name in the legend." + (reduce #'max + (mapcar #'length + (mapcar #'format-table-name + (append (pgstate-tabnames data) + (pgstate-tabnames pre) + (pgstate-tabnames post) + (list legend)))))) + +(defun report-full-summary (legend sections total-secs) "Report the full story when given three different sections of reporting." - (let* ((stype (or (parse-summary-type *summary-pathname*) + (let* ((data (getf sections :data)) + (pre (getf sections :pre)) + (post (getf sections :post)) + + (stype (or (parse-summary-type *summary-pathname*) :human-readable)) (*header* (get-format-for stype :header)) (*footer* (get-format-for stype :footer)) (*end-of-line-format* (get-format-for stype :end-of-line-format)) (*header-line* (get-format-for stype :header-line)) - (*max-length-table-name* - (reduce #'max - (mapcar #'length - (mapcar #'format-table-name - (append (pgstate-tabnames state) - (when before (pgstate-tabnames before)) - (when finally (pgstate-tabnames finally)) - (when parallel (pgstate-tabnames parallel)) - (list legend)))))) + (*max-length-table-name* (max-length-table-name legend data pre post)) (*header-tname-format* (get-format-for stype :header-tname-format)) (*header-stats-format* (get-format-for stype :header-stats-format)) (*header-cols-format* (get-format-for stype :header-cols-format)) @@ -185,39 +187,19 @@ (when *header* (format *report-stream* *header*)) - ;; BEFORE - (if before - (progn - (report-summary :state before :footer nil) - (format *report-stream* *header-line* *max-length-table-name* "-") - (report-summary :state state :header nil :footer nil)) - ;; no state before - (report-summary :state state :footer nil)) + (when (and pre (pgstate-tabnames pre)) + (report-summary pre :footer nil) + (format *report-stream* *header-line* *max-length-table-name* "-")) - (when (or finally parallel) + (report-summary data :header (null pre) :footer nil) + + (when (and post (pgstate-tabnames post)) (format *report-stream* *header-line* *max-length-table-name* "-") - (when parallel - (report-summary :state parallel :header nil :footer nil)) - (when finally - (report-summary :state finally :header nil :footer nil))) + (report-summary post :header nil :footer nil)) - ;; add to the grand total the other sections, except for the parallel one - (incf (pgloader.utils::pgstate-secs state) - (+ (if before (pgloader.utils::pgstate-secs before) 0) - (if finally (pgloader.utils::pgstate-secs finally) 0))) - - ;; if the parallel tasks took longer than the rest cumulated, the total - ;; waiting time actually was parallel - before - (if start-time - (setf (pgloader.utils::pgstate-secs state) - (pgloader.utils::elapsed-time-since start-time)) - (when (and parallel - (< (pgloader.utils::pgstate-secs state) - (pgloader.utils::pgstate-secs parallel))) - (setf (pgloader.utils::pgstate-secs state) - (- (pgloader.utils::pgstate-secs parallel) - (if before (pgloader.utils::pgstate-secs before) 0))))) + ;; replace the grand total now + (setf (pgstate-secs data) total-secs) ;; and report the Grand Total - (report-pgstate-stats state legend))) + (report-pgstate-stats data legend))) diff --git a/src/utils/state.lisp b/src/utils/state.lisp index eec5afd..46232b7 100644 --- a/src/utils/state.lisp +++ b/src/utils/state.lisp @@ -3,7 +3,7 @@ ;;; the load: number of lines read, imported and number of errors found ;;; along the way. ;;; -(in-package :pgloader.utils) +(in-package :pgloader.state) ;;; ;;; Data Structures to maintain information about loading state @@ -26,51 +26,71 @@ (defun format-table-name (table-name) "TABLE-NAME might be a CONS of a schema name and a table name." - (typecase table-name + (etypecase table-name (cons (format nil "~a.~a" (car table-name) (cdr table-name))) (string table-name))) -(defun pgstate-get-table (pgstate name) +(defun relative-pathname (filename type &optional dbname) + "Return the pathname of a file of type TYPE (dat or log) under *ROOT-DIR*" + (let ((dir (if dbname + (uiop:merge-pathnames* + (uiop:make-pathname* :directory `(:relative ,dbname)) + *root-dir*) + *root-dir*))) + (make-pathname :defaults dir :name filename :type type))) + +(defun reject-data-file (table-name dbname) + "Return the pathname to the reject file for STATE entry." + (relative-pathname table-name "dat" dbname)) + +(defun reject-log-file (table-name dbname) + "Return the pathname to the reject file for STATE entry." + (relative-pathname table-name "log" dbname)) + +(defmethod pgtable-initialize-reject-files ((table pgtable) dbname) + "Prepare TABLE for being able to deal with rejected rows (log them)." + (let* ((table-name (format-table-name (pgtable-name table))) + (data-pathname (reject-data-file table-name dbname)) + (logs-pathname (reject-log-file table-name dbname))) + ;; we also use that facility for things that are not tables + ;; such as "fetch" or "before load" or "Create Indexes" + (when dbname + ;; create the per-database directory if it does not exists yet + (ensure-directories-exist (uiop:pathname-directory-pathname data-pathname)) + + ;; rename the existing files if there are some + (when (probe-file data-pathname) + (with-open-file (data data-pathname + :direction :output + :if-exists :rename + :if-does-not-exist nil))) + + (when (probe-file logs-pathname) + (with-open-file (logs logs-pathname + :direction :output + :if-exists :rename + :if-does-not-exist nil))) + + ;; set the properties to the right pathnames + (setf (pgtable-reject-data table) data-pathname + (pgtable-reject-logs table) logs-pathname)))) + +(defun pgstate-get-label (pgstate name) (gethash name (pgstate-tables pgstate))) -(defun pgstate-add-table (pgstate dbname table-name) +(defun pgstate-new-label (pgstate label) "Instanciate a new pgtable structure to hold our stats, and return it." - (or (pgstate-get-table pgstate table-name) - (let* ((table (setf (gethash table-name (pgstate-tables pgstate)) - (make-pgtable :name table-name))) - (reject-dir (merge-pathnames (format nil "~a/" dbname) *root-dir*)) - (filename (format-table-name table-name)) - (data-pathname (make-pathname :defaults reject-dir - :name filename :type "dat")) - (logs-pathname (make-pathname :defaults reject-dir - :name filename :type "log"))) + (or (pgstate-get-label pgstate label) + (let* ((pgtable (setf (gethash label (pgstate-tables pgstate)) + (make-pgtable :name label)))) ;; maintain the ordering - (push table-name (pgstate-tabnames pgstate)) + (push label (pgstate-tabnames pgstate)) - ;; create the per-database directory if it does not exists yet - (ensure-directories-exist reject-dir) - - ;; rename the existing files if there are some - (when (probe-file data-pathname) - (with-open-file (data data-pathname - :direction :output - :if-exists :rename - :if-does-not-exist nil))) - - (when (probe-file logs-pathname) - (with-open-file (logs logs-pathname - :direction :output - :if-exists :rename - :if-does-not-exist nil))) - - ;; set the properties to the right pathnames - (setf (pgtable-reject-data table) data-pathname - (pgtable-reject-logs table) logs-pathname) - table))) + pgtable))) (defun pgstate-setf (pgstate name &key read rows errs secs) - (let ((pgtable (pgstate-get-table pgstate name))) + (let ((pgtable (pgstate-get-label pgstate name))) (when read (setf (pgtable-read pgtable) read) (incf (pgstate-read pgstate) read)) @@ -86,7 +106,7 @@ pgtable)) (defun pgstate-incf (pgstate name &key read rows errs secs) - (let ((pgtable (pgstate-get-table pgstate name))) + (let ((pgtable (pgstate-get-label pgstate name))) (when read (incf (pgtable-read pgtable) read) (incf (pgstate-read pgstate) read)) @@ -102,7 +122,7 @@ pgtable)) (defun pgstate-decf (pgstate name &key read rows errs secs) - (let ((pgtable (pgstate-get-table pgstate name))) + (let ((pgtable (pgstate-get-label pgstate name))) (when read (decf (pgtable-read pgtable) read) (decf (pgstate-read pgstate) read)) diff --git a/src/utils/threads.lisp b/src/utils/threads.lisp index 60c95a9..fdd8dc9 100644 --- a/src/utils/threads.lisp +++ b/src/utils/threads.lisp @@ -12,7 +12,6 @@ (*copy-batch-size* . ,*copy-batch-size*) (*concurrent-batches* . ,*concurrent-batches*) (*pg-settings* . ',*pg-settings*) - (*state* . ,*state*) (*fd-path-root* . ,*fd-path-root*) (*client-min-messages* . ,*client-min-messages*) (*log-min-messages* . ,*log-min-messages*) diff --git a/src/utils/utils.lisp b/src/utils/utils.lisp index 5774bb2..98b8171 100644 --- a/src/utils/utils.lisp +++ b/src/utils/utils.lisp @@ -3,46 +3,6 @@ ;;; (in-package :pgloader.utils) -;;; -;;; Timing Macro -;;; -(defun elapsed-time-since (start) - "Return how many seconds ticked between START and now" - (let ((now (get-internal-real-time))) - (coerce (/ (- now start) internal-time-units-per-second) 'double-float))) - -(defmacro timing (&body forms) - "return both how much real time was spend in body and its result" - (let ((start (gensym)) - (end (gensym)) - (result (gensym))) - `(let* ((,start (get-internal-real-time)) - (,result (progn ,@forms)) - (,end (get-internal-real-time))) - (values ,result (/ (- ,end ,start) internal-time-units-per-second))))) - -;;; -;;; Timing Formating -;;; -(defun format-interval (seconds &optional (stream t)) - "Output the number of seconds in a human friendly way" - (multiple-value-bind (years months days hours mins secs millisecs) - (date:decode-interval (date:encode-interval :second seconds)) - (declare (ignore millisecs)) - (format - stream - "~:[~*~;~d years ~]~:[~*~;~d months ~]~:[~*~;~d days ~]~:[~*~;~dh~]~:[~*~;~dm~]~5,3fs" - (< 0 years) years - (< 0 months) months - (< 0 days) days - (< 0 hours) hours - (< 0 mins) mins - (+ secs (- (multiple-value-bind (r q) - (truncate seconds 60) - (declare (ignore r)) - q) - secs))))) - ;;; ;;; Camel Case converter ;;; diff --git a/test/archive.load b/test/archive.load index 90ccc6e..de0f6f5 100644 --- a/test/archive.load +++ b/test/archive.load @@ -59,5 +59,5 @@ LOAD ARCHIVE fields escaped by double-quote, fields terminated by ',' - FINALLY DO - $$ create index blocks_ip4r_idx on geolite.blocks using gist(iprange); $$; + AFTER LOAD DO + $$ create index blocks_ip4r_idx on geolite.blocks using gist(iprange); $$;