diff --git a/README.md b/README.md index 1fbd71a..1abfec6 100644 --- a/README.md +++ b/README.md @@ -22,7 +22,7 @@ pgloader is now a Common Lisp program, tested using the with [Quicklisp](http://www.quicklisp.org/beta/). apt-get install sbcl - apt-get install libmysqlclient-dev + apt-get install libmysqlclient-dev libsqlite3-dev wget http://beta.quicklisp.org/quicklisp.lisp sbcl --load quicklisp.lisp * (quicklisp-quickstart:install) diff --git a/bootstrap.sh b/bootstrap.sh index b33ff67..b1412e9 100644 --- a/bootstrap.sh +++ b/bootstrap.sh @@ -11,7 +11,7 @@ sudo apt-get install -y postgresql-9.3 postgresql-contrib-9.3 \ postgresql-9.3-ip4r \ sbcl \ git patch unzip \ - libmysqlclient-dev + libmysqlclient-dev libsqlite3-dev HBA=/etc/postgresql/9.3/main/pg_hba.conf echo "local all all trust" | sudo tee $HBA diff --git a/pgloader.asd b/pgloader.asd index 5ec02cf..d263063 100644 --- a/pgloader.asd +++ b/pgloader.asd @@ -6,27 +6,28 @@ :author "Dimitri Fontaine " :license "The PostgreSQL Licence" :depends-on (#:uiop ; host system integration - #:cl-log ; logging + #:cl-log ; logging #:postmodern ; PostgreSQL protocol implementation #:cl-postgres ; low level bits for COPY streaming #:simple-date ; FIXME: recheck dependency #:cl-mysql ; CFFI binding to libmysqlclient-dev - #:split-sequence ; some parsing is made easy - #:cl-csv ; full CSV reader - #:cl-fad ; file and directories + #:split-sequence ; some parsing is made easy + #:cl-csv ; full CSV reader + #:cl-fad ; file and directories #:lparallel ; threads, workers, queues - #:esrap ; parser generator + #:esrap ; parser generator #:alexandria ; utils - #:drakma ; http client, download archives + #:drakma ; http client, download archives #:zip ; support for zip archive files - #:flexi-streams ; streams + #:flexi-streams ; streams #:com.informatimago.clext ; portable character-sets listings #:usocket ; UDP / syslog #:local-time ; UDP date parsing - #:command-line-arguments ; for the main function + #:command-line-arguments ; for the main function #:abnf ; ABNF parser generator (for syslog) #:db3 ; DBF version 3 file reader #:py-configparser ; Read old-style INI config files + #:sqlite ; Query a SQLite file ) :components ((:module "src" @@ -47,16 +48,19 @@ "queue" "utils")) (:file "pgsql-queries" :depends-on ("package" "params")) + (:file "pgsql-schema" :depends-on ("package" "params")) (:file "pgsql" :depends-on ("package" "queue" "utils" "transforms" "pgsql-copy-format" - "pgsql-queries")) + "pgsql-queries" + "pgsql-schema")) ;; Source format specific implementations (:file "csv" :depends-on ("package" "pgsql")) (:file "db3" :depends-on ("package" "pgsql")) + (:file "sqlite" :depends-on ("package" "pgsql")) (:file "archive" :depends-on ("package" "pgsql")) (:file "syslog" :depends-on ("package" "pgsql")) diff --git a/src/mysql-cast-rules.lisp b/src/mysql-cast-rules.lisp index 9ee301b..fa4cbe9 100644 --- a/src/mysql-cast-rules.lisp +++ b/src/mysql-cast-rules.lisp @@ -4,23 +4,9 @@ (in-package :pgloader.mysql) -(defvar *pgsql-reserved-keywords* nil - "We need to always quote PostgreSQL reserved keywords") - ;;; ;;; Some functions to deal with ENUM types ;;; -(defun apply-identifier-case (identifier case) - "Return a SQL string to use in the output part of a MySQL query." - (let ((case - (if (member identifier *pgsql-reserved-keywords* :test #'string=) - :quote - case))) - (ecase case - (:downcase (cl-ppcre:regex-replace-all - "[^a-zA-Z0-9]" (string-downcase identifier) "_")) - (:quote (format nil "\"~a\"" identifier))))) - (defun explode-mysql-enum (ctype) "Convert MySQL ENUM expression into a list of labels." ;; from: "ENUM('small', 'medium', 'large')" diff --git a/src/mysql.lisp b/src/mysql.lisp index 07a161e..ecbfc55 100644 --- a/src/mysql.lisp +++ b/src/mysql.lisp @@ -485,20 +485,6 @@ order by ordinal_position" dbname table-name))) ;;; ;;; Work on all tables for given database ;;; -(defun execute-with-timing (dbname label sql state &key (count 1)) - "Execute given SQL and resgister its timing into STATE." - (multiple-value-bind (res secs) - (timing - (with-pgsql-transaction (dbname) - (handler-case - (pgsql-execute sql) - (cl-postgres:database-error (e) - (log-message :error "~a" e)) - (cl-postgres:postgresql-warning (w) - (log-message :warning "~a" w))))) - (declare (ignore res)) - (pgstate-incf state label :rows count :secs secs))) - (defun create-indexes-in-kernel (dbname table-name indexes kernel channel &key identifier-case include-drop @@ -523,7 +509,8 @@ order by ordinal_position" dbname table-name))) do (log-message :notice "~a" sql) (lp:submit-task drop-channel - #'execute-with-timing dbname label sql state)) + #'pgsql-execute-with-timing + dbname label sql state)) ;; wait for the DROP INDEX to be done before issuing CREATE INDEX (loop for idx in drop-indexes do (lp:receive-result drop-channel)))) @@ -533,7 +520,8 @@ order by ordinal_position" dbname table-name))) :identifier-case identifier-case) do (log-message :notice "~a" sql) - (lp:submit-task channel #'execute-with-timing dbname label sql state)))) + (lp:submit-task channel #'psql-execute-with-timing + dbname label sql state)))) (defun stream-database (dbname &key diff --git a/src/package.lisp b/src/package.lisp index f27551f..f9d535e 100644 --- a/src/package.lisp +++ b/src/package.lisp @@ -48,6 +48,7 @@ (:use #:cl #:pgloader.params #:pgloader.utils) (:export #:with-pgsql-transaction #:pgsql-execute + #:pgsql-execute-with-timing #:truncate-table #:copy-from-file #:copy-from-queue @@ -57,9 +58,11 @@ #:list-tables-cols #:list-reserved-keywords #:reset-all-sequences - #:execute #:get-date-columns - #:format-row)) + #:format-row + #:apply-identifier-case + #:create-tables + #:format-pgsql-column)) (defpackage #:pgloader.ini (:use #:cl #:pgloader.params #:pgloader.utils) @@ -130,7 +133,9 @@ (:use #:cl #:pgloader.params #:pgloader.utils) (:import-from #:pgloader.pgsql #:with-pgsql-transaction - #:pgsql-execute) + #:pgsql-execute + #:pgsql-execute-with-timing + #:apply-identifier-case) (:export #:*cast-rules* #:*default-cast-rules* #:map-rows @@ -142,6 +147,21 @@ #:stream-table #:stream-database)) +(defpackage #:pgloader.sqlite + (:use #:cl #:pgloader.params #:pgloader.utils) + (:import-from #:pgloader.pgsql + #:with-pgsql-transaction + #:pgsql-execute + #:pgsql-execute-with-timing + #:apply-identifier-case + #:create-tables + #:format-pgsql-column) + (:export #:map-rows + #:copy-to + #:list-tables + #:stream-table + #:stream-database)) + ;; ;; Main package diff --git a/src/pgsql-queries.lisp b/src/pgsql-queries.lisp index 68fe74c..46b7764 100644 --- a/src/pgsql-queries.lisp +++ b/src/pgsql-queries.lisp @@ -38,6 +38,20 @@ (log-message :debug set) (pomo:execute set)))) +(defun pgsql-execute-with-timing (dbname label sql state &key (count 1)) + "Execute given SQL and resgister its timing into STATE." + (multiple-value-bind (res secs) + (timing + (with-pgsql-transaction (dbname) + (handler-case + (pgsql-execute sql) + (cl-postgres:database-error (e) + (log-message :error "~a" e)) + (cl-postgres:postgresql-warning (w) + (log-message :warning "~a" w))))) + (declare (ignore res)) + (pgstate-incf state label :rows count :secs secs))) + ;;; ;;; PostgreSQL queries ;;; diff --git a/src/pgsql-schema.lisp b/src/pgsql-schema.lisp new file mode 100644 index 0000000..a1d8231 --- /dev/null +++ b/src/pgsql-schema.lisp @@ -0,0 +1,187 @@ +;;; +;;; Tools to handle PostgreSQL tables and indexes creations +;;; +(in-package pgloader.pgsql) + +(defvar *pgsql-reserved-keywords* nil + "We need to always quote PostgreSQL reserved keywords") + +(defstruct pgsql-column name type-name type-mod nullable default) + +(defmethod format-pgsql-column ((col pgsql-column) &key identifier-case) + "Return a string representing the PostgreSQL column definition." + (let* ((column-name + (apply-identifier-case (pgsql-column-type-name col) identifier-case)) + (type-definition + (format nil + "~a~@[~a~]~:[~; not null~]~@[ default ~a~]" + (pgsql-column-type-name col) + (pgsql-column-type-mod col) + (pgsql-column-nullable col) + (pgsql-column-default col)))) + (format nil "~a ~22t ~a" column-name type-definition))) + +(defun apply-identifier-case (identifier case) + "Return given IDENTIFIER with CASE handled to be PostgreSQL compatible." + (let ((case + (if (member identifier *pgsql-reserved-keywords* :test #'string=) + :quote + case))) + (ecase case + (:downcase (cl-ppcre:regex-replace-all + "[^a-zA-Z0-9]" (string-downcase identifier) "_")) + (:quote (format nil "\"~a\"" identifier))))) + +(defun create-table-sql (table-name cols &key identifier-case) + "Return a PostgreSQL CREATE TABLE statement from given COLS. + + Each element of the COLS list is expected to be of a type handled by the + `format-pgsql-column' generic function." + (with-output-to-string (s) + (let ((table-name (apply-identifier-case table-name identifier-case))) + (format s "CREATE TABLE ~a ~%(~%" table-name)) + (loop + for (col . last?) on cols + for pg-coldef = (format-pgsql-column col :identifier-case identifier-case) + do (format s " ~a~:[~;,~]~%" pg-coldef last?)) + (format s ");~%"))) + +(defun drop-table-if-exists-sql (table-name &key identifier-case) + "Return the PostgreSQL DROP TABLE IF EXISTS statement for TABLE-NAME." + (let ((table-name (apply-identifier-case table-name identifier-case))) + (format nil "DROP TABLE IF EXISTS ~a;~%" table-name))) + +(defun create-table-sql-list (all-columns + &key + include-drop + (identifier-case :downcase)) + "Return the list of CREATE TABLE statements to run against PostgreSQL. + + The ALL-COLUMNS parameter must be a list of alist associations where the + car is the table-name (a string) and the cdr is a column list. Each + element of the column list is expected to be of a type handled by the + `format-pgsql-column' generic function, such as `pgsql-column'." + (loop + for (table-name . cols) in all-columns + when include-drop + collect (drop-table-if-exists-sql table-name + :identifier-case identifier-case) + + collect (create-table-sql table-name cols + :identifier-case identifier-case))) + +(defun create-tables (all-columns + &key + (identifier-case :downcase) + include-drop + (client-min-messages :notice)) + "Create all tables in database dbname in PostgreSQL. + + The ALL-COLUMNS parameter must be a list of alist associations where the + car is the table-name (a string) and the cdr is a column list. Each + element of the column list is expected to be of a type handled by the + `format-pgsql-column' generic function, such as `pgsql-column'." + (loop + for nb-tables from 0 + for sql in (create-table-sql-list all-columns + :identifier-case identifier-case + :include-drop include-drop) + do (pgsql-execute sql :client-min-messages client-min-messages) + finally (return nb-tables))) + + +;;; +;;; Index support: TODO +;;; +(defun create-index-sql (table-name table-oid index-name unique cols + &key identifier-case) + "Return a PostgreSQL CREATE INDEX statement as a string." + (let* ((index-name (format nil "idx_~a_~a" table-oid index-name)) + (table-name (apply-identifier-case table-name identifier-case)) + (index-name (apply-identifier-case index-name identifier-case)) + (cols + (mapcar + (lambda (col) (apply-identifier-case col identifier-case)) cols))) + (cond + ((string= index-name "PRIMARY") + (format nil + "ALTER TABLE ~a ADD PRIMARY KEY (~{~a~^, ~});" table-name cols)) + + (t + (format nil + "CREATE~:[~; UNIQUE~] INDEX ~a ON ~a (~{~a~^, ~});" + unique index-name table-name cols))))) + +(defun drop-index-if-exists-sql (table-name table-oid index-name + &key identifier-case) + "Return the DROP INDEX statement for PostgreSQL" + (cond + ((string= index-name "PRIMARY") + (let* ((pkey-name (format nil "~a_pkey" table-name)) + (pkey-name (apply-identifier-case pkey-name identifier-case)) + (table-name (apply-identifier-case table-name identifier-case))) + (format nil "ALTER TABLE ~a DROP CONSTRAINT ~a;" table-name pkey-name))) + + (t + (let* ((index-name (format nil "idx_~a_~a" table-oid index-name)) + (index-name (apply-identifier-case index-name identifier-case))) + (format nil "DROP INDEX IF EXISTS ~a;" index-name))))) + +(defun drop-index-sql-list (table-name table-oid indexes + &key identifier-case) + "Return the DROP INDEX statements for given INDEXES definitions." + (loop + for (index-name unique cols) in indexes + ;; no need to alter table drop constraint, when include-drop + ;; is true we just did drop the table and created it again + ;; anyway + unless (string= index-name "PRIMARY") + collect (drop-index-if-exists-sql table-name table-oid index-name + :identifier-case identifier-case))) + +(defun create-index-sql-list (table-name table-oid indexes + &key identifier-case) + "Return the CREATE INDEX statements from given INDEXES definitions." + (loop + for (index-name unique cols) in indexes + collect (create-index-sql table-name table-oid index-name unique cols + :identifier-case identifier-case))) + +(defun create-indexes-in-kernel (dbname table-name indexes kernel channel + &key + identifier-case include-drop + state (label "create index")) + "Create indexes for given table in dbname, using given lparallel KERNEL + and CHANNEL so that the index build happen in concurrently with the data + copying." + (let* ((lp:*kernel* kernel) + (table-oid + (with-pgsql-transaction (dbname) + (pomo:query + (format nil "select '~a'::regclass::oid" table-name) :single)))) + (pgstate-add-table state dbname label) + + (when include-drop + (let ((drop-channel (lp:make-channel)) + (drop-indexes + (drop-index-sql-list table-name table-oid indexes + :identifier-case identifier-case))) + (loop + for sql in drop-indexes + do + (log-message :notice "~a" sql) + (lp:submit-task drop-channel + #'pgsql-execute-with-timing + dbname label sql state)) + + ;; wait for the DROP INDEX to be done before issuing CREATE INDEX + (loop for idx in drop-indexes do (lp:receive-result drop-channel)))) + + (loop + for sql in (create-index-sql-list table-name table-oid indexes + :identifier-case identifier-case) + do + (log-message :notice "~a" sql) + (lp:submit-task channel + #'pgsql-execute-with-timing + dbname label sql state)))) diff --git a/src/sqlite.lisp b/src/sqlite.lisp new file mode 100644 index 0000000..e2c67db --- /dev/null +++ b/src/sqlite.lisp @@ -0,0 +1,249 @@ +;;; +;;; Tools to handle the SQLite Database +;;; + +(in-package :pgloader.sqlite) + +(defvar *sqlite-db* nil + "The SQLite database connection handler.") + +;;; +;;; SQLite tools connecting to a database +;;; +(defstruct (coldef + (:constructor make-coldef (seq name type nullable default pk-id))) + seq name type nullable default pk-id) + +(defmethod format-pgsql-column ((col coldef) &key identifier-case) + "Return a string representing the PostgreSQL column definition." + (let* ((column-name + (apply-identifier-case (coldef-name col) identifier-case)) + (type-definition + (format nil + "~a~:[~; not null~]~@[ default ~a~]" + (coldef-type col) + (coldef-nullable col) + (coldef-default col)))) + (format nil "~a ~22t ~a" column-name type-definition))) + +(defun list-tables (&optional (db *sqlite-db*)) + "Return the list of tables found in SQLITE-DB." + (let ((sql "SELECT tbl_name FROM sqlite_master WHERE type='table'")) + (loop for (name) in (sqlite:execute-to-list db sql) + collect name))) + +(defun list-columns (table-name &optional (db *sqlite-db*)) + "Return the list of columns found in TABLE-NAME." + (let ((sql (format nil "PRAGMA table_info(~a)" table-name))) + (loop for (seq name type nullable default pk-id) in + (sqlite:execute-to-list db sql) + collect (make-coldef seq name type (= 1 nullable) default pk-id)))) + +(defun list-all-columns (&optional (db *sqlite-db*)) + "Get the list of SQLite column definitions per table." + (loop for table-name in (list-tables db) + collect (cons table-name (list-columns table-name)))) + +(defun list-all-indexes (&optional (db *sqlite-db*)) + "Get the list of SQLite index definitions per table." + (let ((sql "SELECT name, tbl_name, sql FROM sqlite_master WHERE type='index'")) + (loop with schema = nil + for (index-name table-name sql) in (sqlite:execute-to-list db sql) + do (let ((entry (assoc table-name schema :test 'equal)) + (idxdef (cons index-name sql))) + (if entry + (push idxdef (cdr entry)) + (push (cons table-name (list idxdef)) schema))) + finally (return (reverse (loop for (name . indexes) in schema + collect (cons name (reverse indexes)))))))) + + +;;; +;;; Map a function to each row extracted from SQLite +;;; + +(defun map-rows (db table-name &key process-row-fn) + "Extract SQLite data and call PROCESS-ROW-FN function with a single + argument (a list of column values) for each row" + (let ((sql (format nil "SELECT * FROM ~a" table-name))) + (loop + with statement = (sqlite:prepare-statement db sql) + with column-numbers = + (loop for i from 0 + for name in (sqlite:statement-column-names statement) + collect i) + while (sqlite:step-statement statement) + for row = (mapcar (lambda (x) + (sqlite:statement-column-value statement x)) + column-numbers) + do (funcall process-row-fn row) + finally (sqlite:finalize-statement statement)))) + + +(defun copy-to-queue (db table-name dataq) + "Copy data from SQLite table TABLE-NAME within connection DB into queue DATAQ" + (let ((read + (pgloader.queue:map-push-queue dataq #'map-rows db table-name))) + (pgstate-incf *state* table-name :read read))) + +(defun stream-table (db table-name + &key + (kernel nil k-s-p) + pg-dbname + truncate + transforms) + "Stream the contents of TABLE-NAME in SQLite down to PostgreSQL." + (let* ((summary (null *state*)) + (*state* (or *state* (pgloader.utils:make-pgstate))) + (lp:*kernel* (or kernel (make-kernel 2))) + (channel (lp:make-channel)) + (dataq (lq:make-queue :fixed-capacity 4096)) + (transforms (or transforms + (let ((cols (list-columns table-name db))) + (loop for col in cols + if (string-equal "float" (coldef-type col)) + collect (lambda (f) + (format nil "~f" f)) + else + collect nil))))) + + (with-stats-collection (pg-dbname table-name :state *state* :summary summary) + (log-message :notice "COPY ~a" table-name) + ;; read data from SQLite + (lp:submit-task channel #'copy-to-queue db table-name dataq) + + ;; and start another task to push that data from the queue to PostgreSQL + (lp:submit-task channel + #'pgloader.pgsql:copy-from-queue + pg-dbname table-name dataq + :truncate truncate + :transforms transforms) + + ;; now wait until both the tasks are over + (loop for tasks below 2 do (lp:receive-result channel) + finally + (log-message :info "COPY ~a done." table-name) + (unless k-s-p (lp:end-kernel)))))) + +(defun create-indexes-in-kernel (dbname indexes kernel channel + &key include-drop state (label "create index")) + "Create indexes for given table in dbname, using given lparallel KERNEL + and CHANNEL so that the index build happen in concurrently with the data + copying." + (let* ((lp:*kernel* kernel)) + (pgstate-add-table state dbname label) + + (when include-drop + (let ((drop-channel (lp:make-channel)) + (drop-indexes + (loop for (name . sql) in indexes + collect (format nil "DROP INDEX IF EXISTS ~a;" name)))) + (loop + for sql in drop-indexes + do + (log-message :notice "~a" sql) + (lp:submit-task drop-channel + #'pgsql-execute-with-timing + dbname label sql state)) + + ;; wait for the DROP INDEX to be done before issuing CREATE INDEX + (loop for idx in drop-indexes do (lp:receive-result drop-channel)))) + + (loop + for (name . sql) in indexes + do + (log-message :notice "~a" sql) + (lp:submit-task channel + #'pgsql-execute-with-timing + dbname label sql state)))) + +(defun stream-database (filename + &key + pg-dbname + (schema-only nil) + (create-tables nil) + (include-drop nil) + (create-indexes t) + (reset-sequences t) + (truncate nil) + only-tables) + "Export SQLite data and Import it into PostgreSQL" + (let* ((*sqlite-db* (sqlite:connect filename)) + (*state* (make-pgstate)) + (idx-state (make-pgstate)) + (seq-state (make-pgstate)) + (copy-kernel (make-kernel 2)) + (all-columns (list-all-columns)) + (all-indexes (list-all-indexes)) + (max-indexes (loop for (table . indexes) in all-indexes + maximizing (length indexes))) + (idx-kernel (when (and max-indexes (< 0 max-indexes)) + (make-kernel max-indexes))) + (idx-channel (when idx-kernel + (let ((lp:*kernel* idx-kernel)) + (lp:make-channel))))) + + ;; if asked, first drop/create the tables on the PostgreSQL side + (when create-tables + (log-message :notice "~:[~;DROP then ~]CREATE TABLES" include-drop) + (with-pgsql-transaction (pg-dbname) + (create-tables all-columns :include-drop include-drop))) + + (loop + for (table-name . columns) in all-columns + when (or (null only-tables) + (member table-name only-tables :test #'equal)) + do + (progn + ;; first COPY the data from SQLite to PostgreSQL, using copy-kernel + (unless schema-only + (stream-table *sqlite-db* table-name + :kernel copy-kernel + :pg-dbname pg-dbname + :truncate truncate)) + + ;; Create the indexes for that table in parallel with the next + ;; COPY, and all at once in concurrent threads to benefit from + ;; PostgreSQL synchronous scan ability + ;; + ;; We just push new index build as they come along, if one + ;; index build requires much more time than the others our + ;; index build might get unsync: indexes for different tables + ;; will get built in parallel --- not a big problem. + (when create-indexes + (let* ((indexes + (cdr (assoc table-name all-indexes :test #'string=)))) + (create-indexes-in-kernel pg-dbname indexes + idx-kernel idx-channel + :state idx-state + :include-drop include-drop))))) + + ;; don't forget to reset sequences, but only when we did actually import + ;; the data. + (when (and (not schema-only) reset-sequences) + (let ((tables (or only-tables + (mapcar #'car all-columns)))) + (log-message :notice "Reset sequences") + (with-stats-collection (pg-dbname "reset sequences" + :use-result-as-rows t + :state seq-state) + (pgloader.pgsql:reset-all-sequences pg-dbname :tables tables)))) + + ;; now end the kernels + (let ((lp:*kernel* idx-kernel)) (lp:end-kernel)) + (let ((lp:*kernel* copy-kernel)) + ;; wait until the indexes are done being built... + ;; don't forget accounting for that waiting time. + (with-stats-collection (pg-dbname "index build completion" :state *state*) + (loop for idx in all-indexes do (lp:receive-result idx-channel))) + (lp:end-kernel)) + + ;; and report the total time spent on the operation + (report-summary) + (format t pgloader.utils::*header-line*) + (report-summary :state idx-state :header nil :footer nil) + (report-summary :state seq-state :header nil :footer nil) + ;; don't forget to add up the RESET SEQUENCES timings + (incf (pgloader.utils::pgstate-secs *state*) + (pgloader.utils::pgstate-secs seq-state)) + (report-pgstate-stats *state* "Total streaming time")))