mirror of
https://github.com/dimitri/pgloader.git
synced 2026-01-31 20:11:03 +01:00
Cleanup internal API and stats reporting.
This commit is contained in:
parent
a9d6b7c2c7
commit
2c80996d85
132
mysql.lisp
132
mysql.lisp
@ -72,12 +72,12 @@
|
||||
;;; Use mysql-map-rows and pgsql-text-copy-format to fill in a CSV file on
|
||||
;;; disk with MySQL data in there.
|
||||
;;;
|
||||
(defun copy-from (dbname table-name filename
|
||||
&key
|
||||
date-columns
|
||||
(host *myconn-host*)
|
||||
(user *myconn-user*)
|
||||
(pass *myconn-pass*))
|
||||
(defun copy-to (dbname table-name filename
|
||||
&key
|
||||
date-columns
|
||||
(host *myconn-host*)
|
||||
(user *myconn-user*)
|
||||
(pass *myconn-pass*))
|
||||
"Extrat data from MySQL in PostgreSQL COPY TEXT format"
|
||||
(with-open-file (text-file filename
|
||||
:direction :output
|
||||
@ -95,16 +95,14 @@
|
||||
;;;
|
||||
;;; MySQL bulk export to file, in PostgreSQL COPY TEXT format
|
||||
;;;
|
||||
(defun export-all-tables (dbname
|
||||
&key
|
||||
only-tables
|
||||
(host *myconn-host*)
|
||||
(user *myconn-user*)
|
||||
(pass *myconn-pass*))
|
||||
(defun export-database (dbname
|
||||
&key
|
||||
only-tables
|
||||
(host *myconn-host*)
|
||||
(user *myconn-user*)
|
||||
(pass *myconn-pass*))
|
||||
"Export MySQL tables into as many TEXT files, in the PostgreSQL COPY format"
|
||||
(let ((pgtables (pgloader.pgsql:list-tables dbname))
|
||||
(total-count 0)
|
||||
(total-seconds 0))
|
||||
(let ((pgtables (pgloader.pgsql:list-tables dbname)))
|
||||
(report-header)
|
||||
(loop
|
||||
for table-name in (list-tables dbname
|
||||
@ -115,25 +113,26 @@
|
||||
when (or (null only-tables)
|
||||
(member table-name only-tables :test #'equal))
|
||||
do
|
||||
(pgstate-add-table *state* dbname table-name)
|
||||
(report-table-name table-name)
|
||||
(multiple-value-bind (result seconds)
|
||||
(multiple-value-bind (rows secs)
|
||||
(timing
|
||||
(copy-from dbname table-name filename
|
||||
:date-columns (pgloader.pgsql:get-date-columns
|
||||
table-name pgtables)))
|
||||
(when result
|
||||
(incf total-count result)
|
||||
(incf total-seconds seconds)
|
||||
(report-results result seconds)))
|
||||
;; load data
|
||||
(let ((date-cols
|
||||
(pgloader.pgsql:get-date-columns table-name pgtables)))
|
||||
(copy-to dbname table-name filename :date-columns date-cols)))
|
||||
;; update and report stats
|
||||
(pgstate-incf *state* table-name :read rows :secs secs)
|
||||
(report-pgtable-stats *state* table-name))
|
||||
finally
|
||||
(report-footer "Total export time"
|
||||
total-count total-seconds)
|
||||
(return (values total-count (float total-seconds))))))
|
||||
|
||||
(report-pgstate-stats *state* "Total export time"))))
|
||||
|
||||
;;;
|
||||
;;; Copy data for a target database from files in the PostgreSQL COPY TEXT
|
||||
;;; format
|
||||
;;; Copy data from a target database into files in the PostgreSQL COPY TEXT
|
||||
;;; format, then load those files. Useful mainly to compare timing with the
|
||||
;;; direct streaming method. If you need to pre-process the files, use
|
||||
;;; export-database, do the extra processing, then use
|
||||
;;; pgloader.pgsql:copy-from-file on each file.
|
||||
;;;
|
||||
(defun export-import-database (dbname
|
||||
&key
|
||||
@ -141,63 +140,71 @@
|
||||
only-tables)
|
||||
"Export MySQL data and Import it into PostgreSQL"
|
||||
;; get the list of tables and have at it
|
||||
(let ((mysql-tables (list-tables dbname))
|
||||
(total-count 0)
|
||||
(total-seconds 0))
|
||||
(let ((mysql-tables (list-tables dbname)))
|
||||
(report-header)
|
||||
(loop
|
||||
for (table-name . date-columns) in (pgloader.pgsql:list-tables dbname)
|
||||
when (or (null only-tables)
|
||||
(member table-name only-tables :test #'equal))
|
||||
do
|
||||
(pgstate-add-table *state* dbname table-name)
|
||||
(report-table-name table-name)
|
||||
|
||||
(if (member table-name mysql-tables :test #'equal)
|
||||
(multiple-value-bind (result seconds)
|
||||
(multiple-value-bind (res secs)
|
||||
(timing
|
||||
(let ((filename
|
||||
(pgloader.csv:get-pathname dbname table-name)))
|
||||
;; export from MySQL to file
|
||||
(copy-from dbname table-name filename
|
||||
:date-columns date-columns)
|
||||
;; import the file to PostgreSQL
|
||||
(pgloader.pgsql:copy-from-file dbname table-name filename
|
||||
:truncate truncate)))
|
||||
(when result
|
||||
(incf total-count result)
|
||||
(incf total-seconds seconds)
|
||||
(report-results result seconds)))
|
||||
(let* ((filename
|
||||
(pgloader.csv:get-pathname dbname table-name))
|
||||
(read
|
||||
;; export from MySQL to file
|
||||
(copy-to dbname table-name filename
|
||||
:date-columns date-columns))
|
||||
;; import the file to PostgreSQL
|
||||
(rows
|
||||
(pgloader.pgsql:copy-from-file dbname
|
||||
table-name
|
||||
filename
|
||||
:truncate truncate)))
|
||||
(pgstate-incf *state* table-name :read read :rows rows)))
|
||||
(declare (ignore res))
|
||||
(pgstate-incf *state* table-name :secs secs)
|
||||
(report-pgtable-stats *state* table-name))
|
||||
;; not a known mysql table
|
||||
(format t " skip, unknown table in MySQL database~%"))
|
||||
finally
|
||||
(report-footer "Total export+import time"
|
||||
total-count total-seconds)
|
||||
(return (values total-count (float total-seconds))))))
|
||||
(report-pgstate-stats *state* "Total export+import time"))))
|
||||
|
||||
;;;
|
||||
;;; Export MySQL data to our lparallel data queue. All the work is done in
|
||||
;;; other basic layers, simple enough function.
|
||||
;;;
|
||||
(defun copy-to-queue (dbname table-name dataq)
|
||||
"Copy data from MySQL table DBNAME.TABLE-NAME into queue DATAQ"
|
||||
(let ((read
|
||||
(pgloader.queue:map-push-queue dataq #'map-rows dbname table-name)))
|
||||
(pgstate-incf *state* table-name :read read)))
|
||||
|
||||
;;;
|
||||
;;; Direct "stream" in between mysql fetching of results and PostgreSQL COPY
|
||||
;;; protocol
|
||||
;;;
|
||||
(defun stream-mysql-table-in-pgsql (dbname table-name
|
||||
&key
|
||||
truncate
|
||||
date-columns)
|
||||
(defun stream-table (dbname table-name
|
||||
&key
|
||||
truncate
|
||||
date-columns)
|
||||
"Connect in parallel to MySQL and PostgreSQL and stream the data."
|
||||
(let* ((lp:*kernel* *loader-kernel*)
|
||||
(channel (lp:make-channel))
|
||||
(dataq (lq:make-queue 4096)))
|
||||
;; have a task fill MySQL data in the queue
|
||||
(lp:submit-task
|
||||
channel
|
||||
(lambda ()
|
||||
(pgloader.queue:map-push-queue
|
||||
dataq #'map-rows dbname table-name)))
|
||||
(lp:submit-task channel (lambda ()
|
||||
;; this function update :read stats
|
||||
(copy-to-queue dbname table-name dataq)))
|
||||
|
||||
;; and start another task to push that data from the queue to PostgreSQL
|
||||
(lp:submit-task
|
||||
channel
|
||||
(lambda ()
|
||||
;; this function update :rows stats
|
||||
(pgloader.pgsql:copy-from-queue dbname table-name dataq
|
||||
:truncate truncate
|
||||
:date-columns date-columns)))
|
||||
@ -208,7 +215,7 @@
|
||||
;;;
|
||||
;;; Work on all tables for given database
|
||||
;;;
|
||||
(defun stream-database-tables (dbname &key (truncate t) only-tables)
|
||||
(defun stream-database (dbname &key (truncate t) only-tables)
|
||||
"Export MySQL data and Import it into PostgreSQL"
|
||||
;; get the list of tables and have at it
|
||||
(let ((mysql-tables (list-tables dbname)))
|
||||
@ -226,9 +233,10 @@
|
||||
(if (member table-name mysql-tables :test #'equal)
|
||||
(multiple-value-bind (res secs)
|
||||
(timing
|
||||
(stream-mysql-table-in-pgsql dbname table-name
|
||||
:truncate truncate
|
||||
:date-columns date-columns))
|
||||
;; this will care about updating stats in *state*
|
||||
(stream-table dbname table-name
|
||||
:truncate truncate
|
||||
:date-columns date-columns))
|
||||
;; set the timing we just measured
|
||||
(declare (ignore res))
|
||||
(pgstate-incf *state* table-name :secs secs)
|
||||
|
||||
@ -63,13 +63,13 @@
|
||||
#:report-pgtable-stats
|
||||
#:report-pgstate-stats)
|
||||
(:export #:map-rows
|
||||
#:copy-from
|
||||
#:copy-to
|
||||
#:list-databases
|
||||
#:list-tables
|
||||
#:export-all-tables
|
||||
#:export-database
|
||||
#:export-import-database
|
||||
#:stream-mysql-table-in-pgsql
|
||||
#:stream-database-tables))
|
||||
#:stream-table
|
||||
#:stream-database))
|
||||
|
||||
(defpackage #:pgloader.pgsql
|
||||
(:use #:cl)
|
||||
|
||||
@ -167,7 +167,7 @@ Finally returns how many rows where read and processed."
|
||||
(defun copy-from-file (dbname table-name filename
|
||||
&key
|
||||
(truncate t))
|
||||
"Load data from clean COPY TEXT file to PostgreSQL"
|
||||
"Load data from clean COPY TEXT file to PostgreSQL, return how many rows."
|
||||
(when truncate (truncate-table dbname table-name))
|
||||
|
||||
(let* ((conspec (remove :port (get-connection-string dbname)))
|
||||
|
||||
58
utils.lisp
58
utils.lisp
@ -107,8 +107,11 @@
|
||||
(incf (pgstate-secs pgstate) secs))
|
||||
pgtable))
|
||||
|
||||
(defun pgstate-incf (pgstate name &key rows errs secs)
|
||||
(defun pgstate-incf (pgstate name &key read rows errs secs)
|
||||
(let ((pgtable (pgstate-get-table pgstate name)))
|
||||
(when read
|
||||
(incf (pgtable-read pgtable) read)
|
||||
(incf (pgstate-read pgstate) read))
|
||||
(when rows
|
||||
(incf (pgtable-rows pgtable) rows)
|
||||
(incf (pgstate-rows pgstate) rows))
|
||||
@ -120,8 +123,11 @@
|
||||
(incf (pgstate-secs pgstate) secs))
|
||||
pgtable))
|
||||
|
||||
(defun pgstate-decf (pgstate name &key rows errs secs)
|
||||
(defun pgstate-decf (pgstate name &key read rows errs secs)
|
||||
(let ((pgtable (pgstate-get-table pgstate name)))
|
||||
(when read
|
||||
(decf (pgtable-read pgtable) read)
|
||||
(decf (pgstate-read pgstate) read))
|
||||
(when rows
|
||||
(decf (pgtable-rows pgtable) rows)
|
||||
(decf (pgstate-rows pgstate) rows))
|
||||
@ -133,31 +139,41 @@
|
||||
(decf (pgstate-secs pgstate) secs))
|
||||
pgtable))
|
||||
|
||||
(defun report-pgtable-stats (pgstate name)
|
||||
(with-slots (rows errs secs) (pgstate-get-table pgstate name)
|
||||
(format t "~9@a ~9@a ~9@a" rows errs (format-interval secs nil))))
|
||||
|
||||
(defun report-pgstate-stats (pgstate legend)
|
||||
(with-slots (rows errs secs) pgstate
|
||||
(format t "~&------------------------------ --------- --------- ---------")
|
||||
(format t "~&~30@a ~9@a ~9@a ~9@a" legend
|
||||
rows errs (format-interval secs nil))))
|
||||
|
||||
;;;
|
||||
;;; Pretty print a report while doing bulk operations
|
||||
;;;
|
||||
(defvar *header-line*
|
||||
"~&------------------------------ --------- --------- --------- ---------")
|
||||
|
||||
(defvar *header-tname-format* "~&~30@a")
|
||||
(defvar *header-stats-format* " ~9@a ~9@a ~9@a ~9@a")
|
||||
(defvar *header-cols-format* (concatenate 'string *header-tname-format*
|
||||
*header-stats-format*))
|
||||
(defvar *header-cols-names* '("table name" "read" "imported" "errors" "time"))
|
||||
|
||||
(defun report-header ()
|
||||
(format t "~&~30@a ~9@a ~9@a ~9@a" "table name" "rows" "errors" "time")
|
||||
(format t "~&------------------------------ --------- --------- ---------"))
|
||||
(apply #'format t *header-cols-format* *header-cols-names*)
|
||||
(format t *header-line*))
|
||||
|
||||
(defun report-table-name (table-name)
|
||||
(format t "~&~30@a " table-name))
|
||||
(format t *header-tname-format* table-name))
|
||||
|
||||
(defun report-results (rows errors seconds)
|
||||
(format t "~9@a ~9@a ~9@a" rows errors (format-interval seconds nil)))
|
||||
(defun report-results (read rows errors seconds)
|
||||
(format t *header-stats-format* read rows errors (format-interval seconds nil)))
|
||||
|
||||
(defun report-footer (legend rows errors seconds)
|
||||
(format t "~&------------------------------ --------- --------- ---------")
|
||||
(format t "~&~30@a ~9@a ~9@a ~9@a" legend
|
||||
rows errors (format-interval seconds nil)))
|
||||
(defun report-footer (legend read rows errors seconds)
|
||||
(format t *header-line*)
|
||||
(apply #'format t *header-cols-format*
|
||||
(list legend read rows errors (format-interval seconds nil))))
|
||||
|
||||
;;;
|
||||
;;; Pretty print a report from a pgtable and pgstats counters
|
||||
;;;
|
||||
(defun report-pgtable-stats (pgstate name)
|
||||
(with-slots (read rows errs secs) (pgstate-get-table pgstate name)
|
||||
(report-results read rows errs secs)))
|
||||
|
||||
(defun report-pgstate-stats (pgstate legend)
|
||||
(with-slots (read rows errs secs) pgstate
|
||||
(report-footer legend read rows errs secs)))
|
||||
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user