diff --git a/mysql.lisp b/mysql.lisp index dc4efed..b30046e 100644 --- a/mysql.lisp +++ b/mysql.lisp @@ -72,12 +72,12 @@ ;;; Use mysql-map-rows and pgsql-text-copy-format to fill in a CSV file on ;;; disk with MySQL data in there. ;;; -(defun copy-from (dbname table-name filename - &key - date-columns - (host *myconn-host*) - (user *myconn-user*) - (pass *myconn-pass*)) +(defun copy-to (dbname table-name filename + &key + date-columns + (host *myconn-host*) + (user *myconn-user*) + (pass *myconn-pass*)) "Extrat data from MySQL in PostgreSQL COPY TEXT format" (with-open-file (text-file filename :direction :output @@ -95,16 +95,14 @@ ;;; ;;; MySQL bulk export to file, in PostgreSQL COPY TEXT format ;;; -(defun export-all-tables (dbname - &key - only-tables - (host *myconn-host*) - (user *myconn-user*) - (pass *myconn-pass*)) +(defun export-database (dbname + &key + only-tables + (host *myconn-host*) + (user *myconn-user*) + (pass *myconn-pass*)) "Export MySQL tables into as many TEXT files, in the PostgreSQL COPY format" - (let ((pgtables (pgloader.pgsql:list-tables dbname)) - (total-count 0) - (total-seconds 0)) + (let ((pgtables (pgloader.pgsql:list-tables dbname))) (report-header) (loop for table-name in (list-tables dbname @@ -115,25 +113,26 @@ when (or (null only-tables) (member table-name only-tables :test #'equal)) do + (pgstate-add-table *state* dbname table-name) (report-table-name table-name) - (multiple-value-bind (result seconds) + (multiple-value-bind (rows secs) (timing - (copy-from dbname table-name filename - :date-columns (pgloader.pgsql:get-date-columns - table-name pgtables))) - (when result - (incf total-count result) - (incf total-seconds seconds) - (report-results result seconds))) + ;; load data + (let ((date-cols + (pgloader.pgsql:get-date-columns table-name pgtables))) + (copy-to dbname table-name filename :date-columns date-cols))) + ;; update and report stats + (pgstate-incf *state* table-name :read rows :secs secs) + (report-pgtable-stats *state* table-name)) finally - (report-footer "Total export time" - total-count total-seconds) - (return (values total-count (float total-seconds)))))) - + (report-pgstate-stats *state* "Total export time")))) ;;; -;;; Copy data for a target database from files in the PostgreSQL COPY TEXT -;;; format +;;; Copy data from a target database into files in the PostgreSQL COPY TEXT +;;; format, then load those files. Useful mainly to compare timing with the +;;; direct streaming method. If you need to pre-process the files, use +;;; export-database, do the extra processing, then use +;;; pgloader.pgsql:copy-from-file on each file. ;;; (defun export-import-database (dbname &key @@ -141,63 +140,71 @@ only-tables) "Export MySQL data and Import it into PostgreSQL" ;; get the list of tables and have at it - (let ((mysql-tables (list-tables dbname)) - (total-count 0) - (total-seconds 0)) + (let ((mysql-tables (list-tables dbname))) (report-header) (loop for (table-name . date-columns) in (pgloader.pgsql:list-tables dbname) when (or (null only-tables) (member table-name only-tables :test #'equal)) do + (pgstate-add-table *state* dbname table-name) (report-table-name table-name) (if (member table-name mysql-tables :test #'equal) - (multiple-value-bind (result seconds) + (multiple-value-bind (res secs) (timing - (let ((filename - (pgloader.csv:get-pathname dbname table-name))) - ;; export from MySQL to file - (copy-from dbname table-name filename - :date-columns date-columns) - ;; import the file to PostgreSQL - (pgloader.pgsql:copy-from-file dbname table-name filename - :truncate truncate))) - (when result - (incf total-count result) - (incf total-seconds seconds) - (report-results result seconds))) + (let* ((filename + (pgloader.csv:get-pathname dbname table-name)) + (read + ;; export from MySQL to file + (copy-to dbname table-name filename + :date-columns date-columns)) + ;; import the file to PostgreSQL + (rows + (pgloader.pgsql:copy-from-file dbname + table-name + filename + :truncate truncate))) + (pgstate-incf *state* table-name :read read :rows rows))) + (declare (ignore res)) + (pgstate-incf *state* table-name :secs secs) + (report-pgtable-stats *state* table-name)) ;; not a known mysql table (format t " skip, unknown table in MySQL database~%")) finally - (report-footer "Total export+import time" - total-count total-seconds) - (return (values total-count (float total-seconds)))))) + (report-pgstate-stats *state* "Total export+import time")))) +;;; +;;; Export MySQL data to our lparallel data queue. All the work is done in +;;; other basic layers, simple enough function. +;;; +(defun copy-to-queue (dbname table-name dataq) + "Copy data from MySQL table DBNAME.TABLE-NAME into queue DATAQ" + (let ((read + (pgloader.queue:map-push-queue dataq #'map-rows dbname table-name))) + (pgstate-incf *state* table-name :read read))) ;;; ;;; Direct "stream" in between mysql fetching of results and PostgreSQL COPY ;;; protocol ;;; -(defun stream-mysql-table-in-pgsql (dbname table-name - &key - truncate - date-columns) +(defun stream-table (dbname table-name + &key + truncate + date-columns) "Connect in parallel to MySQL and PostgreSQL and stream the data." (let* ((lp:*kernel* *loader-kernel*) (channel (lp:make-channel)) (dataq (lq:make-queue 4096))) - ;; have a task fill MySQL data in the queue - (lp:submit-task - channel - (lambda () - (pgloader.queue:map-push-queue - dataq #'map-rows dbname table-name))) + (lp:submit-task channel (lambda () + ;; this function update :read stats + (copy-to-queue dbname table-name dataq))) ;; and start another task to push that data from the queue to PostgreSQL (lp:submit-task channel (lambda () + ;; this function update :rows stats (pgloader.pgsql:copy-from-queue dbname table-name dataq :truncate truncate :date-columns date-columns))) @@ -208,7 +215,7 @@ ;;; ;;; Work on all tables for given database ;;; -(defun stream-database-tables (dbname &key (truncate t) only-tables) +(defun stream-database (dbname &key (truncate t) only-tables) "Export MySQL data and Import it into PostgreSQL" ;; get the list of tables and have at it (let ((mysql-tables (list-tables dbname))) @@ -226,9 +233,10 @@ (if (member table-name mysql-tables :test #'equal) (multiple-value-bind (res secs) (timing - (stream-mysql-table-in-pgsql dbname table-name - :truncate truncate - :date-columns date-columns)) + ;; this will care about updating stats in *state* + (stream-table dbname table-name + :truncate truncate + :date-columns date-columns)) ;; set the timing we just measured (declare (ignore res)) (pgstate-incf *state* table-name :secs secs) diff --git a/package.lisp b/package.lisp index 7186345..6bb93cd 100644 --- a/package.lisp +++ b/package.lisp @@ -63,13 +63,13 @@ #:report-pgtable-stats #:report-pgstate-stats) (:export #:map-rows - #:copy-from + #:copy-to #:list-databases #:list-tables - #:export-all-tables + #:export-database #:export-import-database - #:stream-mysql-table-in-pgsql - #:stream-database-tables)) + #:stream-table + #:stream-database)) (defpackage #:pgloader.pgsql (:use #:cl) diff --git a/pgsql.lisp b/pgsql.lisp index a9ecdcb..c353a21 100644 --- a/pgsql.lisp +++ b/pgsql.lisp @@ -167,7 +167,7 @@ Finally returns how many rows where read and processed." (defun copy-from-file (dbname table-name filename &key (truncate t)) - "Load data from clean COPY TEXT file to PostgreSQL" + "Load data from clean COPY TEXT file to PostgreSQL, return how many rows." (when truncate (truncate-table dbname table-name)) (let* ((conspec (remove :port (get-connection-string dbname))) diff --git a/utils.lisp b/utils.lisp index 33b8b80..736d4f9 100644 --- a/utils.lisp +++ b/utils.lisp @@ -107,8 +107,11 @@ (incf (pgstate-secs pgstate) secs)) pgtable)) -(defun pgstate-incf (pgstate name &key rows errs secs) +(defun pgstate-incf (pgstate name &key read rows errs secs) (let ((pgtable (pgstate-get-table pgstate name))) + (when read + (incf (pgtable-read pgtable) read) + (incf (pgstate-read pgstate) read)) (when rows (incf (pgtable-rows pgtable) rows) (incf (pgstate-rows pgstate) rows)) @@ -120,8 +123,11 @@ (incf (pgstate-secs pgstate) secs)) pgtable)) -(defun pgstate-decf (pgstate name &key rows errs secs) +(defun pgstate-decf (pgstate name &key read rows errs secs) (let ((pgtable (pgstate-get-table pgstate name))) + (when read + (decf (pgtable-read pgtable) read) + (decf (pgstate-read pgstate) read)) (when rows (decf (pgtable-rows pgtable) rows) (decf (pgstate-rows pgstate) rows)) @@ -133,31 +139,41 @@ (decf (pgstate-secs pgstate) secs)) pgtable)) -(defun report-pgtable-stats (pgstate name) - (with-slots (rows errs secs) (pgstate-get-table pgstate name) - (format t "~9@a ~9@a ~9@a" rows errs (format-interval secs nil)))) - -(defun report-pgstate-stats (pgstate legend) - (with-slots (rows errs secs) pgstate - (format t "~&------------------------------ --------- --------- ---------") - (format t "~&~30@a ~9@a ~9@a ~9@a" legend - rows errs (format-interval secs nil)))) - ;;; ;;; Pretty print a report while doing bulk operations ;;; +(defvar *header-line* + "~&------------------------------ --------- --------- --------- ---------") + +(defvar *header-tname-format* "~&~30@a") +(defvar *header-stats-format* " ~9@a ~9@a ~9@a ~9@a") +(defvar *header-cols-format* (concatenate 'string *header-tname-format* + *header-stats-format*)) +(defvar *header-cols-names* '("table name" "read" "imported" "errors" "time")) + (defun report-header () - (format t "~&~30@a ~9@a ~9@a ~9@a" "table name" "rows" "errors" "time") - (format t "~&------------------------------ --------- --------- ---------")) + (apply #'format t *header-cols-format* *header-cols-names*) + (format t *header-line*)) (defun report-table-name (table-name) - (format t "~&~30@a " table-name)) + (format t *header-tname-format* table-name)) -(defun report-results (rows errors seconds) - (format t "~9@a ~9@a ~9@a" rows errors (format-interval seconds nil))) +(defun report-results (read rows errors seconds) + (format t *header-stats-format* read rows errors (format-interval seconds nil))) -(defun report-footer (legend rows errors seconds) - (format t "~&------------------------------ --------- --------- ---------") - (format t "~&~30@a ~9@a ~9@a ~9@a" legend - rows errors (format-interval seconds nil))) +(defun report-footer (legend read rows errors seconds) + (format t *header-line*) + (apply #'format t *header-cols-format* + (list legend read rows errors (format-interval seconds nil)))) + +;;; +;;; Pretty print a report from a pgtable and pgstats counters +;;; +(defun report-pgtable-stats (pgstate name) + (with-slots (read rows errs secs) (pgstate-get-table pgstate name) + (report-results read rows errs secs))) + +(defun report-pgstate-stats (pgstate legend) + (with-slots (read rows errs secs) pgstate + (report-footer legend read rows errs secs)))