Review and cleanup the logging monitor thread.

Due to errors in regression testing when using CCL, review this part of
pgloader. It turns out that cl-log:stop-messenger on a text-stream-messenger
closes the stream, which isn't a good idea when given *standard-output*.

At least it makes CCL chokes when it then wants to output something of its
own, such as when running in --batch mode (which is nice because it outputs
more diagnostic information).

To solve that problem, initialize the text-stream-messenger with a broadcast
stream made from *standard-output*, which we now may close at will.
This commit is contained in:
Dimitri Fontaine 2017-09-08 23:03:41 +02:00
parent e7f6505d7d
commit ebf9f7a6a9
5 changed files with 324 additions and 303 deletions

View File

@ -1,254 +1,256 @@
;;;; pgloader.asd
(asdf:defsystem #:pgloader
:serial t
:description "Load data into PostgreSQL"
:author "Dimitri Fontaine <dimitri@2ndQuadrant.fr>"
:license "The PostgreSQL Licence"
:depends-on (#:uiop ; host system integration
#:cl-log ; logging
#:postmodern ; PostgreSQL protocol implementation
#:cl-postgres ; low level bits for COPY streaming
#:simple-date ; FIXME: recheck dependency
#:qmynd ; MySQL protocol implemenation
#:split-sequence ; some parsing is made easy
#:cl-csv ; full CSV reader
#:cl-fad ; file and directories
#:lparallel ; threads, workers, queues
#:esrap ; parser generator
#:alexandria ; utils
#:drakma ; http client, download archives
#:flexi-streams ; streams
#:usocket ; UDP / syslog
#:local-time ; UDP date parsing
#:command-line-arguments ; for the main function
#:abnf ; ABNF parser generator (for syslog)
#:db3 ; DBF version 3 file reader
#:ixf ; IBM IXF file format reader
#:py-configparser ; Read old-style INI config files
#:sqlite ; Query a SQLite file
#:cl-base64 ; Decode base64 data
#:trivial-backtrace ; For --debug cli usage
#:cl-markdown ; To produce the website
#:metabang-bind ; the bind macro
#:mssql ; M$ SQL connectivity
#:uuid ; Transforming MS SQL unique identifiers
#:quri ; decode URI parameters
#:cl-ppcre ; Perl Compatible Regular Expressions
#:cl-mustache ; Logic-less templates
#:yason ; JSON routines
#:closer-mop ; introspection
)
:components
((:module "src"
:components
((:file "params")
(:file "package" :depends-on ("params"))
:serial t
:description "Load data into PostgreSQL"
:author "Dimitri Fontaine <dimitri@2ndQuadrant.fr>"
:license "The PostgreSQL Licence"
:depends-on (#:uiop ; host system integration
#:cl-log ; logging
#:postmodern ; PostgreSQL protocol implementation
#:cl-postgres ; low level bits for COPY streaming
#:simple-date ; FIXME: recheck dependency
#:qmynd ; MySQL protocol implemenation
#:split-sequence ; some parsing is made easy
#:cl-csv ; full CSV reader
#:cl-fad ; file and directories
#:lparallel ; threads, workers, queues
#:esrap ; parser generator
#:alexandria ; utils
#:drakma ; http client, download archives
#:flexi-streams ; streams
#:usocket ; UDP / syslog
#:local-time ; UDP date parsing
#:command-line-arguments ; for the main function
#:abnf ; ABNF parser generator (for syslog)
#:db3 ; DBF version 3 file reader
#:ixf ; IBM IXF file format reader
#:py-configparser ; Read old-style INI config files
#:sqlite ; Query a SQLite file
#:cl-base64 ; Decode base64 data
#:trivial-backtrace ; For --debug cli usage
#:cl-markdown ; To produce the website
#:metabang-bind ; the bind macro
#:mssql ; M$ SQL connectivity
#:uuid ; Transforming MS SQL unique identifiers
#:quri ; decode URI parameters
#:cl-ppcre ; Perl Compatible Regular Expressions
#:cl-mustache ; Logic-less templates
#:yason ; JSON routines
#:closer-mop ; introspection
)
:components
((:module "src"
:components
((:file "params")
(:file "package" :depends-on ("params"))
(:module "monkey"
:components
((:file "bind")
(:file "mssql")))
(:module "monkey"
:components
((:file "bind")
(:file "mssql")))
(:module "utils"
:depends-on ("package" "params")
:components
((:file "charsets")
(:file "batch")
(:file "logs")
(:file "utils")
(:file "state")
(:module "utils"
:depends-on ("package" "params")
:components
((:file "charsets")
(:file "batch")
(:file "logs")
(:file "utils")
(:file "state")
;; user defined transforms package and pgloader
;; provided ones
(:file "transforms")
;; user defined transforms package and pgloader
;; provided ones
(:file "transforms")
;; PostgreSQL related utils
(:file "read-sql-files")
(:file "queries")
(:file "quoting")
(:file "catalog" :depends-on ("quoting"))
(:file "alter-table" :depends-on ("catalog"))
;; PostgreSQL related utils
(:file "read-sql-files")
(:file "queries")
(:file "quoting")
(:file "catalog" :depends-on ("quoting"))
(:file "alter-table" :depends-on ("catalog"))
;; State, monitoring, reporting
(:file "reject" :depends-on ("state"))
(:file "pretty-print-state" :depends-on ("state"))
(:file "report" :depends-on ("state"
"pretty-print-state"
"utils"
"catalog"))
(:file "monitor" :depends-on ("logs"
"state"
"reject"
"report"))
(:file "threads" :depends-on ("monitor"))
(:file "archive" :depends-on ("logs"))
;; State, monitoring, reporting
(:file "reject" :depends-on ("state"))
(:file "pretty-print-state" :depends-on ("state"))
(:file "report" :depends-on ("state"
"pretty-print-state"
"utils"
"catalog"))
(:file "monitor" :depends-on ("logs"
"state"
"reject"
"report"))
(:file "threads" :depends-on ("monitor"))
(:file "archive" :depends-on ("monitor"))
;; generic connection api
(:file "connection" :depends-on ("archive"))))
;; generic connection api
(:file "connection" :depends-on ("monitor"
"archive"))))
;; package pgloader.pgsql
(:module pgsql
:depends-on ("package" "params" "utils")
:serial t
:components
((:file "copy-format")
(:file "connection")
(:file "pgsql-ddl")
(:file "pgsql-schema")
(:file "merge-catalogs" :depends-on ("pgsql-schema"))
(:file "pgsql-trigger")
(:file "pgsql-index-filter")
(:file "pgsql-create-schema" :depends-on ("pgsql-trigger"))
(:file "retry-batch")
(:file "copy-from-queue"
:depends-on ("copy-format"
"connection"
"retry-batch"
"pgsql-create-schema"
"pgsql-schema"))))
;; package pgloader.pgsql
(:module pgsql
:depends-on ("package" "params" "utils")
:serial t
:components
((:file "copy-format")
(:file "connection")
(:file "pgsql-ddl")
(:file "pgsql-schema")
(:file "merge-catalogs" :depends-on ("pgsql-schema"))
(:file "pgsql-trigger")
(:file "pgsql-index-filter")
(:file "pgsql-create-schema" :depends-on ("pgsql-trigger"))
(:file "retry-batch")
(:file "copy-from-queue"
:depends-on ("copy-format"
"connection"
"retry-batch"
"pgsql-create-schema"
"pgsql-schema"))))
;; Source format specific implementations
(:module sources
:depends-on ("monkey" ; mssql driver patches
"params"
"package"
"pgsql"
"utils")
:components
((:module "common"
:components
((:file "api")
(:file "methods" :depends-on ("api"))
(:file "md-methods" :depends-on ("api"))
(:file "db-methods" :depends-on ("api"))
(:file "casting-rules")
(:file "files-and-pathnames")
(:file "project-fields")))
;; Source format specific implementations
(:module sources
:depends-on ("monkey" ; mssql driver patches
"params"
"package"
"pgsql"
"utils")
:components
((:module "common"
:components
((:file "api")
(:file "methods" :depends-on ("api"))
(:file "md-methods" :depends-on ("api"))
(:file "db-methods" :depends-on ("api"))
(:file "casting-rules")
(:file "files-and-pathnames")
(:file "project-fields")))
(:module "csv"
:depends-on ("common")
:components
((:file "csv-guess")
;; (:file "csv-database")
(:file "csv")))
(:module "csv"
:depends-on ("common")
:components
((:file "csv-guess")
;; (:file "csv-database")
(:file "csv")))
(:file "fixed"
:depends-on ("common" "csv"))
(:file "fixed"
:depends-on ("common" "csv"))
(:file "copy"
:depends-on ("common" "csv"))
(:file "copy"
:depends-on ("common" "csv"))
(:module "db3"
:depends-on ("common" "csv")
:components
((:file "db3-schema")
(:file "db3" :depends-on ("db3-schema"))))
(:module "db3"
:depends-on ("common" "csv")
:components
((:file "db3-schema")
(:file "db3" :depends-on ("db3-schema"))))
(:module "ixf"
:depends-on ("common")
:components
((:file "ixf-schema")
(:file "ixf" :depends-on ("ixf-schema"))))
(:module "ixf"
:depends-on ("common")
:components
((:file "ixf-schema")
(:file "ixf" :depends-on ("ixf-schema"))))
;(:file "syslog") ; experimental...
;(:file "syslog") ; experimental...
(:module "sqlite"
:depends-on ("common")
:components
((:file "sqlite-cast-rules")
(:file "sqlite-schema"
:depends-on ("sqlite-cast-rules"))
(:file "sqlite"
:depends-on ("sqlite-cast-rules"
"sqlite-schema"))))
(:module "sqlite"
:depends-on ("common")
:components
((:file "sqlite-cast-rules")
(:file "sqlite-schema"
:depends-on ("sqlite-cast-rules"))
(:file "sqlite"
:depends-on ("sqlite-cast-rules"
"sqlite-schema"))))
(:module "mssql"
:depends-on ("common")
:components
((:file "mssql-cast-rules")
(:file "mssql-schema"
:depends-on ("mssql-cast-rules"))
(:file "mssql"
:depends-on ("mssql-cast-rules"
"mssql-schema"))
(:file "mssql-index-filters"
:depends-on ("mssql"))))
(:module "mssql"
:depends-on ("common")
:components
((:file "mssql-cast-rules")
(:file "mssql-schema"
:depends-on ("mssql-cast-rules"))
(:file "mssql"
:depends-on ("mssql-cast-rules"
"mssql-schema"))
(:file "mssql-index-filters"
:depends-on ("mssql"))))
(:module "mysql"
:depends-on ("common")
:components
((:file "mysql-cast-rules")
(:file "mysql-connection")
(:file "mysql-schema"
:depends-on ("mysql-connection"
"mysql-cast-rules"))
;; (:file "mysql-csv"
;; :depends-on ("mysql-schema"))
(:file "mysql"
:depends-on ("mysql-cast-rules"
"mysql-schema"))))))
(:module "mysql"
:depends-on ("common")
:components
((:file "mysql-cast-rules")
(:file "mysql-connection")
(:file "mysql-schema"
:depends-on ("mysql-connection"
"mysql-cast-rules"))
;; (:file "mysql-csv"
;; :depends-on ("mysql-schema"))
(:file "mysql"
:depends-on ("mysql-cast-rules"
"mysql-schema"))))))
(:module "parsers"
:depends-on ("params"
"package"
"utils"
"pgsql"
"sources"
"monkey")
:serial t
:components
((:file "parse-ini")
(:file "template")
(:file "command-utils")
(:file "command-keywords")
(:file "command-regexp")
(:file "parse-pgpass")
(:file "command-db-uri")
(:file "command-source")
(:file "command-options")
(:file "command-sql-block")
(:file "command-sexp")
(:file "command-csv")
(:file "command-ixf")
(:file "command-fixed")
(:file "command-copy")
(:file "command-dbf")
(:file "command-cast-rules")
(:file "command-materialize-views")
(:file "command-alter-table")
(:file "command-mysql")
(:file "command-including-like")
(:file "command-mssql")
(:file "command-sqlite")
(:file "command-archive")
(:file "command-parser")
(:file "date-format")))
(:module "parsers"
:depends-on ("params"
"package"
"utils"
"pgsql"
"sources"
"monkey")
:serial t
:components
((:file "parse-ini")
(:file "template")
(:file "command-utils")
(:file "command-keywords")
(:file "command-regexp")
(:file "parse-pgpass")
(:file "command-db-uri")
(:file "command-source")
(:file "command-options")
(:file "command-sql-block")
(:file "command-sexp")
(:file "command-csv")
(:file "command-ixf")
(:file "command-fixed")
(:file "command-copy")
(:file "command-dbf")
(:file "command-cast-rules")
(:file "command-materialize-views")
(:file "command-alter-table")
(:file "command-mysql")
(:file "command-including-like")
(:file "command-mssql")
(:file "command-sqlite")
(:file "command-archive")
(:file "command-parser")
(:file "date-format")))
(:module "regress"
:depends-on ("params" "package" "utils" "pgsql")
:components ((:file "regress")))
;; the main entry file, used when building a stand-alone
;; executable image
(:file "api" :depends-on ("params"
"package"
"utils"
"parsers"
"sources"))
;; the main entry file, used when building a stand-alone
;; executable image
(:file "api" :depends-on ("params"
"package"
"utils"
"parsers"
"sources"))
(:module "regress"
:depends-on ("params" "package" "utils" "pgsql" "api")
:components ((:file "regress")))
(:file "main" :depends-on ("params"
"package"
"utils"
"parsers"
"sources"
"api"
"regress"))))
;; to produce the website
(:module "docs"
:components
((:module src
:components
((:file "docs")))))))
(:file "main" :depends-on ("params"
"package"
"utils"
"parsers"
"sources"
"api"
"regress"))))
;; to produce the website
(:module "docs"
:components
((:module src
:components
((:file "docs")))))))

View File

@ -301,7 +301,8 @@
;; conditions being signaled.
(handler-bind
(((and condition (not (or cli-parsing-error
source-definition-error)))
source-definition-error
regression-test-error)))
#'(lambda (condition)
(format *error-output* "KABOOM!~%")
(format *error-output* "FATAL error: ~a~%~a~%~%"
@ -316,7 +317,6 @@
(cond
((and regress (= 1 (length arguments)))
;; run a regression test
(process-regression-test (first arguments)))
(regress
@ -356,18 +356,15 @@
((or cli-parsing-error source-definition-error) (c)
(format *error-output* "~%~a~%~%" c)
(let ((lp:*kernel* *monitoring-kernel*))
(lp:end-kernel :wait t))
(uiop:quit +os-code-error-bad-source+))
(regression-test-error (c)
(format *error-output* "~%~a~%~%" c)
(uiop:quit +os-code-error-regress+))
(condition (c)
(format *error-output* "~%What I am doing here?~%~%")
(format *error-output* "~a~%~%" c)
;; wait until monitor stops...
(format *error-output*
"~%Waiting for the monitor thread to complete.~%~%")
(let ((lp:*kernel* *monitoring-kernel*))
(lp:end-kernel :wait t))
(uiop:quit +os-code-error+)))))
;; done.

View File

@ -7,6 +7,13 @@
(in-package #:pgloader)
(define-condition regression-test-error (error)
((filename :initarg :filename :reader regression-test-filename))
(:report (lambda (err stream)
(format stream
"Regression test failed: ~s"
(regression-test-filename err)))))
(defun process-regression-test (load-file &key start-logger)
"Run a regression test for given LOAD-FILE."
(unless (probe-file load-file)
@ -99,15 +106,13 @@
(diff-count (pomo:query sql :single)))
(log-message :notice "~a" sql)
(log-message :notice "Got a diff of ~a rows" diff-count)
(if (= 0 diff-count)
(progn
(log-message :log "Regress pass.")
#-pgloader-image (values diff-count +os-code-success+)
#+pgloader-image (uiop:quit +os-code-success+))
(progn
(log-message :log "Regress fail.")
#-pgloader-image (values diff-count +os-code-error-regress+)
#+pgloader-image (uiop:quit +os-code-error-regress+)))))))))
;; signal a regression test error when diff isn't 0
(unless (zerop diff-count)
(error 'regression-test-error :filename load-file))
(log-message :log "Regress pass.")
(values diff-count +os-code-success+)))))))
;;;

View File

@ -41,7 +41,7 @@
(push (cl-log:start-messenger 'text-stream-messenger
:name "stdout"
:filter *client-min-messages*
:stream *standard-output*)
:stream (make-broadcast-stream *standard-output*))
*log-messengers*)
(cl-log:log-message :notice "Starting pgloader, log system is ready."))

View File

@ -139,7 +139,8 @@
;; make our kernel and channel visible from the outside
(setf *monitoring-kernel* kernel
*monitoring-channel* (lp:make-channel))
*monitoring-channel* (lp:make-channel)
*monitoring-queue* (lq:make-queue))
;; warm up the channel to ensure we don't loose any event
(lp:submit-task *monitoring-channel* '+ 1 2 3)
@ -149,31 +150,37 @@
(lp:submit-task *monitoring-channel* #'monitor *monitoring-queue*)
(send-event (make-start :start-logger start-logger))
(sleep 0.2)
*monitoring-channel*))
(values *monitoring-kernel* *monitoring-queue* *monitoring-channel*)))
(defun stop-monitor (&key
(kernel *monitoring-kernel*)
(channel *monitoring-channel*)
(stop-logger t))
"Stop the current monitor task."
(send-event (make-stop :stop-logger stop-logger))
(lp:receive-result channel))
(lp:receive-result channel)
(let ((lp:*kernel* kernel))
(lp:end-kernel :wait t)))
(defun call-with-monitor (thunk)
"Call THUNK in a context where a monitor thread is active."
(multiple-value-bind (*monitoring-kernel*
*monitoring-queue*
*monitoring-channel*)
(start-monitor)
(unwind-protect
(funcall thunk)
(stop-monitor))))
(defmacro with-monitor ((&key (start-logger t)) &body body)
"Start and stop the monitor around BODY code. The monitor is responsible
for processing logs into a central logfile"
`(let ((*sections* (create-state)))
(if ,start-logger
(let* ((*monitoring-queue* (lq:make-queue))
(*monitoring-channel* (start-monitor :start-logger ,start-logger)))
(unwind-protect
(progn ,@body)
(stop-monitor :channel *monitoring-channel*
:stop-logger ,start-logger)))
;; logger has already been started
(progn ,@body))))
`(if ,start-logger
(let ((*sections* (create-state)))
(call-with-monitor #'(lambda () ,@body)))
(let ((*sections* (create-state)))
,@body)))
(defun monitor (queue)
"Receives and process messages from *monitoring-queue*."
@ -241,49 +248,13 @@
:ws (update-stats-ws event)
:bytes (update-stats-bytes event))
;; log some kind of a “keep alive” message to the user, for
;; the sake of showing progress.
;;
;; something like one message every 10 batches should only
;; target big tables where we have to wait for a pretty long
;; time.
(when (and (update-stats-rows event)
(typep label 'pgloader.catalog:table)
(< (* 9 *copy-batch-rows*)
(mod (pgtable-rows table)
(* 10 *copy-batch-rows*))))
(log-message :notice "copy ~a: ~d rows done, ~7<~a~>, ~9<~a~>"
(pgloader.catalog:format-table-name label)
(pgtable-rows table)
(pgloader.utils:pretty-print-bytes
(pgtable-bytes table))
(pgloader.utils:pretty-print-bytes
(truncate (pgtable-bytes table)
(elapsed-time-since
(pgtable-start table)))
:unit "Bps")))
(maybe-log-progress-message event label table)
(when (update-stats-start event)
(log-message :debug "start ~a ~30t ~a"
(pgloader.catalog:format-table-name label)
(update-stats-start event))
(setf (pgtable-start table) (update-stats-start event)))
(process-update-stats-start-event event label table))
;; each PostgreSQL writer thread will send a stop even, here
;; we only keep the latest one.
(when (and (update-stats-stop event)
(or (null (pgtable-stop table))
(< (pgtable-stop table) (update-stats-stop event))))
(setf (pgtable-stop table) (update-stats-stop event))
(let ((secs (elapsed-time-since (pgtable-start table)
(pgtable-stop table))))
(setf (pgtable-secs table) secs)
(log-message :debug " stop ~a ~30t | ~a .. ~a = ~a"
(pgloader.catalog:format-table-name label)
(pgtable-start table)
(pgtable-stop table)
secs)))))
(when (update-stats-stop event)
(process-update-stats-stop-event event label table))))
(bad-row
(let* ((pgstate (get-state-section *sections* :data))
@ -296,6 +267,52 @@
:until (typep event 'stop)))
(defun process-update-stats-start-event (event label table)
(declare (type update-stats event))
(cl-log:log-message :debug "start ~a ~30t ~a"
(pgloader.catalog:format-table-name label)
(update-stats-start event))
(setf (pgtable-start table) (update-stats-start event)))
(defun process-update-stats-stop-event (event label table)
(declare (type update-stats event))
;; each PostgreSQL writer thread will send a stop even, here
;; we only keep the latest one.
(when (or (null (pgtable-stop table))
(< (pgtable-stop table) (update-stats-stop event)))
(setf (pgtable-stop table) (update-stats-stop event))
(let ((secs (elapsed-time-since (pgtable-start table)
(pgtable-stop table))))
(setf (pgtable-secs table) secs)
(cl-log:log-message :debug " stop ~a ~30t | ~a .. ~a = ~a"
(pgloader.catalog:format-table-name label)
(pgtable-start table)
(pgtable-stop table)
secs))))
(defun maybe-log-progress-message (event label table)
"Log some kind of a keep alive message to the user, for the sake of
showing progress.
Something like one message every 10 batches should only target big tables
where we have to wait for a pretty long time."
(when (and (update-stats-rows event)
(typep label 'pgloader.catalog:table)
(< (* 9 *copy-batch-rows*)
(mod (pgtable-rows table)
(* 10 *copy-batch-rows*))))
(cl-log:log-message :notice "copy ~a: ~d rows done, ~7<~a~>, ~9<~a~>"
(pgloader.catalog:format-table-name label)
(pgtable-rows table)
(pgloader.utils:pretty-print-bytes
(pgtable-bytes table))
(pgloader.utils:pretty-print-bytes
(truncate (pgtable-bytes table)
(elapsed-time-since
(pgtable-start table)))
:unit "Bps"))))
(defun report-current-summary (start-time)
"Print out the current summary."
(let* ((summary-stream (when *summary-pathname*