pgloader/mysql.lisp

581 lines
20 KiB
Common Lisp
Raw Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

;;;
;;; Tools to handle MySQL data fetching
;;;
(in-package :pgloader.mysql)
;;;
;;; MySQL tools connecting to a database
;;;
(defun list-databases (&key
(host *myconn-host*)
(user *myconn-user*)
(pass *myconn-pass*))
"Connect to a local database and get the database list"
(cl-mysql:connect :host host :user user :password pass)
(unwind-protect
(mapcan #'identity (caar (cl-mysql:query "show databases")))
(cl-mysql:disconnect)))
(defun list-tables (dbname
&key
(host *myconn-host*)
(user *myconn-user*)
(pass *myconn-pass*))
"Return a flat list of all the tables names known in given DATABASE"
(cl-mysql:connect :host host :user user :password pass)
(unwind-protect
(progn
(cl-mysql:use dbname)
;; that returns a pretty weird format, process it
(mapcan #'identity (caar (cl-mysql:list-tables))))
;; free resources
(cl-mysql:disconnect)))
;;;
;;; Tools to get MySQL table and columns definitions and transform them to
;;; PostgreSQL CREATE TABLE statements, and run those.
;;;
(defun list-all-columns (dbname
&key
(host *myconn-host*)
(user *myconn-user*)
(pass *myconn-pass*))
"Get the list of MySQL column names per table."
(cl-mysql:connect :host host :user user :password pass)
(unwind-protect
(progn
(loop
with schema = nil
for (table-name . coldef)
in
(caar (cl-mysql:query (format nil "
select table_name, column_name,
data_type, column_type, column_default,
is_nullable, extra
from information_schema.columns
where table_schema = '~a'
order by table_name, ordinal_position" dbname)))
do
(let ((entry (assoc table-name schema :test 'equal)))
(if entry
(push coldef (cdr entry))
(push (cons table-name (list coldef)) schema)))
finally
;; we did push, we need to reverse here
(return (loop
for (name . cols) in schema
collect (cons name (reverse cols))))))
;; free resources
(cl-mysql:disconnect)))
(defun get-create-type (table-name cols &key identifier-case include-drop)
"MySQL declares ENUM types inline, PostgreSQL wants explicit CREATE TYPE."
(loop
for (name dtype ctype default nullable extra) in cols
when (string-equal "enum" dtype)
collect (when include-drop
(let* ((type-name
(get-enum-type-name table-name name identifier-case)))
(format nil "DROP TYPE IF EXISTS ~a;" type-name)))
and collect (get-create-enum table-name name ctype
:identifier-case identifier-case)))
(defun get-create-table (table-name cols &key identifier-case)
"Return a PostgreSQL CREATE TABLE statement from MySQL columns"
(with-output-to-string (s)
(let ((table-name (apply-identifier-case table-name identifier-case)))
(format s "CREATE TABLE ~a ~%(~%" table-name))
(loop
for ((name dtype ctype default nullable extra) . last?) on cols
for pg-coldef = (cast table-name name dtype ctype default nullable extra)
for colname = (apply-identifier-case name identifier-case)
do (format s " ~a ~22t ~a~:[~;,~]~%" colname pg-coldef last?))
(format s ");~%")))
(defun get-drop-table-if-exists (table-name &key identifier-case)
"Return the PostgreSQL DROP TABLE IF EXISTS statement for TABLE-NAME."
(let ((table-name (apply-identifier-case table-name identifier-case)))
(format nil "DROP TABLE IF EXISTS ~a;~%" table-name)))
(defun get-pgsql-create-tables (all-columns
&key
include-drop
(identifier-case :downcase))
"Return the list of CREATE TABLE statements to run against PostgreSQL"
(loop
for (table-name . cols) in all-columns
for extra-types = (get-create-type table-name cols
:identifier-case identifier-case
:include-drop include-drop)
when include-drop
collect (get-drop-table-if-exists table-name
:identifier-case identifier-case)
when extra-types append extra-types
collect (get-create-table table-name cols
:identifier-case identifier-case)))
(defun pgsql-create-tables (dbname all-columns
&key
(identifier-case :downcase)
(pg-dbname dbname)
include-drop)
"Create all MySQL tables in database dbname in PostgreSQL"
(loop
for nb-tables from 0
for sql in (get-pgsql-create-tables all-columns
:identifier-case identifier-case
:include-drop include-drop)
do (pgloader.pgsql:execute pg-dbname sql :client-min-messages :warning)
finally (return nb-tables)))
;;;
;;; Tools to get MySQL indexes definitions and transform them to PostgreSQL
;;; indexes definitions, and run those statements.
;;;
(defun list-all-indexes (dbname
&key
(host *myconn-host*)
(user *myconn-user*)
(pass *myconn-pass*))
"Get the list of MySQL index definitions per table."
(cl-mysql:connect :host host :user user :password pass)
(unwind-protect
(progn
(loop
with schema = nil
for (table-name . index-def)
in (caar (cl-mysql:query (format nil "
SELECT table_name, index_name, non_unique,
GROUP_CONCAT(column_name order by seq_in_index)
FROM information_schema.statistics
WHERE table_schema = '~a'
GROUP BY table_name, index_name;" dbname)))
do (let ((entry (assoc table-name schema :test 'equal))
(index-def
(destructuring-bind (index-name non-unique cols)
index-def
(list index-name
(not (= 1 non-unique))
(sq:split-sequence #\, cols)))))
(if entry
(push index-def (cdr entry))
(push (cons table-name (list index-def)) schema)))
finally
;; we did push, we need to reverse here
(return (reverse (loop
for (name . indexes) in schema
collect (cons name (reverse indexes)))))))
;; free resources
(cl-mysql:disconnect)))
(defun get-pgsql-index-def (table-name index-name unique cols
&key identifier-case)
"Return a PostgreSQL CREATE INDEX statement as a string."
(let* ((index-name (format nil "~a_~a_idx" table-name index-name))
(table-name (apply-identifier-case table-name identifier-case))
(index-name (apply-identifier-case index-name identifier-case)))
(cond
((string= index-name "PRIMARY")
(format nil
"ALTER TABLE ~a ADD PRIMARY KEY (~{~a~^, ~});" table-name cols))
(t
(format nil
"CREATE~:[~; UNIQUE~] INDEX ~a ON ~a (~{~a~^, ~});"
unique index-name table-name cols)))))
(defun get-drop-index-if-exists (table-name index-name &key identifier-case)
"Return the DROP INDEX statement for PostgreSQL"
(cond
((string= index-name "PRIMARY")
(let* ((pkey-name (format nil "~a_pkey" table-name))
(pkey-name (apply-identifier-case pkey-name identifier-case))
(table-name (apply-identifier-case table-name identifier-case)))
(format nil "ALTER TABLE ~a DROP CONSTRAINT ~a;" table-name pkey-name)))
(t
(let* ((index-name (format nil "~a_idx" index-name))
(index-name (apply-identifier-case index-name identifier-case)))
(format nil "DROP INDEX IF EXISTS ~a;" index-name)))))
(defun get-pgsql-drop-indexes (table-name indexes &key identifier-case)
"Return the DROP INDEX statements for given INDEXES definitions."
(loop
for (index-name unique cols) in indexes
;; no need to alter table drop constraint, when include-drop
;; is true we just did drop the table and created it again
;; anyway
unless (string= index-name "PRIMARY")
collect (get-drop-index-if-exists table-name index-name
:identifier-case identifier-case)))
(defun get-pgsql-create-indexes (table-name indexes
&key identifier-case include-drop)
"Return the CREATE INDEX statements from given INDEXES definitions."
(loop
for (index-name unique cols) in indexes
collect (get-pgsql-index-def table-name index-name unique cols
:identifier-case identifier-case)))
;;;
;;; Map a function to each row extracted from MySQL
;;;
(defun get-column-list-with-is-nulls (dbname table-name)
"We can't just SELECT *, we need to cater for NULLs in text columns ourselves.
This function assumes a valid connection to the MySQL server has been
established already."
(loop
for (name type) in (caar (cl-mysql:query (format nil "
select column_name, data_type
from information_schema.columns
where table_schema = '~a' and table_name = '~a'
order by ordinal_position" dbname table-name)))
for is-null = (member type '("char" "varchar" "text"
"tinytext" "mediumtext" "longtext")
:test #'string-equal)
collect nil into nulls
collect (format nil "`~a`" name) into cols
when is-null
collect (format nil "`~a` is null" name) into cols and collect t into nulls
finally (return (values cols nulls))))
(defun fix-nulls (row nulls)
"Given a cl-mysql row result and a nulls list as from
get-column-list-with-is-nulls, replace NIL with empty strings with when
we know from the added 'foo is null' that the actual value IS NOT NULL.
See http://bugs.mysql.com/bug.php?id=19564 for context."
(loop
for (current-col next-col) on row
for (current-null next-null) on nulls
;; next-null tells us if next column is an "is-null" col
;; when next-null is true, next-col is true if current-col is actually null
for is-null = (and next-null (string= next-col "1"))
for is-empty = (and next-null (string= next-col "0") (null current-col))
;; don't collect columns we added, e.g. "column_name is not null"
when (not current-null)
collect (cond (is-null :null)
(is-empty "")
(t current-col))))
(defun map-rows (dbname table-name &key process-row-fn)
"Extract MySQL data and call PROCESS-ROW-FN function with a single
argument (a list of column values) for each row."
(cl-mysql:connect :host *myconn-host*
:port *myconn-port*
:user *myconn-user*
:password *myconn-pass*)
(unwind-protect
(progn
;; Ensure we're talking utf-8 and connect to DBNAME in MySQL
(cl-mysql:query "SET NAMES 'utf8'")
(cl-mysql:query "SET character_set_results = utf8;")
(cl-mysql:use dbname)
(multiple-value-bind (cols nulls)
(get-column-list-with-is-nulls dbname table-name)
(let* ((sql (format nil "SELECT ~{~a~^, ~} FROM ~a;" cols table-name))
(q (cl-mysql:query sql :store nil :type-map nil))
(rs (cl-mysql:next-result-set q)))
(declare (ignore rs))
;; Now fetch MySQL rows directly in the stream
(loop
for row = (cl-mysql:next-row q :type-map (make-hash-table))
while row
for row-with-proper-nulls = (fix-nulls row nulls)
counting row into count
do (funcall process-row-fn row-with-proper-nulls)
finally (return count)))))
;; free resources
(cl-mysql:disconnect)))
;;;
;;; Use map-rows and pgsql-text-copy-format to fill in a CSV file on disk
;;; with MySQL data in there.
;;;
(defun copy-to (dbname table-name filename &key transforms)
"Extrat data from MySQL in PostgreSQL COPY TEXT format"
(with-open-file (text-file filename
:direction :output
:if-exists :supersede
:external-format :utf-8)
(map-rows dbname table-name
:process-row-fn
(lambda (row)
(pgloader.pgsql:format-row text-file row
:transforms transforms)))))
;;;
;;; MySQL bulk export to file, in PostgreSQL COPY TEXT format
;;;
(defun export-database (dbname &key only-tables)
"Export MySQL tables into as many TEXT files, in the PostgreSQL COPY format"
(let ((all-columns (list-all-columns dbname)))
(setf *state* (pgloader.utils:make-pgstate))
(report-header)
(loop
for (table-name . cols) in all-columns
for filename = (pgloader.csv:get-pathname dbname table-name)
when (or (null only-tables)
(member table-name only-tables :test #'equal))
do
(pgstate-add-table *state* dbname table-name)
(report-table-name table-name)
(multiple-value-bind (rows secs)
(timing
;; load data
(copy-to dbname table-name filename :transforms (transforms cols)))
;; update and report stats
(pgstate-incf *state* table-name :read rows :secs secs)
(report-pgtable-stats *state* table-name))
finally
(report-pgstate-stats *state* "Total export time"))))
;;;
;;; Copy data from a target database into files in the PostgreSQL COPY TEXT
;;; format, then load those files. Useful mainly to compare timing with the
;;; direct streaming method. If you need to pre-process the files, use
;;; export-database, do the extra processing, then use
;;; pgloader.pgsql:copy-from-file on each file.
;;;
(defun export-import-database (dbname
&key
(pg-dbname dbname)
(truncate t)
only-tables)
"Export MySQL data and Import it into PostgreSQL"
;; get the list of tables and have at it
(let ((all-columns (list-all-columns dbname)))
(setf *state* (pgloader.utils:make-pgstate))
(report-header)
(loop
for (table-name . cols) in all-columns
when (or (null only-tables)
(member table-name only-tables :test #'equal))
do
(pgstate-add-table *state* dbname table-name)
(report-table-name table-name)
(multiple-value-bind (res secs)
(timing
(let* ((filename (pgloader.csv:get-pathname dbname table-name)))
;; export from MySQL to file
(copy-to dbname table-name filename :transforms (transforms cols))
;; import the file to PostgreSQL
(pgloader.pgsql:copy-from-file pg-dbname
table-name
filename
:truncate truncate)))
(declare (ignore res))
(pgstate-incf *state* table-name :secs secs)
(report-pgtable-stats *state* table-name))
finally
(report-pgstate-stats *state* "Total export+import time"))))
;;;
;;; Export MySQL data to our lparallel data queue. All the work is done in
;;; other basic layers, simple enough function.
;;;
(defun copy-to-queue (dbname table-name dataq)
"Copy data from MySQL table DBNAME.TABLE-NAME into queue DATAQ"
(let ((read
(pgloader.queue:map-push-queue dataq #'map-rows dbname table-name)))
(pgstate-incf *state* table-name :read read)))
;;;
;;; Direct "stream" in between mysql fetching of results and PostgreSQL COPY
;;; protocol
;;;
(defun stream-table (dbname table-name
&key
(kernel nil k-s-p)
(pg-dbname dbname)
truncate
transforms)
"Connect in parallel to MySQL and PostgreSQL and stream the data."
(let* ((summary (null *state*))
(*state* (or *state* (pgloader.utils:make-pgstate)))
(lp:*kernel*
(or kernel
(lp:make-kernel 2 :bindings
`((*pgconn-host* . ,*pgconn-host*)
(*pgconn-port* . ,*pgconn-port*)
(*pgconn-user* . ,*pgconn-user*)
(*pgconn-pass* . ,*pgconn-pass*)
(*pg-settings* . ',*pg-settings*)
(*myconn-host* . ,*myconn-host*)
(*myconn-port* . ,*myconn-port*)
(*myconn-user* . ,*myconn-user*)
(*myconn-pass* . ,*myconn-pass*)
(*state* . ,*state*)))))
(channel (lp:make-channel))
(dataq (lq:make-queue :fixed-capacity 4096)))
(with-stats-collection (dbname table-name :state *state* :summary summary)
(log-message :notice "COPY ~a.~a" dbname table-name)
;; read data from MySQL
(lp:submit-task channel (lambda ()
;; this function update :read stats
(copy-to-queue dbname table-name dataq)))
;; and start another task to push that data from the queue to PostgreSQL
(lp:submit-task
channel
(lambda ()
;; this function update :rows stats
(pgloader.pgsql:copy-from-queue pg-dbname table-name dataq
:truncate truncate
:transforms transforms)))
;; now wait until both the tasks are over
(loop for tasks below 2 do (lp:receive-result channel)
finally
(log-message :info "COPY ~a.~a done." dbname table-name)
(unless k-s-p (lp:end-kernel))))))
;;;
;;; Work on all tables for given database
;;;
(defun execute-with-timing (dbname label sql state &key (count 1))
"Execute given SQL and resgister its timing into STATE."
(multiple-value-bind (res secs)
(timing (pgloader.pgsql:execute dbname sql))
(declare (ignore res))
(pgstate-incf state label :rows count :secs secs)))
(defun create-indexes-in-kernel (dbname table-name indexes kernel channel
&key
identifier-case include-drop
state (label "create index"))
"Create indexes for given table in dbname, using given lparallel KERNEL
and CHANNEL so that the index build happen in concurrently with the data
copying."
(let* ((lp:*kernel* kernel))
(pgstate-add-table state dbname label)
(when include-drop
(let ((drop-channel (lp:make-channel))
(drop-indexes
(get-pgsql-drop-indexes table-name indexes
:identifier-case identifier-case)))
(loop
for sql in drop-indexes
do
(log-message :notice "~a" sql)
(lp:submit-task drop-channel #'pgloader.pgsql:execute dbname sql))
;; wait for the DROP INDEX to be done before issuing CREATE INDEX
(loop for idx in drop-indexes do (lp:receive-result drop-channel))))
(loop
for sql in (get-pgsql-create-indexes table-name indexes
:identifier-case identifier-case
:include-drop include-drop)
do
(log-message :notice "~a" sql)
(lp:submit-task channel #'execute-with-timing dbname label sql state))))
(defun stream-database (dbname
&key
(pg-dbname dbname)
(schema-only nil)
(create-tables nil)
(include-drop nil)
(create-indexes t)
(reset-sequences t)
(identifier-case :downcase) ; or :quote
(truncate t)
only-tables)
"Export MySQL data and Import it into PostgreSQL"
;; get the list of tables and have at it
(let* ((*state* (make-pgstate))
(idx-state (make-pgstate))
(copy-kernel (make-kernel 2))
(all-columns (list-all-columns dbname))
(all-indexes (list-all-indexes dbname))
(max-indexes (loop for (table . indexes) in all-indexes
maximizing (length indexes)))
(idx-kernel (make-kernel max-indexes))
(idx-channel (let ((lp:*kernel* idx-kernel))
(lp:make-channel))))
;; if asked, first drop/create the tables on the PostgreSQL side
(when create-tables
(log-message :notice "~:[~;DROP then ~]CREATE TABLES" include-drop)
(pgsql-create-tables dbname all-columns
:identifier-case identifier-case
:pg-dbname pg-dbname
:include-drop include-drop))
(loop
for (table-name . columns) in all-columns
when (or (null only-tables)
(member table-name only-tables :test #'equal))
do
(progn
;; first COPY the data from MySQL to PostgreSQL, using copy-kernel
(unless schema-only
(stream-table dbname table-name
:kernel copy-kernel
:pg-dbname pg-dbname
:truncate truncate
:transforms (transforms columns)))
;; Create the indexes for that table in parallel with the next
;; COPY, and all at once in concurrent threads to benefit from
;; PostgreSQL synchronous scan ability
;;
;; We just push new index build as they come along, if one
;; index build requires much more time than the others our
;; index build might get unsync: indexes for different tables
;; will get built in parallel --- not a big problem.
(when create-indexes
(let* ((indexes
(cdr (assoc table-name all-indexes :test #'string=))))
(create-indexes-in-kernel dbname table-name indexes
idx-kernel idx-channel
:state idx-state
:include-drop include-drop
:identifier-case identifier-case)))))
;; don't forget to reset sequences, but only when we did actually import
;; the data.
(when (and (not schema-only) reset-sequences)
(pgloader.pgsql:reset-all-sequences pg-dbname))
;; now end the kernels
(let ((lp:*kernel* idx-kernel)) (lp:end-kernel))
(let ((lp:*kernel* copy-kernel)
(label "index build completion"))
;; wait until the indexes are done being built...
;; don't forget accounting for that waiting time.
(multiple-value-bind (res secs)
(timing
(loop for idx in all-indexes do (lp:receive-result idx-channel)))
(declare (ignore res))
(pgstate-add-table *state* dbname label)
(pgstate-incf *state* label :secs secs)
(lp:end-kernel)))
;; and report the total time spent on the operation
(report-summary)
(format t pgloader.utils::*header-line*)
(report-summary :state idx-state :header nil :footer nil)
(report-pgstate-stats *state* "Total streaming time")))