From f6bdf64c7cdc22a06e236d6e37dd879cd3d20f5b Mon Sep 17 00:00:00 2001 From: Dimitri Fontaine Date: Thu, 7 Feb 2013 21:55:10 +0100 Subject: [PATCH] Cleanup the package definitions and inter-dependancy, and implement a proper data structure for sharing stats about what's happening. --- csv.lisp | 5 --- mysql.lisp | 96 ++++++++++++++++++--------------------------------- package.lisp | 80 +++++++++++++++++++++++++++++++++++++++++- pgloader.asd | 10 +++--- pgloader.lisp | 6 ++++ pgsql.lisp | 64 +++++++++++++++++++--------------- queue.lisp | 7 ---- run.lisp | 3 ++ timing.lisp | 37 -------------------- utils.lisp | 95 ++++++++++++++++++++++++++++++++++++++++++++------ 10 files changed, 248 insertions(+), 155 deletions(-) delete mode 100644 timing.lisp diff --git a/csv.lisp b/csv.lisp index 14d9042..fa30a17 100644 --- a/csv.lisp +++ b/csv.lisp @@ -2,11 +2,6 @@ ;;; Tools to handle MySQL data fetching ;;; -(defpackage #:pgloader.csv - (:use #:cl) - (:export #:*csv-path-root* - #:get-pathname)) - (in-package :pgloader.csv) (defparameter *csv-path-root* diff --git a/mysql.lisp b/mysql.lisp index eb5ab2b..dc4efed 100644 --- a/mysql.lisp +++ b/mysql.lisp @@ -2,22 +2,6 @@ ;;; Tools to handle MySQL data fetching ;;; -(defpackage #:pgloader.mysql - (:use #:cl) - (:import-from #:pgloader - #:*loader-kernel* - #:*myconn-host* - #:*myconn-user* - #:*myconn-pass*) - (:export #:map-rows - #:copy-from - #:list-databases - #:list-tables - #:export-all-tables - #:export-import-database - #:stream-mysql-table-in-pgsql - #:stream-database-tables)) - (in-package :pgloader.mysql) ;;; @@ -121,7 +105,7 @@ (let ((pgtables (pgloader.pgsql:list-tables dbname)) (total-count 0) (total-seconds 0)) - (pgloader.utils:report-header) + (report-header) (loop for table-name in (list-tables dbname :host host @@ -131,18 +115,18 @@ when (or (null only-tables) (member table-name only-tables :test #'equal)) do - (pgloader.utils:report-table-name table-name) + (report-table-name table-name) (multiple-value-bind (result seconds) - (pgloader.utils:timing + (timing (copy-from dbname table-name filename :date-columns (pgloader.pgsql:get-date-columns table-name pgtables))) (when result (incf total-count result) (incf total-seconds seconds) - (pgloader.utils:report-results result seconds))) + (report-results result seconds))) finally - (pgloader.utils:report-footer "Total export time" + (report-footer "Total export time" total-count total-seconds) (return (values total-count (float total-seconds)))))) @@ -160,17 +144,17 @@ (let ((mysql-tables (list-tables dbname)) (total-count 0) (total-seconds 0)) - (pgloader.utils:report-header) + (report-header) (loop for (table-name . date-columns) in (pgloader.pgsql:list-tables dbname) when (or (null only-tables) (member table-name only-tables :test #'equal)) do - (pgloader.utils:report-table-name table-name) + (report-table-name table-name) (if (member table-name mysql-tables :test #'equal) (multiple-value-bind (result seconds) - (pgloader.utils:timing + (timing (let ((filename (pgloader.csv:get-pathname dbname table-name))) ;; export from MySQL to file @@ -182,11 +166,11 @@ (when result (incf total-count result) (incf total-seconds seconds) - (pgloader.utils:report-results result seconds))) + (report-results result seconds))) ;; not a known mysql table (format t " skip, unknown table in MySQL database~%")) finally - (pgloader.utils:report-footer "Total export+import time" + (report-footer "Total export+import time" total-count total-seconds) (return (values total-count (float total-seconds)))))) @@ -205,27 +189,21 @@ (dataq (lq:make-queue 4096))) ;; have a task fill MySQL data in the queue (lp:submit-task - channel (lambda () - (list :mysql - (pgloader.queue:map-push-queue - dataq - #'map-rows dbname table-name)))) + channel + (lambda () + (pgloader.queue:map-push-queue + dataq #'map-rows dbname table-name))) ;; and start another task to push that data from the queue to PostgreSQL (lp:submit-task channel (lambda () - (list :pgsql - (multiple-value-list - (pgloader.pgsql:copy-from-queue dbname table-name dataq - :truncate truncate - :date-columns date-columns))))) + (pgloader.pgsql:copy-from-queue dbname table-name dataq + :truncate truncate + :date-columns date-columns))) ;; now wait until both the tasks are over - (loop - for tasks below 2 - collect (lp:receive-result channel) into counts - finally (return (cadr (assoc :pgsql counts)))))) + (loop for tasks below 2 do (lp:receive-result channel)))) ;;; ;;; Work on all tables for given database @@ -233,36 +211,30 @@ (defun stream-database-tables (dbname &key (truncate t) only-tables) "Export MySQL data and Import it into PostgreSQL" ;; get the list of tables and have at it - (let ((mysql-tables (list-tables dbname)) - (total-count 0) - (total-errors 0) - (total-seconds 0)) - (pgloader.utils:report-header) + (let ((mysql-tables (list-tables dbname))) + + (report-header) + (loop for (table-name . date-columns) in (pgloader.pgsql:list-tables dbname) when (or (null only-tables) (member table-name only-tables :test #'equal)) do - (pgloader.utils:report-table-name table-name) + (pgstate-add-table *state* dbname table-name) + (report-table-name table-name) (if (member table-name mysql-tables :test #'equal) - (multiple-value-bind (result seconds) - (pgloader.utils:timing - (destructuring-bind (rows errors) - (stream-mysql-table-in-pgsql dbname table-name - :truncate truncate - :date-columns date-columns) - (incf total-count rows) - (incf total-errors errors) - (list rows errors))) - ;; time to report - (destructuring-bind (rows errors) result - (incf total-seconds seconds) - (pgloader.utils:report-results rows errors seconds))) + (multiple-value-bind (res secs) + (timing + (stream-mysql-table-in-pgsql dbname table-name + :truncate truncate + :date-columns date-columns)) + ;; set the timing we just measured + (declare (ignore res)) + (pgstate-incf *state* table-name :secs secs) + (report-pgtable-stats *state* table-name)) ;; not a known mysql table (format t "skip, unknown table in MySQL database~%")) finally - (pgloader.utils:report-footer "Total streaming time" - total-count total-errors total-seconds) - (return (values total-count (float total-seconds)))))) + (report-pgstate-stats *state* "Total streaming time")))) diff --git a/package.lisp b/package.lisp index dbcbfbc..dae7b34 100644 --- a/package.lisp +++ b/package.lisp @@ -1,7 +1,84 @@ ;;;; package.lisp +;;; +;;; To avoid circular files dependencies, define all the packages here +;;; + +(defpackage #:pgloader.utils + (:use #:cl) + (:export #:report-header + #:report-table-name + #:report-results + #:report-footer + #:format-interval + #:timing + #:make-pgstate + #:pgstate-get-table + #:pgstate-add-table + #:pgstate-setf + #:pgstate-incf + #:report-pgtable-stats + #:report-pgstate-stats)) + +(defpackage #:pgloader.queue + (:use #:cl) + (:export #:map-pop-queue + #:map-push-queue)) + +(defpackage #:pgloader.csv + (:use #:cl) + (:export #:*csv-path-root* + #:get-pathname)) + +(defpackage #:pgloader.mysql + (:use #:cl) + (:import-from #:pgloader + #:*loader-kernel* + #:*myconn-host* + #:*myconn-user* + #:*myconn-pass*) + (:import-from #:pgloader.utils + #:report-header + #:report-table-name + #:report-results + #:report-footer + #:format-interval + #:timing + #:make-pgstate + #:pgstate-get-table + #:pgstate-add-table + #:pgstate-setf + #:pgstate-incf + #:report-pgtable-stats + #:report-pgstate-stats) + (:import-from #:pgloader + #:*state*) + (:export #:map-rows + #:copy-from + #:list-databases + #:list-tables + #:export-all-tables + #:export-import-database + #:stream-mysql-table-in-pgsql + #:stream-database-tables)) (defpackage #:pgloader.pgsql (:use #:cl) + (:import-from #:pgloader.utils + #:report-header + #:report-table-name + #:report-results + #:report-footer + #:format-interval + #:timing + #:make-pgstate + #:pgstate-get-table + #:pgstate-add-table + #:pgstate-setf + #:pgstate-incf + #:report-pgtable-stats + #:report-pgstate-stats) + (:import-from #:pgloader + #:*state*) (:export #:truncate-table #:copy-from-file #:copy-from-queue @@ -16,7 +93,8 @@ #:copy-from-file #:list-databases #:list-tables) - (:export #:copy-from-file + (:export #:*state* + #:copy-from-file #:list-databases #:list-tables)) diff --git a/pgloader.asd b/pgloader.asd index 70538b8..4f30cfc 100644 --- a/pgloader.asd +++ b/pgloader.asd @@ -14,19 +14,19 @@ #:lparallel) :components ((:file "package") (:file "utils" :depends-on ("package")) - (:file "pgloader" :depends-on ("package")) + (:file "pgloader" :depends-on ("package" "utils")) ;; those are one-package-per-file - (:file "queue") ; package pgloader.queue - (:file "csv") ; package pgloader.csv + (:file "queue" :depends-on ("package")) ; package pgloader.queue + (:file "csv" :depends-on ("package")) ; package pgloader.csv ;; package pgloader.pgsql - (:file "pgsql" :depends-on ("queue" "utils")) + (:file "pgsql" :depends-on ("package" "queue" "utils")) ;; mysql.lisp depends on pgsql.lisp to be able to export data ;; from MySQL in the PostgreSQL format. ;; ;; package pgloader.mysql - (:file "mysql" :depends-on ("pgsql" "queue" "utils")))) + (:file "mysql" :depends-on ("package" "pgsql" "queue" "utils")))) diff --git a/pgloader.lisp b/pgloader.lisp index f25f411..1984684 100644 --- a/pgloader.lisp +++ b/pgloader.lisp @@ -15,6 +15,12 @@ (defparameter *myconn-user* "myuser") (defparameter *myconn-pass* "mypass") +(defparameter *state* (pgloader.utils:make-pgstate) + "pgloader state, global stats and per-table stats") + ;;; ;;; TODO: define a top level API ;;; + +(defparameter *state* (pgloader.utils:make-pgstate) + "State of the current loading.") diff --git a/pgsql.lisp b/pgsql.lisp index 023ca34..a06f502 100644 --- a/pgsql.lisp +++ b/pgsql.lisp @@ -190,19 +190,25 @@ Finally returns how many rows where read and processed." &key (truncate t) date-columns) - "Fetch data from the QUEUE until we see :end-of-data" + "Fetch data from the QUEUE until we see :end-of-data. Update *state*" (when truncate (truncate-table dbname table-name)) - (let* ((conspec (remove :port (get-connection-string dbname))) - (total-errors 0)) + (let* ((conspec (remove :port (get-connection-string dbname)))) (loop for retval = + ;; The idea is to stream the queue content directly into the + ;; PostgreSQL COPY protocol stream, but COMMIT every + ;; *copy-batch-size* rows. + ;; + ;; That allows to have to recover from a buffer of data only rather + ;; than restart from scratch each time we have to find which row + ;; contains erroneous data. BATCH is that buffer. (let* ((stream (cl-postgres:open-db-writer conspec table-name nil)) (batch nil) (batch-size 0) (process-row-fn ;; build our batch aware row processing function - ;; it closes over batch and stream + ;; it closes over stream, batch and batch-size (lambda (row) (let ((reformated-row (reformat-row row :date-columns date-columns))) @@ -221,14 +227,13 @@ Finally returns how many rows where read and processed." ((or CL-POSTGRES-ERROR:UNIQUE-VIOLATION CL-POSTGRES-ERROR:DATA-EXCEPTION) (condition) - (incf total-errors - (retry-batch dbname table-name - (nreverse batch) batch-size)))))) + (retry-batch dbname table-name (nreverse batch) batch-size))))) - ;; the final return value is the number of row processed - summing (if (consp retval) (cdr retval) retval) into total-rows - while (and (consp retval) (eq (car retval) :continue)) - finally (return (values total-rows total-errors))))) + ;; fetch how many rows we just pushed through, update stats + for rows = (if (consp retval) (cdr retval) retval) + for cont = (and (consp retval) (eq (car retval) :continue)) + do (pgstate-incf *state* table-name :rows rows) + while cont))) ;;; ;;; When a batch has been refused by PostgreSQL with a data-exception, that @@ -250,12 +255,20 @@ Finally returns how many rows where read and processed." ;;; (defun process-bad-row (dbname table-name condition row) "Process bad row" + ;; first, the stats. + (pgstate-incf *state* table-name :errs 1) + + ;; now, the bad row processing (let* ((str (format nil "~a" row)) (str (if (< 72 (length str)) (subseq str 0 72) str))) (format t "ERROR: ~a~%" condition) (format t "DATA: ~a...~%" str))) +;;; +;;; Compute the next batch size, must be smaller than the previous one or +;;; just one row to ensure the retry-batch recursion is not infinite. +;;; (defun smaller-batch-size (batch-size processed-rows) "How many rows should we process in next iteration?" (let ((remaining-rows (- batch-size processed-rows))) @@ -265,9 +278,11 @@ Finally returns how many rows where read and processed." (min remaining-rows (floor (/ batch-size *copy-batch-split*)))))) +;;; +;;; The recursive retry batch function. +;;; (defun retry-batch (dbname table-name batch batch-size) - "Batch is a list of rows containing at least one error. Return number of - bad rows." + "Batch is a list of rows containing at least one bad row. Find it." (let* ((conspec (remove :port (get-connection-string dbname))) (current-batch-pos batch) (processed-rows 0) @@ -282,15 +297,11 @@ Finally returns how many rows where read and processed." (cl-postgres:open-db-writer conspec table-name nil))) (unwind-protect - (progn - (dotimes (i current-batch-size) - ;; rows in that batch have already been processed - (cl-postgres:db-write-row stream (car current-batch-pos)) - (setf current-batch-pos (cdr current-batch-pos)) - (incf processed-rows)) - - ;; function's return value: number of bad rows extracted - total-bad-rows) + (dotimes (i current-batch-size) + ;; rows in that batch have already been processed + (cl-postgres:db-write-row stream (car current-batch-pos)) + (setf current-batch-pos (cdr current-batch-pos)) + (incf processed-rows)) (handler-case (cl-postgres:close-db-writer stream) @@ -301,11 +312,8 @@ Finally returns how many rows where read and processed." CL-POSTGRES-ERROR:DATA-EXCEPTION) (condition) ;; process bad data (if (= 1 current-batch-size) - (progn - (process-bad-row - dbname table-name condition (car current-batch)) - (incf total-bad-rows)) + (process-bad-row dbname table-name + condition (car current-batch)) ;; more than one line of bad data: recurse (retry-batch dbname table-name - current-batch current-batch-size)))))) - finally (return total-bad-rows)))) + current-batch current-batch-size))))))))) diff --git a/queue.lisp b/queue.lisp index b5d5de0..37569cf 100644 --- a/queue.lisp +++ b/queue.lisp @@ -1,13 +1,6 @@ ;;; ;;; Tools to handle internal queueing, using lparallel.queue ;;; - -(defpackage #:pgloader.queue - (:use #:cl) - (:export #:map-pop-queue - #:map-push-queue)) - -;; no nickname for that package, queue is far too generic (in-package :pgloader.queue) (defun map-pop-queue (queue process-row-fn) diff --git a/run.lisp b/run.lisp index 145fac2..fd61fb1 100644 --- a/run.lisp +++ b/run.lisp @@ -6,3 +6,6 @@ (setq *myconn-host* "localhost" *myconn-user* "debian-sys-maint" *myconn-pass* "vtmMI04yBZlFprYm") + +;; start with a new empty state, for stats. +(setq pgloader:*state* (pgloader.utils:make-pgstate)) diff --git a/timing.lisp b/timing.lisp deleted file mode 100644 index d4a8812..0000000 --- a/timing.lisp +++ /dev/null @@ -1,37 +0,0 @@ -;;; -;;; Some little timing tools -;;; - -(in-package :pgloader) - -;;; -;;; Timing Macros -;;; -(defun elapsed-time-since (start) - "Return how many seconds ticked between START and now" - (/ (- (get-internal-real-time) start) - internal-time-units-per-second)) - -(defmacro timing (&body forms) - "return both how much real time was spend in body and its result" - (let ((start (gensym)) - (end (gensym)) - (result (gensym))) - `(let* ((,start (get-internal-real-time)) - (,result (progn ,@forms)) - (,end (get-internal-real-time))) - (values ,result (/ (- ,end ,start) internal-time-units-per-second))))) - -(defun format-interval (seconds &optional (stream t)) - "Output the number of seconds in a human friendly way" - (multiple-value-bind (years months days hours mins secs millisecs) - (date:decode-interval (date:encode-interval :second seconds)) - (format - stream - "~:[~*~;~d years ~]~:[~*~;~d months ~]~:[~*~;~d days ~]~:[~*~;~dh~]~:[~*~;~dm~]~d.~ds" - (< 0 years) years - (< 0 months) months - (< 0 days) days - (< 0 hours) hours - (< 0 mins) mins - secs (truncate millisecs)))) diff --git a/utils.lisp b/utils.lisp index 3d04788..0837bcd 100644 --- a/utils.lisp +++ b/utils.lisp @@ -1,18 +1,11 @@ ;;; ;;; Random utilities ;;; - -(defpackage #:pgloader.utils - (:use #:cl) - (:export #:report-header - #:report-table-name - #:report-results - #:report-footer - #:format-interval - #:timing)) - (in-package :pgloader.utils) +(defparameter *reject-path-root* + (make-pathname :directory "/tmp")) + ;;; ;;; Timing Macro ;;; @@ -48,6 +41,88 @@ (< 0 mins) mins secs (truncate millisecs)))) +;;; +;;; Data Structures to maintain information about loading state +;;; +(defstruct pgtable + name + (read 0 :type fixnum) ; how many rows did we read + (rows 0 :type fixnum) ; how many rows did we write + (errs 0 :type fixnum) ; how many errors did we see + (secs 0.0 :type float) ; how many seconds did it take + reject-data reject-logs) ; files where to find reject data + +(defstruct pgstate + (tables (make-hash-table :test 'equal)) + (read 0 :type fixnum) + (rows 0 :type fixnum) + (errs 0 :type fixnum) + (secs 0.0 :type float)) + +(defun pgstate-get-table (pgstate name) + (gethash name (pgstate-tables pgstate))) + +(defun pgstate-add-table (pgstate dbname table-name) + "Instanciate a new pgtable structure to hold our stats, and return it." + (or (pgstate-get-table pgstate table-name) + (let ((table (setf (gethash table-name (pgstate-tables pgstate)) + (make-pgtable :name table-name)))) + (setf (pgtable-reject-data table) + (make-pathname + :directory (pathname-directory + (merge-pathnames + (format nil "~a" dbname) *reject-path-root*)) + :name table-name + :type "rej.dat") + (pgtable-reject-logs table) + (make-pathname + :directory (pathname-directory + (merge-pathnames + (format nil "~a" dbname) *reject-path-root*)) + :name table-name + :type "rej.log")) + table))) + +(defun pgstate-setf (pgstate name &key read rows errs secs) + (let ((pgtable (pgstate-get-table pgstate name))) + (when read + (setf (pgtable-read pgtable) read) + (incf (pgstate-read pgstate) read)) + (when rows + (setf (pgtable-rows pgtable) rows) + (incf (pgstate-rows pgstate) rows)) + (when errs + (setf (pgtable-errs pgtable) errs) + (incf (pgstate-errs pgstate) errs)) + (when secs + (setf (pgtable-secs pgtable) secs) + (incf (pgstate-secs pgstate) secs)) + pgtable)) + +(defun pgstate-incf (pgstate name &key rows errs secs) + (format t "~&pgstate-incf: ~d rows, ~d errs, ~f secs~%" rows errs secs) + (let ((pgtable (pgstate-get-table pgstate name))) + (when rows + (incf (pgtable-rows pgtable) rows) + (incf (pgstate-rows pgstate) rows)) + (when errs + (incf (pgtable-errs pgtable) errs) + (incf (pgstate-errs pgstate) errs)) + (when secs + (incf (pgtable-secs pgtable) secs) + (incf (pgstate-secs pgstate) secs)) + pgtable)) + +(defun report-pgtable-stats (pgstate name) + (with-slots (rows errs secs) (pgstate-get-table pgstate name) + (format t "~9@a ~9@a ~9@a" rows errs (format-interval secs nil)))) + +(defun report-pgstate-stats (pgstate legend) + (with-slots (rows errs secs) pgstate + (format t "~&------------------------------ --------- --------- ---------") + (format t "~&~30@a ~9@a ~9@a ~9@a" legend + rows errs (format-interval secs nil)))) + ;;; ;;; Pretty print a report while doing bulk operations ;;;