mirror of
https://github.com/dimitri/pgloader.git
synced 2026-05-04 10:31:02 +02:00
Refactor source code organisation.
In passing, fix a bug in the previous commit where left-over code would cancel the whole new parsing code for advanced source fields options.
This commit is contained in:
parent
ac55d71401
commit
2369a142a7
105
pgloader.asd
105
pgloader.asd
@ -35,53 +35,28 @@
|
||||
((:module "src"
|
||||
:components
|
||||
((:file "params")
|
||||
|
||||
(:file "package" :depends-on ("params"))
|
||||
(:file "queue" :depends-on ("params" "package"))
|
||||
|
||||
(:file "logs" :depends-on ("package" "params"))
|
||||
(:module "utils"
|
||||
:depends-on ("package" "params")
|
||||
:components
|
||||
((:file "charsets")
|
||||
(:file "archive")
|
||||
(:file "threads")
|
||||
(:file "logs")
|
||||
(:file "monitor" :depends-on ("logs"))
|
||||
(:file "state")
|
||||
(:file "report" :depends-on ("state"))
|
||||
(:file "utils" :depends-on ("charsets" "monitor"))
|
||||
|
||||
(:file "monitor" :depends-on ("params"
|
||||
"package"
|
||||
"logs"))
|
||||
|
||||
(:file "charsets":depends-on ("package"))
|
||||
(:file "utils" :depends-on ("params"
|
||||
"package"
|
||||
"charsets"
|
||||
"monitor"))
|
||||
|
||||
;; those are one-package-per-file
|
||||
(:file "transforms")
|
||||
(:file "queue" :depends-on ("params" "package"))
|
||||
|
||||
(:file "read-sql-files" :depends-on ("package"))
|
||||
|
||||
(:file "parser" :depends-on ("package"
|
||||
"params"
|
||||
"transforms"
|
||||
"utils"
|
||||
"monitor"
|
||||
"read-sql-files"
|
||||
"pgsql"))
|
||||
|
||||
(:file "parse-ini" :depends-on ("package"
|
||||
"params"
|
||||
"utils"))
|
||||
|
||||
(:file "archive" :depends-on ("params"
|
||||
"package"
|
||||
"utils"
|
||||
"sources"
|
||||
"pgsql"))
|
||||
;; those are one-package-per-file
|
||||
(:file "transforms")
|
||||
(:file "read-sql-files")))
|
||||
|
||||
;; package pgloader.pgsql
|
||||
(:module pgsql
|
||||
:depends-on ("package"
|
||||
"params"
|
||||
"queue"
|
||||
"utils"
|
||||
"logs"
|
||||
"monitor")
|
||||
:depends-on ("package" "params" "utils")
|
||||
:components
|
||||
((:file "copy-format")
|
||||
(:file "queries")
|
||||
@ -91,37 +66,49 @@
|
||||
"queries"
|
||||
"schema"))))
|
||||
|
||||
(:module "parsers"
|
||||
:depends-on ("params" "package" "utils" "pgsql")
|
||||
:components
|
||||
((:file "parse-ini")
|
||||
(:file "parser")))
|
||||
|
||||
;; generic API for Sources
|
||||
(:file "sources-api"
|
||||
:pathname "sources"
|
||||
:depends-on ("params" "package" "utils"))
|
||||
|
||||
;; Source format specific implementations
|
||||
(:module sources
|
||||
:depends-on ("params"
|
||||
"package"
|
||||
"sources-api"
|
||||
"pgsql"
|
||||
"utils"
|
||||
"logs"
|
||||
"monitor"
|
||||
"queue"
|
||||
"transforms")
|
||||
"queue")
|
||||
:components
|
||||
((:file "sources")
|
||||
(:file "csv" :depends-on ("sources"))
|
||||
(:file "fixed" :depends-on ("sources"))
|
||||
(:file "db3" :depends-on ("sources"))
|
||||
(:file "ixf" :depends-on ("sources"))
|
||||
(:file "sqlite" :depends-on ("sources"))
|
||||
(:file "syslog" :depends-on ("sources"))
|
||||
(:file "mysql-cast-rules")
|
||||
(:file "mysql-schema")
|
||||
(:file "mysql-csv" :depends-on ("mysql-schema"))
|
||||
(:file "mysql" :depends-on ("mysql-cast-rules"
|
||||
"mysql-schema"))))
|
||||
((:file "csv")
|
||||
(:file "fixed")
|
||||
(:file "db3")
|
||||
(:file "ixf")
|
||||
(:file "sqlite")
|
||||
(:file "syslog")
|
||||
|
||||
(:module "mysql-utils"
|
||||
:pathname "mysql"
|
||||
:components
|
||||
((:file "mysql-cast-rules")
|
||||
(:file "mysql-schema")
|
||||
(:file "mysql-csv"
|
||||
:depends-on ("mysql-schema"))))
|
||||
|
||||
(:file "mysql" :depends-on ("mysql-utils"))))
|
||||
|
||||
;; the main entry file, used when building a stand-alone
|
||||
;; executable image
|
||||
(:file "main" :depends-on ("params"
|
||||
"package"
|
||||
"monitor"
|
||||
"utils"
|
||||
"parser"
|
||||
"parsers"
|
||||
"sources"))))
|
||||
|
||||
;; to produce the website
|
||||
|
||||
@ -289,15 +289,10 @@
|
||||
;; Not really a source, more a util package to deal with http and zip
|
||||
;;
|
||||
(defpackage #:pgloader.archive
|
||||
(:use #:cl #:pgloader.params #:pgloader.utils #:pgloader.csv)
|
||||
(:import-from #:pgloader.pgsql
|
||||
#:with-pgsql-transaction
|
||||
#:pgsql-execute)
|
||||
(:export #:*default-tmpdir*
|
||||
#:http-fetch-file
|
||||
(:use #:cl #:pgloader.params)
|
||||
(:export #:http-fetch-file
|
||||
#:expand-archive
|
||||
#:get-matching-filenames
|
||||
#:import-csv-from-zip))
|
||||
#:get-matching-filenames))
|
||||
|
||||
|
||||
|
||||
|
||||
14
src/parsers/README.md
Normal file
14
src/parsers/README.md
Normal file
@ -0,0 +1,14 @@
|
||||
The pgloader parser reads the command language (a pgloader specific DSL) and
|
||||
produces lisp code as its output. The lisp code is then compiled at run-time
|
||||
and executed.
|
||||
|
||||
The generated lisp-code uses the generic API defined in src/sources.lisp and
|
||||
creates objects specifics to the kind of source that is being loaded.
|
||||
|
||||
It is possible to see the generated code for study or debug:
|
||||
|
||||
PARSER> (pprint
|
||||
(with-monitor ()
|
||||
(parse-commands-from-file
|
||||
"/Users/dim/dev/pgloader/test/fixed.load")))
|
||||
|
||||
@ -1745,10 +1745,6 @@ load database
|
||||
|
||||
(defrule csv-field-options (? (or csv-field-option csv-field-option-list)))
|
||||
|
||||
(defrule csv-field-options (* csv-field-option)
|
||||
(:lambda (options)
|
||||
(alexandria:alist-plist options)))
|
||||
|
||||
(defrule csv-raw-field-name (and (or #\_ (alpha-char-p character))
|
||||
(* (or (alpha-char-p character)
|
||||
(digit-char-p character)
|
||||
345
src/utils.lisp
345
src/utils.lisp
@ -1,345 +0,0 @@
|
||||
;;;
|
||||
;;; Random utilities
|
||||
;;;
|
||||
(in-package :pgloader.utils)
|
||||
|
||||
;;;
|
||||
;;; Timing Macro
|
||||
;;;
|
||||
(defun elapsed-time-since (start)
|
||||
"Return how many seconds ticked between START and now"
|
||||
(/ (- (get-internal-real-time) start)
|
||||
internal-time-units-per-second))
|
||||
|
||||
(defmacro timing (&body forms)
|
||||
"return both how much real time was spend in body and its result"
|
||||
(let ((start (gensym))
|
||||
(end (gensym))
|
||||
(result (gensym)))
|
||||
`(let* ((,start (get-internal-real-time))
|
||||
(,result (progn ,@forms))
|
||||
(,end (get-internal-real-time)))
|
||||
(values ,result (/ (- ,end ,start) internal-time-units-per-second)))))
|
||||
|
||||
;;;
|
||||
;;; Timing Formating
|
||||
;;;
|
||||
(defun format-interval (seconds &optional (stream t))
|
||||
"Output the number of seconds in a human friendly way"
|
||||
(multiple-value-bind (years months days hours mins secs millisecs)
|
||||
(date:decode-interval (date:encode-interval :second seconds))
|
||||
(declare (ignore millisecs))
|
||||
(format
|
||||
stream
|
||||
"~:[~*~;~d years ~]~:[~*~;~d months ~]~:[~*~;~d days ~]~:[~*~;~dh~]~:[~*~;~dm~]~5,3fs"
|
||||
(< 0 years) years
|
||||
(< 0 months) months
|
||||
(< 0 days) days
|
||||
(< 0 hours) hours
|
||||
(< 0 mins) mins
|
||||
(+ secs (- (multiple-value-bind (r q)
|
||||
(truncate seconds 60)
|
||||
(declare (ignore r))
|
||||
q)
|
||||
secs)))))
|
||||
|
||||
;;;
|
||||
;;; Data Structures to maintain information about loading state
|
||||
;;;
|
||||
(defstruct pgtable
|
||||
name
|
||||
(read 0 :type fixnum) ; how many rows did we read
|
||||
(rows 0 :type fixnum) ; how many rows did we write
|
||||
(errs 0 :type fixnum) ; how many errors did we see
|
||||
(secs 0.0 :type float) ; how many seconds did it take
|
||||
reject-data reject-logs) ; files where to find reject data
|
||||
|
||||
(defstruct pgstate
|
||||
(tables (make-hash-table :test 'equal))
|
||||
(tabnames nil) ; we want to keep the ordering
|
||||
(read 0 :type fixnum)
|
||||
(rows 0 :type fixnum)
|
||||
(errs 0 :type fixnum)
|
||||
(secs 0.0 :type float))
|
||||
|
||||
(defun pgstate-get-table (pgstate name)
|
||||
(gethash name (pgstate-tables pgstate)))
|
||||
|
||||
(defun pgstate-add-table (pgstate dbname table-name)
|
||||
"Instanciate a new pgtable structure to hold our stats, and return it."
|
||||
(or (pgstate-get-table pgstate table-name)
|
||||
(let* ((table (setf (gethash table-name (pgstate-tables pgstate))
|
||||
(make-pgtable :name table-name)))
|
||||
(reject-dir (pathname-directory
|
||||
(merge-pathnames
|
||||
(format nil "~a/" dbname) *root-dir*)))
|
||||
(data-pathname
|
||||
(make-pathname :directory reject-dir :name table-name :type "dat"))
|
||||
(logs-pathname
|
||||
(make-pathname :directory reject-dir :name table-name :type "log")))
|
||||
|
||||
;; maintain the ordering
|
||||
(push table-name (pgstate-tabnames pgstate))
|
||||
|
||||
;; create the per-database directory if it does not exists yet
|
||||
(ensure-directories-exist (directory-namestring data-pathname))
|
||||
|
||||
;; rename the existing files if there are some
|
||||
(with-open-file (data data-pathname
|
||||
:direction :output
|
||||
:if-exists :rename
|
||||
:if-does-not-exist nil))
|
||||
|
||||
(with-open-file (logs logs-pathname
|
||||
:direction :output
|
||||
:if-exists :rename
|
||||
:if-does-not-exist nil))
|
||||
|
||||
;; set the properties to the right pathnames
|
||||
(setf (pgtable-reject-data table) data-pathname
|
||||
(pgtable-reject-logs table) logs-pathname)
|
||||
table)))
|
||||
|
||||
(defun pgstate-setf (pgstate name &key read rows errs secs)
|
||||
(let ((pgtable (pgstate-get-table pgstate name)))
|
||||
(when read
|
||||
(setf (pgtable-read pgtable) read)
|
||||
(incf (pgstate-read pgstate) read))
|
||||
(when rows
|
||||
(setf (pgtable-rows pgtable) rows)
|
||||
(incf (pgstate-rows pgstate) rows))
|
||||
(when errs
|
||||
(setf (pgtable-errs pgtable) errs)
|
||||
(incf (pgstate-errs pgstate) errs))
|
||||
(when secs
|
||||
(setf (pgtable-secs pgtable) secs)
|
||||
(incf (pgstate-secs pgstate) secs))
|
||||
pgtable))
|
||||
|
||||
(defun pgstate-incf (pgstate name &key read rows errs secs)
|
||||
(let ((pgtable (pgstate-get-table pgstate name)))
|
||||
(when read
|
||||
(incf (pgtable-read pgtable) read)
|
||||
(incf (pgstate-read pgstate) read))
|
||||
(when rows
|
||||
(incf (pgtable-rows pgtable) rows)
|
||||
(incf (pgstate-rows pgstate) rows))
|
||||
(when errs
|
||||
(incf (pgtable-errs pgtable) errs)
|
||||
(incf (pgstate-errs pgstate) errs))
|
||||
(when secs
|
||||
(incf (pgtable-secs pgtable) secs)
|
||||
(incf (pgstate-secs pgstate) secs))
|
||||
pgtable))
|
||||
|
||||
(defun pgstate-decf (pgstate name &key read rows errs secs)
|
||||
(let ((pgtable (pgstate-get-table pgstate name)))
|
||||
(when read
|
||||
(decf (pgtable-read pgtable) read)
|
||||
(decf (pgstate-read pgstate) read))
|
||||
(when rows
|
||||
(decf (pgtable-rows pgtable) rows)
|
||||
(decf (pgstate-rows pgstate) rows))
|
||||
(when errs
|
||||
(decf (pgtable-errs pgtable) errs)
|
||||
(decf (pgstate-errs pgstate) errs))
|
||||
(when secs
|
||||
(decf (pgtable-secs pgtable) secs)
|
||||
(decf (pgstate-secs pgstate) secs))
|
||||
pgtable))
|
||||
|
||||
;;;
|
||||
;;; Pretty print a report while doing bulk operations
|
||||
;;;
|
||||
(defvar *header-line*
|
||||
"~&------------------------------ --------- --------- --------- --------------")
|
||||
|
||||
(defvar *header-tname-format* "~&~30@a")
|
||||
(defvar *header-stats-format* " ~9@a ~9@a ~9@a ~14@a")
|
||||
(defvar *header-cols-format* (concatenate 'string *header-tname-format*
|
||||
*header-stats-format*))
|
||||
(defvar *header-cols-names* '("table name" "read" "imported" "errors" "time"))
|
||||
|
||||
(defun report-header ()
|
||||
;; (apply #'format *report-stream* *header-cols-format* *header-cols-names*)
|
||||
(format *report-stream* "~{~}" *header-cols-format* *header-cols-names*)
|
||||
(terpri)
|
||||
(format *report-stream* *header-line*)
|
||||
(terpri))
|
||||
|
||||
(defun report-table-name (table-name)
|
||||
(format *report-stream* *header-tname-format* table-name))
|
||||
|
||||
(defun report-results (read rows errors seconds)
|
||||
(format *report-stream* *header-stats-format*
|
||||
read rows errors (format-interval seconds nil))
|
||||
(terpri))
|
||||
|
||||
(defun report-footer (legend read rows errors seconds)
|
||||
(terpri)
|
||||
(format *report-stream* *header-line*)
|
||||
(format *report-stream* "~{~}" *header-cols-format*
|
||||
(list legend read rows errors (format-interval seconds nil)))
|
||||
(format *report-stream* "~&")
|
||||
(terpri))
|
||||
|
||||
;;;
|
||||
;;; Pretty print a report from a pgtable and pgstats counters
|
||||
;;;
|
||||
(defun report-pgtable-stats (pgstate name)
|
||||
(with-slots (read rows errs secs) (pgstate-get-table pgstate name)
|
||||
(report-results read rows errs secs)))
|
||||
|
||||
(defun report-pgstate-stats (pgstate legend)
|
||||
(with-slots (read rows errs secs) pgstate
|
||||
(report-footer legend read rows errs secs)))
|
||||
|
||||
;;;
|
||||
;;; Pretty print the whole summary from a state
|
||||
;;;
|
||||
(defun report-summary (&key ((:state pgstate) *state*) (header t) footer)
|
||||
"Report a whole summary."
|
||||
(when header (report-header))
|
||||
(loop
|
||||
for table-name in (reverse (pgstate-tabnames pgstate))
|
||||
for pgtable = (gethash table-name (pgstate-tables pgstate))
|
||||
do
|
||||
(with-slots (read rows errs secs) pgtable
|
||||
(format *report-stream* *header-cols-format*
|
||||
table-name read rows errs (format-interval secs nil)))
|
||||
finally (when footer
|
||||
(report-pgstate-stats pgstate footer))))
|
||||
|
||||
(defmacro with-stats-collection ((table-name
|
||||
&key
|
||||
(dbname *pg-dbname*)
|
||||
summary
|
||||
use-result-as-read
|
||||
use-result-as-rows
|
||||
((:state pgstate) *state*))
|
||||
&body forms)
|
||||
"Measure time spent in running BODY into STATE, accounting the seconds to
|
||||
given DBNAME and TABLE-NAME"
|
||||
(let ((result (gensym "result"))
|
||||
(secs (gensym "secs")))
|
||||
`(let ((*pg-dbname* (or ,dbname *pg-dbname*)))
|
||||
(prog2
|
||||
(pgstate-add-table ,pgstate *pg-dbname* ,table-name)
|
||||
(multiple-value-bind (,result ,secs)
|
||||
(timing ,@forms)
|
||||
(cond ((and ,use-result-as-read ,use-result-as-rows)
|
||||
(pgstate-incf ,pgstate ,table-name
|
||||
:read ,result :rows ,result :secs ,secs))
|
||||
(,use-result-as-read
|
||||
(pgstate-incf ,pgstate ,table-name :read ,result :secs ,secs))
|
||||
(,use-result-as-rows
|
||||
(pgstate-incf ,pgstate ,table-name :rows ,result :secs ,secs))
|
||||
(t
|
||||
(pgstate-incf ,pgstate ,table-name :secs ,secs)))
|
||||
,result)
|
||||
(when ,summary (report-summary))))))
|
||||
|
||||
(defun report-full-summary (legend state
|
||||
&key before finally parallel)
|
||||
"Report the full story when given three different sections of reporting."
|
||||
|
||||
(terpri)
|
||||
|
||||
;; BEFORE
|
||||
(if before
|
||||
(progn
|
||||
(report-summary :state before :footer nil)
|
||||
(format *report-stream* pgloader.utils::*header-line*)
|
||||
(report-summary :state state :header nil :footer nil))
|
||||
;; no state before
|
||||
(report-summary :state state :footer nil))
|
||||
|
||||
(when (or finally parallel)
|
||||
(format *report-stream* pgloader.utils::*header-line*)
|
||||
(when parallel
|
||||
(report-summary :state parallel :header nil :footer nil))
|
||||
(when finally
|
||||
(report-summary :state finally :header nil :footer nil)))
|
||||
|
||||
;; add to the grand total the other sections, except for the parallel one
|
||||
(incf (pgloader.utils::pgstate-secs state)
|
||||
(+ (if before (pgloader.utils::pgstate-secs before) 0)
|
||||
(if finally (pgloader.utils::pgstate-secs finally) 0)))
|
||||
|
||||
;; if the parallel tasks took longer than the rest cumulated, the total
|
||||
;; waiting time actually was parallel - before
|
||||
(when (and parallel
|
||||
(< (pgloader.utils::pgstate-secs state)
|
||||
(pgloader.utils::pgstate-secs parallel)))
|
||||
(setf (pgloader.utils::pgstate-secs state)
|
||||
(- (pgloader.utils::pgstate-secs parallel)
|
||||
(if before (pgloader.utils::pgstate-secs before) 0))))
|
||||
|
||||
;; and report the Grand Total
|
||||
(report-pgstate-stats state legend))
|
||||
|
||||
;;;
|
||||
;;; File utils
|
||||
;;;
|
||||
(defun slurp-file-into-string (filename)
|
||||
"Return given filename's whole content as a string."
|
||||
(with-open-file (stream filename
|
||||
:direction :input
|
||||
:external-format :utf-8)
|
||||
(let ((seq (make-array (file-length stream)
|
||||
:element-type 'character
|
||||
:fill-pointer t)))
|
||||
;; apparently the fastest way at that is read-sequence
|
||||
;; http://www.ymeme.com/slurping-a-file-common-lisp-83.html
|
||||
(setf (fill-pointer seq) (read-sequence seq stream))
|
||||
seq)))
|
||||
|
||||
;;;
|
||||
;;; Camel Case converter
|
||||
;;;
|
||||
(defun camelCase-to-colname (string)
|
||||
"Transform input STRING into a suitable column name.
|
||||
lahmanID lahman_id
|
||||
playerID player_id
|
||||
birthYear birth_year"
|
||||
(coerce
|
||||
(loop
|
||||
for first = t then nil
|
||||
for char across string
|
||||
for previous-upper-p = nil then char-upper-p
|
||||
for char-upper-p = (eq char (char-upcase char))
|
||||
for new-word = (and (not first) char-upper-p (not previous-upper-p))
|
||||
when (and new-word (not (char= char #\_))) collect #\_
|
||||
collect (char-downcase char))
|
||||
'string))
|
||||
|
||||
|
||||
;;;
|
||||
;;; For communication purposes, the lparallel kernel must be created with
|
||||
;;; access to some common bindings.
|
||||
;;;
|
||||
(defun make-kernel (worker-count
|
||||
&key (bindings
|
||||
`((*monitoring-queue* . ,*monitoring-queue*)
|
||||
(*copy-batch-rows* . ,*copy-batch-rows*)
|
||||
(*copy-batch-size* . ,*copy-batch-size*)
|
||||
(*concurrent-batches* . ,*concurrent-batches*)
|
||||
(*pgconn-host* . ',*pgconn-host*)
|
||||
(*pgconn-port* . ,*pgconn-port*)
|
||||
(*pgconn-user* . ,*pgconn-user*)
|
||||
(*pgconn-pass* . ,*pgconn-pass*)
|
||||
(*pg-settings* . ',*pg-settings*)
|
||||
(*myconn-host* . ',*myconn-host*)
|
||||
(*myconn-port* . ,*myconn-port*)
|
||||
(*myconn-user* . ,*myconn-user*)
|
||||
(*myconn-pass* . ,*myconn-pass*)
|
||||
(*state* . ,*state*)
|
||||
(*client-min-messages* . ,*client-min-messages*)
|
||||
(*log-min-messages* . ,*log-min-messages*)
|
||||
|
||||
;; bindings updates for libs
|
||||
;; CFFI is used by the SQLite lib
|
||||
(cffi:*default-foreign-encoding*
|
||||
. ,cffi:*default-foreign-encoding*))))
|
||||
"Wrapper around lparallel:make-kernel that sets our usual bindings."
|
||||
(lp:make-kernel worker-count :bindings bindings))
|
||||
133
src/utils/report.lisp
Normal file
133
src/utils/report.lisp
Normal file
@ -0,0 +1,133 @@
|
||||
;;;
|
||||
;;; Pretty print a report while doing bulk operations
|
||||
;;;
|
||||
|
||||
(in-package :pgloader.utils)
|
||||
|
||||
(defvar *header-line*
|
||||
"~&------------------------------ --------- --------- --------- --------------")
|
||||
|
||||
(defvar *header-tname-format* "~&~30@a")
|
||||
(defvar *header-stats-format* " ~9@a ~9@a ~9@a ~14@a")
|
||||
(defvar *header-cols-format* (concatenate 'string *header-tname-format*
|
||||
*header-stats-format*))
|
||||
(defvar *header-cols-names* '("table name" "read" "imported" "errors" "time"))
|
||||
|
||||
(defun report-header ()
|
||||
;; (apply #'format *report-stream* *header-cols-format* *header-cols-names*)
|
||||
(format *report-stream* "~{~}" *header-cols-format* *header-cols-names*)
|
||||
(terpri)
|
||||
(format *report-stream* *header-line*)
|
||||
(terpri))
|
||||
|
||||
(defun report-table-name (table-name)
|
||||
(format *report-stream* *header-tname-format* table-name))
|
||||
|
||||
(defun report-results (read rows errors seconds)
|
||||
(format *report-stream* *header-stats-format*
|
||||
read rows errors (format-interval seconds nil))
|
||||
(terpri))
|
||||
|
||||
(defun report-footer (legend read rows errors seconds)
|
||||
(terpri)
|
||||
(format *report-stream* *header-line*)
|
||||
(format *report-stream* "~{~}" *header-cols-format*
|
||||
(list legend read rows errors (format-interval seconds nil)))
|
||||
(format *report-stream* "~&")
|
||||
(terpri))
|
||||
|
||||
;;;
|
||||
;;; Pretty print a report from a pgtable and pgstats counters
|
||||
;;;
|
||||
(defun report-pgtable-stats (pgstate name)
|
||||
(with-slots (read rows errs secs) (pgstate-get-table pgstate name)
|
||||
(report-results read rows errs secs)))
|
||||
|
||||
(defun report-pgstate-stats (pgstate legend)
|
||||
(with-slots (read rows errs secs) pgstate
|
||||
(report-footer legend read rows errs secs)))
|
||||
|
||||
;;;
|
||||
;;; Pretty print the whole summary from a state
|
||||
;;;
|
||||
(defun report-summary (&key ((:state pgstate) *state*) (header t) footer)
|
||||
"Report a whole summary."
|
||||
(when header (report-header))
|
||||
(loop
|
||||
for table-name in (reverse (pgstate-tabnames pgstate))
|
||||
for pgtable = (gethash table-name (pgstate-tables pgstate))
|
||||
do
|
||||
(with-slots (read rows errs secs) pgtable
|
||||
(format *report-stream* *header-cols-format*
|
||||
table-name read rows errs (format-interval secs nil)))
|
||||
finally (when footer
|
||||
(report-pgstate-stats pgstate footer))))
|
||||
|
||||
(defmacro with-stats-collection ((table-name
|
||||
&key
|
||||
(dbname *pg-dbname*)
|
||||
summary
|
||||
use-result-as-read
|
||||
use-result-as-rows
|
||||
((:state pgstate) *state*))
|
||||
&body forms)
|
||||
"Measure time spent in running BODY into STATE, accounting the seconds to
|
||||
given DBNAME and TABLE-NAME"
|
||||
(let ((result (gensym "result"))
|
||||
(secs (gensym "secs")))
|
||||
`(let ((*pg-dbname* (or ,dbname *pg-dbname*)))
|
||||
(prog2
|
||||
(pgstate-add-table ,pgstate *pg-dbname* ,table-name)
|
||||
(multiple-value-bind (,result ,secs)
|
||||
(timing ,@forms)
|
||||
(cond ((and ,use-result-as-read ,use-result-as-rows)
|
||||
(pgstate-incf ,pgstate ,table-name
|
||||
:read ,result :rows ,result :secs ,secs))
|
||||
(,use-result-as-read
|
||||
(pgstate-incf ,pgstate ,table-name :read ,result :secs ,secs))
|
||||
(,use-result-as-rows
|
||||
(pgstate-incf ,pgstate ,table-name :rows ,result :secs ,secs))
|
||||
(t
|
||||
(pgstate-incf ,pgstate ,table-name :secs ,secs)))
|
||||
,result)
|
||||
(when ,summary (report-summary))))))
|
||||
|
||||
(defun report-full-summary (legend state
|
||||
&key before finally parallel)
|
||||
"Report the full story when given three different sections of reporting."
|
||||
|
||||
(terpri)
|
||||
|
||||
;; BEFORE
|
||||
(if before
|
||||
(progn
|
||||
(report-summary :state before :footer nil)
|
||||
(format *report-stream* pgloader.utils::*header-line*)
|
||||
(report-summary :state state :header nil :footer nil))
|
||||
;; no state before
|
||||
(report-summary :state state :footer nil))
|
||||
|
||||
(when (or finally parallel)
|
||||
(format *report-stream* pgloader.utils::*header-line*)
|
||||
(when parallel
|
||||
(report-summary :state parallel :header nil :footer nil))
|
||||
(when finally
|
||||
(report-summary :state finally :header nil :footer nil)))
|
||||
|
||||
;; add to the grand total the other sections, except for the parallel one
|
||||
(incf (pgloader.utils::pgstate-secs state)
|
||||
(+ (if before (pgloader.utils::pgstate-secs before) 0)
|
||||
(if finally (pgloader.utils::pgstate-secs finally) 0)))
|
||||
|
||||
;; if the parallel tasks took longer than the rest cumulated, the total
|
||||
;; waiting time actually was parallel - before
|
||||
(when (and parallel
|
||||
(< (pgloader.utils::pgstate-secs state)
|
||||
(pgloader.utils::pgstate-secs parallel)))
|
||||
(setf (pgloader.utils::pgstate-secs state)
|
||||
(- (pgloader.utils::pgstate-secs parallel)
|
||||
(if before (pgloader.utils::pgstate-secs before) 0))))
|
||||
|
||||
;; and report the Grand Total
|
||||
(report-pgstate-stats state legend))
|
||||
|
||||
111
src/utils/state.lisp
Normal file
111
src/utils/state.lisp
Normal file
@ -0,0 +1,111 @@
|
||||
;;;
|
||||
;;; Global state maintenance, which includes statistics about each target of
|
||||
;;; the load: number of lines read, imported and number of errors found
|
||||
;;; along the way.
|
||||
;;;
|
||||
(in-package :pgloader.utils)
|
||||
|
||||
;;;
|
||||
;;; Data Structures to maintain information about loading state
|
||||
;;;
|
||||
(defstruct pgtable
|
||||
name
|
||||
(read 0 :type fixnum) ; how many rows did we read
|
||||
(rows 0 :type fixnum) ; how many rows did we write
|
||||
(errs 0 :type fixnum) ; how many errors did we see
|
||||
(secs 0.0 :type float) ; how many seconds did it take
|
||||
reject-data reject-logs) ; files where to find reject data
|
||||
|
||||
(defstruct pgstate
|
||||
(tables (make-hash-table :test 'equal))
|
||||
(tabnames nil) ; we want to keep the ordering
|
||||
(read 0 :type fixnum)
|
||||
(rows 0 :type fixnum)
|
||||
(errs 0 :type fixnum)
|
||||
(secs 0.0 :type float))
|
||||
|
||||
(defun pgstate-get-table (pgstate name)
|
||||
(gethash name (pgstate-tables pgstate)))
|
||||
|
||||
(defun pgstate-add-table (pgstate dbname table-name)
|
||||
"Instanciate a new pgtable structure to hold our stats, and return it."
|
||||
(or (pgstate-get-table pgstate table-name)
|
||||
(let* ((table (setf (gethash table-name (pgstate-tables pgstate))
|
||||
(make-pgtable :name table-name)))
|
||||
(reject-dir (pathname-directory
|
||||
(merge-pathnames
|
||||
(format nil "~a/" dbname) *root-dir*)))
|
||||
(data-pathname
|
||||
(make-pathname :directory reject-dir :name table-name :type "dat"))
|
||||
(logs-pathname
|
||||
(make-pathname :directory reject-dir :name table-name :type "log")))
|
||||
|
||||
;; maintain the ordering
|
||||
(push table-name (pgstate-tabnames pgstate))
|
||||
|
||||
;; create the per-database directory if it does not exists yet
|
||||
(ensure-directories-exist (directory-namestring data-pathname))
|
||||
|
||||
;; rename the existing files if there are some
|
||||
(with-open-file (data data-pathname
|
||||
:direction :output
|
||||
:if-exists :rename
|
||||
:if-does-not-exist nil))
|
||||
|
||||
(with-open-file (logs logs-pathname
|
||||
:direction :output
|
||||
:if-exists :rename
|
||||
:if-does-not-exist nil))
|
||||
|
||||
;; set the properties to the right pathnames
|
||||
(setf (pgtable-reject-data table) data-pathname
|
||||
(pgtable-reject-logs table) logs-pathname)
|
||||
table)))
|
||||
|
||||
(defun pgstate-setf (pgstate name &key read rows errs secs)
|
||||
(let ((pgtable (pgstate-get-table pgstate name)))
|
||||
(when read
|
||||
(setf (pgtable-read pgtable) read)
|
||||
(incf (pgstate-read pgstate) read))
|
||||
(when rows
|
||||
(setf (pgtable-rows pgtable) rows)
|
||||
(incf (pgstate-rows pgstate) rows))
|
||||
(when errs
|
||||
(setf (pgtable-errs pgtable) errs)
|
||||
(incf (pgstate-errs pgstate) errs))
|
||||
(when secs
|
||||
(setf (pgtable-secs pgtable) secs)
|
||||
(incf (pgstate-secs pgstate) secs))
|
||||
pgtable))
|
||||
|
||||
(defun pgstate-incf (pgstate name &key read rows errs secs)
|
||||
(let ((pgtable (pgstate-get-table pgstate name)))
|
||||
(when read
|
||||
(incf (pgtable-read pgtable) read)
|
||||
(incf (pgstate-read pgstate) read))
|
||||
(when rows
|
||||
(incf (pgtable-rows pgtable) rows)
|
||||
(incf (pgstate-rows pgstate) rows))
|
||||
(when errs
|
||||
(incf (pgtable-errs pgtable) errs)
|
||||
(incf (pgstate-errs pgstate) errs))
|
||||
(when secs
|
||||
(incf (pgtable-secs pgtable) secs)
|
||||
(incf (pgstate-secs pgstate) secs))
|
||||
pgtable))
|
||||
|
||||
(defun pgstate-decf (pgstate name &key read rows errs secs)
|
||||
(let ((pgtable (pgstate-get-table pgstate name)))
|
||||
(when read
|
||||
(decf (pgtable-read pgtable) read)
|
||||
(decf (pgstate-read pgstate) read))
|
||||
(when rows
|
||||
(decf (pgtable-rows pgtable) rows)
|
||||
(decf (pgstate-rows pgstate) rows))
|
||||
(when errs
|
||||
(decf (pgtable-errs pgtable) errs)
|
||||
(decf (pgstate-errs pgstate) errs))
|
||||
(when secs
|
||||
(decf (pgtable-secs pgtable) secs)
|
||||
(decf (pgstate-secs pgstate) secs))
|
||||
pgtable))
|
||||
32
src/utils/threads.lisp
Normal file
32
src/utils/threads.lisp
Normal file
@ -0,0 +1,32 @@
|
||||
;;;
|
||||
;;; For communication purposes, the lparallel kernel must be created with
|
||||
;;; access to some common bindings.
|
||||
;;;
|
||||
|
||||
(in-package :pgloader.utils)
|
||||
|
||||
(defun make-kernel (worker-count
|
||||
&key (bindings
|
||||
`((*monitoring-queue* . ,*monitoring-queue*)
|
||||
(*copy-batch-rows* . ,*copy-batch-rows*)
|
||||
(*copy-batch-size* . ,*copy-batch-size*)
|
||||
(*concurrent-batches* . ,*concurrent-batches*)
|
||||
(*pgconn-host* . ',*pgconn-host*)
|
||||
(*pgconn-port* . ,*pgconn-port*)
|
||||
(*pgconn-user* . ,*pgconn-user*)
|
||||
(*pgconn-pass* . ,*pgconn-pass*)
|
||||
(*pg-settings* . ',*pg-settings*)
|
||||
(*myconn-host* . ',*myconn-host*)
|
||||
(*myconn-port* . ,*myconn-port*)
|
||||
(*myconn-user* . ,*myconn-user*)
|
||||
(*myconn-pass* . ,*myconn-pass*)
|
||||
(*state* . ,*state*)
|
||||
(*client-min-messages* . ,*client-min-messages*)
|
||||
(*log-min-messages* . ,*log-min-messages*)
|
||||
|
||||
;; bindings updates for libs
|
||||
;; CFFI is used by the SQLite lib
|
||||
(cffi:*default-foreign-encoding*
|
||||
. ,cffi:*default-foreign-encoding*))))
|
||||
"Wrapper around lparallel:make-kernel that sets our usual bindings."
|
||||
(lp:make-kernel worker-count :bindings bindings))
|
||||
79
src/utils/utils.lisp
Normal file
79
src/utils/utils.lisp
Normal file
@ -0,0 +1,79 @@
|
||||
;;;
|
||||
;;; Random utilities
|
||||
;;;
|
||||
(in-package :pgloader.utils)
|
||||
|
||||
;;;
|
||||
;;; Timing Macro
|
||||
;;;
|
||||
(defun elapsed-time-since (start)
|
||||
"Return how many seconds ticked between START and now"
|
||||
(/ (- (get-internal-real-time) start)
|
||||
internal-time-units-per-second))
|
||||
|
||||
(defmacro timing (&body forms)
|
||||
"return both how much real time was spend in body and its result"
|
||||
(let ((start (gensym))
|
||||
(end (gensym))
|
||||
(result (gensym)))
|
||||
`(let* ((,start (get-internal-real-time))
|
||||
(,result (progn ,@forms))
|
||||
(,end (get-internal-real-time)))
|
||||
(values ,result (/ (- ,end ,start) internal-time-units-per-second)))))
|
||||
|
||||
;;;
|
||||
;;; Timing Formating
|
||||
;;;
|
||||
(defun format-interval (seconds &optional (stream t))
|
||||
"Output the number of seconds in a human friendly way"
|
||||
(multiple-value-bind (years months days hours mins secs millisecs)
|
||||
(date:decode-interval (date:encode-interval :second seconds))
|
||||
(declare (ignore millisecs))
|
||||
(format
|
||||
stream
|
||||
"~:[~*~;~d years ~]~:[~*~;~d months ~]~:[~*~;~d days ~]~:[~*~;~dh~]~:[~*~;~dm~]~5,3fs"
|
||||
(< 0 years) years
|
||||
(< 0 months) months
|
||||
(< 0 days) days
|
||||
(< 0 hours) hours
|
||||
(< 0 mins) mins
|
||||
(+ secs (- (multiple-value-bind (r q)
|
||||
(truncate seconds 60)
|
||||
(declare (ignore r))
|
||||
q)
|
||||
secs)))))
|
||||
|
||||
;;;
|
||||
;;; File utils
|
||||
;;;
|
||||
(defun slurp-file-into-string (filename)
|
||||
"Return given filename's whole content as a string."
|
||||
(with-open-file (stream filename
|
||||
:direction :input
|
||||
:external-format :utf-8)
|
||||
(let ((seq (make-array (file-length stream)
|
||||
:element-type 'character
|
||||
:fill-pointer t)))
|
||||
;; apparently the fastest way at that is read-sequence
|
||||
;; http://www.ymeme.com/slurping-a-file-common-lisp-83.html
|
||||
(setf (fill-pointer seq) (read-sequence seq stream))
|
||||
seq)))
|
||||
|
||||
;;;
|
||||
;;; Camel Case converter
|
||||
;;;
|
||||
(defun camelCase-to-colname (string)
|
||||
"Transform input STRING into a suitable column name.
|
||||
lahmanID lahman_id
|
||||
playerID player_id
|
||||
birthYear birth_year"
|
||||
(coerce
|
||||
(loop
|
||||
for first = t then nil
|
||||
for char across string
|
||||
for previous-upper-p = nil then char-upper-p
|
||||
for char-upper-p = (eq char (char-upcase char))
|
||||
for new-word = (and (not first) char-upper-p (not previous-upper-p))
|
||||
when (and new-word (not (char= char #\_))) collect #\_
|
||||
collect (char-downcase char))
|
||||
'string))
|
||||
Loading…
x
Reference in New Issue
Block a user