From 2369a142a7c2a3c34955b06091eb32b3296b2196 Mon Sep 17 00:00:00 2001 From: Dimitri Fontaine Date: Wed, 1 Oct 2014 23:20:24 +0200 Subject: [PATCH] Refactor source code organisation. In passing, fix a bug in the previous commit where left-over code would cancel the whole new parsing code for advanced source fields options. --- pgloader.asd | 105 +++--- src/package.lisp | 11 +- src/parsers/README.md | 14 + src/{ => parsers}/parse-ini.lisp | 0 src/{ => parsers}/parser.lisp | 4 - src/{sources => }/sources.lisp | 0 src/sources/{ => mysql}/mysql-cast-rules.lisp | 0 src/sources/{ => mysql}/mysql-csv.lisp | 0 src/sources/{ => mysql}/mysql-schema.lisp | 0 src/utils.lisp | 345 ------------------ src/{ => utils}/archive.lisp | 0 src/{ => utils}/charsets.lisp | 0 src/{ => utils}/logs.lisp | 0 src/{ => utils}/monitor.lisp | 0 src/{ => utils}/read-sql-files.lisp | 0 src/utils/report.lisp | 133 +++++++ src/utils/state.lisp | 111 ++++++ src/utils/threads.lisp | 32 ++ src/{ => utils}/transforms.lisp | 0 src/utils/utils.lisp | 79 ++++ 20 files changed, 418 insertions(+), 416 deletions(-) create mode 100644 src/parsers/README.md rename src/{ => parsers}/parse-ini.lisp (100%) rename src/{ => parsers}/parser.lisp (99%) rename src/{sources => }/sources.lisp (100%) rename src/sources/{ => mysql}/mysql-cast-rules.lisp (100%) rename src/sources/{ => mysql}/mysql-csv.lisp (100%) rename src/sources/{ => mysql}/mysql-schema.lisp (100%) delete mode 100644 src/utils.lisp rename src/{ => utils}/archive.lisp (100%) rename src/{ => utils}/charsets.lisp (100%) rename src/{ => utils}/logs.lisp (100%) rename src/{ => utils}/monitor.lisp (100%) rename src/{ => utils}/read-sql-files.lisp (100%) create mode 100644 src/utils/report.lisp create mode 100644 src/utils/state.lisp create mode 100644 src/utils/threads.lisp rename src/{ => utils}/transforms.lisp (100%) create mode 100644 src/utils/utils.lisp diff --git a/pgloader.asd b/pgloader.asd index 3026337..e58be78 100644 --- a/pgloader.asd +++ b/pgloader.asd @@ -35,53 +35,28 @@ ((:module "src" :components ((:file "params") - (:file "package" :depends-on ("params")) + (:file "queue" :depends-on ("params" "package")) - (:file "logs" :depends-on ("package" "params")) + (:module "utils" + :depends-on ("package" "params") + :components + ((:file "charsets") + (:file "archive") + (:file "threads") + (:file "logs") + (:file "monitor" :depends-on ("logs")) + (:file "state") + (:file "report" :depends-on ("state")) + (:file "utils" :depends-on ("charsets" "monitor")) - (:file "monitor" :depends-on ("params" - "package" - "logs")) - - (:file "charsets":depends-on ("package")) - (:file "utils" :depends-on ("params" - "package" - "charsets" - "monitor")) - - ;; those are one-package-per-file - (:file "transforms") - (:file "queue" :depends-on ("params" "package")) - - (:file "read-sql-files" :depends-on ("package")) - - (:file "parser" :depends-on ("package" - "params" - "transforms" - "utils" - "monitor" - "read-sql-files" - "pgsql")) - - (:file "parse-ini" :depends-on ("package" - "params" - "utils")) - - (:file "archive" :depends-on ("params" - "package" - "utils" - "sources" - "pgsql")) + ;; those are one-package-per-file + (:file "transforms") + (:file "read-sql-files"))) ;; package pgloader.pgsql (:module pgsql - :depends-on ("package" - "params" - "queue" - "utils" - "logs" - "monitor") + :depends-on ("package" "params" "utils") :components ((:file "copy-format") (:file "queries") @@ -91,37 +66,49 @@ "queries" "schema")))) + (:module "parsers" + :depends-on ("params" "package" "utils" "pgsql") + :components + ((:file "parse-ini") + (:file "parser"))) + + ;; generic API for Sources + (:file "sources-api" + :pathname "sources" + :depends-on ("params" "package" "utils")) + ;; Source format specific implementations (:module sources :depends-on ("params" "package" + "sources-api" "pgsql" "utils" - "logs" - "monitor" - "queue" - "transforms") + "queue") :components - ((:file "sources") - (:file "csv" :depends-on ("sources")) - (:file "fixed" :depends-on ("sources")) - (:file "db3" :depends-on ("sources")) - (:file "ixf" :depends-on ("sources")) - (:file "sqlite" :depends-on ("sources")) - (:file "syslog" :depends-on ("sources")) - (:file "mysql-cast-rules") - (:file "mysql-schema") - (:file "mysql-csv" :depends-on ("mysql-schema")) - (:file "mysql" :depends-on ("mysql-cast-rules" - "mysql-schema")))) + ((:file "csv") + (:file "fixed") + (:file "db3") + (:file "ixf") + (:file "sqlite") + (:file "syslog") + + (:module "mysql-utils" + :pathname "mysql" + :components + ((:file "mysql-cast-rules") + (:file "mysql-schema") + (:file "mysql-csv" + :depends-on ("mysql-schema")))) + + (:file "mysql" :depends-on ("mysql-utils")))) ;; the main entry file, used when building a stand-alone ;; executable image (:file "main" :depends-on ("params" "package" - "monitor" "utils" - "parser" + "parsers" "sources")))) ;; to produce the website diff --git a/src/package.lisp b/src/package.lisp index 3f14be1..3d0a542 100644 --- a/src/package.lisp +++ b/src/package.lisp @@ -289,15 +289,10 @@ ;; Not really a source, more a util package to deal with http and zip ;; (defpackage #:pgloader.archive - (:use #:cl #:pgloader.params #:pgloader.utils #:pgloader.csv) - (:import-from #:pgloader.pgsql - #:with-pgsql-transaction - #:pgsql-execute) - (:export #:*default-tmpdir* - #:http-fetch-file + (:use #:cl #:pgloader.params) + (:export #:http-fetch-file #:expand-archive - #:get-matching-filenames - #:import-csv-from-zip)) + #:get-matching-filenames)) diff --git a/src/parsers/README.md b/src/parsers/README.md new file mode 100644 index 0000000..8c9e66e --- /dev/null +++ b/src/parsers/README.md @@ -0,0 +1,14 @@ +The pgloader parser reads the command language (a pgloader specific DSL) and +produces lisp code as its output. The lisp code is then compiled at run-time +and executed. + +The generated lisp-code uses the generic API defined in src/sources.lisp and +creates objects specifics to the kind of source that is being loaded. + +It is possible to see the generated code for study or debug: + + PARSER> (pprint + (with-monitor () + (parse-commands-from-file + "/Users/dim/dev/pgloader/test/fixed.load"))) + diff --git a/src/parse-ini.lisp b/src/parsers/parse-ini.lisp similarity index 100% rename from src/parse-ini.lisp rename to src/parsers/parse-ini.lisp diff --git a/src/parser.lisp b/src/parsers/parser.lisp similarity index 99% rename from src/parser.lisp rename to src/parsers/parser.lisp index 19beaa3..9de7ae3 100644 --- a/src/parser.lisp +++ b/src/parsers/parser.lisp @@ -1745,10 +1745,6 @@ load database (defrule csv-field-options (? (or csv-field-option csv-field-option-list))) -(defrule csv-field-options (* csv-field-option) - (:lambda (options) - (alexandria:alist-plist options))) - (defrule csv-raw-field-name (and (or #\_ (alpha-char-p character)) (* (or (alpha-char-p character) (digit-char-p character) diff --git a/src/sources/sources.lisp b/src/sources.lisp similarity index 100% rename from src/sources/sources.lisp rename to src/sources.lisp diff --git a/src/sources/mysql-cast-rules.lisp b/src/sources/mysql/mysql-cast-rules.lisp similarity index 100% rename from src/sources/mysql-cast-rules.lisp rename to src/sources/mysql/mysql-cast-rules.lisp diff --git a/src/sources/mysql-csv.lisp b/src/sources/mysql/mysql-csv.lisp similarity index 100% rename from src/sources/mysql-csv.lisp rename to src/sources/mysql/mysql-csv.lisp diff --git a/src/sources/mysql-schema.lisp b/src/sources/mysql/mysql-schema.lisp similarity index 100% rename from src/sources/mysql-schema.lisp rename to src/sources/mysql/mysql-schema.lisp diff --git a/src/utils.lisp b/src/utils.lisp deleted file mode 100644 index 3750465..0000000 --- a/src/utils.lisp +++ /dev/null @@ -1,345 +0,0 @@ -;;; -;;; Random utilities -;;; -(in-package :pgloader.utils) - -;;; -;;; Timing Macro -;;; -(defun elapsed-time-since (start) - "Return how many seconds ticked between START and now" - (/ (- (get-internal-real-time) start) - internal-time-units-per-second)) - -(defmacro timing (&body forms) - "return both how much real time was spend in body and its result" - (let ((start (gensym)) - (end (gensym)) - (result (gensym))) - `(let* ((,start (get-internal-real-time)) - (,result (progn ,@forms)) - (,end (get-internal-real-time))) - (values ,result (/ (- ,end ,start) internal-time-units-per-second))))) - -;;; -;;; Timing Formating -;;; -(defun format-interval (seconds &optional (stream t)) - "Output the number of seconds in a human friendly way" - (multiple-value-bind (years months days hours mins secs millisecs) - (date:decode-interval (date:encode-interval :second seconds)) - (declare (ignore millisecs)) - (format - stream - "~:[~*~;~d years ~]~:[~*~;~d months ~]~:[~*~;~d days ~]~:[~*~;~dh~]~:[~*~;~dm~]~5,3fs" - (< 0 years) years - (< 0 months) months - (< 0 days) days - (< 0 hours) hours - (< 0 mins) mins - (+ secs (- (multiple-value-bind (r q) - (truncate seconds 60) - (declare (ignore r)) - q) - secs))))) - -;;; -;;; Data Structures to maintain information about loading state -;;; -(defstruct pgtable - name - (read 0 :type fixnum) ; how many rows did we read - (rows 0 :type fixnum) ; how many rows did we write - (errs 0 :type fixnum) ; how many errors did we see - (secs 0.0 :type float) ; how many seconds did it take - reject-data reject-logs) ; files where to find reject data - -(defstruct pgstate - (tables (make-hash-table :test 'equal)) - (tabnames nil) ; we want to keep the ordering - (read 0 :type fixnum) - (rows 0 :type fixnum) - (errs 0 :type fixnum) - (secs 0.0 :type float)) - -(defun pgstate-get-table (pgstate name) - (gethash name (pgstate-tables pgstate))) - -(defun pgstate-add-table (pgstate dbname table-name) - "Instanciate a new pgtable structure to hold our stats, and return it." - (or (pgstate-get-table pgstate table-name) - (let* ((table (setf (gethash table-name (pgstate-tables pgstate)) - (make-pgtable :name table-name))) - (reject-dir (pathname-directory - (merge-pathnames - (format nil "~a/" dbname) *root-dir*))) - (data-pathname - (make-pathname :directory reject-dir :name table-name :type "dat")) - (logs-pathname - (make-pathname :directory reject-dir :name table-name :type "log"))) - - ;; maintain the ordering - (push table-name (pgstate-tabnames pgstate)) - - ;; create the per-database directory if it does not exists yet - (ensure-directories-exist (directory-namestring data-pathname)) - - ;; rename the existing files if there are some - (with-open-file (data data-pathname - :direction :output - :if-exists :rename - :if-does-not-exist nil)) - - (with-open-file (logs logs-pathname - :direction :output - :if-exists :rename - :if-does-not-exist nil)) - - ;; set the properties to the right pathnames - (setf (pgtable-reject-data table) data-pathname - (pgtable-reject-logs table) logs-pathname) - table))) - -(defun pgstate-setf (pgstate name &key read rows errs secs) - (let ((pgtable (pgstate-get-table pgstate name))) - (when read - (setf (pgtable-read pgtable) read) - (incf (pgstate-read pgstate) read)) - (when rows - (setf (pgtable-rows pgtable) rows) - (incf (pgstate-rows pgstate) rows)) - (when errs - (setf (pgtable-errs pgtable) errs) - (incf (pgstate-errs pgstate) errs)) - (when secs - (setf (pgtable-secs pgtable) secs) - (incf (pgstate-secs pgstate) secs)) - pgtable)) - -(defun pgstate-incf (pgstate name &key read rows errs secs) - (let ((pgtable (pgstate-get-table pgstate name))) - (when read - (incf (pgtable-read pgtable) read) - (incf (pgstate-read pgstate) read)) - (when rows - (incf (pgtable-rows pgtable) rows) - (incf (pgstate-rows pgstate) rows)) - (when errs - (incf (pgtable-errs pgtable) errs) - (incf (pgstate-errs pgstate) errs)) - (when secs - (incf (pgtable-secs pgtable) secs) - (incf (pgstate-secs pgstate) secs)) - pgtable)) - -(defun pgstate-decf (pgstate name &key read rows errs secs) - (let ((pgtable (pgstate-get-table pgstate name))) - (when read - (decf (pgtable-read pgtable) read) - (decf (pgstate-read pgstate) read)) - (when rows - (decf (pgtable-rows pgtable) rows) - (decf (pgstate-rows pgstate) rows)) - (when errs - (decf (pgtable-errs pgtable) errs) - (decf (pgstate-errs pgstate) errs)) - (when secs - (decf (pgtable-secs pgtable) secs) - (decf (pgstate-secs pgstate) secs)) - pgtable)) - -;;; -;;; Pretty print a report while doing bulk operations -;;; -(defvar *header-line* - "~&------------------------------ --------- --------- --------- --------------") - -(defvar *header-tname-format* "~&~30@a") -(defvar *header-stats-format* " ~9@a ~9@a ~9@a ~14@a") -(defvar *header-cols-format* (concatenate 'string *header-tname-format* - *header-stats-format*)) -(defvar *header-cols-names* '("table name" "read" "imported" "errors" "time")) - -(defun report-header () - ;; (apply #'format *report-stream* *header-cols-format* *header-cols-names*) - (format *report-stream* "~{~}" *header-cols-format* *header-cols-names*) - (terpri) - (format *report-stream* *header-line*) - (terpri)) - -(defun report-table-name (table-name) - (format *report-stream* *header-tname-format* table-name)) - -(defun report-results (read rows errors seconds) - (format *report-stream* *header-stats-format* - read rows errors (format-interval seconds nil)) - (terpri)) - -(defun report-footer (legend read rows errors seconds) - (terpri) - (format *report-stream* *header-line*) - (format *report-stream* "~{~}" *header-cols-format* - (list legend read rows errors (format-interval seconds nil))) - (format *report-stream* "~&") - (terpri)) - -;;; -;;; Pretty print a report from a pgtable and pgstats counters -;;; -(defun report-pgtable-stats (pgstate name) - (with-slots (read rows errs secs) (pgstate-get-table pgstate name) - (report-results read rows errs secs))) - -(defun report-pgstate-stats (pgstate legend) - (with-slots (read rows errs secs) pgstate - (report-footer legend read rows errs secs))) - -;;; -;;; Pretty print the whole summary from a state -;;; -(defun report-summary (&key ((:state pgstate) *state*) (header t) footer) - "Report a whole summary." - (when header (report-header)) - (loop - for table-name in (reverse (pgstate-tabnames pgstate)) - for pgtable = (gethash table-name (pgstate-tables pgstate)) - do - (with-slots (read rows errs secs) pgtable - (format *report-stream* *header-cols-format* - table-name read rows errs (format-interval secs nil))) - finally (when footer - (report-pgstate-stats pgstate footer)))) - -(defmacro with-stats-collection ((table-name - &key - (dbname *pg-dbname*) - summary - use-result-as-read - use-result-as-rows - ((:state pgstate) *state*)) - &body forms) - "Measure time spent in running BODY into STATE, accounting the seconds to - given DBNAME and TABLE-NAME" - (let ((result (gensym "result")) - (secs (gensym "secs"))) - `(let ((*pg-dbname* (or ,dbname *pg-dbname*))) - (prog2 - (pgstate-add-table ,pgstate *pg-dbname* ,table-name) - (multiple-value-bind (,result ,secs) - (timing ,@forms) - (cond ((and ,use-result-as-read ,use-result-as-rows) - (pgstate-incf ,pgstate ,table-name - :read ,result :rows ,result :secs ,secs)) - (,use-result-as-read - (pgstate-incf ,pgstate ,table-name :read ,result :secs ,secs)) - (,use-result-as-rows - (pgstate-incf ,pgstate ,table-name :rows ,result :secs ,secs)) - (t - (pgstate-incf ,pgstate ,table-name :secs ,secs))) - ,result) - (when ,summary (report-summary)))))) - -(defun report-full-summary (legend state - &key before finally parallel) - "Report the full story when given three different sections of reporting." - - (terpri) - - ;; BEFORE - (if before - (progn - (report-summary :state before :footer nil) - (format *report-stream* pgloader.utils::*header-line*) - (report-summary :state state :header nil :footer nil)) - ;; no state before - (report-summary :state state :footer nil)) - - (when (or finally parallel) - (format *report-stream* pgloader.utils::*header-line*) - (when parallel - (report-summary :state parallel :header nil :footer nil)) - (when finally - (report-summary :state finally :header nil :footer nil))) - - ;; add to the grand total the other sections, except for the parallel one - (incf (pgloader.utils::pgstate-secs state) - (+ (if before (pgloader.utils::pgstate-secs before) 0) - (if finally (pgloader.utils::pgstate-secs finally) 0))) - - ;; if the parallel tasks took longer than the rest cumulated, the total - ;; waiting time actually was parallel - before - (when (and parallel - (< (pgloader.utils::pgstate-secs state) - (pgloader.utils::pgstate-secs parallel))) - (setf (pgloader.utils::pgstate-secs state) - (- (pgloader.utils::pgstate-secs parallel) - (if before (pgloader.utils::pgstate-secs before) 0)))) - - ;; and report the Grand Total - (report-pgstate-stats state legend)) - -;;; -;;; File utils -;;; -(defun slurp-file-into-string (filename) - "Return given filename's whole content as a string." - (with-open-file (stream filename - :direction :input - :external-format :utf-8) - (let ((seq (make-array (file-length stream) - :element-type 'character - :fill-pointer t))) - ;; apparently the fastest way at that is read-sequence - ;; http://www.ymeme.com/slurping-a-file-common-lisp-83.html - (setf (fill-pointer seq) (read-sequence seq stream)) - seq))) - -;;; -;;; Camel Case converter -;;; -(defun camelCase-to-colname (string) - "Transform input STRING into a suitable column name. - lahmanID lahman_id - playerID player_id - birthYear birth_year" - (coerce - (loop - for first = t then nil - for char across string - for previous-upper-p = nil then char-upper-p - for char-upper-p = (eq char (char-upcase char)) - for new-word = (and (not first) char-upper-p (not previous-upper-p)) - when (and new-word (not (char= char #\_))) collect #\_ - collect (char-downcase char)) - 'string)) - - -;;; -;;; For communication purposes, the lparallel kernel must be created with -;;; access to some common bindings. -;;; -(defun make-kernel (worker-count - &key (bindings - `((*monitoring-queue* . ,*monitoring-queue*) - (*copy-batch-rows* . ,*copy-batch-rows*) - (*copy-batch-size* . ,*copy-batch-size*) - (*concurrent-batches* . ,*concurrent-batches*) - (*pgconn-host* . ',*pgconn-host*) - (*pgconn-port* . ,*pgconn-port*) - (*pgconn-user* . ,*pgconn-user*) - (*pgconn-pass* . ,*pgconn-pass*) - (*pg-settings* . ',*pg-settings*) - (*myconn-host* . ',*myconn-host*) - (*myconn-port* . ,*myconn-port*) - (*myconn-user* . ,*myconn-user*) - (*myconn-pass* . ,*myconn-pass*) - (*state* . ,*state*) - (*client-min-messages* . ,*client-min-messages*) - (*log-min-messages* . ,*log-min-messages*) - - ;; bindings updates for libs - ;; CFFI is used by the SQLite lib - (cffi:*default-foreign-encoding* - . ,cffi:*default-foreign-encoding*)))) - "Wrapper around lparallel:make-kernel that sets our usual bindings." - (lp:make-kernel worker-count :bindings bindings)) diff --git a/src/archive.lisp b/src/utils/archive.lisp similarity index 100% rename from src/archive.lisp rename to src/utils/archive.lisp diff --git a/src/charsets.lisp b/src/utils/charsets.lisp similarity index 100% rename from src/charsets.lisp rename to src/utils/charsets.lisp diff --git a/src/logs.lisp b/src/utils/logs.lisp similarity index 100% rename from src/logs.lisp rename to src/utils/logs.lisp diff --git a/src/monitor.lisp b/src/utils/monitor.lisp similarity index 100% rename from src/monitor.lisp rename to src/utils/monitor.lisp diff --git a/src/read-sql-files.lisp b/src/utils/read-sql-files.lisp similarity index 100% rename from src/read-sql-files.lisp rename to src/utils/read-sql-files.lisp diff --git a/src/utils/report.lisp b/src/utils/report.lisp new file mode 100644 index 0000000..74357bb --- /dev/null +++ b/src/utils/report.lisp @@ -0,0 +1,133 @@ +;;; +;;; Pretty print a report while doing bulk operations +;;; + +(in-package :pgloader.utils) + +(defvar *header-line* + "~&------------------------------ --------- --------- --------- --------------") + +(defvar *header-tname-format* "~&~30@a") +(defvar *header-stats-format* " ~9@a ~9@a ~9@a ~14@a") +(defvar *header-cols-format* (concatenate 'string *header-tname-format* + *header-stats-format*)) +(defvar *header-cols-names* '("table name" "read" "imported" "errors" "time")) + +(defun report-header () + ;; (apply #'format *report-stream* *header-cols-format* *header-cols-names*) + (format *report-stream* "~{~}" *header-cols-format* *header-cols-names*) + (terpri) + (format *report-stream* *header-line*) + (terpri)) + +(defun report-table-name (table-name) + (format *report-stream* *header-tname-format* table-name)) + +(defun report-results (read rows errors seconds) + (format *report-stream* *header-stats-format* + read rows errors (format-interval seconds nil)) + (terpri)) + +(defun report-footer (legend read rows errors seconds) + (terpri) + (format *report-stream* *header-line*) + (format *report-stream* "~{~}" *header-cols-format* + (list legend read rows errors (format-interval seconds nil))) + (format *report-stream* "~&") + (terpri)) + +;;; +;;; Pretty print a report from a pgtable and pgstats counters +;;; +(defun report-pgtable-stats (pgstate name) + (with-slots (read rows errs secs) (pgstate-get-table pgstate name) + (report-results read rows errs secs))) + +(defun report-pgstate-stats (pgstate legend) + (with-slots (read rows errs secs) pgstate + (report-footer legend read rows errs secs))) + +;;; +;;; Pretty print the whole summary from a state +;;; +(defun report-summary (&key ((:state pgstate) *state*) (header t) footer) + "Report a whole summary." + (when header (report-header)) + (loop + for table-name in (reverse (pgstate-tabnames pgstate)) + for pgtable = (gethash table-name (pgstate-tables pgstate)) + do + (with-slots (read rows errs secs) pgtable + (format *report-stream* *header-cols-format* + table-name read rows errs (format-interval secs nil))) + finally (when footer + (report-pgstate-stats pgstate footer)))) + +(defmacro with-stats-collection ((table-name + &key + (dbname *pg-dbname*) + summary + use-result-as-read + use-result-as-rows + ((:state pgstate) *state*)) + &body forms) + "Measure time spent in running BODY into STATE, accounting the seconds to + given DBNAME and TABLE-NAME" + (let ((result (gensym "result")) + (secs (gensym "secs"))) + `(let ((*pg-dbname* (or ,dbname *pg-dbname*))) + (prog2 + (pgstate-add-table ,pgstate *pg-dbname* ,table-name) + (multiple-value-bind (,result ,secs) + (timing ,@forms) + (cond ((and ,use-result-as-read ,use-result-as-rows) + (pgstate-incf ,pgstate ,table-name + :read ,result :rows ,result :secs ,secs)) + (,use-result-as-read + (pgstate-incf ,pgstate ,table-name :read ,result :secs ,secs)) + (,use-result-as-rows + (pgstate-incf ,pgstate ,table-name :rows ,result :secs ,secs)) + (t + (pgstate-incf ,pgstate ,table-name :secs ,secs))) + ,result) + (when ,summary (report-summary)))))) + +(defun report-full-summary (legend state + &key before finally parallel) + "Report the full story when given three different sections of reporting." + + (terpri) + + ;; BEFORE + (if before + (progn + (report-summary :state before :footer nil) + (format *report-stream* pgloader.utils::*header-line*) + (report-summary :state state :header nil :footer nil)) + ;; no state before + (report-summary :state state :footer nil)) + + (when (or finally parallel) + (format *report-stream* pgloader.utils::*header-line*) + (when parallel + (report-summary :state parallel :header nil :footer nil)) + (when finally + (report-summary :state finally :header nil :footer nil))) + + ;; add to the grand total the other sections, except for the parallel one + (incf (pgloader.utils::pgstate-secs state) + (+ (if before (pgloader.utils::pgstate-secs before) 0) + (if finally (pgloader.utils::pgstate-secs finally) 0))) + + ;; if the parallel tasks took longer than the rest cumulated, the total + ;; waiting time actually was parallel - before + (when (and parallel + (< (pgloader.utils::pgstate-secs state) + (pgloader.utils::pgstate-secs parallel))) + (setf (pgloader.utils::pgstate-secs state) + (- (pgloader.utils::pgstate-secs parallel) + (if before (pgloader.utils::pgstate-secs before) 0)))) + + ;; and report the Grand Total + (report-pgstate-stats state legend)) + diff --git a/src/utils/state.lisp b/src/utils/state.lisp new file mode 100644 index 0000000..3c37478 --- /dev/null +++ b/src/utils/state.lisp @@ -0,0 +1,111 @@ +;;; +;;; Global state maintenance, which includes statistics about each target of +;;; the load: number of lines read, imported and number of errors found +;;; along the way. +;;; +(in-package :pgloader.utils) + +;;; +;;; Data Structures to maintain information about loading state +;;; +(defstruct pgtable + name + (read 0 :type fixnum) ; how many rows did we read + (rows 0 :type fixnum) ; how many rows did we write + (errs 0 :type fixnum) ; how many errors did we see + (secs 0.0 :type float) ; how many seconds did it take + reject-data reject-logs) ; files where to find reject data + +(defstruct pgstate + (tables (make-hash-table :test 'equal)) + (tabnames nil) ; we want to keep the ordering + (read 0 :type fixnum) + (rows 0 :type fixnum) + (errs 0 :type fixnum) + (secs 0.0 :type float)) + +(defun pgstate-get-table (pgstate name) + (gethash name (pgstate-tables pgstate))) + +(defun pgstate-add-table (pgstate dbname table-name) + "Instanciate a new pgtable structure to hold our stats, and return it." + (or (pgstate-get-table pgstate table-name) + (let* ((table (setf (gethash table-name (pgstate-tables pgstate)) + (make-pgtable :name table-name))) + (reject-dir (pathname-directory + (merge-pathnames + (format nil "~a/" dbname) *root-dir*))) + (data-pathname + (make-pathname :directory reject-dir :name table-name :type "dat")) + (logs-pathname + (make-pathname :directory reject-dir :name table-name :type "log"))) + + ;; maintain the ordering + (push table-name (pgstate-tabnames pgstate)) + + ;; create the per-database directory if it does not exists yet + (ensure-directories-exist (directory-namestring data-pathname)) + + ;; rename the existing files if there are some + (with-open-file (data data-pathname + :direction :output + :if-exists :rename + :if-does-not-exist nil)) + + (with-open-file (logs logs-pathname + :direction :output + :if-exists :rename + :if-does-not-exist nil)) + + ;; set the properties to the right pathnames + (setf (pgtable-reject-data table) data-pathname + (pgtable-reject-logs table) logs-pathname) + table))) + +(defun pgstate-setf (pgstate name &key read rows errs secs) + (let ((pgtable (pgstate-get-table pgstate name))) + (when read + (setf (pgtable-read pgtable) read) + (incf (pgstate-read pgstate) read)) + (when rows + (setf (pgtable-rows pgtable) rows) + (incf (pgstate-rows pgstate) rows)) + (when errs + (setf (pgtable-errs pgtable) errs) + (incf (pgstate-errs pgstate) errs)) + (when secs + (setf (pgtable-secs pgtable) secs) + (incf (pgstate-secs pgstate) secs)) + pgtable)) + +(defun pgstate-incf (pgstate name &key read rows errs secs) + (let ((pgtable (pgstate-get-table pgstate name))) + (when read + (incf (pgtable-read pgtable) read) + (incf (pgstate-read pgstate) read)) + (when rows + (incf (pgtable-rows pgtable) rows) + (incf (pgstate-rows pgstate) rows)) + (when errs + (incf (pgtable-errs pgtable) errs) + (incf (pgstate-errs pgstate) errs)) + (when secs + (incf (pgtable-secs pgtable) secs) + (incf (pgstate-secs pgstate) secs)) + pgtable)) + +(defun pgstate-decf (pgstate name &key read rows errs secs) + (let ((pgtable (pgstate-get-table pgstate name))) + (when read + (decf (pgtable-read pgtable) read) + (decf (pgstate-read pgstate) read)) + (when rows + (decf (pgtable-rows pgtable) rows) + (decf (pgstate-rows pgstate) rows)) + (when errs + (decf (pgtable-errs pgtable) errs) + (decf (pgstate-errs pgstate) errs)) + (when secs + (decf (pgtable-secs pgtable) secs) + (decf (pgstate-secs pgstate) secs)) + pgtable)) diff --git a/src/utils/threads.lisp b/src/utils/threads.lisp new file mode 100644 index 0000000..85f8b40 --- /dev/null +++ b/src/utils/threads.lisp @@ -0,0 +1,32 @@ +;;; +;;; For communication purposes, the lparallel kernel must be created with +;;; access to some common bindings. +;;; + +(in-package :pgloader.utils) + +(defun make-kernel (worker-count + &key (bindings + `((*monitoring-queue* . ,*monitoring-queue*) + (*copy-batch-rows* . ,*copy-batch-rows*) + (*copy-batch-size* . ,*copy-batch-size*) + (*concurrent-batches* . ,*concurrent-batches*) + (*pgconn-host* . ',*pgconn-host*) + (*pgconn-port* . ,*pgconn-port*) + (*pgconn-user* . ,*pgconn-user*) + (*pgconn-pass* . ,*pgconn-pass*) + (*pg-settings* . ',*pg-settings*) + (*myconn-host* . ',*myconn-host*) + (*myconn-port* . ,*myconn-port*) + (*myconn-user* . ,*myconn-user*) + (*myconn-pass* . ,*myconn-pass*) + (*state* . ,*state*) + (*client-min-messages* . ,*client-min-messages*) + (*log-min-messages* . ,*log-min-messages*) + + ;; bindings updates for libs + ;; CFFI is used by the SQLite lib + (cffi:*default-foreign-encoding* + . ,cffi:*default-foreign-encoding*)))) + "Wrapper around lparallel:make-kernel that sets our usual bindings." + (lp:make-kernel worker-count :bindings bindings)) diff --git a/src/transforms.lisp b/src/utils/transforms.lisp similarity index 100% rename from src/transforms.lisp rename to src/utils/transforms.lisp diff --git a/src/utils/utils.lisp b/src/utils/utils.lisp new file mode 100644 index 0000000..16ccf2f --- /dev/null +++ b/src/utils/utils.lisp @@ -0,0 +1,79 @@ +;;; +;;; Random utilities +;;; +(in-package :pgloader.utils) + +;;; +;;; Timing Macro +;;; +(defun elapsed-time-since (start) + "Return how many seconds ticked between START and now" + (/ (- (get-internal-real-time) start) + internal-time-units-per-second)) + +(defmacro timing (&body forms) + "return both how much real time was spend in body and its result" + (let ((start (gensym)) + (end (gensym)) + (result (gensym))) + `(let* ((,start (get-internal-real-time)) + (,result (progn ,@forms)) + (,end (get-internal-real-time))) + (values ,result (/ (- ,end ,start) internal-time-units-per-second))))) + +;;; +;;; Timing Formating +;;; +(defun format-interval (seconds &optional (stream t)) + "Output the number of seconds in a human friendly way" + (multiple-value-bind (years months days hours mins secs millisecs) + (date:decode-interval (date:encode-interval :second seconds)) + (declare (ignore millisecs)) + (format + stream + "~:[~*~;~d years ~]~:[~*~;~d months ~]~:[~*~;~d days ~]~:[~*~;~dh~]~:[~*~;~dm~]~5,3fs" + (< 0 years) years + (< 0 months) months + (< 0 days) days + (< 0 hours) hours + (< 0 mins) mins + (+ secs (- (multiple-value-bind (r q) + (truncate seconds 60) + (declare (ignore r)) + q) + secs))))) + +;;; +;;; File utils +;;; +(defun slurp-file-into-string (filename) + "Return given filename's whole content as a string." + (with-open-file (stream filename + :direction :input + :external-format :utf-8) + (let ((seq (make-array (file-length stream) + :element-type 'character + :fill-pointer t))) + ;; apparently the fastest way at that is read-sequence + ;; http://www.ymeme.com/slurping-a-file-common-lisp-83.html + (setf (fill-pointer seq) (read-sequence seq stream)) + seq))) + +;;; +;;; Camel Case converter +;;; +(defun camelCase-to-colname (string) + "Transform input STRING into a suitable column name. + lahmanID lahman_id + playerID player_id + birthYear birth_year" + (coerce + (loop + for first = t then nil + for char across string + for previous-upper-p = nil then char-upper-p + for char-upper-p = (eq char (char-upcase char)) + for new-word = (and (not first) char-upper-p (not previous-upper-p)) + when (and new-word (not (char= char #\_))) collect #\_ + collect (char-downcase char)) + 'string))