Review the stats and reporting code organisation.

In order to later be able to have more worker threads sharing the
load (multiple readers and/or writers, maybe more specialized threads
too), have all the stats be managed centrally by a single thread. We
already have a "monitor" thread that get passed log messages so that the
output buffer is not subject to race conditions, extend its use to also
deal with statistics messages.

In the current code, we send a message each time we read a row. In some
future commits we should probably reduce the messaging here to something
like one message per batch in the common case.

Also, as a nice side effect of the code simplification and refactoring
this fixes #283 wherein the before/after sections of individual CSV
files within an ARCHIVE command where not counted in the reporting.
This commit is contained in:
Dimitri Fontaine 2015-10-05 01:25:08 +02:00
parent bc9d2d8962
commit 96a33de084
33 changed files with 470 additions and 681 deletions

View File

@ -53,10 +53,14 @@
((:file "charsets")
(:file "threads")
(:file "logs")
(:file "monitor" :depends-on ("logs"))
(:file "utils")
(:file "state")
(:file "report" :depends-on ("state"))
(:file "utils" :depends-on ("charsets" "monitor"))
(:file "reject" :depends-on ("state"))
(:file "report" :depends-on ("state" "utils"))
(:file "monitor" :depends-on ("logs"
"state"
"reject"
"report"))
(:file "archive" :depends-on ("logs"))
;; those are one-package-per-file
@ -178,8 +182,8 @@
((:file "mysql-cast-rules")
(:file "mysql-schema"
:depends-on ("mysql-cast-rules"))
(:file "mysql-csv"
:depends-on ("mysql-schema"))
;; (:file "mysql-csv"
;; :depends-on ("mysql-schema"))
(:file "mysql"
:depends-on ("mysql-cast-rules"
"mysql-schema"))))))

View File

@ -271,7 +271,8 @@
(with-monitor ()
;; tell the user where to look for interesting things
(log-message :log "Main logs in '~a'" (probe-file *log-filename*))
(log-message :log "Main logs in '~a'"
(uiop:native-namestring *log-filename*))
(log-message :log "Data errors in '~a'~%" *root-dir*)
;; load extra lisp code provided for by the user
@ -398,19 +399,7 @@
(parse-commands-from-file source)
(parse-commands source)))))))
;; maybe duplicate the summary to a file
(let* ((summary-stream (when *summary-pathname*
(open *summary-pathname*
:direction :output
:if-exists :rename
:if-does-not-exist :create)))
(*report-stream* (or summary-stream *standard-output*)))
(unwind-protect
;; run the commands
(loop for func in funcs do (funcall func))
;; cleanup
(when summary-stream (close summary-stream)))))))
(loop for func in funcs do (funcall func)))))
;;;

View File

@ -22,32 +22,28 @@
#:start-logger
#:stop-logger))
(defpackage #:pgloader.monitor
(defpackage #:pgloader.state
(:use #:cl #:pgloader.params)
(:export #:with-monitor
#:*monitoring-queue*
#:log-message
#:send-event
#:start-monitor
#:stop-monitor))
(:export #:format-table-name
#:make-pgstate
#:pgstate-tabnames
#:pgstate-tables
#:pgstate-read
#:pgstate-rows
#:pgstate-errs
#:pgstate-secs
(defpackage #:pgloader.utils
(:use #:cl #:pgloader.params)
(:import-from #:alexandria
#:appendf
#:read-file-into-string)
(:import-from #:pgloader.monitor
#:with-monitor
#:*monitoring-queue*
#:log-message)
(:export #:with-monitor ; monitor
#:pgstate-get-label
#:pgstate-new-label
#:pgstate-setf
#:pgstate-incf
#:pgstate-decf
;; bits from alexandria
#:appendf
#:read-file-into-string
;; logs
#:log-message
#:pgtable-initialize-reject-files
#:pgtable-reject-data
#:pgtable-reject-logs
#:report-pgtable-stats
#:report-pgstate-stats
;; report
#:report-header
@ -55,28 +51,53 @@
#:report-results
#:report-footer
#:report-summary
#:report-full-summary
#:with-stats-collection
#:report-full-summary))
(defpackage #:pgloader.monitor
(:use #:cl #:pgloader.params #:pgloader.state)
(:export #:with-monitor
#:*monitoring-queue*
#:log-message
#:new-label
#:update-stats
#:process-bad-row
#:with-stats-collection
#:send-event
#:start-monitor
#:stop-monitor
#:timing))
(defpackage #:pgloader.utils
(:use #:cl #:pgloader.params #:pgloader.monitor #:pgloader.state)
(:import-from #:alexandria
#:appendf
#:read-file-into-string)
(:export #:with-monitor ; monitor
#:*monitoring-queue*
#:with-stats-collection
#:timing
;; bits from alexandria
#:appendf
#:read-file-into-string
;; state
#:make-pgstate
#:format-table-name
#:pgstate-tabnames
;; events
#:log-message
#:new-label
#:update-stats
#:process-bad-row
;; utils
#:format-interval
#:timing
#:camelCase-to-colname
#:unquote
;; state
#:format-table-name
#:make-pgstate
#:pgstate-get-table
#:pgstate-add-table
#:pgstate-setf
#:pgstate-incf
#:pgstate-decf
#:pgtable-reject-data
#:pgtable-reject-logs
#:report-pgtable-stats
#:report-pgstate-stats
;; threads
#:make-kernel
@ -243,17 +264,13 @@
#:reset-sequences))
(defpackage #:pgloader.queue
(:use #:cl #:pgloader.params)
(:import-from #:pgloader.monitor
#:log-message)
(:use #:cl #:pgloader.params #:pgloader.monitor)
(:import-from #:pgloader.pgsql
#:format-vector-row)
(:import-from #:pgloader.sources
#:map-rows
#:transforms
#:target)
(:import-from #:pgloader.utils
#:pgstate-incf)
(:export #:map-push-queue))

View File

@ -21,7 +21,6 @@
#:*copy-batch-size*
#:*concurrent-batches*
#:*pg-settings*
#:*state*
#:*default-tmpdir*
#:init-params-from-environment
#:getenv-default))
@ -68,16 +67,11 @@
(defparameter *dry-run* nil
"Set to non-nil to only run checks about the load setup.")
;; we can't use pgloader.utils:make-pgstate yet because params is compiled
;; first in the asd definition, we just make the symbol a special variable.
(defparameter *state* nil
"State of the current loading.")
(defparameter *fd-path-root* nil
"Where to load files from, when loading from an archive or expanding regexps.")
(defparameter *root-dir*
#+unix (make-pathname :directory "/tmp/pgloader/")
#+unix (uiop:make-pathname* :directory '(:absolute "tmp/pgloader"))
#-unix (uiop:merge-pathnames*
"pgloader/"
(uiop:ensure-directory-pathname (getenv-default "Temp")))

View File

@ -42,32 +42,22 @@
(when (and (or before finally) (null pg-db-conn))
(error "When using a BEFORE LOAD DO or a FINALLY block, you must provide an archive level target database connection."))
`(lambda ()
(let* ((start-irt (get-internal-real-time))
(state-before (pgloader.utils:make-pgstate))
(*state* (pgloader.utils:make-pgstate))
,@(pgsql-connection-bindings pg-db-conn nil)
(state-finally ,(when finally `(pgloader.utils:make-pgstate)))
(let* (,@(pgsql-connection-bindings pg-db-conn nil)
(archive-file
,(destructuring-bind (kind url) source
(ecase kind
(:http `(with-stats-collection
("download" :state state-before)
(pgloader.archive:http-fetch-file ,url)))
(:filename url))))
(*fd-path-root*
(with-stats-collection ("extract" :state state-before)
(pgloader.archive:expand-archive archive-file))))
, (destructuring-bind (kind url) source
(ecase kind
(:http `(with-stats-collection
("download" :section :pre)
(pgloader.archive:http-fetch-file ,url)))
(:filename url))))
(*fd-path-root*
(with-stats-collection ("extract" :section :pre)
(pgloader.archive:expand-archive archive-file))))
(progn
,(sql-code-block pg-db-conn 'state-before before "before load")
,(sql-code-block pg-db-conn :pre before "before load")
;; import from files block
,@(loop for command in commands
collect `(funcall ,command))
,(sql-code-block pg-db-conn 'state-finally finally "finally")
;; reporting
(report-full-summary "Total import time" *state*
:start-time start-irt
:before state-before
:finally state-finally)))))))
,(sql-code-block pg-db-conn :post finally "finally")))))))

View File

@ -115,20 +115,13 @@
gucs before after
((:copy-options options)))
`(lambda ()
(let* ((state-before (pgloader.utils:make-pgstate))
(summary (null *state*))
(*state* (or *state* (pgloader.utils:make-pgstate)))
(state-idx ,(when (getf options :drop-indexes)
`(pgloader.utils:make-pgstate)))
(state-after ,(when (or after (getf options :drop-indexes))
`(pgloader.utils:make-pgstate)))
,@(pgsql-connection-bindings pg-db-conn gucs)
(let* (,@(pgsql-connection-bindings pg-db-conn gucs)
,@(batch-control-bindings options)
(source-db (with-stats-collection ("fetch" :state state-before)
(source-db (with-stats-collection ("fetch" :section :pre)
(expand (fetch-file ,copy-conn)))))
(progn
,(sql-code-block pg-db-conn 'state-before before "before load")
,(sql-code-block pg-db-conn :pre before "before load")
(let ((truncate ,(getf options :truncate))
(disable-triggers (getf ',options :disable-triggers))
@ -146,21 +139,11 @@
:drop-indexes
:disable-triggers)))))
(pgloader.sources:copy-from source
:state-before state-before
:state-after state-after
:state-indexes state-idx
:truncate truncate
:drop-indexes drop-indexes
:disable-triggers disable-triggers))
,(sql-code-block pg-db-conn 'state-after after "after load")
;; reporting
(when summary
(report-full-summary "Total import time" *state*
:before state-before
:finally state-after
:parallel state-idx))))))
,(sql-code-block pg-db-conn :post after "after load")))))
(defrule load-copy-file load-copy-file-command
(:lambda (command)

View File

@ -455,20 +455,13 @@
gucs before after
((:csv-options options)))
`(lambda ()
(let* ((state-before (pgloader.utils:make-pgstate))
(summary (null *state*))
(*state* (or *state* (pgloader.utils:make-pgstate)))
(state-idx ,(when (getf options :drop-indexes)
`(pgloader.utils:make-pgstate)))
(state-after ,(when (or after (getf options :drop-indexes))
`(pgloader.utils:make-pgstate)))
,@(pgsql-connection-bindings pg-db-conn gucs)
(let* (,@(pgsql-connection-bindings pg-db-conn gucs)
,@(batch-control-bindings options)
(source-db (with-stats-collection ("fetch" :state state-before)
(source-db (with-stats-collection ("fetch" :section :pre)
(expand (fetch-file ,csv-conn)))))
(progn
,(sql-code-block pg-db-conn 'state-before before "before load")
,(sql-code-block pg-db-conn :pre before "before load")
(let ((truncate (getf ',options :truncate))
(disable-triggers (getf ',options :disable-triggers))
@ -486,21 +479,11 @@
:drop-indexes
:disable-triggers)))))
(pgloader.sources:copy-from source
:state-before state-before
:state-after state-after
:state-indexes state-idx
:truncate truncate
:drop-indexes drop-indexes
:disable-triggers disable-triggers))
,(sql-code-block pg-db-conn 'state-after after "after load")
;; reporting
(when summary
(report-full-summary "Total import time" *state*
:before state-before
:finally state-after
:parallel state-idx))))))
,(sql-code-block pg-db-conn :post after "after load")))))
(defrule load-csv-file load-csv-file-command
(:lambda (command)

View File

@ -93,15 +93,11 @@
gucs before after
((:dbf-options options)))
`(lambda ()
(let* ((state-before (pgloader.utils:make-pgstate))
(summary (null *state*))
(*state* (or *state* (pgloader.utils:make-pgstate)))
(state-after ,(when after `(pgloader.utils:make-pgstate)))
,@(pgsql-connection-bindings pg-db-conn gucs)
(let* (,@(pgsql-connection-bindings pg-db-conn gucs)
,@(batch-control-bindings options)
,@(identifier-case-binding options)
(table-name ',(pgconn-table-name pg-db-conn))
(source-db (with-stats-collection ("fetch" :state state-before)
(source-db (with-stats-collection ("fetch" :section :pre)
(expand (fetch-file ,dbf-db-conn))))
(source
(make-instance 'pgloader.db3:copy-db3
@ -110,19 +106,12 @@
:source-db source-db
:target table-name)))
,(sql-code-block pg-db-conn 'state-before before "before load")
,(sql-code-block pg-db-conn :pre before "before load")
(pgloader.sources:copy-database source
:state-before state-before
,@(remove-batch-control-option options))
,(sql-code-block pg-db-conn 'state-after after "after load")
;; reporting
(when summary
(report-full-summary "Total import time" *state*
:before state-before
:finally state-after)))))
,(sql-code-block pg-db-conn :post after "after load"))))
(defrule load-dbf-file load-dbf-command
(:lambda (command)

View File

@ -123,20 +123,13 @@
gucs before after
((:fixed-options options)))
`(lambda ()
(let* ((state-before (pgloader.utils:make-pgstate))
(summary (null *state*))
(*state* (or *state* (pgloader.utils:make-pgstate)))
(state-idx ,(when (getf options :drop-indexes)
`(pgloader.utils:make-pgstate)))
(state-after ,(when (or after (getf options :drop-indexes))
`(pgloader.utils:make-pgstate)))
,@(pgsql-connection-bindings pg-db-conn gucs)
(let* (,@(pgsql-connection-bindings pg-db-conn gucs)
,@(batch-control-bindings options)
(source-db (with-stats-collection ("fetch" :state state-before)
(source-db (with-stats-collection ("fetch" :section :pre)
(expand (fetch-file ,fixed-conn)))))
(progn
,(sql-code-block pg-db-conn 'state-before before "before load")
,(sql-code-block pg-db-conn :pre before "before load")
(let ((truncate ,(getf options :truncate))
(disable-triggers ,(getf options :disable-triggers))
@ -152,21 +145,11 @@
:skip-lines ,(or (getf options :skip-line) 0))))
(pgloader.sources:copy-from source
:state-before state-before
:state-after state-after
:state-indexes state-idx
:truncate truncate
:drop-indexes drop-indexes
:disable-triggers disable-triggers))
,(sql-code-block pg-db-conn 'state-after after "after load")
;; reporting
(when summary
(report-full-summary "Total import time" *state*
:before state-before
:finally state-after
:parallel state-idx))))))
,(sql-code-block pg-db-conn :post after "after load")))))
(defrule load-fixed-cols-file load-fixed-cols-file-command
(:lambda (command)

View File

@ -49,15 +49,11 @@
gucs before after
((:ixf-options options)))
`(lambda ()
(let* ((state-before (pgloader.utils:make-pgstate))
(summary (null *state*))
(*state* (or *state* (pgloader.utils:make-pgstate)))
(state-after ,(when after `(pgloader.utils:make-pgstate)))
,@(pgsql-connection-bindings pg-db-conn gucs)
(let* (,@(pgsql-connection-bindings pg-db-conn gucs)
,@(batch-control-bindings options)
,@(identifier-case-binding options)
(table-name ',(pgconn-table-name pg-db-conn))
(source-db (with-stats-collection ("fetch" :state state-before)
(source-db (with-stats-collection ("fetch" :section :pre)
(expand (fetch-file ,ixf-db-conn))))
(source
(make-instance 'pgloader.ixf:copy-ixf
@ -65,18 +61,12 @@
:source-db source-db
:target table-name)))
,(sql-code-block pg-db-conn 'state-before before "before load")
,(sql-code-block pg-db-conn :pre before "before load")
(pgloader.sources:copy-database source
:state-before state-before
,@(remove-batch-control-option options))
,(sql-code-block pg-db-conn 'state-after after "after load")
(when summary
(report-full-summary "Total import time" *state*
:before state-before
:finally state-after)))))
,(sql-code-block pg-db-conn :post after "after load"))))
(defrule load-ixf-file load-ixf-command
(:lambda (command)

View File

@ -161,11 +161,7 @@
(let (#+sbcl(sb-ext:*muffled-warnings* 'style-warning))
(cffi:load-foreign-library 'mssql::sybdb))
(let* ((state-before (pgloader.utils:make-pgstate))
(*state* (or *state* (pgloader.utils:make-pgstate)))
(state-idx (pgloader.utils:make-pgstate))
(state-after (pgloader.utils:make-pgstate))
(*default-cast-rules* ',*mssql-default-cast-rules*)
(let* ((*default-cast-rules* ',*mssql-default-cast-rules*)
(*cast-rules* ',casts)
,@(pgsql-connection-bindings pg-db-conn gucs)
,@(batch-control-bindings options)
@ -175,22 +171,14 @@
:target-db ,pg-db-conn
:source-db ,ms-db-conn)))
,(sql-code-block pg-db-conn 'state-before before "before load")
,(sql-code-block pg-db-conn :pre before "before load")
(pgloader.mssql:copy-database source
:state-before state-before
:state-after state-after
:state-indexes state-idx
:including ',including
:excluding ',excluding
,@(remove-batch-control-option options))
,(sql-code-block pg-db-conn 'state-after after "after load")
(report-full-summary "Total import time" *state*
:before state-before
:finally state-after
:parallel state-idx))))
,(sql-code-block pg-db-conn :post after "after load"))))
(defrule load-mssql-database load-mssql-command
(:lambda (source)

View File

@ -166,11 +166,7 @@
((:excluding excl))
((:decoding decoding-as)))
`(lambda ()
(let* ((state-before (pgloader.utils:make-pgstate))
(*state* (or *state* (pgloader.utils:make-pgstate)))
(state-idx (pgloader.utils:make-pgstate))
(state-after (pgloader.utils:make-pgstate))
(*default-cast-rules* ',*mysql-default-cast-rules*)
(let* ((*default-cast-rules* ',*mysql-default-cast-rules*)
(*cast-rules* ',casts)
,@(pgsql-connection-bindings pg-db-conn gucs)
,@(batch-control-bindings options)
@ -180,24 +176,16 @@
:target-db ,pg-db-conn
:source-db ,my-db-conn)))
,(sql-code-block pg-db-conn 'state-before before "before load")
,(sql-code-block pg-db-conn :pre before "before load")
(pgloader.mysql:copy-database source
:including ',incl
:excluding ',excl
:decoding-as ',decoding-as
:materialize-views ',views
:state-before state-before
:state-after state-after
:state-indexes state-idx
,@(remove-batch-control-option options))
,(sql-code-block pg-db-conn 'state-after after "after load")
(report-full-summary "Total import time" *state*
:before state-before
:finally state-after
:parallel state-idx))))
,(sql-code-block pg-db-conn :post after "after load"))))
(defrule load-mysql-database load-mysql-command
(:lambda (source)

View File

@ -58,12 +58,12 @@
(bind (((_ _ sql-list-of-list) after))
(cons :after (apply #'append sql-list-of-list)))))
(defun sql-code-block (pgconn state commands label)
(defun sql-code-block (pgconn section commands label)
"Return lisp code to run COMMANDS against DBNAME, updating STATE."
(when commands
`(with-stats-collection (,label
:dbname ,(db-name pgconn)
:state ,state
:section ,section
:use-result-as-read t
:use-result-as-rows t)
(with-pgsql-transaction (:pgconn ,pgconn)

View File

@ -101,20 +101,17 @@ load database
((:including incl))
((:excluding excl)))
`(lambda ()
(let* ((state-before (pgloader.utils:make-pgstate))
(*state* (pgloader.utils:make-pgstate))
(*default-cast-rules* ',*sqlite-default-cast-rules*)
(let* ((*default-cast-rules* ',*sqlite-default-cast-rules*)
(*cast-rules* ',casts)
,@(pgsql-connection-bindings pg-db-conn gucs)
,@(batch-control-bindings options)
(source-db (with-stats-collection ("fetch" :state state-before)
(source-db (with-stats-collection ("fetch" :section :pre)
(expand (fetch-file ,sqlite-db-conn))))
(source
(make-instance 'pgloader.sqlite::copy-sqlite
:target-db ,pg-db-conn
:source-db source-db)))
(pgloader.sqlite:copy-database source
:state-before state-before
:including ',incl
:excluding ',excl
,@(remove-batch-control-option options)))))

View File

@ -45,11 +45,10 @@
&key
columns
(truncate t)
disable-triggers
((:state *state*) *state*))
disable-triggers)
"Fetch from the QUEUE messages containing how many rows are in the
*writer-batch* for us to send down to PostgreSQL, and when that's done
update *state*."
update stats."
(when truncate
(truncate-tables pgconn (list table-name)))
@ -68,44 +67,10 @@
#+sbcl (when oversized? (sb-ext:gc :full t))
(log-message :debug "copy-batch ~a ~d row~:p~:[~; [oversized]~]"
unqualified-table-name rows oversized?)
(pgstate-incf *state* table-name :rows rows)))
(update-stats :data table-name :rows rows)))
(when disable-triggers (enable-triggers unqualified-table-name)))))
;;;
;;; When a batch has been refused by PostgreSQL with a data-exception, that
;;; means it contains non-conforming data. Log the error message in a log
;;; file and the erroneous data in a rejected data file for further
;;; processing.
;;;
(defun process-bad-row (table-name condition row)
"Add the row to the reject file, in PostgreSQL COPY TEXT format"
;; first, update the stats.
(pgstate-incf *state* table-name :errs 1)
;; now, the bad row processing
(let* ((table (pgstate-get-table *state* table-name))
(data (pgtable-reject-data table))
(logs (pgtable-reject-logs table)))
;; first log the rejected data
(with-open-file (reject-data-file data
:direction :output
:if-exists :append
:if-does-not-exist :create
:external-format :utf-8)
;; the row has already been processed when we get here
(write-string row reject-data-file))
;; now log the condition signaled to reject the data
(with-open-file (reject-logs-file logs
:direction :output
:if-exists :append
:if-does-not-exist :create
:external-format :utf-8)
;; the row has already been processed when we get here
(format reject-logs-file "~a~%" condition))))
;;;
;;; Compute how many rows we're going to try loading next, depending on
;;; where we are in the batch currently and where is the next-error to be

View File

@ -109,22 +109,23 @@
(log-message :debug set)
(pomo:execute set))))
(defun pgsql-connect-and-execute-with-timing (pgconn label sql state &key (count 1))
(defun pgsql-connect-and-execute-with-timing (pgconn section label sql &key (count 1))
"Run pgsql-execute-with-timing within a newly establised connection."
(with-pgsql-connection (pgconn)
(pomo:with-transaction ()
(pgsql-execute-with-timing label sql state :count count))))
(pgsql-execute-with-timing section label sql :count count))))
(defun pgsql-execute-with-timing (label sql state &key (count 1))
(defun pgsql-execute-with-timing (section label sql &key (count 1))
"Execute given SQL and resgister its timing into STATE."
(multiple-value-bind (res secs)
(timing
(handler-case (pgsql-execute sql)
(handler-case
(pgsql-execute sql)
(cl-postgres:database-error (e)
(log-message :error "~a" e)
(pgstate-incf state label :errs 1 :rows (- count)))))
(update-stats section label :errs 1 :rows (- count)))))
(declare (ignore res))
(pgstate-incf state label :read count :rows count :secs secs)))
(update-stats section label :read count :rows count :secs secs)))
(defun pgsql-execute (sql &key ((:client-min-messages level)))
"Execute given SQL in current transaction"

View File

@ -145,17 +145,17 @@
(defun create-pgsql-fkeys (pgconn all-fkeys
&key
state
(section :post)
(label "Foreign Keys"))
"Actually create the Foreign Key References that where declared in the
MySQL database"
(pgstate-add-table state (db-name pgconn) label)
(new-label section label)
(loop for (table-name . fkeys) in all-fkeys
do (loop for fkey in fkeys
for sql = (format-pgsql-create-fkey fkey)
do
(log-message :notice "~a;" sql)
(pgsql-execute-with-timing "Foreign Keys" sql state))))
(pgsql-execute-with-timing section label sql))))
;;;
@ -341,16 +341,11 @@
;;; Parallel index building.
;;;
(defun create-indexes-in-kernel (pgconn indexes kernel channel
&key
state
(label "Create Indexes"))
&key (label "Create Indexes"))
"Create indexes for given table in dbname, using given lparallel KERNEL
and CHANNEL so that the index build happen in concurrently with the data
copying."
(let* ((lp:*kernel* kernel))
;; ensure we have a stats entry
(pgstate-add-table state (db-name pgconn) label)
(loop
:for index :in indexes
:collect (multiple-value-bind (sql pkey)
@ -362,7 +357,7 @@
#'pgsql-connect-and-execute-with-timing
;; each thread must have its own connection
(new-pgsql-connection pgconn)
label sql state)
:post label sql)
;; return the pkey "upgrade" statement
pkey))))
@ -390,18 +385,18 @@
;;;
;;; Drop indexes before loading
;;;
(defun drop-indexes (state pgsql-index-list)
(defun drop-indexes (section pgsql-index-list)
"Drop indexes in PGSQL-INDEX-LIST. A PostgreSQL connection must already be
active when calling that function."
(loop :for index :in pgsql-index-list
:do (let ((sql (format-pgsql-drop-index index)))
(log-message :notice "~a" sql)
(pgsql-execute-with-timing "drop indexes" sql state))))
(pgsql-execute-with-timing section "drop indexes" sql))))
;;;
;;; Higher level API to care about indexes
;;;
(defun maybe-drop-indexes (target table-name state &key drop-indexes)
(defun maybe-drop-indexes (target table-name &key (section :pre) drop-indexes)
"Drop the indexes for TABLE-NAME on TARGET PostgreSQL connection, and
returns a list of indexes to create again."
(with-pgsql-connection (target)
@ -420,14 +415,13 @@
(indexes
;; drop the indexes now
(with-stats-collection ("drop indexes" :state state)
(drop-indexes state indexes))))
(with-stats-collection ("drop indexes" :section section)
(drop-indexes section indexes))))
;; and return the indexes list
indexes)))
(defun create-indexes-again (target indexes state state-parallel
&key drop-indexes)
(defun create-indexes-again (target indexes &key (section :post) drop-indexes)
"Create the indexes that we dropped previously."
(when (and indexes drop-indexes)
(let* ((*preserve-index-names* t)
@ -438,29 +432,27 @@
(idx-channel (let ((lp:*kernel* idx-kernel))
(lp:make-channel))))
(let ((pkeys
(create-indexes-in-kernel target indexes idx-kernel idx-channel
:state state-parallel)))
(create-indexes-in-kernel target indexes idx-kernel idx-channel)))
(with-stats-collection ("Index Build Completion" :state state)
(with-stats-collection ("Index Build Completion" :section section)
(loop :for idx :in indexes :do (lp:receive-result idx-channel)))
;; turn unique indexes into pkeys now
(with-pgsql-connection (target)
(with-stats-collection ("Constraints" :state state)
(with-stats-collection ("Constraints" :section section)
(loop :for sql :in pkeys
:when sql
:do (progn
(log-message :notice "~a" sql)
(pgsql-execute-with-timing "Constraints" sql state)))))))))
(pgsql-execute-with-timing section "Constraints" sql)))))))))
;;;
;;; Sequences
;;;
(defun reset-sequences (table-names &key pgconn state)
(defun reset-sequences (table-names &key pgconn (section :post))
"Reset all sequences created during this MySQL migration."
(log-message :notice "Reset sequences")
(with-stats-collection ("Reset Sequences"
:dbname (db-name pgconn)
:use-result-as-rows t
:state state)
:section section)
(reset-all-sequences pgconn :tables table-names)))

View File

@ -55,7 +55,7 @@
(condition (e)
(log-message :error "~a" e)
(pgstate-incf *state* (target copy) :errs 1))))
(update-stats :data (target copy) :errs 1))))
(defun map-push-queue (copy queue &optional pre-formatted)
"Apply MAP-ROWS on the COPY instance and a function of ROW that will push

View File

@ -126,7 +126,7 @@
lines we did read in the file."
(let ((projection (project-fields :fields fields :columns columns)))
(lambda (row)
(pgstate-incf *state* target :read 1)
(update-stats :data target :read 1)
;; cl-csv returns (nil) for an empty line
(if (or (null row)
(and (null (car row)) (null (cdr row))))
@ -137,11 +137,8 @@
(handler-case
(funcall projection row)
(condition (e)
(pgstate-incf *state* target :errs 1)
(log-message :error "Could not read line ~d: ~a"
(pgloader.utils::pgtable-read
(pgstate-get-table *state* target))
e)))))
(update-stats :data target :errs 1)
(log-message :error "Could not read input: ~a" e)))))
(when projected-vector
(funcall process-row-fn projected-vector)))))))

View File

@ -93,7 +93,7 @@
(condition (e)
(progn
(log-message :error "~a" e)
(pgstate-incf *state* (target copy) :errs 1))))))))))
(update-stats :data (target copy) :errs 1))))))))))
(defmethod copy-to-queue ((copy copy-copy) queue)
"Copy data from given COPY definition into lparallel.queue DATAQ"
@ -101,27 +101,18 @@
(defmethod copy-from ((copy copy-copy)
&key
state-before
state-after
state-indexes
truncate
disable-triggers
drop-indexes)
"Copy data from given COPY file definition into its PostgreSQL target table."
(let* ((summary (null *state*))
(*state* (or *state* (pgloader.utils:make-pgstate)))
(lp:*kernel* (make-kernel 2))
(let* ((lp:*kernel* (make-kernel 2))
(channel (lp:make-channel))
(queue (lq:make-queue :fixed-capacity *concurrent-batches*))
(indexes (maybe-drop-indexes (target-db copy)
(target copy)
state-before
:drop-indexes drop-indexes)))
(with-stats-collection ((target copy)
:dbname (db-name (target-db copy))
:state *state*
:summary summary)
(with-stats-collection ((target copy) :dbname (db-name (target-db copy)))
(lp:task-handler-bind ((error #'lp:invoke-transfer-error))
(log-message :notice "COPY ~a" (target copy))
(lp:submit-task channel #'copy-to-queue copy queue)
@ -144,6 +135,6 @@
finally (lp:end-kernel))))
;; re-create the indexes
(create-indexes-again (target-db copy) indexes state-after state-indexes
(create-indexes-again (target-db copy) indexes
:drop-indexes drop-indexes)))

View File

@ -126,7 +126,7 @@
(condition (e)
(progn
(log-message :error "~a" e)
(pgstate-incf *state* (target csv) :errs 1)))))))))
(update-stats :data (target csv) :errs 1)))))))))
(defmethod copy-to-queue ((csv copy-csv) queue)
"Copy data from given CSV definition into lparallel.queue DATAQ"
@ -134,26 +134,18 @@
(defmethod copy-from ((csv copy-csv)
&key
state-before
state-after
state-indexes
truncate
disable-triggers
drop-indexes)
"Copy data from given CSV file definition into its PostgreSQL target table."
(let* ((summary (null *state*))
(*state* (or *state* (pgloader.utils:make-pgstate)))
(lp:*kernel* (make-kernel 2))
(let* ((lp:*kernel* (make-kernel 2))
(channel (lp:make-channel))
(queue (lq:make-queue :fixed-capacity *concurrent-batches*))
(indexes (maybe-drop-indexes (target-db csv)
(target csv)
state-before
:drop-indexes drop-indexes)))
(with-stats-collection ((target csv)
:dbname (db-name (target-db csv))
:state *state* :summary summary)
(with-stats-collection ((target csv) :dbname (db-name (target-db csv)))
(lp:task-handler-bind () ;; ((error #'lp:invoke-transfer-error))
(log-message :notice "COPY ~a" (target csv))
(lp:submit-task channel #'copy-to-queue csv queue)
@ -176,5 +168,4 @@
finally (lp:end-kernel))))
;; re-create the indexes
(create-indexes-again (target-db csv) indexes state-after state-indexes
:drop-indexes drop-indexes)))
(create-indexes-again (target-db csv) indexes :drop-indexes drop-indexes)))

View File

@ -59,21 +59,15 @@
(defmethod copy-to-queue ((db3 copy-db3) queue)
"Copy data from DB3 file FILENAME into queue DATAQ"
(let ((read (pgloader.queue:map-push-queue db3 queue)))
(pgstate-incf *state* (target db3) :read read)))
(pgloader.queue:map-push-queue db3 queue))
(defmethod copy-from ((db3 copy-db3)
&key (kernel nil k-s-p) truncate disable-triggers)
(let* ((summary (null *state*))
(*state* (or *state* (pgloader.utils:make-pgstate)))
(lp:*kernel* (or kernel (make-kernel 2)))
(let* ((lp:*kernel* (or kernel (make-kernel 2)))
(channel (lp:make-channel))
(queue (lq:make-queue :fixed-capacity *concurrent-batches*)))
(with-stats-collection ((target db3)
:dbname (db-name (target-db db3))
:state *state*
:summary summary)
(with-stats-collection ((target db3) :dbname (db-name (target-db db3)))
(lp:task-handler-bind ((error #'lp:invoke-transfer-error))
(log-message :notice "COPY \"~a\" from '~a'" (target db3) (source db3))
(lp:submit-task channel #'copy-to-queue db3 queue)
@ -94,7 +88,6 @@
(defmethod copy-database ((db3 copy-db3)
&key
table-name
state-before
data-only
schema-only
(truncate t)
@ -105,9 +98,7 @@
(reset-sequences t))
"Open the DB3 and stream its content to a PostgreSQL database."
(declare (ignore create-indexes reset-sequences))
(let* ((summary (null *state*))
(*state* (or *state* (make-pgstate)))
(table-name (or table-name
(let* ((table-name (or table-name
(target db3)
(source db3))))
@ -116,9 +107,7 @@
(handler-case
(when (and (or create-tables schema-only) (not data-only))
(with-stats-collection ("create, truncate"
:state state-before
:summary summary)
(with-stats-collection ("create, truncate" :section :pre)
(with-pgsql-transaction (:pgconn (target-db db3))
(when create-tables
(with-schema (tname table-name)
@ -133,9 +122,4 @@
(return-from copy-database)))
(unless schema-only
(copy-from db3 :truncate truncate :disable-triggers disable-triggers))
;; and report the total time spent on the operation
(when summary
(report-full-summary "Total streaming time" *state*
:before state-before))))
(copy-from db3 :truncate truncate :disable-triggers disable-triggers))))

View File

@ -79,7 +79,7 @@
(condition (e)
(progn
(log-message :error "~a" e)
(pgstate-incf *state* (target fixed) :errs 1))))))))))
(update-stats :data (target fixed) :errs 1))))))))))
(defmethod copy-to-queue ((fixed copy-fixed) queue)
"Copy data from given FIXED definition into lparallel.queue DATAQ"
@ -87,27 +87,18 @@
(defmethod copy-from ((fixed copy-fixed)
&key
state-before
state-after
state-indexes
truncate
disable-triggers
drop-indexes)
"Copy data from given FIXED file definition into its PostgreSQL target table."
(let* ((summary (null *state*))
(*state* (or *state* (pgloader.utils:make-pgstate)))
(lp:*kernel* (make-kernel 2))
(let* ((lp:*kernel* (make-kernel 2))
(channel (lp:make-channel))
(queue (lq:make-queue :fixed-capacity *concurrent-batches*))
(indexes (maybe-drop-indexes (target-db fixed)
(target fixed)
state-before
:drop-indexes drop-indexes)))
(with-stats-collection ((target fixed)
:dbname (db-name (target-db fixed))
:state *state*
:summary summary)
(with-stats-collection ((target fixed) :dbname (db-name (target-db fixed)))
(lp:task-handler-bind () ;; ((error #'lp:invoke-transfer-error))
(log-message :notice "COPY ~a" (target fixed))
(lp:submit-task channel #'copy-to-queue fixed queue)
@ -130,6 +121,5 @@
finally (lp:end-kernel))))
;; re-create the indexes
(create-indexes-again (target-db fixed) indexes state-after state-indexes
:drop-indexes drop-indexes)))
(create-indexes-again (target-db fixed) indexes :drop-indexes drop-indexes)))

View File

@ -61,28 +61,22 @@
(with-connection (conn (source-db copy-ixf))
(let ((ixf (ixf:make-ixf-file :stream (conn-handle conn)))
(row-fn (lambda (row)
(pgstate-incf *state* (target copy-ixf) :read 1)
(update-stats :data (target copy-ixf) :read 1)
(funcall process-row-fn row))))
(ixf:read-headers ixf)
(ixf:map-data ixf row-fn))))
(defmethod copy-to-queue ((ixf copy-ixf) queue)
"Copy data from IXF file FILENAME into queue DATAQ"
(let ((read (pgloader.queue:map-push-queue ixf queue)))
(pgstate-incf *state* (target ixf) :read read)))
(pgloader.queue:map-push-queue ixf queue))
(defmethod copy-from ((ixf copy-ixf)
&key (kernel nil k-s-p) truncate disable-triggers)
(let* ((summary (null *state*))
(*state* (or *state* (pgloader.utils:make-pgstate)))
(lp:*kernel* (or kernel (make-kernel 2)))
(let* ((lp:*kernel* (or kernel (make-kernel 2)))
(channel (lp:make-channel))
(queue (lq:make-queue :fixed-capacity *concurrent-batches*)))
(with-stats-collection ((target ixf)
:dbname (db-name (target-db ixf))
:state *state*
:summary summary)
(with-stats-collection ((target ixf) :dbname (db-name (target-db ixf)))
(lp:task-handler-bind ((error #'lp:invoke-transfer-error))
(log-message :notice "COPY \"~a\" from '~a'" (target ixf) (source ixf))
(lp:submit-task channel #'copy-to-queue ixf queue)
@ -103,7 +97,6 @@
(defmethod copy-database ((ixf copy-ixf)
&key
table-name
state-before
data-only
schema-only
(truncate t)
@ -114,9 +107,7 @@
(reset-sequences t))
"Open the IXF and stream its content to a PostgreSQL database."
(declare (ignore create-indexes reset-sequences))
(let* ((summary (null *state*))
(*state* (or *state* (make-pgstate)))
(table-name (or table-name
(let* ((table-name (or table-name
(target ixf)
(source ixf))))
@ -125,9 +116,7 @@
(handler-case
(when (and (or create-tables schema-only) (not data-only))
(with-stats-collection ("create, truncate"
:state state-before
:summary summary)
(with-stats-collection ("create, truncate" :section :pre)
(with-pgsql-transaction (:pgconn (target-db ixf))
(when create-tables
(with-schema (tname table-name)
@ -142,10 +131,5 @@
(return-from copy-database)))
(unless schema-only
(copy-from ixf :truncate truncate :disable-triggers disable-triggers))
;; and report the total time spent on the operation
(when summary
(report-full-summary "Total streaming time" *state*
:before state-before))))
(copy-from ixf :truncate truncate :disable-triggers disable-triggers))))

View File

@ -37,7 +37,7 @@
table-name)))
(row-fn
(lambda (row)
(pgstate-incf *state* (target mssql) :read 1)
(update-stats :data (target mssql) :read 1)
(funcall process-row-fn row))))
(log-message :debug "~a" sql)
(handler-case
@ -45,7 +45,7 @@
((condition
#'(lambda (c)
(log-message :error "~a" c)
(pgstate-incf *state* (target mssql) :errs 1)
(update-stats :data (target mssql) :errs 1)
(invoke-restart 'mssql::use-nil))))
(mssql::map-query-results sql
:row-fn row-fn
@ -53,7 +53,7 @@
(condition (e)
(progn
(log-message :error "~a" e)
(pgstate-incf *state* (target mssql) :errs 1)))))))
(update-stats :data (target mssql) :errs 1)))))))
(defmethod copy-to-queue ((mssql copy-mssql) queue)
"Copy data from MSSQL table DBNAME.TABLE-NAME into queue DATAQ"
@ -62,19 +62,14 @@
(defmethod copy-from ((mssql copy-mssql)
&key (kernel nil k-s-p) truncate disable-triggers)
"Connect in parallel to MSSQL and PostgreSQL and stream the data."
(let* ((summary (null *state*))
(*state* (or *state* (pgloader.utils:make-pgstate)))
(lp:*kernel* (or kernel (make-kernel 2)))
(let* ((lp:*kernel* (or kernel (make-kernel 2)))
(channel (lp:make-channel))
(queue (lq:make-queue :fixed-capacity *concurrent-batches*))
(table-name (target mssql)))
;; we account stats against the target table-name, because that's all we
;; know on the PostgreSQL thread
(with-stats-collection (table-name
:dbname (db-name (target-db mssql))
:state *state*
:summary summary)
(with-stats-collection (table-name :dbname (db-name (target-db mssql)))
(lp:task-handler-bind ((error #'lp:invoke-transfer-error))
(log-message :notice "COPY ~a" table-name)
;; read data from Mssql
@ -97,7 +92,6 @@
(defun complete-pgsql-database (pgconn all-columns all-fkeys pkeys
&key
state
data-only
foreign-keys
reset-sequences)
@ -111,18 +105,17 @@
;;
(when reset-sequences
(let ((table-names (mapcar #'car (qualified-table-name-list all-columns))))
(reset-sequences table-names :pgconn pgconn :state state)))
(reset-sequences table-names :pgconn pgconn)))
;;
;; Turn UNIQUE indexes into PRIMARY KEYS now
;;
(with-pgsql-connection (pgconn)
(pgstate-add-table state (db-name pgconn) "Primary Keys")
(loop :for sql :in pkeys
:when sql
:do (progn
(log-message :notice "~a" sql)
(pgsql-execute-with-timing "Primary Keys" sql state)))
(pgsql-execute-with-timing :post "Primary Keys" sql)))
;;
;; Foreign Key Constraints
@ -132,22 +125,21 @@
;; and indexes are imported before doing that.
;;
(when (and foreign-keys (not data-only))
(pgstate-add-table state (db-name pgconn) "Foreign Keys")
(loop :for (schema . tables) :in all-fkeys
:do (loop :for (table-name . fkeys) :in tables
:do (loop :for (fk-name . fkey) :in fkeys
:for sql := (format-pgsql-create-fkey fkey)
:do (progn
(log-message :notice "~a;" sql)
(pgsql-execute-with-timing "Foreign Keys" sql state))))))))
(pgsql-execute-with-timing :post "Foreign Keys" sql))))))))
(defun fetch-mssql-metadata (mssql &key state including excluding)
(defun fetch-mssql-metadata (mssql &key including excluding)
"MS SQL introspection to prepare the migration."
(let (all-columns all-indexes all-fkeys)
(with-stats-collection ("fetch meta data"
:use-result-as-rows t
:use-result-as-read t
:state state)
:section :pre)
(with-connection (*mssql-db* (source-db mssql))
(setf all-columns (list-all-columns :including including
:excluding excluding))
@ -172,9 +164,6 @@
(defmethod copy-database ((mssql copy-mssql)
&key
state-before
state-after
state-indexes
(truncate nil)
(disable-triggers nil)
(data-only nil)
@ -189,19 +178,13 @@
including
excluding)
"Stream the given MS SQL database down to PostgreSQL."
(let* ((summary (null *state*))
(*state* (or *state* (make-pgstate)))
(idx-state (or state-indexes (make-pgstate)))
(state-before (or state-before (make-pgstate)))
(state-after (or state-after (make-pgstate)))
(cffi:*default-foreign-encoding* encoding)
(let* ((cffi:*default-foreign-encoding* encoding)
(copy-kernel (make-kernel 2))
idx-kernel idx-channel)
(destructuring-bind (&key all-columns all-indexes all-fkeys pkeys)
;; to prepare the run we need to fetch MS SQL meta-data
(fetch-mssql-metadata mssql
:state state-before
:including including
:excluding excluding)
@ -220,9 +203,7 @@
(handler-case
(cond ((and (or create-tables schema-only) (not data-only))
(log-message :notice "~:[~;DROP then ~]CREATE TABLES" include-drop)
(with-stats-collection ("create, truncate"
:state state-before
:summary summary)
(with-stats-collection ("create, truncate" :section :pre)
(with-pgsql-transaction (:pgconn (target-db mssql))
(loop :for (schema . tables) :in all-columns
:do (let ((schema (apply-identifier-case schema)))
@ -305,8 +286,7 @@
(create-indexes-in-kernel (target-db mssql)
(mapcar #'cdr indexes-with-names)
idx-kernel
idx-channel
:state idx-state)))))))
idx-channel)))))))
;; now end the kernels
(let ((lp:*kernel* copy-kernel)) (lp:end-kernel))
@ -314,7 +294,7 @@
;; wait until the indexes are done being built...
;; don't forget accounting for that waiting time.
(when (and create-indexes (not data-only))
(with-stats-collection ("Index Build Completion" :state *state*)
(with-stats-collection ("Index Build Completion" :section :post)
(loop for idx in all-indexes do (lp:receive-result idx-channel))))
(lp:end-kernel))
@ -323,14 +303,6 @@
;;
(complete-pgsql-database (new-pgsql-connection (target-db mssql))
all-columns all-fkeys pkeys
:state state-after
:data-only data-only
:foreign-keys foreign-keys
:reset-sequences reset-sequences)
;; and report the total time spent on the operation
(when summary
(report-full-summary "Total streaming time" *state*
:before state-before
:finally state-after
:parallel idx-state)))))
:reset-sequences reset-sequences))))

View File

@ -52,18 +52,18 @@
(sql (format nil "SELECT ~{~a~^, ~} FROM `~a`;" cols table-name))
(row-fn
(lambda (row)
(pgstate-incf *state* (target mysql) :read 1)
(update-stats :data (target mysql) :read 1)
(funcall process-row-fn row))))
(handler-bind
;; avoid trying to fetch the character at end-of-input position...
((babel-encodings:end-of-input-in-character
#'(lambda (c)
(pgstate-incf *state* (target mysql) :errs 1)
(update-stats :data (target mysql) :errs 1)
(log-message :error "~a" c)
(invoke-restart 'qmynd-impl::use-nil)))
(babel-encodings:character-decoding-error
#'(lambda (c)
(pgstate-incf *state* (target mysql) :errs 1)
(update-stats :data (target mysql) :errs 1)
(let ((encoding (babel-encodings:character-coding-error-encoding c))
(position (babel-encodings:character-coding-error-position c))
(character
@ -106,19 +106,14 @@
(defmethod copy-from ((mysql copy-mysql)
&key (kernel nil k-s-p) truncate disable-triggers)
"Connect in parallel to MySQL and PostgreSQL and stream the data."
(let* ((summary (null *state*))
(*state* (or *state* (pgloader.utils:make-pgstate)))
(lp:*kernel* (or kernel (make-kernel 2)))
(let* ((lp:*kernel* (or kernel (make-kernel 2)))
(channel (lp:make-channel))
(queue (lq:make-queue :fixed-capacity *concurrent-batches*))
(table-name (target mysql)))
;; we account stats against the target table-name, because that's all we
;; know on the PostgreSQL thread
(with-stats-collection (table-name
:dbname (db-name (target-db mysql))
:state *state*
:summary summary)
(with-stats-collection (table-name :dbname (db-name (target-db mysql)))
(lp:task-handler-bind ((error #'lp:invoke-transfer-error))
(log-message :notice "COPY ~a" table-name)
;; read data from MySQL
@ -150,7 +145,6 @@
all-columns all-indexes all-fkeys
materialize-views view-columns
&key
state
foreign-keys
include-drop)
"Prepare the target PostgreSQL database: create tables casting datatypes
@ -165,7 +159,7 @@
(length all-columns)
(loop for (name . idxs) in all-indexes sum (length idxs)))
(with-stats-collection ("create, drop" :use-result-as-rows t :state state)
(with-stats-collection ("create, drop" :use-result-as-rows t :section :pre)
(with-pgsql-transaction (:pgconn pgconn)
;; we need to first drop the Foreign Key Constraints, so that we
;; can DROP TABLE when asked
@ -188,7 +182,6 @@
(defun complete-pgsql-database (pgconn all-columns all-fkeys pkeys
table-comments column-comments
&key
state
data-only
foreign-keys
reset-sequences)
@ -201,18 +194,17 @@
;; while CREATE INDEX statements are in flight (avoid locking).
;;
(when reset-sequences
(reset-sequences (mapcar #'car all-columns) :pgconn pgconn :state state))
(reset-sequences (mapcar #'car all-columns) :pgconn pgconn))
(with-pgsql-connection (pgconn)
;;
;; Turn UNIQUE indexes into PRIMARY KEYS now
;;
(pgstate-add-table state (db-name pgconn) "Primary Keys")
(loop :for sql :in pkeys
:when sql
:do (progn
(log-message :notice "~a" sql)
(pgsql-execute-with-timing "Primary Keys" sql state)))
(pgsql-execute-with-timing :post "Primary Keys" sql)))
;;
;; Foreign Key Constraints
@ -222,19 +214,17 @@
;; and indexes are imported before doing that.
;;
(when (and foreign-keys (not data-only))
(pgstate-add-table state (db-name pgconn) "Foreign Keys")
(loop :for (table-name . fkeys) :in all-fkeys
:do (loop :for fkey :in fkeys
:for sql := (format-pgsql-create-fkey fkey)
:do (progn
(log-message :notice "~a;" sql)
(pgsql-execute-with-timing "Foreign Keys" sql state)))))
(pgsql-execute-with-timing :post "Foreign Keys" sql)))))
;;
;; And now, comments on tables and columns.
;;
(log-message :notice "Comments")
(pgstate-add-table state (db-name pgconn) "Comments")
(let* ((quote
;; just something improbably found in a table comment, to use as
;; dollar quoting, and generated at random at that.
@ -256,7 +246,7 @@
quote comment quote)
:do (progn
(log-message :log "~a" sql)
(pgsql-execute-with-timing "Comments" sql state)))
(pgsql-execute-with-timing :post "Comments" sql)))
(loop :for (table-name column-name comment) :in column-comments
:for sql := (format nil "comment on column ~a.~a is $~a$~a$~a$"
@ -265,11 +255,10 @@
quote comment quote)
:do (progn
(log-message :notice "~a;" sql)
(pgsql-execute-with-timing "Comments" sql state))))))
(pgsql-execute-with-timing :post "Comments" sql))))))
(defun fetch-mysql-metadata (mysql
&key
state
materialize-views
only-tables
including
@ -282,7 +271,7 @@
(with-stats-collection ("fetch meta data"
:use-result-as-rows t
:use-result-as-read t
:state state)
:section :pre)
(with-connection (*connection* (source-db mysql))
;; If asked to MATERIALIZE VIEWS, now is the time to create them in
;; MySQL, when given definitions rather than existing view names.
@ -350,9 +339,6 @@
;;;
(defmethod copy-database ((mysql copy-mysql)
&key
state-before
state-after
state-indexes
(truncate nil)
(disable-triggers nil)
(data-only nil)
@ -369,12 +355,7 @@
decoding-as
materialize-views)
"Export MySQL data and Import it into PostgreSQL"
(let* ((summary (null *state*))
(*state* (or *state* (make-pgstate)))
(idx-state (or state-indexes (make-pgstate)))
(state-before (or state-before (make-pgstate)))
(state-after (or state-after (make-pgstate)))
(copy-kernel (make-kernel 2))
(let* ((copy-kernel (make-kernel 2))
idx-kernel idx-channel)
(destructuring-bind (&key view-columns all-columns
@ -382,7 +363,6 @@
all-fkeys all-indexes pkeys)
;; to prepare the run, we need to fetch MySQL meta-data
(fetch-mysql-metadata mysql
:state state-before
:materialize-views materialize-views
:only-tables only-tables
:including including
@ -409,7 +389,6 @@
all-fkeys
materialize-views
view-columns
:state state-before
:foreign-keys foreign-keys
:include-drop include-drop))
(t
@ -479,8 +458,7 @@
(alexandria:appendf
pkeys
(create-indexes-in-kernel (target-db mysql)
indexes idx-kernel idx-channel
:state idx-state))))))
indexes idx-kernel idx-channel))))))
;; now end the kernels
(let ((lp:*kernel* copy-kernel)) (lp:end-kernel))
@ -488,7 +466,7 @@
;; wait until the indexes are done being built...
;; don't forget accounting for that waiting time.
(when (and create-indexes (not data-only))
(with-stats-collection ("Index Build Completion" :state *state*)
(with-stats-collection ("Index Build Completion" :section :post)
(loop for idx in all-indexes do (lp:receive-result idx-channel))))
(lp:end-kernel))
@ -505,14 +483,6 @@
(complete-pgsql-database (new-pgsql-connection (target-db mysql))
all-columns all-fkeys pkeys
table-comments column-comments
:state state-after
:data-only data-only
:foreign-keys foreign-keys
:reset-sequences reset-sequences)
;; and report the total time spent on the operation
(when summary
(report-full-summary "Total streaming time" *state*
:before state-before
:finally state-after
:parallel idx-state)))))
:reset-sequences reset-sequences))))

View File

@ -106,14 +106,14 @@
v)
counting t into rows
do (progn
(pgstate-incf *state* (target sqlite) :read 1)
(update-stats :data (target sqlite) :read 1)
(funcall process-row-fn row))
finally
(sqlite:finalize-statement statement)
(return rows))
(condition (e)
(log-message :error "~a" e)
(pgstate-incf *state* (target sqlite) :errs 1)))))))
(update-stats :data (target sqlite) :errs 1)))))))
(defmethod copy-to-queue ((sqlite copy-sqlite) queue)
@ -123,16 +123,11 @@
(defmethod copy-from ((sqlite copy-sqlite)
&key (kernel nil k-s-p) truncate disable-triggers)
"Stream the contents from a SQLite database table down to PostgreSQL."
(let* ((summary (null *state*))
(*state* (or *state* (pgloader.utils:make-pgstate)))
(lp:*kernel* (or kernel (make-kernel 2)))
(let* ((lp:*kernel* (or kernel (make-kernel 2)))
(channel (lp:make-channel))
(queue (lq:make-queue :fixed-capacity *concurrent-batches*)))
(with-stats-collection ((target sqlite)
:dbname (db-name (target-db sqlite))
:state *state*
:summary summary)
(with-stats-collection ((target sqlite) :dbname (db-name (target-db sqlite)))
(lp:task-handler-bind ((error #'lp:invoke-transfer-error))
(log-message :notice "COPY ~a" (target sqlite))
;; read data from SQLite
@ -151,17 +146,13 @@
(log-message :info "COPY ~a done." (target sqlite))
(unless k-s-p (lp:end-kernel)))))))
(defun fetch-sqlite-metadata (sqlite
&key
state
including
excluding)
(defun fetch-sqlite-metadata (sqlite &key including excluding)
"SQLite introspection to prepare the migration."
(let (all-columns all-indexes)
(with-stats-collection ("fetch meta data"
:use-result-as-rows t
:use-result-as-read t
:state state)
:section :pre)
(with-connection (conn (source-db sqlite))
(let ((*sqlite-db* (conn-handle conn)))
(setf all-columns (filter-column-list (list-all-columns *sqlite-db*)
@ -182,7 +173,6 @@
(defmethod copy-database ((sqlite copy-sqlite)
&key
state-before
data-only
schema-only
(truncate nil)
@ -197,20 +187,12 @@
(encoding :utf-8))
"Stream the given SQLite database down to PostgreSQL."
(declare (ignore only-tables))
(let* ((summary (null *state*))
(*state* (or *state* (make-pgstate)))
(state-before (or state-before (make-pgstate)))
(idx-state (make-pgstate))
(seq-state (make-pgstate))
(cffi:*default-foreign-encoding* encoding)
(let* ((cffi:*default-foreign-encoding* encoding)
(copy-kernel (make-kernel 2))
idx-kernel idx-channel)
(destructuring-bind (&key all-columns all-indexes pkeys)
(fetch-sqlite-metadata sqlite
:state state-before
:including including
:excluding excluding)
(fetch-sqlite-metadata sqlite :including including :excluding excluding)
(let ((max-indexes
(loop for (table . indexes) in all-indexes
@ -227,9 +209,7 @@
(handler-case
(cond ((and (or create-tables schema-only) (not data-only))
(log-message :notice "~:[~;DROP then ~]CREATE TABLES" include-drop)
(with-stats-collection ("create, truncate"
:state state-before
:summary summary)
(with-stats-collection ("create, truncate" :section :pre)
(with-pgsql-transaction (:pgconn (target-db sqlite))
(create-tables all-columns :include-drop include-drop))))
@ -271,8 +251,7 @@
(alexandria:appendf
pkeys
(create-indexes-in-kernel (target-db sqlite) indexes
idx-kernel idx-channel
:state idx-state))))))
idx-kernel idx-channel))))))
;; now end the kernels
(let ((lp:*kernel* copy-kernel)) (lp:end-kernel))
@ -280,7 +259,7 @@
;; wait until the indexes are done being built...
;; don't forget accounting for that waiting time.
(when (and create-indexes (not data-only))
(with-stats-collection ("index build completion" :state *state*)
(with-stats-collection ("index build completion" :section :post)
(loop for idx in all-indexes do (lp:receive-result idx-channel))))
(lp:end-kernel))
@ -288,12 +267,5 @@
;; the data.
(when reset-sequences
(reset-sequences (mapcar #'car all-columns)
:pgconn (target-db sqlite)
:state seq-state))
;; and report the total time spent on the operation
(report-full-summary "Total streaming time" *state*
:before state-before
:finally seq-state
:parallel idx-state))))
:pgconn (target-db sqlite))))))

View File

@ -18,8 +18,8 @@
(defvar *monitoring-channel* nil
"Internal lparallel channel.")
(defvar *sections* nil
"List of currently monitored activities (per category or concurrency.")
(defvar *sections* '(:pre nil :data nil :post nil)
"plist of load sections: :pre, :data and :post.")
;;;
@ -29,8 +29,9 @@
(defstruct stop stop-logger)
(defstruct noop)
(defstruct log-message category description arguments)
(defstruct new-label dbname section label)
(defstruct new-label section label dbname)
(defstruct update-stats section label read rows errs secs rs ws)
(defstruct bad-row section label condition data)
(defun log-message (category description &rest arguments)
"Send given message into our monitoring queue for processing."
@ -38,6 +39,59 @@
:description description
:arguments arguments)))
(defun new-label (section label &optional dbname)
"Send an event to create a new LABEL for registering a shared state under
SECTION."
(send-event (make-new-label :section section :label label :dbname dbname)))
(defun update-stats (section label &key read rows errs secs rs ws)
"Send an event to update stats for given SECTION and LABEL."
(send-event (make-update-stats :section section
:label label
:read read
:rows rows
:errs errs
:secs secs
:rs rs
:ws ws)))
(defun process-bad-row (table-name condition data)
"Send an event to log the bad row DATA in the reject and log files for given
TABLE-NAME (a label in section :data), for reason found in CONDITION."
(send-event (make-bad-row :section :data
:label table-name
:condition condition
:data data)))
;;;
;;; Easier API to manage statistics collection and state updates
;;;
(defmacro with-stats-collection ((table-name
&key
(section :data)
dbname
use-result-as-read
use-result-as-rows)
&body forms)
"Measure time spent in running BODY into STATE, accounting the seconds to
given DBNAME and TABLE-NAME"
(let ((result (gensym "result"))
(secs (gensym "secs")))
`(prog2
(new-label ,section ,table-name ,dbname)
(multiple-value-bind (,result ,secs)
(timing ,@forms)
(cond ((and ,use-result-as-read ,use-result-as-rows)
(update-stats ,section ,table-name
:read ,result :rows ,result :secs ,secs))
(,use-result-as-read
(update-stats ,section ,table-name :read ,result :secs ,secs))
(,use-result-as-rows
(update-stats ,section ,table-name :rows ,result :secs ,secs))
(t
(update-stats ,section ,table-name :secs ,secs)))
,result))))
;;;
;;; Now, the monitor thread management
@ -65,7 +119,9 @@
(*client-min-messages* . ,*client-min-messages*)
(*monitoring-queue* . ,*monitoring-queue*)
(*error-output* . ,*error-output*)
(*standard-output* . ,*standard-output*)))
(*standard-output* . ,*standard-output*)
(*summary-pathname* . ,*summary-pathname*)
(*sections* . ',*sections*)))
(lparallel:*kernel* (lp:make-kernel 1 :bindings bindings))
(*monitoring-channel* (lp:make-channel)))
@ -84,24 +140,29 @@
(defmacro with-monitor ((&key (start-logger t)) &body body)
"Start and stop the monitor around BODY code. The monitor is responsible
for processing logs into a central logfile"
`(if ,start-logger
(let* ((*monitoring-queue* (lq:make-queue))
(*monitoring-channel* (start-monitor :start-logger ,start-logger)))
(unwind-protect
,@body
(stop-monitor :channel *monitoring-channel*
:stop-logger ,start-logger)))
`(let ((*sections* (list :pre (make-pgstate)
:data (make-pgstate)
:post (make-pgstate))))
(if ,start-logger
(let* ((*monitoring-queue* (lq:make-queue))
(*monitoring-channel* (start-monitor :start-logger ,start-logger)))
(unwind-protect
,@body
(stop-monitor :channel *monitoring-channel*
:stop-logger ,start-logger)))
;; logger has already been started
(progn ,@body)))
;; logger has already been started
(progn ,@body))))
(defun monitor (queue)
"Receives and process messages from *monitoring-queue*."
;; process messages from the queue
(loop :for event := (multiple-value-bind (event available)
(lq:try-pop-queue queue)
(if available event (make-noop)))
(loop :with start-time := (get-internal-real-time)
:for event := (multiple-value-bind (event available)
(lq:try-pop-queue queue)
(if available event (make-noop)))
:do (typecase event
(start
(when (start-start-logger event)
@ -110,7 +171,22 @@
(stop
(cl-log:log-message :info "Stopping monitor")
(when (stop-stop-logger event) (pgloader.logs:stop-logger)))
;; report the summary now
(let* ((summary-stream (when *summary-pathname*
(open *summary-pathname*
:direction :output
:if-exists :rename
:if-does-not-exist :create)))
(*report-stream* (or summary-stream *standard-output*)))
(report-full-summary "Total import time"
*sections*
(elapsed-time-since start-time))
(when summary-stream (close summary-stream)))
;; time to shut down the logger?
(when (stop-stop-logger event)
(pgloader.logs:stop-logger)))
(noop
(sleep 0.2)) ; avoid buzy looping
@ -123,6 +199,54 @@
(log-message-description event)
(log-message-arguments event))
(log-message-description event))))
(cl-log:log-message (log-message-category event) "~a" mesg))))
(cl-log:log-message (log-message-category event) "~a" mesg)))
(new-label
(let ((label
(pgstate-new-label (getf *sections* (new-label-section event))
(new-label-label event))))
(when (eq :data (new-label-section event))
(pgtable-initialize-reject-files label
(new-label-dbname event)))))
(update-stats
;; it only costs an extra hash table lookup...
(pgstate-new-label (getf *sections* (update-stats-section event))
(update-stats-label event))
(pgstate-incf (getf *sections* (update-stats-section event))
(update-stats-label event)
:read (update-stats-read event)
:rows (update-stats-rows event)
:secs (update-stats-secs event)))
(bad-row
(%process-bad-row (bad-row-label event)
(bad-row-condition event)
(bad-row-data event))))
:until (typep event 'stop)))
;;;
;;; Internal utils
;;;
(defun elapsed-time-since (start)
"Return how many seconds ticked between START and now"
(let ((now (get-internal-real-time)))
(coerce (/ (- now start) internal-time-units-per-second) 'double-float)))
;;;
;;; Timing Macro
;;;
(defmacro timing (&body forms)
"return both how much real time was spend in body and its result"
(let ((start (gensym))
(end (gensym))
(result (gensym)))
`(let* ((,start (get-internal-real-time))
(,result (progn ,@forms))
(,end (get-internal-real-time)))
(values ,result (/ (- ,end ,start) internal-time-units-per-second)))))

View File

@ -2,7 +2,7 @@
;;; Pretty print a report while doing bulk operations
;;;
(in-package :pgloader.utils)
(in-package :pgloader.state)
(defvar *header-line*
"~&~v@{~A~:*~} --------- --------- --------- --------------")
@ -63,6 +63,31 @@
"Return the format string to use for a given TYPE of output and KEY."
(getf (cadr (assoc type *header-format-strings*)) key))
;;;
;;; Timing Formating
;;;
(defun format-interval (seconds &optional (stream t))
"Output the number of seconds in a human friendly way"
(multiple-value-bind (years months days hours mins secs millisecs)
(date:decode-interval (date:encode-interval :second seconds))
(declare (ignore millisecs))
(format
stream
"~:[~*~;~d years ~]~:[~*~;~d months ~]~:[~*~;~d days ~]~:[~*~;~dh~]~:[~*~;~dm~]~5,3fs"
(< 0 years) years
(< 0 months) months
(< 0 days) days
(< 0 hours) hours
(< 0 mins) mins
(+ secs (- (multiple-value-bind (r q)
(truncate seconds 60)
(declare (ignore r))
q)
secs)))))
;;;
;;; Pretty printing reports in several formats
;;;
(defun report-header ()
;; (apply #'format *report-stream* *header-cols-format* *header-cols-names*)
(format *report-stream*
@ -106,7 +131,7 @@
;;;
;;; Pretty print the whole summary from a state
;;;
(defun report-summary (&key ((:state pgstate) *state*) (header t) footer)
(defun report-summary (pgstate &key (header t) footer)
"Report a whole summary."
(when header (report-header))
(loop
@ -122,34 +147,6 @@
finally (when footer
(report-pgstate-stats pgstate footer))))
(defmacro with-stats-collection ((table-name
&key
dbname
summary
use-result-as-read
use-result-as-rows
((:state pgstate) *state*))
&body forms)
"Measure time spent in running BODY into STATE, accounting the seconds to
given DBNAME and TABLE-NAME"
(let ((result (gensym "result"))
(secs (gensym "secs")))
`(prog2
(pgstate-add-table ,pgstate ,dbname ,table-name)
(multiple-value-bind (,result ,secs)
(timing ,@forms)
(cond ((and ,use-result-as-read ,use-result-as-rows)
(pgstate-incf ,pgstate ,table-name
:read ,result :rows ,result :secs ,secs))
(,use-result-as-read
(pgstate-incf ,pgstate ,table-name :read ,result :secs ,secs))
(,use-result-as-rows
(pgstate-incf ,pgstate ,table-name :rows ,result :secs ,secs))
(t
(pgstate-incf ,pgstate ,table-name :secs ,secs)))
,result)
(when ,summary (report-summary)))))
(defun parse-summary-type (&optional (pathname *summary-pathname*))
"Return the summary type we want: human-readable, csv, json."
(when pathname
@ -158,25 +155,30 @@
((string= "copy" (pathname-type pathname)) :copy)
(t :human-readable))))
(defun report-full-summary (legend state
&key before finally parallel start-time)
(defun max-length-table-name (legend data pre post)
"Compute the max length of a table-name in the legend."
(reduce #'max
(mapcar #'length
(mapcar #'format-table-name
(append (pgstate-tabnames data)
(pgstate-tabnames pre)
(pgstate-tabnames post)
(list legend))))))
(defun report-full-summary (legend sections total-secs)
"Report the full story when given three different sections of reporting."
(let* ((stype (or (parse-summary-type *summary-pathname*)
(let* ((data (getf sections :data))
(pre (getf sections :pre))
(post (getf sections :post))
(stype (or (parse-summary-type *summary-pathname*)
:human-readable))
(*header* (get-format-for stype :header))
(*footer* (get-format-for stype :footer))
(*end-of-line-format* (get-format-for stype :end-of-line-format))
(*header-line* (get-format-for stype :header-line))
(*max-length-table-name*
(reduce #'max
(mapcar #'length
(mapcar #'format-table-name
(append (pgstate-tabnames state)
(when before (pgstate-tabnames before))
(when finally (pgstate-tabnames finally))
(when parallel (pgstate-tabnames parallel))
(list legend))))))
(*max-length-table-name* (max-length-table-name legend data pre post))
(*header-tname-format* (get-format-for stype :header-tname-format))
(*header-stats-format* (get-format-for stype :header-stats-format))
(*header-cols-format* (get-format-for stype :header-cols-format))
@ -185,39 +187,19 @@
(when *header*
(format *report-stream* *header*))
;; BEFORE
(if before
(progn
(report-summary :state before :footer nil)
(format *report-stream* *header-line* *max-length-table-name* "-")
(report-summary :state state :header nil :footer nil))
;; no state before
(report-summary :state state :footer nil))
(when (and pre (pgstate-tabnames pre))
(report-summary pre :footer nil)
(format *report-stream* *header-line* *max-length-table-name* "-"))
(when (or finally parallel)
(report-summary data :header (null pre) :footer nil)
(when (and post (pgstate-tabnames post))
(format *report-stream* *header-line* *max-length-table-name* "-")
(when parallel
(report-summary :state parallel :header nil :footer nil))
(when finally
(report-summary :state finally :header nil :footer nil)))
(report-summary post :header nil :footer nil))
;; add to the grand total the other sections, except for the parallel one
(incf (pgloader.utils::pgstate-secs state)
(+ (if before (pgloader.utils::pgstate-secs before) 0)
(if finally (pgloader.utils::pgstate-secs finally) 0)))
;; if the parallel tasks took longer than the rest cumulated, the total
;; waiting time actually was parallel - before
(if start-time
(setf (pgloader.utils::pgstate-secs state)
(pgloader.utils::elapsed-time-since start-time))
(when (and parallel
(< (pgloader.utils::pgstate-secs state)
(pgloader.utils::pgstate-secs parallel)))
(setf (pgloader.utils::pgstate-secs state)
(- (pgloader.utils::pgstate-secs parallel)
(if before (pgloader.utils::pgstate-secs before) 0)))))
;; replace the grand total now
(setf (pgstate-secs data) total-secs)
;; and report the Grand Total
(report-pgstate-stats state legend)))
(report-pgstate-stats data legend)))

View File

@ -3,7 +3,7 @@
;;; the load: number of lines read, imported and number of errors found
;;; along the way.
;;;
(in-package :pgloader.utils)
(in-package :pgloader.state)
;;;
;;; Data Structures to maintain information about loading state
@ -26,51 +26,71 @@
(defun format-table-name (table-name)
"TABLE-NAME might be a CONS of a schema name and a table name."
(typecase table-name
(etypecase table-name
(cons (format nil "~a.~a" (car table-name) (cdr table-name)))
(string table-name)))
(defun pgstate-get-table (pgstate name)
(defun relative-pathname (filename type &optional dbname)
"Return the pathname of a file of type TYPE (dat or log) under *ROOT-DIR*"
(let ((dir (if dbname
(uiop:merge-pathnames*
(uiop:make-pathname* :directory `(:relative ,dbname))
*root-dir*)
*root-dir*)))
(make-pathname :defaults dir :name filename :type type)))
(defun reject-data-file (table-name dbname)
"Return the pathname to the reject file for STATE entry."
(relative-pathname table-name "dat" dbname))
(defun reject-log-file (table-name dbname)
"Return the pathname to the reject file for STATE entry."
(relative-pathname table-name "log" dbname))
(defmethod pgtable-initialize-reject-files ((table pgtable) dbname)
"Prepare TABLE for being able to deal with rejected rows (log them)."
(let* ((table-name (format-table-name (pgtable-name table)))
(data-pathname (reject-data-file table-name dbname))
(logs-pathname (reject-log-file table-name dbname)))
;; we also use that facility for things that are not tables
;; such as "fetch" or "before load" or "Create Indexes"
(when dbname
;; create the per-database directory if it does not exists yet
(ensure-directories-exist (uiop:pathname-directory-pathname data-pathname))
;; rename the existing files if there are some
(when (probe-file data-pathname)
(with-open-file (data data-pathname
:direction :output
:if-exists :rename
:if-does-not-exist nil)))
(when (probe-file logs-pathname)
(with-open-file (logs logs-pathname
:direction :output
:if-exists :rename
:if-does-not-exist nil)))
;; set the properties to the right pathnames
(setf (pgtable-reject-data table) data-pathname
(pgtable-reject-logs table) logs-pathname))))
(defun pgstate-get-label (pgstate name)
(gethash name (pgstate-tables pgstate)))
(defun pgstate-add-table (pgstate dbname table-name)
(defun pgstate-new-label (pgstate label)
"Instanciate a new pgtable structure to hold our stats, and return it."
(or (pgstate-get-table pgstate table-name)
(let* ((table (setf (gethash table-name (pgstate-tables pgstate))
(make-pgtable :name table-name)))
(reject-dir (merge-pathnames (format nil "~a/" dbname) *root-dir*))
(filename (format-table-name table-name))
(data-pathname (make-pathname :defaults reject-dir
:name filename :type "dat"))
(logs-pathname (make-pathname :defaults reject-dir
:name filename :type "log")))
(or (pgstate-get-label pgstate label)
(let* ((pgtable (setf (gethash label (pgstate-tables pgstate))
(make-pgtable :name label))))
;; maintain the ordering
(push table-name (pgstate-tabnames pgstate))
(push label (pgstate-tabnames pgstate))
;; create the per-database directory if it does not exists yet
(ensure-directories-exist reject-dir)
;; rename the existing files if there are some
(when (probe-file data-pathname)
(with-open-file (data data-pathname
:direction :output
:if-exists :rename
:if-does-not-exist nil)))
(when (probe-file logs-pathname)
(with-open-file (logs logs-pathname
:direction :output
:if-exists :rename
:if-does-not-exist nil)))
;; set the properties to the right pathnames
(setf (pgtable-reject-data table) data-pathname
(pgtable-reject-logs table) logs-pathname)
table)))
pgtable)))
(defun pgstate-setf (pgstate name &key read rows errs secs)
(let ((pgtable (pgstate-get-table pgstate name)))
(let ((pgtable (pgstate-get-label pgstate name)))
(when read
(setf (pgtable-read pgtable) read)
(incf (pgstate-read pgstate) read))
@ -86,7 +106,7 @@
pgtable))
(defun pgstate-incf (pgstate name &key read rows errs secs)
(let ((pgtable (pgstate-get-table pgstate name)))
(let ((pgtable (pgstate-get-label pgstate name)))
(when read
(incf (pgtable-read pgtable) read)
(incf (pgstate-read pgstate) read))
@ -102,7 +122,7 @@
pgtable))
(defun pgstate-decf (pgstate name &key read rows errs secs)
(let ((pgtable (pgstate-get-table pgstate name)))
(let ((pgtable (pgstate-get-label pgstate name)))
(when read
(decf (pgtable-read pgtable) read)
(decf (pgstate-read pgstate) read))

View File

@ -12,7 +12,6 @@
(*copy-batch-size* . ,*copy-batch-size*)
(*concurrent-batches* . ,*concurrent-batches*)
(*pg-settings* . ',*pg-settings*)
(*state* . ,*state*)
(*fd-path-root* . ,*fd-path-root*)
(*client-min-messages* . ,*client-min-messages*)
(*log-min-messages* . ,*log-min-messages*)

View File

@ -3,46 +3,6 @@
;;;
(in-package :pgloader.utils)
;;;
;;; Timing Macro
;;;
(defun elapsed-time-since (start)
"Return how many seconds ticked between START and now"
(let ((now (get-internal-real-time)))
(coerce (/ (- now start) internal-time-units-per-second) 'double-float)))
(defmacro timing (&body forms)
"return both how much real time was spend in body and its result"
(let ((start (gensym))
(end (gensym))
(result (gensym)))
`(let* ((,start (get-internal-real-time))
(,result (progn ,@forms))
(,end (get-internal-real-time)))
(values ,result (/ (- ,end ,start) internal-time-units-per-second)))))
;;;
;;; Timing Formating
;;;
(defun format-interval (seconds &optional (stream t))
"Output the number of seconds in a human friendly way"
(multiple-value-bind (years months days hours mins secs millisecs)
(date:decode-interval (date:encode-interval :second seconds))
(declare (ignore millisecs))
(format
stream
"~:[~*~;~d years ~]~:[~*~;~d months ~]~:[~*~;~d days ~]~:[~*~;~dh~]~:[~*~;~dm~]~5,3fs"
(< 0 years) years
(< 0 months) months
(< 0 days) days
(< 0 hours) hours
(< 0 mins) mins
(+ secs (- (multiple-value-bind (r q)
(truncate seconds 60)
(declare (ignore r))
q)
secs)))))
;;;
;;; Camel Case converter
;;;

View File

@ -59,5 +59,5 @@ LOAD ARCHIVE
fields escaped by double-quote,
fields terminated by ','
FINALLY DO
$$ create index blocks_ip4r_idx on geolite.blocks using gist(iprange); $$;
AFTER LOAD DO
$$ create index blocks_ip4r_idx on geolite.blocks using gist(iprange); $$;