diff --git a/pgloader.asd b/pgloader.asd index 3ded7c8..c9da027 100644 --- a/pgloader.asd +++ b/pgloader.asd @@ -53,10 +53,14 @@ ((:file "charsets") (:file "threads") (:file "logs") - (:file "monitor" :depends-on ("logs")) + (:file "utils") (:file "state") - (:file "report" :depends-on ("state")) - (:file "utils" :depends-on ("charsets" "monitor")) + (:file "reject" :depends-on ("state")) + (:file "report" :depends-on ("state" "utils")) + (:file "monitor" :depends-on ("logs" + "state" + "reject" + "report")) (:file "archive" :depends-on ("logs")) ;; those are one-package-per-file @@ -178,8 +182,8 @@ ((:file "mysql-cast-rules") (:file "mysql-schema" :depends-on ("mysql-cast-rules")) - (:file "mysql-csv" - :depends-on ("mysql-schema")) + ;; (:file "mysql-csv" + ;; :depends-on ("mysql-schema")) (:file "mysql" :depends-on ("mysql-cast-rules" "mysql-schema")))))) diff --git a/src/main.lisp b/src/main.lisp index 730173b..9d59489 100644 --- a/src/main.lisp +++ b/src/main.lisp @@ -271,7 +271,8 @@ (with-monitor () ;; tell the user where to look for interesting things - (log-message :log "Main logs in '~a'" (probe-file *log-filename*)) + (log-message :log "Main logs in '~a'" + (uiop:native-namestring *log-filename*)) (log-message :log "Data errors in '~a'~%" *root-dir*) ;; load extra lisp code provided for by the user @@ -398,19 +399,7 @@ (parse-commands-from-file source) (parse-commands source))))))) - ;; maybe duplicate the summary to a file - (let* ((summary-stream (when *summary-pathname* - (open *summary-pathname* - :direction :output - :if-exists :rename - :if-does-not-exist :create))) - (*report-stream* (or summary-stream *standard-output*))) - (unwind-protect - ;; run the commands - (loop for func in funcs do (funcall func)) - - ;; cleanup - (when summary-stream (close summary-stream))))))) + (loop for func in funcs do (funcall func))))) ;;; diff --git a/src/package.lisp b/src/package.lisp index 990b24c..ddb298d 100644 --- a/src/package.lisp +++ b/src/package.lisp @@ -22,32 +22,28 @@ #:start-logger #:stop-logger)) -(defpackage #:pgloader.monitor +(defpackage #:pgloader.state (:use #:cl #:pgloader.params) - (:export #:with-monitor - #:*monitoring-queue* - #:log-message - #:send-event - #:start-monitor - #:stop-monitor)) + (:export #:format-table-name + #:make-pgstate + #:pgstate-tabnames + #:pgstate-tables + #:pgstate-read + #:pgstate-rows + #:pgstate-errs + #:pgstate-secs -(defpackage #:pgloader.utils - (:use #:cl #:pgloader.params) - (:import-from #:alexandria - #:appendf - #:read-file-into-string) - (:import-from #:pgloader.monitor - #:with-monitor - #:*monitoring-queue* - #:log-message) - (:export #:with-monitor ; monitor + #:pgstate-get-label + #:pgstate-new-label + #:pgstate-setf + #:pgstate-incf + #:pgstate-decf - ;; bits from alexandria - #:appendf - #:read-file-into-string - - ;; logs - #:log-message + #:pgtable-initialize-reject-files + #:pgtable-reject-data + #:pgtable-reject-logs + #:report-pgtable-stats + #:report-pgstate-stats ;; report #:report-header @@ -55,28 +51,53 @@ #:report-results #:report-footer #:report-summary - #:report-full-summary - #:with-stats-collection + #:report-full-summary)) + +(defpackage #:pgloader.monitor + (:use #:cl #:pgloader.params #:pgloader.state) + (:export #:with-monitor + #:*monitoring-queue* + #:log-message + #:new-label + #:update-stats + #:process-bad-row + #:with-stats-collection + #:send-event + #:start-monitor + #:stop-monitor + #:timing)) + +(defpackage #:pgloader.utils + (:use #:cl #:pgloader.params #:pgloader.monitor #:pgloader.state) + (:import-from #:alexandria + #:appendf + #:read-file-into-string) + + (:export #:with-monitor ; monitor + #:*monitoring-queue* + #:with-stats-collection + #:timing + + ;; bits from alexandria + #:appendf + #:read-file-into-string + + ;; state + #:make-pgstate + #:format-table-name + #:pgstate-tabnames + + ;; events + #:log-message + #:new-label + #:update-stats + #:process-bad-row ;; utils #:format-interval - #:timing #:camelCase-to-colname #:unquote - ;; state - #:format-table-name - #:make-pgstate - #:pgstate-get-table - #:pgstate-add-table - #:pgstate-setf - #:pgstate-incf - #:pgstate-decf - #:pgtable-reject-data - #:pgtable-reject-logs - #:report-pgtable-stats - #:report-pgstate-stats - ;; threads #:make-kernel @@ -243,17 +264,13 @@ #:reset-sequences)) (defpackage #:pgloader.queue - (:use #:cl #:pgloader.params) - (:import-from #:pgloader.monitor - #:log-message) + (:use #:cl #:pgloader.params #:pgloader.monitor) (:import-from #:pgloader.pgsql #:format-vector-row) (:import-from #:pgloader.sources #:map-rows #:transforms #:target) - (:import-from #:pgloader.utils - #:pgstate-incf) (:export #:map-push-queue)) diff --git a/src/params.lisp b/src/params.lisp index 26ad3aa..836168e 100644 --- a/src/params.lisp +++ b/src/params.lisp @@ -21,7 +21,6 @@ #:*copy-batch-size* #:*concurrent-batches* #:*pg-settings* - #:*state* #:*default-tmpdir* #:init-params-from-environment #:getenv-default)) @@ -68,16 +67,11 @@ (defparameter *dry-run* nil "Set to non-nil to only run checks about the load setup.") -;; we can't use pgloader.utils:make-pgstate yet because params is compiled -;; first in the asd definition, we just make the symbol a special variable. -(defparameter *state* nil - "State of the current loading.") - (defparameter *fd-path-root* nil "Where to load files from, when loading from an archive or expanding regexps.") (defparameter *root-dir* - #+unix (make-pathname :directory "/tmp/pgloader/") + #+unix (uiop:make-pathname* :directory '(:absolute "tmp/pgloader")) #-unix (uiop:merge-pathnames* "pgloader/" (uiop:ensure-directory-pathname (getenv-default "Temp"))) diff --git a/src/parsers/command-archive.lisp b/src/parsers/command-archive.lisp index 9b3129b..d8b79ab 100644 --- a/src/parsers/command-archive.lisp +++ b/src/parsers/command-archive.lisp @@ -42,32 +42,22 @@ (when (and (or before finally) (null pg-db-conn)) (error "When using a BEFORE LOAD DO or a FINALLY block, you must provide an archive level target database connection.")) `(lambda () - (let* ((start-irt (get-internal-real-time)) - (state-before (pgloader.utils:make-pgstate)) - (*state* (pgloader.utils:make-pgstate)) - ,@(pgsql-connection-bindings pg-db-conn nil) - (state-finally ,(when finally `(pgloader.utils:make-pgstate))) + (let* (,@(pgsql-connection-bindings pg-db-conn nil) (archive-file - ,(destructuring-bind (kind url) source - (ecase kind - (:http `(with-stats-collection - ("download" :state state-before) - (pgloader.archive:http-fetch-file ,url))) - (:filename url)))) - (*fd-path-root* - (with-stats-collection ("extract" :state state-before) - (pgloader.archive:expand-archive archive-file)))) + , (destructuring-bind (kind url) source + (ecase kind + (:http `(with-stats-collection + ("download" :section :pre) + (pgloader.archive:http-fetch-file ,url))) + (:filename url)))) + (*fd-path-root* + (with-stats-collection ("extract" :section :pre) + (pgloader.archive:expand-archive archive-file)))) (progn - ,(sql-code-block pg-db-conn 'state-before before "before load") + ,(sql-code-block pg-db-conn :pre before "before load") ;; import from files block ,@(loop for command in commands collect `(funcall ,command)) - ,(sql-code-block pg-db-conn 'state-finally finally "finally") - - ;; reporting - (report-full-summary "Total import time" *state* - :start-time start-irt - :before state-before - :finally state-finally))))))) + ,(sql-code-block pg-db-conn :post finally "finally"))))))) diff --git a/src/parsers/command-copy.lisp b/src/parsers/command-copy.lisp index 672fa85..ca423d2 100644 --- a/src/parsers/command-copy.lisp +++ b/src/parsers/command-copy.lisp @@ -115,20 +115,13 @@ gucs before after ((:copy-options options))) `(lambda () - (let* ((state-before (pgloader.utils:make-pgstate)) - (summary (null *state*)) - (*state* (or *state* (pgloader.utils:make-pgstate))) - (state-idx ,(when (getf options :drop-indexes) - `(pgloader.utils:make-pgstate))) - (state-after ,(when (or after (getf options :drop-indexes)) - `(pgloader.utils:make-pgstate))) - ,@(pgsql-connection-bindings pg-db-conn gucs) + (let* (,@(pgsql-connection-bindings pg-db-conn gucs) ,@(batch-control-bindings options) - (source-db (with-stats-collection ("fetch" :state state-before) + (source-db (with-stats-collection ("fetch" :section :pre) (expand (fetch-file ,copy-conn))))) (progn - ,(sql-code-block pg-db-conn 'state-before before "before load") + ,(sql-code-block pg-db-conn :pre before "before load") (let ((truncate ,(getf options :truncate)) (disable-triggers (getf ',options :disable-triggers)) @@ -146,21 +139,11 @@ :drop-indexes :disable-triggers))))) (pgloader.sources:copy-from source - :state-before state-before - :state-after state-after - :state-indexes state-idx :truncate truncate :drop-indexes drop-indexes :disable-triggers disable-triggers)) - ,(sql-code-block pg-db-conn 'state-after after "after load") - - ;; reporting - (when summary - (report-full-summary "Total import time" *state* - :before state-before - :finally state-after - :parallel state-idx)))))) + ,(sql-code-block pg-db-conn :post after "after load"))))) (defrule load-copy-file load-copy-file-command (:lambda (command) diff --git a/src/parsers/command-csv.lisp b/src/parsers/command-csv.lisp index 5fad5b1..a672c19 100644 --- a/src/parsers/command-csv.lisp +++ b/src/parsers/command-csv.lisp @@ -455,20 +455,13 @@ gucs before after ((:csv-options options))) `(lambda () - (let* ((state-before (pgloader.utils:make-pgstate)) - (summary (null *state*)) - (*state* (or *state* (pgloader.utils:make-pgstate))) - (state-idx ,(when (getf options :drop-indexes) - `(pgloader.utils:make-pgstate))) - (state-after ,(when (or after (getf options :drop-indexes)) - `(pgloader.utils:make-pgstate))) - ,@(pgsql-connection-bindings pg-db-conn gucs) + (let* (,@(pgsql-connection-bindings pg-db-conn gucs) ,@(batch-control-bindings options) - (source-db (with-stats-collection ("fetch" :state state-before) + (source-db (with-stats-collection ("fetch" :section :pre) (expand (fetch-file ,csv-conn))))) (progn - ,(sql-code-block pg-db-conn 'state-before before "before load") + ,(sql-code-block pg-db-conn :pre before "before load") (let ((truncate (getf ',options :truncate)) (disable-triggers (getf ',options :disable-triggers)) @@ -486,21 +479,11 @@ :drop-indexes :disable-triggers))))) (pgloader.sources:copy-from source - :state-before state-before - :state-after state-after - :state-indexes state-idx :truncate truncate :drop-indexes drop-indexes :disable-triggers disable-triggers)) - ,(sql-code-block pg-db-conn 'state-after after "after load") - - ;; reporting - (when summary - (report-full-summary "Total import time" *state* - :before state-before - :finally state-after - :parallel state-idx)))))) + ,(sql-code-block pg-db-conn :post after "after load"))))) (defrule load-csv-file load-csv-file-command (:lambda (command) diff --git a/src/parsers/command-dbf.lisp b/src/parsers/command-dbf.lisp index f46d5dc..35cc4c3 100644 --- a/src/parsers/command-dbf.lisp +++ b/src/parsers/command-dbf.lisp @@ -93,15 +93,11 @@ gucs before after ((:dbf-options options))) `(lambda () - (let* ((state-before (pgloader.utils:make-pgstate)) - (summary (null *state*)) - (*state* (or *state* (pgloader.utils:make-pgstate))) - (state-after ,(when after `(pgloader.utils:make-pgstate))) - ,@(pgsql-connection-bindings pg-db-conn gucs) + (let* (,@(pgsql-connection-bindings pg-db-conn gucs) ,@(batch-control-bindings options) ,@(identifier-case-binding options) (table-name ',(pgconn-table-name pg-db-conn)) - (source-db (with-stats-collection ("fetch" :state state-before) + (source-db (with-stats-collection ("fetch" :section :pre) (expand (fetch-file ,dbf-db-conn)))) (source (make-instance 'pgloader.db3:copy-db3 @@ -110,19 +106,12 @@ :source-db source-db :target table-name))) - ,(sql-code-block pg-db-conn 'state-before before "before load") + ,(sql-code-block pg-db-conn :pre before "before load") (pgloader.sources:copy-database source - :state-before state-before ,@(remove-batch-control-option options)) - ,(sql-code-block pg-db-conn 'state-after after "after load") - - ;; reporting - (when summary - (report-full-summary "Total import time" *state* - :before state-before - :finally state-after))))) + ,(sql-code-block pg-db-conn :post after "after load")))) (defrule load-dbf-file load-dbf-command (:lambda (command) diff --git a/src/parsers/command-fixed.lisp b/src/parsers/command-fixed.lisp index 0485535..af52694 100644 --- a/src/parsers/command-fixed.lisp +++ b/src/parsers/command-fixed.lisp @@ -123,20 +123,13 @@ gucs before after ((:fixed-options options))) `(lambda () - (let* ((state-before (pgloader.utils:make-pgstate)) - (summary (null *state*)) - (*state* (or *state* (pgloader.utils:make-pgstate))) - (state-idx ,(when (getf options :drop-indexes) - `(pgloader.utils:make-pgstate))) - (state-after ,(when (or after (getf options :drop-indexes)) - `(pgloader.utils:make-pgstate))) - ,@(pgsql-connection-bindings pg-db-conn gucs) + (let* (,@(pgsql-connection-bindings pg-db-conn gucs) ,@(batch-control-bindings options) - (source-db (with-stats-collection ("fetch" :state state-before) + (source-db (with-stats-collection ("fetch" :section :pre) (expand (fetch-file ,fixed-conn))))) (progn - ,(sql-code-block pg-db-conn 'state-before before "before load") + ,(sql-code-block pg-db-conn :pre before "before load") (let ((truncate ,(getf options :truncate)) (disable-triggers ,(getf options :disable-triggers)) @@ -152,21 +145,11 @@ :skip-lines ,(or (getf options :skip-line) 0)))) (pgloader.sources:copy-from source - :state-before state-before - :state-after state-after - :state-indexes state-idx :truncate truncate :drop-indexes drop-indexes :disable-triggers disable-triggers)) - ,(sql-code-block pg-db-conn 'state-after after "after load") - - ;; reporting - (when summary - (report-full-summary "Total import time" *state* - :before state-before - :finally state-after - :parallel state-idx)))))) + ,(sql-code-block pg-db-conn :post after "after load"))))) (defrule load-fixed-cols-file load-fixed-cols-file-command (:lambda (command) diff --git a/src/parsers/command-ixf.lisp b/src/parsers/command-ixf.lisp index ab0d3fa..fbad04c 100644 --- a/src/parsers/command-ixf.lisp +++ b/src/parsers/command-ixf.lisp @@ -49,15 +49,11 @@ gucs before after ((:ixf-options options))) `(lambda () - (let* ((state-before (pgloader.utils:make-pgstate)) - (summary (null *state*)) - (*state* (or *state* (pgloader.utils:make-pgstate))) - (state-after ,(when after `(pgloader.utils:make-pgstate))) - ,@(pgsql-connection-bindings pg-db-conn gucs) + (let* (,@(pgsql-connection-bindings pg-db-conn gucs) ,@(batch-control-bindings options) ,@(identifier-case-binding options) (table-name ',(pgconn-table-name pg-db-conn)) - (source-db (with-stats-collection ("fetch" :state state-before) + (source-db (with-stats-collection ("fetch" :section :pre) (expand (fetch-file ,ixf-db-conn)))) (source (make-instance 'pgloader.ixf:copy-ixf @@ -65,18 +61,12 @@ :source-db source-db :target table-name))) - ,(sql-code-block pg-db-conn 'state-before before "before load") + ,(sql-code-block pg-db-conn :pre before "before load") (pgloader.sources:copy-database source - :state-before state-before ,@(remove-batch-control-option options)) - ,(sql-code-block pg-db-conn 'state-after after "after load") - - (when summary - (report-full-summary "Total import time" *state* - :before state-before - :finally state-after))))) + ,(sql-code-block pg-db-conn :post after "after load")))) (defrule load-ixf-file load-ixf-command (:lambda (command) diff --git a/src/parsers/command-mssql.lisp b/src/parsers/command-mssql.lisp index 875d1b9..987713a 100644 --- a/src/parsers/command-mssql.lisp +++ b/src/parsers/command-mssql.lisp @@ -161,11 +161,7 @@ (let (#+sbcl(sb-ext:*muffled-warnings* 'style-warning)) (cffi:load-foreign-library 'mssql::sybdb)) - (let* ((state-before (pgloader.utils:make-pgstate)) - (*state* (or *state* (pgloader.utils:make-pgstate))) - (state-idx (pgloader.utils:make-pgstate)) - (state-after (pgloader.utils:make-pgstate)) - (*default-cast-rules* ',*mssql-default-cast-rules*) + (let* ((*default-cast-rules* ',*mssql-default-cast-rules*) (*cast-rules* ',casts) ,@(pgsql-connection-bindings pg-db-conn gucs) ,@(batch-control-bindings options) @@ -175,22 +171,14 @@ :target-db ,pg-db-conn :source-db ,ms-db-conn))) - ,(sql-code-block pg-db-conn 'state-before before "before load") + ,(sql-code-block pg-db-conn :pre before "before load") (pgloader.mssql:copy-database source - :state-before state-before - :state-after state-after - :state-indexes state-idx :including ',including :excluding ',excluding ,@(remove-batch-control-option options)) - ,(sql-code-block pg-db-conn 'state-after after "after load") - - (report-full-summary "Total import time" *state* - :before state-before - :finally state-after - :parallel state-idx)))) + ,(sql-code-block pg-db-conn :post after "after load")))) (defrule load-mssql-database load-mssql-command (:lambda (source) diff --git a/src/parsers/command-mysql.lisp b/src/parsers/command-mysql.lisp index d395d7e..b6dd1e8 100644 --- a/src/parsers/command-mysql.lisp +++ b/src/parsers/command-mysql.lisp @@ -166,11 +166,7 @@ ((:excluding excl)) ((:decoding decoding-as))) `(lambda () - (let* ((state-before (pgloader.utils:make-pgstate)) - (*state* (or *state* (pgloader.utils:make-pgstate))) - (state-idx (pgloader.utils:make-pgstate)) - (state-after (pgloader.utils:make-pgstate)) - (*default-cast-rules* ',*mysql-default-cast-rules*) + (let* ((*default-cast-rules* ',*mysql-default-cast-rules*) (*cast-rules* ',casts) ,@(pgsql-connection-bindings pg-db-conn gucs) ,@(batch-control-bindings options) @@ -180,24 +176,16 @@ :target-db ,pg-db-conn :source-db ,my-db-conn))) - ,(sql-code-block pg-db-conn 'state-before before "before load") + ,(sql-code-block pg-db-conn :pre before "before load") (pgloader.mysql:copy-database source :including ',incl :excluding ',excl :decoding-as ',decoding-as :materialize-views ',views - :state-before state-before - :state-after state-after - :state-indexes state-idx ,@(remove-batch-control-option options)) - ,(sql-code-block pg-db-conn 'state-after after "after load") - - (report-full-summary "Total import time" *state* - :before state-before - :finally state-after - :parallel state-idx)))) + ,(sql-code-block pg-db-conn :post after "after load")))) (defrule load-mysql-database load-mysql-command (:lambda (source) diff --git a/src/parsers/command-sql-block.lisp b/src/parsers/command-sql-block.lisp index 3c254fd..5ae2f1b 100644 --- a/src/parsers/command-sql-block.lisp +++ b/src/parsers/command-sql-block.lisp @@ -58,12 +58,12 @@ (bind (((_ _ sql-list-of-list) after)) (cons :after (apply #'append sql-list-of-list))))) -(defun sql-code-block (pgconn state commands label) +(defun sql-code-block (pgconn section commands label) "Return lisp code to run COMMANDS against DBNAME, updating STATE." (when commands `(with-stats-collection (,label :dbname ,(db-name pgconn) - :state ,state + :section ,section :use-result-as-read t :use-result-as-rows t) (with-pgsql-transaction (:pgconn ,pgconn) diff --git a/src/parsers/command-sqlite.lisp b/src/parsers/command-sqlite.lisp index aa7d65f..e38776f 100644 --- a/src/parsers/command-sqlite.lisp +++ b/src/parsers/command-sqlite.lisp @@ -101,20 +101,17 @@ load database ((:including incl)) ((:excluding excl))) `(lambda () - (let* ((state-before (pgloader.utils:make-pgstate)) - (*state* (pgloader.utils:make-pgstate)) - (*default-cast-rules* ',*sqlite-default-cast-rules*) + (let* ((*default-cast-rules* ',*sqlite-default-cast-rules*) (*cast-rules* ',casts) ,@(pgsql-connection-bindings pg-db-conn gucs) ,@(batch-control-bindings options) - (source-db (with-stats-collection ("fetch" :state state-before) + (source-db (with-stats-collection ("fetch" :section :pre) (expand (fetch-file ,sqlite-db-conn)))) (source (make-instance 'pgloader.sqlite::copy-sqlite :target-db ,pg-db-conn :source-db source-db))) (pgloader.sqlite:copy-database source - :state-before state-before :including ',incl :excluding ',excl ,@(remove-batch-control-option options))))) diff --git a/src/pgsql/pgsql.lisp b/src/pgsql/pgsql.lisp index b9a3a9d..42464a6 100644 --- a/src/pgsql/pgsql.lisp +++ b/src/pgsql/pgsql.lisp @@ -45,11 +45,10 @@ &key columns (truncate t) - disable-triggers - ((:state *state*) *state*)) + disable-triggers) "Fetch from the QUEUE messages containing how many rows are in the *writer-batch* for us to send down to PostgreSQL, and when that's done - update *state*." + update stats." (when truncate (truncate-tables pgconn (list table-name))) @@ -68,44 +67,10 @@ #+sbcl (when oversized? (sb-ext:gc :full t)) (log-message :debug "copy-batch ~a ~d row~:p~:[~; [oversized]~]" unqualified-table-name rows oversized?) - (pgstate-incf *state* table-name :rows rows))) + (update-stats :data table-name :rows rows))) (when disable-triggers (enable-triggers unqualified-table-name))))) -;;; -;;; When a batch has been refused by PostgreSQL with a data-exception, that -;;; means it contains non-conforming data. Log the error message in a log -;;; file and the erroneous data in a rejected data file for further -;;; processing. -;;; -(defun process-bad-row (table-name condition row) - "Add the row to the reject file, in PostgreSQL COPY TEXT format" - ;; first, update the stats. - (pgstate-incf *state* table-name :errs 1) - - ;; now, the bad row processing - (let* ((table (pgstate-get-table *state* table-name)) - (data (pgtable-reject-data table)) - (logs (pgtable-reject-logs table))) - - ;; first log the rejected data - (with-open-file (reject-data-file data - :direction :output - :if-exists :append - :if-does-not-exist :create - :external-format :utf-8) - ;; the row has already been processed when we get here - (write-string row reject-data-file)) - - ;; now log the condition signaled to reject the data - (with-open-file (reject-logs-file logs - :direction :output - :if-exists :append - :if-does-not-exist :create - :external-format :utf-8) - ;; the row has already been processed when we get here - (format reject-logs-file "~a~%" condition)))) - ;;; ;;; Compute how many rows we're going to try loading next, depending on ;;; where we are in the batch currently and where is the next-error to be diff --git a/src/pgsql/queries.lisp b/src/pgsql/queries.lisp index e7aaeb2..3758a36 100644 --- a/src/pgsql/queries.lisp +++ b/src/pgsql/queries.lisp @@ -109,22 +109,23 @@ (log-message :debug set) (pomo:execute set)))) -(defun pgsql-connect-and-execute-with-timing (pgconn label sql state &key (count 1)) +(defun pgsql-connect-and-execute-with-timing (pgconn section label sql &key (count 1)) "Run pgsql-execute-with-timing within a newly establised connection." (with-pgsql-connection (pgconn) (pomo:with-transaction () - (pgsql-execute-with-timing label sql state :count count)))) + (pgsql-execute-with-timing section label sql :count count)))) -(defun pgsql-execute-with-timing (label sql state &key (count 1)) +(defun pgsql-execute-with-timing (section label sql &key (count 1)) "Execute given SQL and resgister its timing into STATE." (multiple-value-bind (res secs) (timing - (handler-case (pgsql-execute sql) + (handler-case + (pgsql-execute sql) (cl-postgres:database-error (e) (log-message :error "~a" e) - (pgstate-incf state label :errs 1 :rows (- count))))) + (update-stats section label :errs 1 :rows (- count))))) (declare (ignore res)) - (pgstate-incf state label :read count :rows count :secs secs))) + (update-stats section label :read count :rows count :secs secs))) (defun pgsql-execute (sql &key ((:client-min-messages level))) "Execute given SQL in current transaction" diff --git a/src/pgsql/schema.lisp b/src/pgsql/schema.lisp index d24ecc8..dd20849 100644 --- a/src/pgsql/schema.lisp +++ b/src/pgsql/schema.lisp @@ -145,17 +145,17 @@ (defun create-pgsql-fkeys (pgconn all-fkeys &key - state + (section :post) (label "Foreign Keys")) "Actually create the Foreign Key References that where declared in the MySQL database" - (pgstate-add-table state (db-name pgconn) label) + (new-label section label) (loop for (table-name . fkeys) in all-fkeys do (loop for fkey in fkeys for sql = (format-pgsql-create-fkey fkey) do (log-message :notice "~a;" sql) - (pgsql-execute-with-timing "Foreign Keys" sql state)))) + (pgsql-execute-with-timing section label sql)))) ;;; @@ -341,16 +341,11 @@ ;;; Parallel index building. ;;; (defun create-indexes-in-kernel (pgconn indexes kernel channel - &key - state - (label "Create Indexes")) + &key (label "Create Indexes")) "Create indexes for given table in dbname, using given lparallel KERNEL and CHANNEL so that the index build happen in concurrently with the data copying." (let* ((lp:*kernel* kernel)) - ;; ensure we have a stats entry - (pgstate-add-table state (db-name pgconn) label) - (loop :for index :in indexes :collect (multiple-value-bind (sql pkey) @@ -362,7 +357,7 @@ #'pgsql-connect-and-execute-with-timing ;; each thread must have its own connection (new-pgsql-connection pgconn) - label sql state) + :post label sql) ;; return the pkey "upgrade" statement pkey)))) @@ -390,18 +385,18 @@ ;;; ;;; Drop indexes before loading ;;; -(defun drop-indexes (state pgsql-index-list) +(defun drop-indexes (section pgsql-index-list) "Drop indexes in PGSQL-INDEX-LIST. A PostgreSQL connection must already be active when calling that function." (loop :for index :in pgsql-index-list :do (let ((sql (format-pgsql-drop-index index))) (log-message :notice "~a" sql) - (pgsql-execute-with-timing "drop indexes" sql state)))) + (pgsql-execute-with-timing section "drop indexes" sql)))) ;;; ;;; Higher level API to care about indexes ;;; -(defun maybe-drop-indexes (target table-name state &key drop-indexes) +(defun maybe-drop-indexes (target table-name &key (section :pre) drop-indexes) "Drop the indexes for TABLE-NAME on TARGET PostgreSQL connection, and returns a list of indexes to create again." (with-pgsql-connection (target) @@ -420,14 +415,13 @@ (indexes ;; drop the indexes now - (with-stats-collection ("drop indexes" :state state) - (drop-indexes state indexes)))) + (with-stats-collection ("drop indexes" :section section) + (drop-indexes section indexes)))) ;; and return the indexes list indexes))) -(defun create-indexes-again (target indexes state state-parallel - &key drop-indexes) +(defun create-indexes-again (target indexes &key (section :post) drop-indexes) "Create the indexes that we dropped previously." (when (and indexes drop-indexes) (let* ((*preserve-index-names* t) @@ -438,29 +432,27 @@ (idx-channel (let ((lp:*kernel* idx-kernel)) (lp:make-channel)))) (let ((pkeys - (create-indexes-in-kernel target indexes idx-kernel idx-channel - :state state-parallel))) + (create-indexes-in-kernel target indexes idx-kernel idx-channel))) - (with-stats-collection ("Index Build Completion" :state state) + (with-stats-collection ("Index Build Completion" :section section) (loop :for idx :in indexes :do (lp:receive-result idx-channel))) ;; turn unique indexes into pkeys now (with-pgsql-connection (target) - (with-stats-collection ("Constraints" :state state) + (with-stats-collection ("Constraints" :section section) (loop :for sql :in pkeys :when sql :do (progn (log-message :notice "~a" sql) - (pgsql-execute-with-timing "Constraints" sql state))))))))) + (pgsql-execute-with-timing section "Constraints" sql))))))))) ;;; ;;; Sequences ;;; -(defun reset-sequences (table-names &key pgconn state) +(defun reset-sequences (table-names &key pgconn (section :post)) "Reset all sequences created during this MySQL migration." (log-message :notice "Reset sequences") (with-stats-collection ("Reset Sequences" - :dbname (db-name pgconn) :use-result-as-rows t - :state state) + :section section) (reset-all-sequences pgconn :tables table-names))) diff --git a/src/queue.lisp b/src/queue.lisp index 3674e2a..9c829ff 100644 --- a/src/queue.lisp +++ b/src/queue.lisp @@ -55,7 +55,7 @@ (condition (e) (log-message :error "~a" e) - (pgstate-incf *state* (target copy) :errs 1)))) + (update-stats :data (target copy) :errs 1)))) (defun map-push-queue (copy queue &optional pre-formatted) "Apply MAP-ROWS on the COPY instance and a function of ROW that will push diff --git a/src/sources/common/project-fields.lisp b/src/sources/common/project-fields.lisp index 7567425..f666647 100644 --- a/src/sources/common/project-fields.lisp +++ b/src/sources/common/project-fields.lisp @@ -126,7 +126,7 @@ lines we did read in the file." (let ((projection (project-fields :fields fields :columns columns))) (lambda (row) - (pgstate-incf *state* target :read 1) + (update-stats :data target :read 1) ;; cl-csv returns (nil) for an empty line (if (or (null row) (and (null (car row)) (null (cdr row)))) @@ -137,11 +137,8 @@ (handler-case (funcall projection row) (condition (e) - (pgstate-incf *state* target :errs 1) - (log-message :error "Could not read line ~d: ~a" - (pgloader.utils::pgtable-read - (pgstate-get-table *state* target)) - e))))) + (update-stats :data target :errs 1) + (log-message :error "Could not read input: ~a" e))))) (when projected-vector (funcall process-row-fn projected-vector))))))) diff --git a/src/sources/copy.lisp b/src/sources/copy.lisp index 2c4c56c..8a9d229 100644 --- a/src/sources/copy.lisp +++ b/src/sources/copy.lisp @@ -93,7 +93,7 @@ (condition (e) (progn (log-message :error "~a" e) - (pgstate-incf *state* (target copy) :errs 1)))))))))) + (update-stats :data (target copy) :errs 1)))))))))) (defmethod copy-to-queue ((copy copy-copy) queue) "Copy data from given COPY definition into lparallel.queue DATAQ" @@ -101,27 +101,18 @@ (defmethod copy-from ((copy copy-copy) &key - state-before - state-after - state-indexes truncate disable-triggers drop-indexes) "Copy data from given COPY file definition into its PostgreSQL target table." - (let* ((summary (null *state*)) - (*state* (or *state* (pgloader.utils:make-pgstate))) - (lp:*kernel* (make-kernel 2)) + (let* ((lp:*kernel* (make-kernel 2)) (channel (lp:make-channel)) (queue (lq:make-queue :fixed-capacity *concurrent-batches*)) (indexes (maybe-drop-indexes (target-db copy) (target copy) - state-before :drop-indexes drop-indexes))) - (with-stats-collection ((target copy) - :dbname (db-name (target-db copy)) - :state *state* - :summary summary) + (with-stats-collection ((target copy) :dbname (db-name (target-db copy))) (lp:task-handler-bind ((error #'lp:invoke-transfer-error)) (log-message :notice "COPY ~a" (target copy)) (lp:submit-task channel #'copy-to-queue copy queue) @@ -144,6 +135,6 @@ finally (lp:end-kernel)))) ;; re-create the indexes - (create-indexes-again (target-db copy) indexes state-after state-indexes + (create-indexes-again (target-db copy) indexes :drop-indexes drop-indexes))) diff --git a/src/sources/csv/csv.lisp b/src/sources/csv/csv.lisp index 7d77697..6d20660 100644 --- a/src/sources/csv/csv.lisp +++ b/src/sources/csv/csv.lisp @@ -126,7 +126,7 @@ (condition (e) (progn (log-message :error "~a" e) - (pgstate-incf *state* (target csv) :errs 1))))))))) + (update-stats :data (target csv) :errs 1))))))))) (defmethod copy-to-queue ((csv copy-csv) queue) "Copy data from given CSV definition into lparallel.queue DATAQ" @@ -134,26 +134,18 @@ (defmethod copy-from ((csv copy-csv) &key - state-before - state-after - state-indexes truncate disable-triggers drop-indexes) "Copy data from given CSV file definition into its PostgreSQL target table." - (let* ((summary (null *state*)) - (*state* (or *state* (pgloader.utils:make-pgstate))) - (lp:*kernel* (make-kernel 2)) + (let* ((lp:*kernel* (make-kernel 2)) (channel (lp:make-channel)) (queue (lq:make-queue :fixed-capacity *concurrent-batches*)) (indexes (maybe-drop-indexes (target-db csv) (target csv) - state-before :drop-indexes drop-indexes))) - (with-stats-collection ((target csv) - :dbname (db-name (target-db csv)) - :state *state* :summary summary) + (with-stats-collection ((target csv) :dbname (db-name (target-db csv))) (lp:task-handler-bind () ;; ((error #'lp:invoke-transfer-error)) (log-message :notice "COPY ~a" (target csv)) (lp:submit-task channel #'copy-to-queue csv queue) @@ -176,5 +168,4 @@ finally (lp:end-kernel)))) ;; re-create the indexes - (create-indexes-again (target-db csv) indexes state-after state-indexes - :drop-indexes drop-indexes))) + (create-indexes-again (target-db csv) indexes :drop-indexes drop-indexes))) diff --git a/src/sources/db3/db3.lisp b/src/sources/db3/db3.lisp index f0af6ae..36d67b6 100644 --- a/src/sources/db3/db3.lisp +++ b/src/sources/db3/db3.lisp @@ -59,21 +59,15 @@ (defmethod copy-to-queue ((db3 copy-db3) queue) "Copy data from DB3 file FILENAME into queue DATAQ" - (let ((read (pgloader.queue:map-push-queue db3 queue))) - (pgstate-incf *state* (target db3) :read read))) + (pgloader.queue:map-push-queue db3 queue)) (defmethod copy-from ((db3 copy-db3) &key (kernel nil k-s-p) truncate disable-triggers) - (let* ((summary (null *state*)) - (*state* (or *state* (pgloader.utils:make-pgstate))) - (lp:*kernel* (or kernel (make-kernel 2))) + (let* ((lp:*kernel* (or kernel (make-kernel 2))) (channel (lp:make-channel)) (queue (lq:make-queue :fixed-capacity *concurrent-batches*))) - (with-stats-collection ((target db3) - :dbname (db-name (target-db db3)) - :state *state* - :summary summary) + (with-stats-collection ((target db3) :dbname (db-name (target-db db3))) (lp:task-handler-bind ((error #'lp:invoke-transfer-error)) (log-message :notice "COPY \"~a\" from '~a'" (target db3) (source db3)) (lp:submit-task channel #'copy-to-queue db3 queue) @@ -94,7 +88,6 @@ (defmethod copy-database ((db3 copy-db3) &key table-name - state-before data-only schema-only (truncate t) @@ -105,9 +98,7 @@ (reset-sequences t)) "Open the DB3 and stream its content to a PostgreSQL database." (declare (ignore create-indexes reset-sequences)) - (let* ((summary (null *state*)) - (*state* (or *state* (make-pgstate))) - (table-name (or table-name + (let* ((table-name (or table-name (target db3) (source db3)))) @@ -116,9 +107,7 @@ (handler-case (when (and (or create-tables schema-only) (not data-only)) - (with-stats-collection ("create, truncate" - :state state-before - :summary summary) + (with-stats-collection ("create, truncate" :section :pre) (with-pgsql-transaction (:pgconn (target-db db3)) (when create-tables (with-schema (tname table-name) @@ -133,9 +122,4 @@ (return-from copy-database))) (unless schema-only - (copy-from db3 :truncate truncate :disable-triggers disable-triggers)) - - ;; and report the total time spent on the operation - (when summary - (report-full-summary "Total streaming time" *state* - :before state-before)))) + (copy-from db3 :truncate truncate :disable-triggers disable-triggers)))) diff --git a/src/sources/fixed.lisp b/src/sources/fixed.lisp index 54620f5..5b116dc 100644 --- a/src/sources/fixed.lisp +++ b/src/sources/fixed.lisp @@ -79,7 +79,7 @@ (condition (e) (progn (log-message :error "~a" e) - (pgstate-incf *state* (target fixed) :errs 1)))))))))) + (update-stats :data (target fixed) :errs 1)))))))))) (defmethod copy-to-queue ((fixed copy-fixed) queue) "Copy data from given FIXED definition into lparallel.queue DATAQ" @@ -87,27 +87,18 @@ (defmethod copy-from ((fixed copy-fixed) &key - state-before - state-after - state-indexes truncate disable-triggers drop-indexes) "Copy data from given FIXED file definition into its PostgreSQL target table." - (let* ((summary (null *state*)) - (*state* (or *state* (pgloader.utils:make-pgstate))) - (lp:*kernel* (make-kernel 2)) + (let* ((lp:*kernel* (make-kernel 2)) (channel (lp:make-channel)) (queue (lq:make-queue :fixed-capacity *concurrent-batches*)) (indexes (maybe-drop-indexes (target-db fixed) (target fixed) - state-before :drop-indexes drop-indexes))) - (with-stats-collection ((target fixed) - :dbname (db-name (target-db fixed)) - :state *state* - :summary summary) + (with-stats-collection ((target fixed) :dbname (db-name (target-db fixed))) (lp:task-handler-bind () ;; ((error #'lp:invoke-transfer-error)) (log-message :notice "COPY ~a" (target fixed)) (lp:submit-task channel #'copy-to-queue fixed queue) @@ -130,6 +121,5 @@ finally (lp:end-kernel)))) ;; re-create the indexes - (create-indexes-again (target-db fixed) indexes state-after state-indexes - :drop-indexes drop-indexes))) + (create-indexes-again (target-db fixed) indexes :drop-indexes drop-indexes))) diff --git a/src/sources/ixf/ixf.lisp b/src/sources/ixf/ixf.lisp index 8872666..695ec6b 100644 --- a/src/sources/ixf/ixf.lisp +++ b/src/sources/ixf/ixf.lisp @@ -61,28 +61,22 @@ (with-connection (conn (source-db copy-ixf)) (let ((ixf (ixf:make-ixf-file :stream (conn-handle conn))) (row-fn (lambda (row) - (pgstate-incf *state* (target copy-ixf) :read 1) + (update-stats :data (target copy-ixf) :read 1) (funcall process-row-fn row)))) (ixf:read-headers ixf) (ixf:map-data ixf row-fn)))) (defmethod copy-to-queue ((ixf copy-ixf) queue) "Copy data from IXF file FILENAME into queue DATAQ" - (let ((read (pgloader.queue:map-push-queue ixf queue))) - (pgstate-incf *state* (target ixf) :read read))) + (pgloader.queue:map-push-queue ixf queue)) (defmethod copy-from ((ixf copy-ixf) &key (kernel nil k-s-p) truncate disable-triggers) - (let* ((summary (null *state*)) - (*state* (or *state* (pgloader.utils:make-pgstate))) - (lp:*kernel* (or kernel (make-kernel 2))) + (let* ((lp:*kernel* (or kernel (make-kernel 2))) (channel (lp:make-channel)) (queue (lq:make-queue :fixed-capacity *concurrent-batches*))) - (with-stats-collection ((target ixf) - :dbname (db-name (target-db ixf)) - :state *state* - :summary summary) + (with-stats-collection ((target ixf) :dbname (db-name (target-db ixf))) (lp:task-handler-bind ((error #'lp:invoke-transfer-error)) (log-message :notice "COPY \"~a\" from '~a'" (target ixf) (source ixf)) (lp:submit-task channel #'copy-to-queue ixf queue) @@ -103,7 +97,6 @@ (defmethod copy-database ((ixf copy-ixf) &key table-name - state-before data-only schema-only (truncate t) @@ -114,9 +107,7 @@ (reset-sequences t)) "Open the IXF and stream its content to a PostgreSQL database." (declare (ignore create-indexes reset-sequences)) - (let* ((summary (null *state*)) - (*state* (or *state* (make-pgstate))) - (table-name (or table-name + (let* ((table-name (or table-name (target ixf) (source ixf)))) @@ -125,9 +116,7 @@ (handler-case (when (and (or create-tables schema-only) (not data-only)) - (with-stats-collection ("create, truncate" - :state state-before - :summary summary) + (with-stats-collection ("create, truncate" :section :pre) (with-pgsql-transaction (:pgconn (target-db ixf)) (when create-tables (with-schema (tname table-name) @@ -142,10 +131,5 @@ (return-from copy-database))) (unless schema-only - (copy-from ixf :truncate truncate :disable-triggers disable-triggers)) - - ;; and report the total time spent on the operation - (when summary - (report-full-summary "Total streaming time" *state* - :before state-before)))) + (copy-from ixf :truncate truncate :disable-triggers disable-triggers)))) diff --git a/src/sources/mssql/mssql.lisp b/src/sources/mssql/mssql.lisp index 9daee61..0889e9a 100644 --- a/src/sources/mssql/mssql.lisp +++ b/src/sources/mssql/mssql.lisp @@ -37,7 +37,7 @@ table-name))) (row-fn (lambda (row) - (pgstate-incf *state* (target mssql) :read 1) + (update-stats :data (target mssql) :read 1) (funcall process-row-fn row)))) (log-message :debug "~a" sql) (handler-case @@ -45,7 +45,7 @@ ((condition #'(lambda (c) (log-message :error "~a" c) - (pgstate-incf *state* (target mssql) :errs 1) + (update-stats :data (target mssql) :errs 1) (invoke-restart 'mssql::use-nil)))) (mssql::map-query-results sql :row-fn row-fn @@ -53,7 +53,7 @@ (condition (e) (progn (log-message :error "~a" e) - (pgstate-incf *state* (target mssql) :errs 1))))))) + (update-stats :data (target mssql) :errs 1))))))) (defmethod copy-to-queue ((mssql copy-mssql) queue) "Copy data from MSSQL table DBNAME.TABLE-NAME into queue DATAQ" @@ -62,19 +62,14 @@ (defmethod copy-from ((mssql copy-mssql) &key (kernel nil k-s-p) truncate disable-triggers) "Connect in parallel to MSSQL and PostgreSQL and stream the data." - (let* ((summary (null *state*)) - (*state* (or *state* (pgloader.utils:make-pgstate))) - (lp:*kernel* (or kernel (make-kernel 2))) + (let* ((lp:*kernel* (or kernel (make-kernel 2))) (channel (lp:make-channel)) (queue (lq:make-queue :fixed-capacity *concurrent-batches*)) (table-name (target mssql))) ;; we account stats against the target table-name, because that's all we ;; know on the PostgreSQL thread - (with-stats-collection (table-name - :dbname (db-name (target-db mssql)) - :state *state* - :summary summary) + (with-stats-collection (table-name :dbname (db-name (target-db mssql))) (lp:task-handler-bind ((error #'lp:invoke-transfer-error)) (log-message :notice "COPY ~a" table-name) ;; read data from Mssql @@ -97,7 +92,6 @@ (defun complete-pgsql-database (pgconn all-columns all-fkeys pkeys &key - state data-only foreign-keys reset-sequences) @@ -111,18 +105,17 @@ ;; (when reset-sequences (let ((table-names (mapcar #'car (qualified-table-name-list all-columns)))) - (reset-sequences table-names :pgconn pgconn :state state))) + (reset-sequences table-names :pgconn pgconn))) ;; ;; Turn UNIQUE indexes into PRIMARY KEYS now ;; (with-pgsql-connection (pgconn) - (pgstate-add-table state (db-name pgconn) "Primary Keys") (loop :for sql :in pkeys :when sql :do (progn (log-message :notice "~a" sql) - (pgsql-execute-with-timing "Primary Keys" sql state))) + (pgsql-execute-with-timing :post "Primary Keys" sql))) ;; ;; Foreign Key Constraints @@ -132,22 +125,21 @@ ;; and indexes are imported before doing that. ;; (when (and foreign-keys (not data-only)) - (pgstate-add-table state (db-name pgconn) "Foreign Keys") (loop :for (schema . tables) :in all-fkeys :do (loop :for (table-name . fkeys) :in tables :do (loop :for (fk-name . fkey) :in fkeys :for sql := (format-pgsql-create-fkey fkey) :do (progn (log-message :notice "~a;" sql) - (pgsql-execute-with-timing "Foreign Keys" sql state)))))))) + (pgsql-execute-with-timing :post "Foreign Keys" sql)))))))) -(defun fetch-mssql-metadata (mssql &key state including excluding) +(defun fetch-mssql-metadata (mssql &key including excluding) "MS SQL introspection to prepare the migration." (let (all-columns all-indexes all-fkeys) (with-stats-collection ("fetch meta data" :use-result-as-rows t :use-result-as-read t - :state state) + :section :pre) (with-connection (*mssql-db* (source-db mssql)) (setf all-columns (list-all-columns :including including :excluding excluding)) @@ -172,9 +164,6 @@ (defmethod copy-database ((mssql copy-mssql) &key - state-before - state-after - state-indexes (truncate nil) (disable-triggers nil) (data-only nil) @@ -189,19 +178,13 @@ including excluding) "Stream the given MS SQL database down to PostgreSQL." - (let* ((summary (null *state*)) - (*state* (or *state* (make-pgstate))) - (idx-state (or state-indexes (make-pgstate))) - (state-before (or state-before (make-pgstate))) - (state-after (or state-after (make-pgstate))) - (cffi:*default-foreign-encoding* encoding) + (let* ((cffi:*default-foreign-encoding* encoding) (copy-kernel (make-kernel 2)) idx-kernel idx-channel) (destructuring-bind (&key all-columns all-indexes all-fkeys pkeys) ;; to prepare the run we need to fetch MS SQL meta-data (fetch-mssql-metadata mssql - :state state-before :including including :excluding excluding) @@ -220,9 +203,7 @@ (handler-case (cond ((and (or create-tables schema-only) (not data-only)) (log-message :notice "~:[~;DROP then ~]CREATE TABLES" include-drop) - (with-stats-collection ("create, truncate" - :state state-before - :summary summary) + (with-stats-collection ("create, truncate" :section :pre) (with-pgsql-transaction (:pgconn (target-db mssql)) (loop :for (schema . tables) :in all-columns :do (let ((schema (apply-identifier-case schema))) @@ -305,8 +286,7 @@ (create-indexes-in-kernel (target-db mssql) (mapcar #'cdr indexes-with-names) idx-kernel - idx-channel - :state idx-state))))))) + idx-channel))))))) ;; now end the kernels (let ((lp:*kernel* copy-kernel)) (lp:end-kernel)) @@ -314,7 +294,7 @@ ;; wait until the indexes are done being built... ;; don't forget accounting for that waiting time. (when (and create-indexes (not data-only)) - (with-stats-collection ("Index Build Completion" :state *state*) + (with-stats-collection ("Index Build Completion" :section :post) (loop for idx in all-indexes do (lp:receive-result idx-channel)))) (lp:end-kernel)) @@ -323,14 +303,6 @@ ;; (complete-pgsql-database (new-pgsql-connection (target-db mssql)) all-columns all-fkeys pkeys - :state state-after :data-only data-only :foreign-keys foreign-keys - :reset-sequences reset-sequences) - - ;; and report the total time spent on the operation - (when summary - (report-full-summary "Total streaming time" *state* - :before state-before - :finally state-after - :parallel idx-state))))) + :reset-sequences reset-sequences)))) diff --git a/src/sources/mysql/mysql.lisp b/src/sources/mysql/mysql.lisp index 2425fe6..76058c2 100644 --- a/src/sources/mysql/mysql.lisp +++ b/src/sources/mysql/mysql.lisp @@ -52,18 +52,18 @@ (sql (format nil "SELECT ~{~a~^, ~} FROM `~a`;" cols table-name)) (row-fn (lambda (row) - (pgstate-incf *state* (target mysql) :read 1) + (update-stats :data (target mysql) :read 1) (funcall process-row-fn row)))) (handler-bind ;; avoid trying to fetch the character at end-of-input position... ((babel-encodings:end-of-input-in-character #'(lambda (c) - (pgstate-incf *state* (target mysql) :errs 1) + (update-stats :data (target mysql) :errs 1) (log-message :error "~a" c) (invoke-restart 'qmynd-impl::use-nil))) (babel-encodings:character-decoding-error #'(lambda (c) - (pgstate-incf *state* (target mysql) :errs 1) + (update-stats :data (target mysql) :errs 1) (let ((encoding (babel-encodings:character-coding-error-encoding c)) (position (babel-encodings:character-coding-error-position c)) (character @@ -106,19 +106,14 @@ (defmethod copy-from ((mysql copy-mysql) &key (kernel nil k-s-p) truncate disable-triggers) "Connect in parallel to MySQL and PostgreSQL and stream the data." - (let* ((summary (null *state*)) - (*state* (or *state* (pgloader.utils:make-pgstate))) - (lp:*kernel* (or kernel (make-kernel 2))) + (let* ((lp:*kernel* (or kernel (make-kernel 2))) (channel (lp:make-channel)) (queue (lq:make-queue :fixed-capacity *concurrent-batches*)) (table-name (target mysql))) ;; we account stats against the target table-name, because that's all we ;; know on the PostgreSQL thread - (with-stats-collection (table-name - :dbname (db-name (target-db mysql)) - :state *state* - :summary summary) + (with-stats-collection (table-name :dbname (db-name (target-db mysql))) (lp:task-handler-bind ((error #'lp:invoke-transfer-error)) (log-message :notice "COPY ~a" table-name) ;; read data from MySQL @@ -150,7 +145,6 @@ all-columns all-indexes all-fkeys materialize-views view-columns &key - state foreign-keys include-drop) "Prepare the target PostgreSQL database: create tables casting datatypes @@ -165,7 +159,7 @@ (length all-columns) (loop for (name . idxs) in all-indexes sum (length idxs))) - (with-stats-collection ("create, drop" :use-result-as-rows t :state state) + (with-stats-collection ("create, drop" :use-result-as-rows t :section :pre) (with-pgsql-transaction (:pgconn pgconn) ;; we need to first drop the Foreign Key Constraints, so that we ;; can DROP TABLE when asked @@ -188,7 +182,6 @@ (defun complete-pgsql-database (pgconn all-columns all-fkeys pkeys table-comments column-comments &key - state data-only foreign-keys reset-sequences) @@ -201,18 +194,17 @@ ;; while CREATE INDEX statements are in flight (avoid locking). ;; (when reset-sequences - (reset-sequences (mapcar #'car all-columns) :pgconn pgconn :state state)) + (reset-sequences (mapcar #'car all-columns) :pgconn pgconn)) (with-pgsql-connection (pgconn) ;; ;; Turn UNIQUE indexes into PRIMARY KEYS now ;; - (pgstate-add-table state (db-name pgconn) "Primary Keys") (loop :for sql :in pkeys :when sql :do (progn (log-message :notice "~a" sql) - (pgsql-execute-with-timing "Primary Keys" sql state))) + (pgsql-execute-with-timing :post "Primary Keys" sql))) ;; ;; Foreign Key Constraints @@ -222,19 +214,17 @@ ;; and indexes are imported before doing that. ;; (when (and foreign-keys (not data-only)) - (pgstate-add-table state (db-name pgconn) "Foreign Keys") (loop :for (table-name . fkeys) :in all-fkeys :do (loop :for fkey :in fkeys :for sql := (format-pgsql-create-fkey fkey) :do (progn (log-message :notice "~a;" sql) - (pgsql-execute-with-timing "Foreign Keys" sql state))))) + (pgsql-execute-with-timing :post "Foreign Keys" sql))))) ;; ;; And now, comments on tables and columns. ;; (log-message :notice "Comments") - (pgstate-add-table state (db-name pgconn) "Comments") (let* ((quote ;; just something improbably found in a table comment, to use as ;; dollar quoting, and generated at random at that. @@ -256,7 +246,7 @@ quote comment quote) :do (progn (log-message :log "~a" sql) - (pgsql-execute-with-timing "Comments" sql state))) + (pgsql-execute-with-timing :post "Comments" sql))) (loop :for (table-name column-name comment) :in column-comments :for sql := (format nil "comment on column ~a.~a is $~a$~a$~a$" @@ -265,11 +255,10 @@ quote comment quote) :do (progn (log-message :notice "~a;" sql) - (pgsql-execute-with-timing "Comments" sql state)))))) + (pgsql-execute-with-timing :post "Comments" sql)))))) (defun fetch-mysql-metadata (mysql &key - state materialize-views only-tables including @@ -282,7 +271,7 @@ (with-stats-collection ("fetch meta data" :use-result-as-rows t :use-result-as-read t - :state state) + :section :pre) (with-connection (*connection* (source-db mysql)) ;; If asked to MATERIALIZE VIEWS, now is the time to create them in ;; MySQL, when given definitions rather than existing view names. @@ -350,9 +339,6 @@ ;;; (defmethod copy-database ((mysql copy-mysql) &key - state-before - state-after - state-indexes (truncate nil) (disable-triggers nil) (data-only nil) @@ -369,12 +355,7 @@ decoding-as materialize-views) "Export MySQL data and Import it into PostgreSQL" - (let* ((summary (null *state*)) - (*state* (or *state* (make-pgstate))) - (idx-state (or state-indexes (make-pgstate))) - (state-before (or state-before (make-pgstate))) - (state-after (or state-after (make-pgstate))) - (copy-kernel (make-kernel 2)) + (let* ((copy-kernel (make-kernel 2)) idx-kernel idx-channel) (destructuring-bind (&key view-columns all-columns @@ -382,7 +363,6 @@ all-fkeys all-indexes pkeys) ;; to prepare the run, we need to fetch MySQL meta-data (fetch-mysql-metadata mysql - :state state-before :materialize-views materialize-views :only-tables only-tables :including including @@ -409,7 +389,6 @@ all-fkeys materialize-views view-columns - :state state-before :foreign-keys foreign-keys :include-drop include-drop)) (t @@ -479,8 +458,7 @@ (alexandria:appendf pkeys (create-indexes-in-kernel (target-db mysql) - indexes idx-kernel idx-channel - :state idx-state)))))) + indexes idx-kernel idx-channel)))))) ;; now end the kernels (let ((lp:*kernel* copy-kernel)) (lp:end-kernel)) @@ -488,7 +466,7 @@ ;; wait until the indexes are done being built... ;; don't forget accounting for that waiting time. (when (and create-indexes (not data-only)) - (with-stats-collection ("Index Build Completion" :state *state*) + (with-stats-collection ("Index Build Completion" :section :post) (loop for idx in all-indexes do (lp:receive-result idx-channel)))) (lp:end-kernel)) @@ -505,14 +483,6 @@ (complete-pgsql-database (new-pgsql-connection (target-db mysql)) all-columns all-fkeys pkeys table-comments column-comments - :state state-after :data-only data-only :foreign-keys foreign-keys - :reset-sequences reset-sequences) - - ;; and report the total time spent on the operation - (when summary - (report-full-summary "Total streaming time" *state* - :before state-before - :finally state-after - :parallel idx-state))))) + :reset-sequences reset-sequences)))) diff --git a/src/sources/sqlite/sqlite.lisp b/src/sources/sqlite/sqlite.lisp index e086e03..c0a7308 100644 --- a/src/sources/sqlite/sqlite.lisp +++ b/src/sources/sqlite/sqlite.lisp @@ -106,14 +106,14 @@ v) counting t into rows do (progn - (pgstate-incf *state* (target sqlite) :read 1) + (update-stats :data (target sqlite) :read 1) (funcall process-row-fn row)) finally (sqlite:finalize-statement statement) (return rows)) (condition (e) (log-message :error "~a" e) - (pgstate-incf *state* (target sqlite) :errs 1))))))) + (update-stats :data (target sqlite) :errs 1))))))) (defmethod copy-to-queue ((sqlite copy-sqlite) queue) @@ -123,16 +123,11 @@ (defmethod copy-from ((sqlite copy-sqlite) &key (kernel nil k-s-p) truncate disable-triggers) "Stream the contents from a SQLite database table down to PostgreSQL." - (let* ((summary (null *state*)) - (*state* (or *state* (pgloader.utils:make-pgstate))) - (lp:*kernel* (or kernel (make-kernel 2))) + (let* ((lp:*kernel* (or kernel (make-kernel 2))) (channel (lp:make-channel)) (queue (lq:make-queue :fixed-capacity *concurrent-batches*))) - (with-stats-collection ((target sqlite) - :dbname (db-name (target-db sqlite)) - :state *state* - :summary summary) + (with-stats-collection ((target sqlite) :dbname (db-name (target-db sqlite))) (lp:task-handler-bind ((error #'lp:invoke-transfer-error)) (log-message :notice "COPY ~a" (target sqlite)) ;; read data from SQLite @@ -151,17 +146,13 @@ (log-message :info "COPY ~a done." (target sqlite)) (unless k-s-p (lp:end-kernel))))))) -(defun fetch-sqlite-metadata (sqlite - &key - state - including - excluding) +(defun fetch-sqlite-metadata (sqlite &key including excluding) "SQLite introspection to prepare the migration." (let (all-columns all-indexes) (with-stats-collection ("fetch meta data" :use-result-as-rows t :use-result-as-read t - :state state) + :section :pre) (with-connection (conn (source-db sqlite)) (let ((*sqlite-db* (conn-handle conn))) (setf all-columns (filter-column-list (list-all-columns *sqlite-db*) @@ -182,7 +173,6 @@ (defmethod copy-database ((sqlite copy-sqlite) &key - state-before data-only schema-only (truncate nil) @@ -197,20 +187,12 @@ (encoding :utf-8)) "Stream the given SQLite database down to PostgreSQL." (declare (ignore only-tables)) - (let* ((summary (null *state*)) - (*state* (or *state* (make-pgstate))) - (state-before (or state-before (make-pgstate))) - (idx-state (make-pgstate)) - (seq-state (make-pgstate)) - (cffi:*default-foreign-encoding* encoding) + (let* ((cffi:*default-foreign-encoding* encoding) (copy-kernel (make-kernel 2)) idx-kernel idx-channel) (destructuring-bind (&key all-columns all-indexes pkeys) - (fetch-sqlite-metadata sqlite - :state state-before - :including including - :excluding excluding) + (fetch-sqlite-metadata sqlite :including including :excluding excluding) (let ((max-indexes (loop for (table . indexes) in all-indexes @@ -227,9 +209,7 @@ (handler-case (cond ((and (or create-tables schema-only) (not data-only)) (log-message :notice "~:[~;DROP then ~]CREATE TABLES" include-drop) - (with-stats-collection ("create, truncate" - :state state-before - :summary summary) + (with-stats-collection ("create, truncate" :section :pre) (with-pgsql-transaction (:pgconn (target-db sqlite)) (create-tables all-columns :include-drop include-drop)))) @@ -271,8 +251,7 @@ (alexandria:appendf pkeys (create-indexes-in-kernel (target-db sqlite) indexes - idx-kernel idx-channel - :state idx-state)))))) + idx-kernel idx-channel)))))) ;; now end the kernels (let ((lp:*kernel* copy-kernel)) (lp:end-kernel)) @@ -280,7 +259,7 @@ ;; wait until the indexes are done being built... ;; don't forget accounting for that waiting time. (when (and create-indexes (not data-only)) - (with-stats-collection ("index build completion" :state *state*) + (with-stats-collection ("index build completion" :section :post) (loop for idx in all-indexes do (lp:receive-result idx-channel)))) (lp:end-kernel)) @@ -288,12 +267,5 @@ ;; the data. (when reset-sequences (reset-sequences (mapcar #'car all-columns) - :pgconn (target-db sqlite) - :state seq-state)) - - ;; and report the total time spent on the operation - (report-full-summary "Total streaming time" *state* - :before state-before - :finally seq-state - :parallel idx-state)))) + :pgconn (target-db sqlite)))))) diff --git a/src/utils/monitor.lisp b/src/utils/monitor.lisp index b3c3f8d..6f08b1e 100644 --- a/src/utils/monitor.lisp +++ b/src/utils/monitor.lisp @@ -18,8 +18,8 @@ (defvar *monitoring-channel* nil "Internal lparallel channel.") -(defvar *sections* nil - "List of currently monitored activities (per category or concurrency.") +(defvar *sections* '(:pre nil :data nil :post nil) + "plist of load sections: :pre, :data and :post.") ;;; @@ -29,8 +29,9 @@ (defstruct stop stop-logger) (defstruct noop) (defstruct log-message category description arguments) -(defstruct new-label dbname section label) +(defstruct new-label section label dbname) (defstruct update-stats section label read rows errs secs rs ws) +(defstruct bad-row section label condition data) (defun log-message (category description &rest arguments) "Send given message into our monitoring queue for processing." @@ -38,6 +39,59 @@ :description description :arguments arguments))) +(defun new-label (section label &optional dbname) + "Send an event to create a new LABEL for registering a shared state under + SECTION." + (send-event (make-new-label :section section :label label :dbname dbname))) + +(defun update-stats (section label &key read rows errs secs rs ws) + "Send an event to update stats for given SECTION and LABEL." + (send-event (make-update-stats :section section + :label label + :read read + :rows rows + :errs errs + :secs secs + :rs rs + :ws ws))) + +(defun process-bad-row (table-name condition data) + "Send an event to log the bad row DATA in the reject and log files for given + TABLE-NAME (a label in section :data), for reason found in CONDITION." + (send-event (make-bad-row :section :data + :label table-name + :condition condition + :data data))) + +;;; +;;; Easier API to manage statistics collection and state updates +;;; +(defmacro with-stats-collection ((table-name + &key + (section :data) + dbname + use-result-as-read + use-result-as-rows) + &body forms) + "Measure time spent in running BODY into STATE, accounting the seconds to + given DBNAME and TABLE-NAME" + (let ((result (gensym "result")) + (secs (gensym "secs"))) + `(prog2 + (new-label ,section ,table-name ,dbname) + (multiple-value-bind (,result ,secs) + (timing ,@forms) + (cond ((and ,use-result-as-read ,use-result-as-rows) + (update-stats ,section ,table-name + :read ,result :rows ,result :secs ,secs)) + (,use-result-as-read + (update-stats ,section ,table-name :read ,result :secs ,secs)) + (,use-result-as-rows + (update-stats ,section ,table-name :rows ,result :secs ,secs)) + (t + (update-stats ,section ,table-name :secs ,secs))) + ,result)))) + ;;; ;;; Now, the monitor thread management @@ -65,7 +119,9 @@ (*client-min-messages* . ,*client-min-messages*) (*monitoring-queue* . ,*monitoring-queue*) (*error-output* . ,*error-output*) - (*standard-output* . ,*standard-output*))) + (*standard-output* . ,*standard-output*) + (*summary-pathname* . ,*summary-pathname*) + (*sections* . ',*sections*))) (lparallel:*kernel* (lp:make-kernel 1 :bindings bindings)) (*monitoring-channel* (lp:make-channel))) @@ -84,24 +140,29 @@ (defmacro with-monitor ((&key (start-logger t)) &body body) "Start and stop the monitor around BODY code. The monitor is responsible for processing logs into a central logfile" - `(if ,start-logger - (let* ((*monitoring-queue* (lq:make-queue)) - (*monitoring-channel* (start-monitor :start-logger ,start-logger))) - (unwind-protect - ,@body - (stop-monitor :channel *monitoring-channel* - :stop-logger ,start-logger))) + `(let ((*sections* (list :pre (make-pgstate) + :data (make-pgstate) + :post (make-pgstate)))) + (if ,start-logger + (let* ((*monitoring-queue* (lq:make-queue)) + (*monitoring-channel* (start-monitor :start-logger ,start-logger))) + (unwind-protect + ,@body + (stop-monitor :channel *monitoring-channel* + :stop-logger ,start-logger))) - ;; logger has already been started - (progn ,@body))) + ;; logger has already been started + (progn ,@body)))) (defun monitor (queue) "Receives and process messages from *monitoring-queue*." ;; process messages from the queue - (loop :for event := (multiple-value-bind (event available) - (lq:try-pop-queue queue) - (if available event (make-noop))) + (loop :with start-time := (get-internal-real-time) + + :for event := (multiple-value-bind (event available) + (lq:try-pop-queue queue) + (if available event (make-noop))) :do (typecase event (start (when (start-start-logger event) @@ -110,7 +171,22 @@ (stop (cl-log:log-message :info "Stopping monitor") - (when (stop-stop-logger event) (pgloader.logs:stop-logger))) + + ;; report the summary now + (let* ((summary-stream (when *summary-pathname* + (open *summary-pathname* + :direction :output + :if-exists :rename + :if-does-not-exist :create))) + (*report-stream* (or summary-stream *standard-output*))) + (report-full-summary "Total import time" + *sections* + (elapsed-time-since start-time)) + (when summary-stream (close summary-stream))) + + ;; time to shut down the logger? + (when (stop-stop-logger event) + (pgloader.logs:stop-logger))) (noop (sleep 0.2)) ; avoid buzy looping @@ -123,6 +199,54 @@ (log-message-description event) (log-message-arguments event)) (log-message-description event)))) - (cl-log:log-message (log-message-category event) "~a" mesg)))) + (cl-log:log-message (log-message-category event) "~a" mesg))) + + (new-label + (let ((label + (pgstate-new-label (getf *sections* (new-label-section event)) + (new-label-label event)))) + + (when (eq :data (new-label-section event)) + (pgtable-initialize-reject-files label + (new-label-dbname event))))) + + (update-stats + ;; it only costs an extra hash table lookup... + (pgstate-new-label (getf *sections* (update-stats-section event)) + (update-stats-label event)) + + (pgstate-incf (getf *sections* (update-stats-section event)) + (update-stats-label event) + :read (update-stats-read event) + :rows (update-stats-rows event) + :secs (update-stats-secs event))) + + (bad-row + (%process-bad-row (bad-row-label event) + (bad-row-condition event) + (bad-row-data event)))) :until (typep event 'stop))) + + +;;; +;;; Internal utils +;;; +(defun elapsed-time-since (start) + "Return how many seconds ticked between START and now" + (let ((now (get-internal-real-time))) + (coerce (/ (- now start) internal-time-units-per-second) 'double-float))) + + +;;; +;;; Timing Macro +;;; +(defmacro timing (&body forms) + "return both how much real time was spend in body and its result" + (let ((start (gensym)) + (end (gensym)) + (result (gensym))) + `(let* ((,start (get-internal-real-time)) + (,result (progn ,@forms)) + (,end (get-internal-real-time))) + (values ,result (/ (- ,end ,start) internal-time-units-per-second))))) diff --git a/src/utils/report.lisp b/src/utils/report.lisp index 5d787d3..f3df033 100644 --- a/src/utils/report.lisp +++ b/src/utils/report.lisp @@ -2,7 +2,7 @@ ;;; Pretty print a report while doing bulk operations ;;; -(in-package :pgloader.utils) +(in-package :pgloader.state) (defvar *header-line* "~&~v@{~A~:*~} --------- --------- --------- --------------") @@ -63,6 +63,31 @@ "Return the format string to use for a given TYPE of output and KEY." (getf (cadr (assoc type *header-format-strings*)) key)) +;;; +;;; Timing Formating +;;; +(defun format-interval (seconds &optional (stream t)) + "Output the number of seconds in a human friendly way" + (multiple-value-bind (years months days hours mins secs millisecs) + (date:decode-interval (date:encode-interval :second seconds)) + (declare (ignore millisecs)) + (format + stream + "~:[~*~;~d years ~]~:[~*~;~d months ~]~:[~*~;~d days ~]~:[~*~;~dh~]~:[~*~;~dm~]~5,3fs" + (< 0 years) years + (< 0 months) months + (< 0 days) days + (< 0 hours) hours + (< 0 mins) mins + (+ secs (- (multiple-value-bind (r q) + (truncate seconds 60) + (declare (ignore r)) + q) + secs))))) + +;;; +;;; Pretty printing reports in several formats +;;; (defun report-header () ;; (apply #'format *report-stream* *header-cols-format* *header-cols-names*) (format *report-stream* @@ -106,7 +131,7 @@ ;;; ;;; Pretty print the whole summary from a state ;;; -(defun report-summary (&key ((:state pgstate) *state*) (header t) footer) +(defun report-summary (pgstate &key (header t) footer) "Report a whole summary." (when header (report-header)) (loop @@ -122,34 +147,6 @@ finally (when footer (report-pgstate-stats pgstate footer)))) -(defmacro with-stats-collection ((table-name - &key - dbname - summary - use-result-as-read - use-result-as-rows - ((:state pgstate) *state*)) - &body forms) - "Measure time spent in running BODY into STATE, accounting the seconds to - given DBNAME and TABLE-NAME" - (let ((result (gensym "result")) - (secs (gensym "secs"))) - `(prog2 - (pgstate-add-table ,pgstate ,dbname ,table-name) - (multiple-value-bind (,result ,secs) - (timing ,@forms) - (cond ((and ,use-result-as-read ,use-result-as-rows) - (pgstate-incf ,pgstate ,table-name - :read ,result :rows ,result :secs ,secs)) - (,use-result-as-read - (pgstate-incf ,pgstate ,table-name :read ,result :secs ,secs)) - (,use-result-as-rows - (pgstate-incf ,pgstate ,table-name :rows ,result :secs ,secs)) - (t - (pgstate-incf ,pgstate ,table-name :secs ,secs))) - ,result) - (when ,summary (report-summary))))) - (defun parse-summary-type (&optional (pathname *summary-pathname*)) "Return the summary type we want: human-readable, csv, json." (when pathname @@ -158,25 +155,30 @@ ((string= "copy" (pathname-type pathname)) :copy) (t :human-readable)))) -(defun report-full-summary (legend state - &key before finally parallel start-time) +(defun max-length-table-name (legend data pre post) + "Compute the max length of a table-name in the legend." + (reduce #'max + (mapcar #'length + (mapcar #'format-table-name + (append (pgstate-tabnames data) + (pgstate-tabnames pre) + (pgstate-tabnames post) + (list legend)))))) + +(defun report-full-summary (legend sections total-secs) "Report the full story when given three different sections of reporting." - (let* ((stype (or (parse-summary-type *summary-pathname*) + (let* ((data (getf sections :data)) + (pre (getf sections :pre)) + (post (getf sections :post)) + + (stype (or (parse-summary-type *summary-pathname*) :human-readable)) (*header* (get-format-for stype :header)) (*footer* (get-format-for stype :footer)) (*end-of-line-format* (get-format-for stype :end-of-line-format)) (*header-line* (get-format-for stype :header-line)) - (*max-length-table-name* - (reduce #'max - (mapcar #'length - (mapcar #'format-table-name - (append (pgstate-tabnames state) - (when before (pgstate-tabnames before)) - (when finally (pgstate-tabnames finally)) - (when parallel (pgstate-tabnames parallel)) - (list legend)))))) + (*max-length-table-name* (max-length-table-name legend data pre post)) (*header-tname-format* (get-format-for stype :header-tname-format)) (*header-stats-format* (get-format-for stype :header-stats-format)) (*header-cols-format* (get-format-for stype :header-cols-format)) @@ -185,39 +187,19 @@ (when *header* (format *report-stream* *header*)) - ;; BEFORE - (if before - (progn - (report-summary :state before :footer nil) - (format *report-stream* *header-line* *max-length-table-name* "-") - (report-summary :state state :header nil :footer nil)) - ;; no state before - (report-summary :state state :footer nil)) + (when (and pre (pgstate-tabnames pre)) + (report-summary pre :footer nil) + (format *report-stream* *header-line* *max-length-table-name* "-")) - (when (or finally parallel) + (report-summary data :header (null pre) :footer nil) + + (when (and post (pgstate-tabnames post)) (format *report-stream* *header-line* *max-length-table-name* "-") - (when parallel - (report-summary :state parallel :header nil :footer nil)) - (when finally - (report-summary :state finally :header nil :footer nil))) + (report-summary post :header nil :footer nil)) - ;; add to the grand total the other sections, except for the parallel one - (incf (pgloader.utils::pgstate-secs state) - (+ (if before (pgloader.utils::pgstate-secs before) 0) - (if finally (pgloader.utils::pgstate-secs finally) 0))) - - ;; if the parallel tasks took longer than the rest cumulated, the total - ;; waiting time actually was parallel - before - (if start-time - (setf (pgloader.utils::pgstate-secs state) - (pgloader.utils::elapsed-time-since start-time)) - (when (and parallel - (< (pgloader.utils::pgstate-secs state) - (pgloader.utils::pgstate-secs parallel))) - (setf (pgloader.utils::pgstate-secs state) - (- (pgloader.utils::pgstate-secs parallel) - (if before (pgloader.utils::pgstate-secs before) 0))))) + ;; replace the grand total now + (setf (pgstate-secs data) total-secs) ;; and report the Grand Total - (report-pgstate-stats state legend))) + (report-pgstate-stats data legend))) diff --git a/src/utils/state.lisp b/src/utils/state.lisp index eec5afd..46232b7 100644 --- a/src/utils/state.lisp +++ b/src/utils/state.lisp @@ -3,7 +3,7 @@ ;;; the load: number of lines read, imported and number of errors found ;;; along the way. ;;; -(in-package :pgloader.utils) +(in-package :pgloader.state) ;;; ;;; Data Structures to maintain information about loading state @@ -26,51 +26,71 @@ (defun format-table-name (table-name) "TABLE-NAME might be a CONS of a schema name and a table name." - (typecase table-name + (etypecase table-name (cons (format nil "~a.~a" (car table-name) (cdr table-name))) (string table-name))) -(defun pgstate-get-table (pgstate name) +(defun relative-pathname (filename type &optional dbname) + "Return the pathname of a file of type TYPE (dat or log) under *ROOT-DIR*" + (let ((dir (if dbname + (uiop:merge-pathnames* + (uiop:make-pathname* :directory `(:relative ,dbname)) + *root-dir*) + *root-dir*))) + (make-pathname :defaults dir :name filename :type type))) + +(defun reject-data-file (table-name dbname) + "Return the pathname to the reject file for STATE entry." + (relative-pathname table-name "dat" dbname)) + +(defun reject-log-file (table-name dbname) + "Return the pathname to the reject file for STATE entry." + (relative-pathname table-name "log" dbname)) + +(defmethod pgtable-initialize-reject-files ((table pgtable) dbname) + "Prepare TABLE for being able to deal with rejected rows (log them)." + (let* ((table-name (format-table-name (pgtable-name table))) + (data-pathname (reject-data-file table-name dbname)) + (logs-pathname (reject-log-file table-name dbname))) + ;; we also use that facility for things that are not tables + ;; such as "fetch" or "before load" or "Create Indexes" + (when dbname + ;; create the per-database directory if it does not exists yet + (ensure-directories-exist (uiop:pathname-directory-pathname data-pathname)) + + ;; rename the existing files if there are some + (when (probe-file data-pathname) + (with-open-file (data data-pathname + :direction :output + :if-exists :rename + :if-does-not-exist nil))) + + (when (probe-file logs-pathname) + (with-open-file (logs logs-pathname + :direction :output + :if-exists :rename + :if-does-not-exist nil))) + + ;; set the properties to the right pathnames + (setf (pgtable-reject-data table) data-pathname + (pgtable-reject-logs table) logs-pathname)))) + +(defun pgstate-get-label (pgstate name) (gethash name (pgstate-tables pgstate))) -(defun pgstate-add-table (pgstate dbname table-name) +(defun pgstate-new-label (pgstate label) "Instanciate a new pgtable structure to hold our stats, and return it." - (or (pgstate-get-table pgstate table-name) - (let* ((table (setf (gethash table-name (pgstate-tables pgstate)) - (make-pgtable :name table-name))) - (reject-dir (merge-pathnames (format nil "~a/" dbname) *root-dir*)) - (filename (format-table-name table-name)) - (data-pathname (make-pathname :defaults reject-dir - :name filename :type "dat")) - (logs-pathname (make-pathname :defaults reject-dir - :name filename :type "log"))) + (or (pgstate-get-label pgstate label) + (let* ((pgtable (setf (gethash label (pgstate-tables pgstate)) + (make-pgtable :name label)))) ;; maintain the ordering - (push table-name (pgstate-tabnames pgstate)) + (push label (pgstate-tabnames pgstate)) - ;; create the per-database directory if it does not exists yet - (ensure-directories-exist reject-dir) - - ;; rename the existing files if there are some - (when (probe-file data-pathname) - (with-open-file (data data-pathname - :direction :output - :if-exists :rename - :if-does-not-exist nil))) - - (when (probe-file logs-pathname) - (with-open-file (logs logs-pathname - :direction :output - :if-exists :rename - :if-does-not-exist nil))) - - ;; set the properties to the right pathnames - (setf (pgtable-reject-data table) data-pathname - (pgtable-reject-logs table) logs-pathname) - table))) + pgtable))) (defun pgstate-setf (pgstate name &key read rows errs secs) - (let ((pgtable (pgstate-get-table pgstate name))) + (let ((pgtable (pgstate-get-label pgstate name))) (when read (setf (pgtable-read pgtable) read) (incf (pgstate-read pgstate) read)) @@ -86,7 +106,7 @@ pgtable)) (defun pgstate-incf (pgstate name &key read rows errs secs) - (let ((pgtable (pgstate-get-table pgstate name))) + (let ((pgtable (pgstate-get-label pgstate name))) (when read (incf (pgtable-read pgtable) read) (incf (pgstate-read pgstate) read)) @@ -102,7 +122,7 @@ pgtable)) (defun pgstate-decf (pgstate name &key read rows errs secs) - (let ((pgtable (pgstate-get-table pgstate name))) + (let ((pgtable (pgstate-get-label pgstate name))) (when read (decf (pgtable-read pgtable) read) (decf (pgstate-read pgstate) read)) diff --git a/src/utils/threads.lisp b/src/utils/threads.lisp index 60c95a9..fdd8dc9 100644 --- a/src/utils/threads.lisp +++ b/src/utils/threads.lisp @@ -12,7 +12,6 @@ (*copy-batch-size* . ,*copy-batch-size*) (*concurrent-batches* . ,*concurrent-batches*) (*pg-settings* . ',*pg-settings*) - (*state* . ,*state*) (*fd-path-root* . ,*fd-path-root*) (*client-min-messages* . ,*client-min-messages*) (*log-min-messages* . ,*log-min-messages*) diff --git a/src/utils/utils.lisp b/src/utils/utils.lisp index 5774bb2..98b8171 100644 --- a/src/utils/utils.lisp +++ b/src/utils/utils.lisp @@ -3,46 +3,6 @@ ;;; (in-package :pgloader.utils) -;;; -;;; Timing Macro -;;; -(defun elapsed-time-since (start) - "Return how many seconds ticked between START and now" - (let ((now (get-internal-real-time))) - (coerce (/ (- now start) internal-time-units-per-second) 'double-float))) - -(defmacro timing (&body forms) - "return both how much real time was spend in body and its result" - (let ((start (gensym)) - (end (gensym)) - (result (gensym))) - `(let* ((,start (get-internal-real-time)) - (,result (progn ,@forms)) - (,end (get-internal-real-time))) - (values ,result (/ (- ,end ,start) internal-time-units-per-second))))) - -;;; -;;; Timing Formating -;;; -(defun format-interval (seconds &optional (stream t)) - "Output the number of seconds in a human friendly way" - (multiple-value-bind (years months days hours mins secs millisecs) - (date:decode-interval (date:encode-interval :second seconds)) - (declare (ignore millisecs)) - (format - stream - "~:[~*~;~d years ~]~:[~*~;~d months ~]~:[~*~;~d days ~]~:[~*~;~dh~]~:[~*~;~dm~]~5,3fs" - (< 0 years) years - (< 0 months) months - (< 0 days) days - (< 0 hours) hours - (< 0 mins) mins - (+ secs (- (multiple-value-bind (r q) - (truncate seconds 60) - (declare (ignore r)) - q) - secs))))) - ;;; ;;; Camel Case converter ;;; diff --git a/test/archive.load b/test/archive.load index 90ccc6e..de0f6f5 100644 --- a/test/archive.load +++ b/test/archive.load @@ -59,5 +59,5 @@ LOAD ARCHIVE fields escaped by double-quote, fields terminated by ',' - FINALLY DO - $$ create index blocks_ip4r_idx on geolite.blocks using gist(iprange); $$; + AFTER LOAD DO + $$ create index blocks_ip4r_idx on geolite.blocks using gist(iprange); $$;