Switch from cl-mysql to qmynd, an all-lisp driver for MySQL.

This commit is contained in:
Dimitri Fontaine 2013-11-28 17:19:51 +01:00
parent 157587476b
commit f02eb641b4
5 changed files with 140 additions and 211 deletions

View File

@ -14,6 +14,9 @@ docs:
pandoc pgloader.1.md -o pgloader.html pandoc pgloader.1.md -o pgloader.html
pandoc pgloader.1.md -o pgloader.pdf pandoc pgloader.1.md -o pgloader.pdf
~/quicklisp/local-projects/qmynd:
git clone -b streaming https://github.com/dimitri/qmynd.git $@
~/quicklisp/local-projects/cl-abnf: ~/quicklisp/local-projects/cl-abnf:
git clone https://github.com/dimitri/cl-abnf.git $@ git clone https://github.com/dimitri/cl-abnf.git $@
@ -23,6 +26,7 @@ docs:
~/quicklisp/local-projects/cl-csv: ~/quicklisp/local-projects/cl-csv:
git clone -b empty-strings-and-nil https://github.com/dimitri/cl-csv.git $@ git clone -b empty-strings-and-nil https://github.com/dimitri/cl-csv.git $@
qmynd: ~/quicklisp/local-projects/qmynd ;
cl-abnf: ~/quicklisp/local-projects/cl-abnf ; cl-abnf: ~/quicklisp/local-projects/cl-abnf ;
cl-csv: ~/quicklisp/local-projects/cl-csv ; cl-csv: ~/quicklisp/local-projects/cl-csv ;
postmodern: ~/quicklisp/local-projects/Postmodern ; postmodern: ~/quicklisp/local-projects/Postmodern ;
@ -41,7 +45,7 @@ $(ASDF_CONF):
asdf-config: $(ASDF_CONF) ; asdf-config: $(ASDF_CONF) ;
$(LIBS): quicklisp $(ASDF_CONF) cl-abnf postmodern cl-csv $(LIBS): quicklisp $(ASDF_CONF) cl-abnf postmodern cl-csv qmynd
sbcl --load ~/quicklisp/setup.lisp \ sbcl --load ~/quicklisp/setup.lisp \
--eval '(ql:quickload "pgloader")' \ --eval '(ql:quickload "pgloader")' \
--eval '(quit)' --eval '(quit)'

View File

@ -37,7 +37,7 @@ pgloader is now a Common Lisp program, tested using the
[Quicklisp](http://www.quicklisp.org/beta/). [Quicklisp](http://www.quicklisp.org/beta/).
$ apt-get install sbcl $ apt-get install sbcl
$ apt-get install libmysqlclient-dev libsqlite3-dev $ apt-get install libsqlite3-dev
$ make pgloader $ make pgloader
$ ./build/pgloader.exe --help $ ./build/pgloader.exe --help
@ -113,11 +113,6 @@ file into a manual page or an HTML page thanks to the `pandoc` application:
Some notes about what I intend to be working on next. Some notes about what I intend to be working on next.
### tests
- add needed pre-requisites in bootstrap.sh to run the MySQL and SQLite
tests from the `make test` target without errors
### binary distribution ### binary distribution
- prepare an all-included binary for several platforms - prepare an all-included binary for several platforms

View File

@ -10,7 +10,7 @@
#:postmodern ; PostgreSQL protocol implementation #:postmodern ; PostgreSQL protocol implementation
#:cl-postgres ; low level bits for COPY streaming #:cl-postgres ; low level bits for COPY streaming
#:simple-date ; FIXME: recheck dependency #:simple-date ; FIXME: recheck dependency
#:cl-mysql ; CFFI binding to libmysqlclient-dev #:qmynd ; MySQL protocol implemenation
#:split-sequence ; some parsing is made easy #:split-sequence ; some parsing is made easy
#:cl-csv ; full CSV reader #:cl-csv ; full CSV reader
#:cl-fad ; file and directories #:cl-fad ; file and directories

View File

@ -4,6 +4,8 @@
(in-package pgloader.mysql) (in-package pgloader.mysql)
(defvar *connection* nil "Current MySQL connection")
;;; ;;;
;;; Specific implementation of schema migration, see the API in ;;; Specific implementation of schema migration, see the API in
@ -45,103 +47,79 @@
:identifier-case identifier-case))))) :identifier-case identifier-case)))))
(defmacro with-mysql-connection ((&optional dbname) &body forms)
"Connect to MySQL, use given DBNAME as the current database if provided,
and execute FORMS in a protected way so that we always disconnect when
done.
Connection parameters are *myconn-host*, *myconn-port*, *myconn-user* and
*myconn-pass*."
`(let ((*connection*
(qmynd:mysql-connect :host *myconn-host*
:port *myconn-port*
:username *myconn-user*
:password *myconn-pass*
,@(when dbname
(list :database dbname)))))
(flet ((mysql-query (query &key row-fn (as-text t) (result-type 'list))
(qmynd:mysql-query *connection* query
:row-fn row-fn
:as-text as-text
:result-type result-type)))
(unwind-protect
(progn ,@forms)
(qmynd:mysql-disconnect *connection*)))))
;;; ;;;
;;; Function for accessing the MySQL catalogs, implementing auto-discovery ;;; Function for accessing the MySQL catalogs, implementing auto-discovery
;;; ;;;
(defun list-databases (&key (defun list-databases ()
(host *myconn-host*)
(user *myconn-user*)
(pass *myconn-pass*))
"Connect to a local database and get the database list" "Connect to a local database and get the database list"
(cl-mysql:connect :host host :user user :password pass) (with-mysql-connection ()
(unwind-protect (mysql-query "show databases")))
(mapcan #'identity (caar (cl-mysql:query "show databases")))
(cl-mysql:disconnect)))
(defun list-tables (dbname (defun list-tables (dbname)
&key
(host *myconn-host*)
(user *myconn-user*)
(pass *myconn-pass*))
"Return a flat list of all the tables names known in given DATABASE" "Return a flat list of all the tables names known in given DATABASE"
(cl-mysql:connect :host host :user user :password pass) (with-mysql-connection (dbname)
(mysql-query (format nil "
(unwind-protect
(progn
(cl-mysql:use dbname)
;; that returns a pretty weird format, process it
(loop for (table-name)
in (caar (cl-mysql:query (format nil "
select table_name select table_name
from information_schema.tables from information_schema.tables
where table_schema = '~a' and table_type = 'BASE TABLE' where table_schema = '~a' and table_type = 'BASE TABLE'
order by table_name" dbname))) order by table_name" dbname))))
collect table-name))
;; free resources
(cl-mysql:disconnect)))
(defun list-views (dbname (defun list-views (dbname &key only-tables)
&key
only-tables
(host *myconn-host*)
(user *myconn-user*)
(pass *myconn-pass*))
"Return a flat list of all the view names and definitions known in given DBNAME" "Return a flat list of all the view names and definitions known in given DBNAME"
(cl-mysql:connect :host host :user user :password pass) (with-mysql-connection (dbname)
(mysql-query (format nil "
(unwind-protect
(progn
(cl-mysql:use dbname)
;; that returns a pretty weird format, process it
(caar (cl-mysql:query (format nil "
select table_name, view_definition select table_name, view_definition
from information_schema.views from information_schema.views
where table_schema = '~a' where table_schema = '~a'
~@[and table_name in (~{'~a'~^,~})~] ~@[and table_name in (~{'~a'~^,~})~]
order by table_name" dbname only-tables)))) order by table_name" dbname only-tables))))
;; free resources
(cl-mysql:disconnect)))
(defun create-my-views (dbname views-alist (defun create-my-views (dbname views-alist)
&key
(host *myconn-host*)
(user *myconn-user*)
(pass *myconn-pass*))
"VIEWS-ALIST associates view names with their SQL definition, which might "VIEWS-ALIST associates view names with their SQL definition, which might
be empty for already existing views. Create only the views for which we be empty for already existing views. Create only the views for which we
have an SQL definition." have an SQL definition."
(let ((views (remove-if #'null views-alist :key #'cdr))) (let ((views (remove-if #'null views-alist :key #'cdr)))
(when views (when views
(cl-mysql:connect :host host :user user :password pass) (with-mysql-connection (dbname)
(unwind-protect (loop for (name . def) in views
(progn for sql = (format nil "CREATE VIEW ~a AS ~a" name def)
(cl-mysql:use dbname) do
(loop for (name . def) in views (log-message :info "MySQL: ~a" sql)
for sql = (format nil "CREATE VIEW ~a AS ~a" name def) (mysql-query sql))))))
do
(log-message :info "MySQL: ~a" sql)
(cl-mysql:query sql)))
;; free resources
(cl-mysql:disconnect)))))
(defun drop-my-views (dbname views-alist (defun drop-my-views (dbname views-alist)
&key
(host *myconn-host*)
(user *myconn-user*)
(pass *myconn-pass*))
"See `create-my-views' for VIEWS-ALIST description. This time we DROP the "See `create-my-views' for VIEWS-ALIST description. This time we DROP the
views to clean out after our work." views to clean out after our work."
(let ((views (remove-if #'null views-alist :key #'cdr))) (let ((views (remove-if #'null views-alist :key #'cdr)))
(when views (when views
(cl-mysql:connect :host host :user user :password pass) (with-mysql-connection (dbname)
(unwind-protect (let ((sql
(let ((sql (format nil "DROP VIEW ~{~a~^, ~};" (mapcar #'car views))))
(format nil "DROP VIEW ~{~a~^, ~};" (mapcar #'car views)))) (log-message :info "MySQL: ~a" sql)
(cl-mysql:use dbname) (mysql-query sql))))))
(log-message :info "MySQL: ~a" sql)
(cl-mysql:query sql))
;; free resources
(cl-mysql:disconnect)))))
;;; ;;;
@ -156,22 +134,16 @@ order by table_name" dbname only-tables))))
(defun list-all-columns (dbname (defun list-all-columns (dbname
&key &key
only-tables only-tables
(host *myconn-host*)
(user *myconn-user*)
(pass *myconn-pass*)
(table-type :table) (table-type :table)
&aux &aux
(table-type-name (cdr (assoc table-type *table-type*)))) (table-type-name (cdr (assoc table-type *table-type*))))
"Get the list of MySQL column names per table." "Get the list of MySQL column names per table."
(cl-mysql:connect :host host :user user :password pass) (with-mysql-connection (dbname)
(loop
(unwind-protect with schema = nil
(progn for (table-name name dtype ctype default nullable extra)
(loop in
with schema = nil (mysql-query (format nil "
for (table-name name dtype ctype default nullable extra)
in
(caar (cl-mysql:query (format nil "
select c.table_name, c.column_name, select c.table_name, c.column_name,
c.data_type, c.column_type, c.column_default, c.data_type, c.column_type, c.column_default,
c.is_nullable, c.extra c.is_nullable, c.extra
@ -179,61 +151,48 @@ order by table_name" dbname only-tables))))
join information_schema.tables t using(table_schema, table_name) join information_schema.tables t using(table_schema, table_name)
where c.table_schema = '~a' and t.table_type = '~a' where c.table_schema = '~a' and t.table_type = '~a'
~@[and table_name in (~{'~a'~^,~})~] ~@[and table_name in (~{'~a'~^,~})~]
order by table_name, ordinal_position" dbname table-type-name only-tables))) order by table_name, ordinal_position" dbname table-type-name only-tables))
do do
(let ((entry (assoc table-name schema :test 'equal)) (let ((entry (assoc table-name schema :test 'equal))
(column (column
(make-mysql-column (make-mysql-column
table-name name dtype ctype default nullable extra))) table-name name dtype ctype default nullable extra)))
(if entry (if entry
(push column (cdr entry)) (push column (cdr entry))
(push (cons table-name (list column)) schema))) (push (cons table-name (list column)) schema)))
finally finally
;; we did push, we need to reverse here ;; we did push, we need to reverse here
(return (loop (return (loop
for (name . cols) in schema for (name . cols) in schema
collect (cons name (reverse cols)))))) collect (cons name (reverse cols)))))))
;; free resources (defun list-all-indexes (dbname)
(cl-mysql:disconnect)))
(defun list-all-indexes (dbname
&key
(host *myconn-host*)
(user *myconn-user*)
(pass *myconn-pass*))
"Get the list of MySQL index definitions per table." "Get the list of MySQL index definitions per table."
(cl-mysql:connect :host host :user user :password pass) (with-mysql-connection (dbname)
(loop
(unwind-protect with schema = nil
(progn for (table-name name non-unique cols)
(loop in (mysql-query (format nil "
with schema = nil
for (table-name name non-unique cols)
in (caar (cl-mysql:query (format nil "
SELECT table_name, index_name, non_unique, SELECT table_name, index_name, non_unique,
GROUP_CONCAT(column_name order by seq_in_index) cast(GROUP_CONCAT(column_name order by seq_in_index) as char)
FROM information_schema.statistics FROM information_schema.statistics
WHERE table_schema = '~a' WHERE table_schema = '~a'
GROUP BY table_name, index_name;" dbname))) GROUP BY table_name, index_name;" dbname))
do (let ((entry (assoc table-name schema :test 'equal)) do (let ((entry (assoc table-name schema :test 'equal))
(index (index
(make-pgsql-index :name name (make-pgsql-index :name name
:primary (string= name "PRIMARY") :primary (string= name "PRIMARY")
:table-name table-name :table-name table-name
:unique (not (= 1 non-unique)) :unique (not (string= "1" non-unique))
:columns (sq:split-sequence #\, cols)))) :columns (sq:split-sequence #\, cols))))
(if entry (if entry
(push index (cdr entry)) (push index (cdr entry))
(push (cons table-name (list index)) schema))) (push (cons table-name (list index)) schema)))
finally finally
;; we did push, we need to reverse here ;; we did push, we need to reverse here
(return (reverse (loop (return (reverse (loop
for (name . indexes) in schema for (name . indexes) in schema
collect (cons name (reverse indexes))))))) collect (cons name (reverse indexes))))))))
;; free resources
(cl-mysql:disconnect)))
(defun set-table-oids (all-indexes) (defun set-table-oids (all-indexes)
"MySQL allows using the same index name against separate tables, which "MySQL allows using the same index name against separate tables, which
@ -254,20 +213,13 @@ GROUP BY table_name, index_name;" dbname)))
;;; ;;;
;;; MySQL Foreign Keys ;;; MySQL Foreign Keys
;;; ;;;
(defun list-all-fkeys (dbname (defun list-all-fkeys (dbname)
&key
(host *myconn-host*)
(user *myconn-user*)
(pass *myconn-pass*))
"Get the list of MySQL Foreign Keys definitions per table." "Get the list of MySQL Foreign Keys definitions per table."
(cl-mysql:connect :host host :user user :password pass) (with-mysql-connection (dbname)
(loop
(unwind-protect with schema = nil
(progn for (table-name name ftable cols fcols)
(loop in (mysql-query (format nil "
with schema = nil
for (table-name name ftable cols fcols)
in (caar (cl-mysql:query (format nil "
SELECT i.table_name, i.constraint_name, k.referenced_table_name ft, SELECT i.table_name, i.constraint_name, k.referenced_table_name ft,
group_concat( k.column_name group_concat( k.column_name
@ -284,24 +236,21 @@ GROUP BY table_name, index_name;" dbname)))
AND k.referenced_table_schema = '~a' AND k.referenced_table_schema = '~a'
AND i.constraint_type = 'FOREIGN KEY' AND i.constraint_type = 'FOREIGN KEY'
GROUP BY table_name, constraint_name, ft;" dbname dbname))) GROUP BY table_name, constraint_name, ft;" dbname dbname))
do (let ((entry (assoc table-name schema :test 'equal)) do (let ((entry (assoc table-name schema :test 'equal))
(fk (make-pgsql-fkey :name name (fk (make-pgsql-fkey :name name
:table-name table-name :table-name table-name
:columns cols :columns cols
:foreign-table ftable :foreign-table ftable
:foreign-columns fcols))) :foreign-columns fcols)))
(if entry (if entry
(push fk (cdr entry)) (push fk (cdr entry))
(push (cons table-name (list fk)) schema))) (push (cons table-name (list fk)) schema)))
finally finally
;; we did push, we need to reverse here ;; we did push, we need to reverse here
(return (reverse (loop (return (reverse (loop
for (name . fks) in schema for (name . fks) in schema
collect (cons name (reverse fks))))))) collect (cons name (reverse fks))))))))
;; free resources
(cl-mysql:disconnect)))
(defun drop-fkeys (all-fkeys &key dbname identifier-case) (defun drop-fkeys (all-fkeys &key dbname identifier-case)
"Drop all Foreign Key Definitions given, to prepare for a clean run." "Drop all Foreign Key Definitions given, to prepare for a clean run."
@ -366,11 +315,12 @@ GROUP BY table_name, index_name;" dbname)))
This function assumes a valid connection to the MySQL server has been This function assumes a valid connection to the MySQL server has been
established already." established already."
(loop (loop
for (name type) in (caar (cl-mysql:query (format nil " for (name type) in (qmynd:mysql-query *connection* (format nil "
select column_name, data_type select column_name, data_type
from information_schema.columns from information_schema.columns
where table_schema = '~a' and table_name = '~a' where table_schema = '~a' and table_name = '~a'
order by ordinal_position" dbname table-name))) order by ordinal_position" dbname table-name)
:result-type 'list)
for is-null = (member type '("char" "varchar" "text" for is-null = (member type '("char" "varchar" "text"
"tinytext" "mediumtext" "longtext") "tinytext" "mediumtext" "longtext")
:test #'string-equal) :test #'string-equal)

View File

@ -50,42 +50,23 @@
(let ((dbname (source-db mysql)) (let ((dbname (source-db mysql))
(table-name (source mysql))) (table-name (source mysql)))
(cl-mysql:connect :host *myconn-host* (with-mysql-connection (dbname)
:port *myconn-port* (mysql-query "SET NAMES 'utf8'")
:user *myconn-user* (mysql-query "SET character_set_results = utf8;")
:password *myconn-pass*)
(unwind-protect (multiple-value-bind (cols nulls)
(progn (get-column-list-with-is-nulls dbname table-name)
;; Ensure we're talking utf-8 and connect to DBNAME in MySQL (let* ((sql (format nil "SELECT ~{~a~^, ~} FROM ~a;" cols table-name))
(cl-mysql:query "SET NAMES 'utf8'") (row-fn
(cl-mysql:query "SET character_set_results = utf8;") (lambda (row)
(cl-mysql:use dbname) (pgstate-incf *state* (target mysql) :read 1)
(funcall process-row-fn (fix-nulls row nulls)))))
(multiple-value-bind (cols nulls) (handler-bind
(get-column-list-with-is-nulls dbname table-name) ((babel-encodings:character-decoding-error
(let* ((sql (format nil "SELECT ~{~a~^, ~} FROM ~a;" cols table-name)) #'(lambda (e)
(q (cl-mysql:query sql :store nil :type-map nil)) (pgstate-incf *state* (target mysql) :errs 1)
(rs (cl-mysql:next-result-set q))) (log-message :error "~a" e))))
(declare (ignore rs)) (mysql-query sql :row-fn row-fn)))))))
;; Now fetch MySQL rows directly in the stream
(handler-case
(loop
with type-map = (make-hash-table)
for row = (cl-mysql:next-row q :type-map type-map)
while row
for row-with-proper-nulls = (fix-nulls row nulls)
counting row into count
do (funcall process-row-fn row-with-proper-nulls)
finally (return count))
(cl-mysql-system:mysql-error (e)
(progn
(log-message :error "~a" e) ; begins with MySQL error:
(pgstate-setf *state* (target mysql) :errs -1)))))))
;; free resources
(cl-mysql:disconnect))))
;;; ;;;
;;; Use map-rows and pgsql-text-copy-format to fill in a CSV file on disk ;;; Use map-rows and pgsql-text-copy-format to fill in a CSV file on disk
@ -109,8 +90,7 @@
;;; ;;;
(defmethod copy-to-queue ((mysql copy-mysql) dataq) (defmethod copy-to-queue ((mysql copy-mysql) dataq)
"Copy data from MySQL table DBNAME.TABLE-NAME into queue DATAQ" "Copy data from MySQL table DBNAME.TABLE-NAME into queue DATAQ"
(let ((read (pgloader.queue:map-push-queue dataq #'map-rows mysql))) (pgloader.queue:map-push-queue dataq #'map-rows mysql))
(pgstate-incf *state* (target mysql) :read read)))
;;; ;;;
@ -246,7 +226,7 @@
;; need to stop now and refrain to try loading the data into an ;; need to stop now and refrain to try loading the data into an
;; incomplete schema. ;; incomplete schema.
;; ;;
(cl-mysql-system:mysql-error (e) (qmynd:mysql-error (e)
(log-message :fatal "~a" e) (log-message :fatal "~a" e)
(return-from copy-database)) (return-from copy-database))