diff --git a/pgloader.asd b/pgloader.asd index f689b30..7f03405 100644 --- a/pgloader.asd +++ b/pgloader.asd @@ -1,254 +1,256 @@ ;;;; pgloader.asd (asdf:defsystem #:pgloader - :serial t - :description "Load data into PostgreSQL" - :author "Dimitri Fontaine " - :license "The PostgreSQL Licence" - :depends-on (#:uiop ; host system integration - #:cl-log ; logging - #:postmodern ; PostgreSQL protocol implementation - #:cl-postgres ; low level bits for COPY streaming - #:simple-date ; FIXME: recheck dependency - #:qmynd ; MySQL protocol implemenation - #:split-sequence ; some parsing is made easy - #:cl-csv ; full CSV reader - #:cl-fad ; file and directories - #:lparallel ; threads, workers, queues - #:esrap ; parser generator - #:alexandria ; utils - #:drakma ; http client, download archives - #:flexi-streams ; streams - #:usocket ; UDP / syslog - #:local-time ; UDP date parsing - #:command-line-arguments ; for the main function - #:abnf ; ABNF parser generator (for syslog) - #:db3 ; DBF version 3 file reader - #:ixf ; IBM IXF file format reader - #:py-configparser ; Read old-style INI config files - #:sqlite ; Query a SQLite file - #:cl-base64 ; Decode base64 data - #:trivial-backtrace ; For --debug cli usage - #:cl-markdown ; To produce the website - #:metabang-bind ; the bind macro - #:mssql ; M$ SQL connectivity - #:uuid ; Transforming MS SQL unique identifiers - #:quri ; decode URI parameters - #:cl-ppcre ; Perl Compatible Regular Expressions - #:cl-mustache ; Logic-less templates - #:yason ; JSON routines - #:closer-mop ; introspection - ) - :components - ((:module "src" - :components - ((:file "params") - (:file "package" :depends-on ("params")) + :serial t + :description "Load data into PostgreSQL" + :author "Dimitri Fontaine " + :license "The PostgreSQL Licence" + :depends-on (#:uiop ; host system integration + #:cl-log ; logging + #:postmodern ; PostgreSQL protocol implementation + #:cl-postgres ; low level bits for COPY streaming + #:simple-date ; FIXME: recheck dependency + #:qmynd ; MySQL protocol implemenation + #:split-sequence ; some parsing is made easy + #:cl-csv ; full CSV reader + #:cl-fad ; file and directories + #:lparallel ; threads, workers, queues + #:esrap ; parser generator + #:alexandria ; utils + #:drakma ; http client, download archives + #:flexi-streams ; streams + #:usocket ; UDP / syslog + #:local-time ; UDP date parsing + #:command-line-arguments ; for the main function + #:abnf ; ABNF parser generator (for syslog) + #:db3 ; DBF version 3 file reader + #:ixf ; IBM IXF file format reader + #:py-configparser ; Read old-style INI config files + #:sqlite ; Query a SQLite file + #:cl-base64 ; Decode base64 data + #:trivial-backtrace ; For --debug cli usage + #:cl-markdown ; To produce the website + #:metabang-bind ; the bind macro + #:mssql ; M$ SQL connectivity + #:uuid ; Transforming MS SQL unique identifiers + #:quri ; decode URI parameters + #:cl-ppcre ; Perl Compatible Regular Expressions + #:cl-mustache ; Logic-less templates + #:yason ; JSON routines + #:closer-mop ; introspection + ) + :components + ((:module "src" + :components + ((:file "params") + (:file "package" :depends-on ("params")) - (:module "monkey" - :components - ((:file "bind") - (:file "mssql"))) + (:module "monkey" + :components + ((:file "bind") + (:file "mssql"))) - (:module "utils" - :depends-on ("package" "params") - :components - ((:file "charsets") - (:file "batch") - (:file "logs") - (:file "utils") - (:file "state") + (:module "utils" + :depends-on ("package" "params") + :components + ((:file "charsets") + (:file "batch") + (:file "logs") + (:file "utils") + (:file "state") - ;; user defined transforms package and pgloader - ;; provided ones - (:file "transforms") + ;; user defined transforms package and pgloader + ;; provided ones + (:file "transforms") - ;; PostgreSQL related utils - (:file "read-sql-files") - (:file "queries") - (:file "quoting") - (:file "catalog" :depends-on ("quoting")) - (:file "alter-table" :depends-on ("catalog")) + ;; PostgreSQL related utils + (:file "read-sql-files") + (:file "queries") + (:file "quoting") + (:file "catalog" :depends-on ("quoting")) + (:file "alter-table" :depends-on ("catalog")) - ;; State, monitoring, reporting - (:file "reject" :depends-on ("state")) - (:file "pretty-print-state" :depends-on ("state")) - (:file "report" :depends-on ("state" - "pretty-print-state" - "utils" - "catalog")) - (:file "monitor" :depends-on ("logs" - "state" - "reject" - "report")) - (:file "threads" :depends-on ("monitor")) - (:file "archive" :depends-on ("logs")) + ;; State, monitoring, reporting + (:file "reject" :depends-on ("state")) + (:file "pretty-print-state" :depends-on ("state")) + (:file "report" :depends-on ("state" + "pretty-print-state" + "utils" + "catalog")) + (:file "monitor" :depends-on ("logs" + "state" + "reject" + "report")) + (:file "threads" :depends-on ("monitor")) + (:file "archive" :depends-on ("monitor")) - ;; generic connection api - (:file "connection" :depends-on ("archive")))) + ;; generic connection api + (:file "connection" :depends-on ("monitor" + "archive")))) - ;; package pgloader.pgsql - (:module pgsql - :depends-on ("package" "params" "utils") - :serial t - :components - ((:file "copy-format") - (:file "connection") - (:file "pgsql-ddl") - (:file "pgsql-schema") - (:file "merge-catalogs" :depends-on ("pgsql-schema")) - (:file "pgsql-trigger") - (:file "pgsql-index-filter") - (:file "pgsql-create-schema" :depends-on ("pgsql-trigger")) - (:file "retry-batch") - (:file "copy-from-queue" - :depends-on ("copy-format" - "connection" - "retry-batch" - "pgsql-create-schema" - "pgsql-schema")))) + ;; package pgloader.pgsql + (:module pgsql + :depends-on ("package" "params" "utils") + :serial t + :components + ((:file "copy-format") + (:file "connection") + (:file "pgsql-ddl") + (:file "pgsql-schema") + (:file "merge-catalogs" :depends-on ("pgsql-schema")) + (:file "pgsql-trigger") + (:file "pgsql-index-filter") + (:file "pgsql-create-schema" :depends-on ("pgsql-trigger")) + (:file "retry-batch") + (:file "copy-from-queue" + :depends-on ("copy-format" + "connection" + "retry-batch" + "pgsql-create-schema" + "pgsql-schema")))) - ;; Source format specific implementations - (:module sources - :depends-on ("monkey" ; mssql driver patches - "params" - "package" - "pgsql" - "utils") - :components - ((:module "common" - :components - ((:file "api") - (:file "methods" :depends-on ("api")) - (:file "md-methods" :depends-on ("api")) - (:file "db-methods" :depends-on ("api")) - (:file "casting-rules") - (:file "files-and-pathnames") - (:file "project-fields"))) + ;; Source format specific implementations + (:module sources + :depends-on ("monkey" ; mssql driver patches + "params" + "package" + "pgsql" + "utils") + :components + ((:module "common" + :components + ((:file "api") + (:file "methods" :depends-on ("api")) + (:file "md-methods" :depends-on ("api")) + (:file "db-methods" :depends-on ("api")) + (:file "casting-rules") + (:file "files-and-pathnames") + (:file "project-fields"))) - (:module "csv" - :depends-on ("common") - :components - ((:file "csv-guess") - ;; (:file "csv-database") - (:file "csv"))) + (:module "csv" + :depends-on ("common") + :components + ((:file "csv-guess") + ;; (:file "csv-database") + (:file "csv"))) - (:file "fixed" - :depends-on ("common" "csv")) + (:file "fixed" + :depends-on ("common" "csv")) - (:file "copy" - :depends-on ("common" "csv")) + (:file "copy" + :depends-on ("common" "csv")) - (:module "db3" - :depends-on ("common" "csv") - :components - ((:file "db3-schema") - (:file "db3" :depends-on ("db3-schema")))) + (:module "db3" + :depends-on ("common" "csv") + :components + ((:file "db3-schema") + (:file "db3" :depends-on ("db3-schema")))) - (:module "ixf" - :depends-on ("common") - :components - ((:file "ixf-schema") - (:file "ixf" :depends-on ("ixf-schema")))) + (:module "ixf" + :depends-on ("common") + :components + ((:file "ixf-schema") + (:file "ixf" :depends-on ("ixf-schema")))) - ;(:file "syslog") ; experimental... + ;(:file "syslog") ; experimental... - (:module "sqlite" - :depends-on ("common") - :components - ((:file "sqlite-cast-rules") - (:file "sqlite-schema" - :depends-on ("sqlite-cast-rules")) - (:file "sqlite" - :depends-on ("sqlite-cast-rules" - "sqlite-schema")))) + (:module "sqlite" + :depends-on ("common") + :components + ((:file "sqlite-cast-rules") + (:file "sqlite-schema" + :depends-on ("sqlite-cast-rules")) + (:file "sqlite" + :depends-on ("sqlite-cast-rules" + "sqlite-schema")))) - (:module "mssql" - :depends-on ("common") - :components - ((:file "mssql-cast-rules") - (:file "mssql-schema" - :depends-on ("mssql-cast-rules")) - (:file "mssql" - :depends-on ("mssql-cast-rules" - "mssql-schema")) - (:file "mssql-index-filters" - :depends-on ("mssql")))) + (:module "mssql" + :depends-on ("common") + :components + ((:file "mssql-cast-rules") + (:file "mssql-schema" + :depends-on ("mssql-cast-rules")) + (:file "mssql" + :depends-on ("mssql-cast-rules" + "mssql-schema")) + (:file "mssql-index-filters" + :depends-on ("mssql")))) - (:module "mysql" - :depends-on ("common") - :components - ((:file "mysql-cast-rules") - (:file "mysql-connection") - (:file "mysql-schema" - :depends-on ("mysql-connection" - "mysql-cast-rules")) - ;; (:file "mysql-csv" - ;; :depends-on ("mysql-schema")) - (:file "mysql" - :depends-on ("mysql-cast-rules" - "mysql-schema")))))) + (:module "mysql" + :depends-on ("common") + :components + ((:file "mysql-cast-rules") + (:file "mysql-connection") + (:file "mysql-schema" + :depends-on ("mysql-connection" + "mysql-cast-rules")) + ;; (:file "mysql-csv" + ;; :depends-on ("mysql-schema")) + (:file "mysql" + :depends-on ("mysql-cast-rules" + "mysql-schema")))))) - (:module "parsers" - :depends-on ("params" - "package" - "utils" - "pgsql" - "sources" - "monkey") - :serial t - :components - ((:file "parse-ini") - (:file "template") - (:file "command-utils") - (:file "command-keywords") - (:file "command-regexp") - (:file "parse-pgpass") - (:file "command-db-uri") - (:file "command-source") - (:file "command-options") - (:file "command-sql-block") - (:file "command-sexp") - (:file "command-csv") - (:file "command-ixf") - (:file "command-fixed") - (:file "command-copy") - (:file "command-dbf") - (:file "command-cast-rules") - (:file "command-materialize-views") - (:file "command-alter-table") - (:file "command-mysql") - (:file "command-including-like") - (:file "command-mssql") - (:file "command-sqlite") - (:file "command-archive") - (:file "command-parser") - (:file "date-format"))) + (:module "parsers" + :depends-on ("params" + "package" + "utils" + "pgsql" + "sources" + "monkey") + :serial t + :components + ((:file "parse-ini") + (:file "template") + (:file "command-utils") + (:file "command-keywords") + (:file "command-regexp") + (:file "parse-pgpass") + (:file "command-db-uri") + (:file "command-source") + (:file "command-options") + (:file "command-sql-block") + (:file "command-sexp") + (:file "command-csv") + (:file "command-ixf") + (:file "command-fixed") + (:file "command-copy") + (:file "command-dbf") + (:file "command-cast-rules") + (:file "command-materialize-views") + (:file "command-alter-table") + (:file "command-mysql") + (:file "command-including-like") + (:file "command-mssql") + (:file "command-sqlite") + (:file "command-archive") + (:file "command-parser") + (:file "date-format"))) - (:module "regress" - :depends-on ("params" "package" "utils" "pgsql") - :components ((:file "regress"))) + ;; the main entry file, used when building a stand-alone + ;; executable image + (:file "api" :depends-on ("params" + "package" + "utils" + "parsers" + "sources")) - ;; the main entry file, used when building a stand-alone - ;; executable image - (:file "api" :depends-on ("params" - "package" - "utils" - "parsers" - "sources")) + (:module "regress" + :depends-on ("params" "package" "utils" "pgsql" "api") + :components ((:file "regress"))) - (:file "main" :depends-on ("params" - "package" - "utils" - "parsers" - "sources" - "api" - "regress")))) - ;; to produce the website - (:module "docs" - :components - ((:module src - :components - ((:file "docs"))))))) + (:file "main" :depends-on ("params" + "package" + "utils" + "parsers" + "sources" + "api" + "regress")))) + + ;; to produce the website + (:module "docs" + :components + ((:module src + :components + ((:file "docs"))))))) diff --git a/src/main.lisp b/src/main.lisp index 3cc90c2..83fd3d7 100644 --- a/src/main.lisp +++ b/src/main.lisp @@ -301,7 +301,8 @@ ;; conditions being signaled. (handler-bind (((and condition (not (or cli-parsing-error - source-definition-error))) + source-definition-error + regression-test-error))) #'(lambda (condition) (format *error-output* "KABOOM!~%") (format *error-output* "FATAL error: ~a~%~a~%~%" @@ -316,7 +317,6 @@ (cond ((and regress (= 1 (length arguments))) - ;; run a regression test (process-regression-test (first arguments))) (regress @@ -356,18 +356,15 @@ ((or cli-parsing-error source-definition-error) (c) (format *error-output* "~%~a~%~%" c) - (let ((lp:*kernel* *monitoring-kernel*)) - (lp:end-kernel :wait t)) (uiop:quit +os-code-error-bad-source+)) + (regression-test-error (c) + (format *error-output* "~%~a~%~%" c) + (uiop:quit +os-code-error-regress+)) + (condition (c) (format *error-output* "~%What I am doing here?~%~%") (format *error-output* "~a~%~%" c) - ;; wait until monitor stops... - (format *error-output* - "~%Waiting for the monitor thread to complete.~%~%") - (let ((lp:*kernel* *monitoring-kernel*)) - (lp:end-kernel :wait t)) (uiop:quit +os-code-error+))))) ;; done. diff --git a/src/regress/regress.lisp b/src/regress/regress.lisp index 06dc696..57771e6 100644 --- a/src/regress/regress.lisp +++ b/src/regress/regress.lisp @@ -7,6 +7,13 @@ (in-package #:pgloader) +(define-condition regression-test-error (error) + ((filename :initarg :filename :reader regression-test-filename)) + (:report (lambda (err stream) + (format stream + "Regression test failed: ~s" + (regression-test-filename err))))) + (defun process-regression-test (load-file &key start-logger) "Run a regression test for given LOAD-FILE." (unless (probe-file load-file) @@ -99,15 +106,13 @@ (diff-count (pomo:query sql :single))) (log-message :notice "~a" sql) (log-message :notice "Got a diff of ~a rows" diff-count) - (if (= 0 diff-count) - (progn - (log-message :log "Regress pass.") - #-pgloader-image (values diff-count +os-code-success+) - #+pgloader-image (uiop:quit +os-code-success+)) - (progn - (log-message :log "Regress fail.") - #-pgloader-image (values diff-count +os-code-error-regress+) - #+pgloader-image (uiop:quit +os-code-error-regress+))))))))) + + ;; signal a regression test error when diff isn't 0 + (unless (zerop diff-count) + (error 'regression-test-error :filename load-file)) + + (log-message :log "Regress pass.") + (values diff-count +os-code-success+))))))) ;;; diff --git a/src/utils/logs.lisp b/src/utils/logs.lisp index 489f20c..77ac6eb 100644 --- a/src/utils/logs.lisp +++ b/src/utils/logs.lisp @@ -41,7 +41,7 @@ (push (cl-log:start-messenger 'text-stream-messenger :name "stdout" :filter *client-min-messages* - :stream *standard-output*) + :stream (make-broadcast-stream *standard-output*)) *log-messengers*) (cl-log:log-message :notice "Starting pgloader, log system is ready.")) diff --git a/src/utils/monitor.lisp b/src/utils/monitor.lisp index e90f0db..31248e0 100644 --- a/src/utils/monitor.lisp +++ b/src/utils/monitor.lisp @@ -139,7 +139,8 @@ ;; make our kernel and channel visible from the outside (setf *monitoring-kernel* kernel - *monitoring-channel* (lp:make-channel)) + *monitoring-channel* (lp:make-channel) + *monitoring-queue* (lq:make-queue)) ;; warm up the channel to ensure we don't loose any event (lp:submit-task *monitoring-channel* '+ 1 2 3) @@ -149,31 +150,37 @@ (lp:submit-task *monitoring-channel* #'monitor *monitoring-queue*) (send-event (make-start :start-logger start-logger)) - (sleep 0.2) - - *monitoring-channel*)) + (values *monitoring-kernel* *monitoring-queue* *monitoring-channel*))) (defun stop-monitor (&key + (kernel *monitoring-kernel*) (channel *monitoring-channel*) (stop-logger t)) "Stop the current monitor task." (send-event (make-stop :stop-logger stop-logger)) - (lp:receive-result channel)) + (lp:receive-result channel) + + (let ((lp:*kernel* kernel)) + (lp:end-kernel :wait t))) + +(defun call-with-monitor (thunk) + "Call THUNK in a context where a monitor thread is active." + (multiple-value-bind (*monitoring-kernel* + *monitoring-queue* + *monitoring-channel*) + (start-monitor) + (unwind-protect + (funcall thunk) + (stop-monitor)))) (defmacro with-monitor ((&key (start-logger t)) &body body) "Start and stop the monitor around BODY code. The monitor is responsible for processing logs into a central logfile" - `(let ((*sections* (create-state))) - (if ,start-logger - (let* ((*monitoring-queue* (lq:make-queue)) - (*monitoring-channel* (start-monitor :start-logger ,start-logger))) - (unwind-protect - (progn ,@body) - (stop-monitor :channel *monitoring-channel* - :stop-logger ,start-logger))) - - ;; logger has already been started - (progn ,@body)))) + `(if ,start-logger + (let ((*sections* (create-state))) + (call-with-monitor #'(lambda () ,@body))) + (let ((*sections* (create-state))) + ,@body))) (defun monitor (queue) "Receives and process messages from *monitoring-queue*." @@ -241,49 +248,13 @@ :ws (update-stats-ws event) :bytes (update-stats-bytes event)) - ;; log some kind of a “keep alive” message to the user, for - ;; the sake of showing progress. - ;; - ;; something like one message every 10 batches should only - ;; target big tables where we have to wait for a pretty long - ;; time. - (when (and (update-stats-rows event) - (typep label 'pgloader.catalog:table) - (< (* 9 *copy-batch-rows*) - (mod (pgtable-rows table) - (* 10 *copy-batch-rows*)))) - (log-message :notice "copy ~a: ~d rows done, ~7<~a~>, ~9<~a~>" - (pgloader.catalog:format-table-name label) - (pgtable-rows table) - (pgloader.utils:pretty-print-bytes - (pgtable-bytes table)) - (pgloader.utils:pretty-print-bytes - (truncate (pgtable-bytes table) - (elapsed-time-since - (pgtable-start table))) - :unit "Bps"))) + (maybe-log-progress-message event label table) (when (update-stats-start event) - (log-message :debug "start ~a ~30t ~a" - (pgloader.catalog:format-table-name label) - (update-stats-start event)) - (setf (pgtable-start table) (update-stats-start event))) + (process-update-stats-start-event event label table)) - ;; each PostgreSQL writer thread will send a stop even, here - ;; we only keep the latest one. - (when (and (update-stats-stop event) - (or (null (pgtable-stop table)) - (< (pgtable-stop table) (update-stats-stop event)))) - (setf (pgtable-stop table) (update-stats-stop event)) - (let ((secs (elapsed-time-since (pgtable-start table) - (pgtable-stop table)))) - (setf (pgtable-secs table) secs) - - (log-message :debug " stop ~a ~30t | ~a .. ~a = ~a" - (pgloader.catalog:format-table-name label) - (pgtable-start table) - (pgtable-stop table) - secs))))) + (when (update-stats-stop event) + (process-update-stats-stop-event event label table)))) (bad-row (let* ((pgstate (get-state-section *sections* :data)) @@ -296,6 +267,52 @@ :until (typep event 'stop))) +(defun process-update-stats-start-event (event label table) + (declare (type update-stats event)) + (cl-log:log-message :debug "start ~a ~30t ~a" + (pgloader.catalog:format-table-name label) + (update-stats-start event)) + (setf (pgtable-start table) (update-stats-start event))) + +(defun process-update-stats-stop-event (event label table) + (declare (type update-stats event)) + ;; each PostgreSQL writer thread will send a stop even, here + ;; we only keep the latest one. + (when (or (null (pgtable-stop table)) + (< (pgtable-stop table) (update-stats-stop event))) + (setf (pgtable-stop table) (update-stats-stop event)) + (let ((secs (elapsed-time-since (pgtable-start table) + (pgtable-stop table)))) + (setf (pgtable-secs table) secs) + + (cl-log:log-message :debug " stop ~a ~30t | ~a .. ~a = ~a" + (pgloader.catalog:format-table-name label) + (pgtable-start table) + (pgtable-stop table) + secs)))) + +(defun maybe-log-progress-message (event label table) + "Log some kind of a “keep alive” message to the user, for the sake of + showing progress. + + Something like one message every 10 batches should only target big tables + where we have to wait for a pretty long time." + (when (and (update-stats-rows event) + (typep label 'pgloader.catalog:table) + (< (* 9 *copy-batch-rows*) + (mod (pgtable-rows table) + (* 10 *copy-batch-rows*)))) + (cl-log:log-message :notice "copy ~a: ~d rows done, ~7<~a~>, ~9<~a~>" + (pgloader.catalog:format-table-name label) + (pgtable-rows table) + (pgloader.utils:pretty-print-bytes + (pgtable-bytes table)) + (pgloader.utils:pretty-print-bytes + (truncate (pgtable-bytes table) + (elapsed-time-since + (pgtable-start table))) + :unit "Bps")))) + (defun report-current-summary (start-time) "Print out the current summary." (let* ((summary-stream (when *summary-pathname*