Hack-in some support for SQLite data source, including some refactoring preps.

This commit is contained in:
Dimitri Fontaine 2013-10-24 00:21:46 +02:00
parent 6e29957c1c
commit 50114a0d3a
9 changed files with 492 additions and 44 deletions

View File

@ -22,7 +22,7 @@ pgloader is now a Common Lisp program, tested using the
with [Quicklisp](http://www.quicklisp.org/beta/).
apt-get install sbcl
apt-get install libmysqlclient-dev
apt-get install libmysqlclient-dev libsqlite3-dev
wget http://beta.quicklisp.org/quicklisp.lisp
sbcl --load quicklisp.lisp
* (quicklisp-quickstart:install)

View File

@ -11,7 +11,7 @@ sudo apt-get install -y postgresql-9.3 postgresql-contrib-9.3 \
postgresql-9.3-ip4r \
sbcl \
git patch unzip \
libmysqlclient-dev
libmysqlclient-dev libsqlite3-dev
HBA=/etc/postgresql/9.3/main/pg_hba.conf
echo "local all all trust" | sudo tee $HBA

View File

@ -6,27 +6,28 @@
:author "Dimitri Fontaine <dimitri@2ndQuadrant.fr>"
:license "The PostgreSQL Licence"
:depends-on (#:uiop ; host system integration
#:cl-log ; logging
#:cl-log ; logging
#:postmodern ; PostgreSQL protocol implementation
#:cl-postgres ; low level bits for COPY streaming
#:simple-date ; FIXME: recheck dependency
#:cl-mysql ; CFFI binding to libmysqlclient-dev
#:split-sequence ; some parsing is made easy
#:cl-csv ; full CSV reader
#:cl-fad ; file and directories
#:split-sequence ; some parsing is made easy
#:cl-csv ; full CSV reader
#:cl-fad ; file and directories
#:lparallel ; threads, workers, queues
#:esrap ; parser generator
#:esrap ; parser generator
#:alexandria ; utils
#:drakma ; http client, download archives
#:drakma ; http client, download archives
#:zip ; support for zip archive files
#:flexi-streams ; streams
#:flexi-streams ; streams
#:com.informatimago.clext ; portable character-sets listings
#:usocket ; UDP / syslog
#:local-time ; UDP date parsing
#:command-line-arguments ; for the main function
#:command-line-arguments ; for the main function
#:abnf ; ABNF parser generator (for syslog)
#:db3 ; DBF version 3 file reader
#:py-configparser ; Read old-style INI config files
#:sqlite ; Query a SQLite file
)
:components
((:module "src"
@ -47,16 +48,19 @@
"queue"
"utils"))
(:file "pgsql-queries" :depends-on ("package" "params"))
(:file "pgsql-schema" :depends-on ("package" "params"))
(:file "pgsql" :depends-on ("package"
"queue"
"utils"
"transforms"
"pgsql-copy-format"
"pgsql-queries"))
"pgsql-queries"
"pgsql-schema"))
;; Source format specific implementations
(:file "csv" :depends-on ("package" "pgsql"))
(:file "db3" :depends-on ("package" "pgsql"))
(:file "sqlite" :depends-on ("package" "pgsql"))
(:file "archive" :depends-on ("package" "pgsql"))
(:file "syslog" :depends-on ("package" "pgsql"))

View File

@ -4,23 +4,9 @@
(in-package :pgloader.mysql)
(defvar *pgsql-reserved-keywords* nil
"We need to always quote PostgreSQL reserved keywords")
;;;
;;; Some functions to deal with ENUM types
;;;
(defun apply-identifier-case (identifier case)
"Return a SQL string to use in the output part of a MySQL query."
(let ((case
(if (member identifier *pgsql-reserved-keywords* :test #'string=)
:quote
case)))
(ecase case
(:downcase (cl-ppcre:regex-replace-all
"[^a-zA-Z0-9]" (string-downcase identifier) "_"))
(:quote (format nil "\"~a\"" identifier)))))
(defun explode-mysql-enum (ctype)
"Convert MySQL ENUM expression into a list of labels."
;; from: "ENUM('small', 'medium', 'large')"

View File

@ -485,20 +485,6 @@ order by ordinal_position" dbname table-name)))
;;;
;;; Work on all tables for given database
;;;
(defun execute-with-timing (dbname label sql state &key (count 1))
"Execute given SQL and resgister its timing into STATE."
(multiple-value-bind (res secs)
(timing
(with-pgsql-transaction (dbname)
(handler-case
(pgsql-execute sql)
(cl-postgres:database-error (e)
(log-message :error "~a" e))
(cl-postgres:postgresql-warning (w)
(log-message :warning "~a" w)))))
(declare (ignore res))
(pgstate-incf state label :rows count :secs secs)))
(defun create-indexes-in-kernel (dbname table-name indexes kernel channel
&key
identifier-case include-drop
@ -523,7 +509,8 @@ order by ordinal_position" dbname table-name)))
do
(log-message :notice "~a" sql)
(lp:submit-task drop-channel
#'execute-with-timing dbname label sql state))
#'pgsql-execute-with-timing
dbname label sql state))
;; wait for the DROP INDEX to be done before issuing CREATE INDEX
(loop for idx in drop-indexes do (lp:receive-result drop-channel))))
@ -533,7 +520,8 @@ order by ordinal_position" dbname table-name)))
:identifier-case identifier-case)
do
(log-message :notice "~a" sql)
(lp:submit-task channel #'execute-with-timing dbname label sql state))))
(lp:submit-task channel #'psql-execute-with-timing
dbname label sql state))))
(defun stream-database (dbname
&key

View File

@ -48,6 +48,7 @@
(:use #:cl #:pgloader.params #:pgloader.utils)
(:export #:with-pgsql-transaction
#:pgsql-execute
#:pgsql-execute-with-timing
#:truncate-table
#:copy-from-file
#:copy-from-queue
@ -57,9 +58,11 @@
#:list-tables-cols
#:list-reserved-keywords
#:reset-all-sequences
#:execute
#:get-date-columns
#:format-row))
#:format-row
#:apply-identifier-case
#:create-tables
#:format-pgsql-column))
(defpackage #:pgloader.ini
(:use #:cl #:pgloader.params #:pgloader.utils)
@ -130,7 +133,9 @@
(:use #:cl #:pgloader.params #:pgloader.utils)
(:import-from #:pgloader.pgsql
#:with-pgsql-transaction
#:pgsql-execute)
#:pgsql-execute
#:pgsql-execute-with-timing
#:apply-identifier-case)
(:export #:*cast-rules*
#:*default-cast-rules*
#:map-rows
@ -142,6 +147,21 @@
#:stream-table
#:stream-database))
(defpackage #:pgloader.sqlite
(:use #:cl #:pgloader.params #:pgloader.utils)
(:import-from #:pgloader.pgsql
#:with-pgsql-transaction
#:pgsql-execute
#:pgsql-execute-with-timing
#:apply-identifier-case
#:create-tables
#:format-pgsql-column)
(:export #:map-rows
#:copy-to
#:list-tables
#:stream-table
#:stream-database))
;;
;; Main package

View File

@ -38,6 +38,20 @@
(log-message :debug set)
(pomo:execute set))))
(defun pgsql-execute-with-timing (dbname label sql state &key (count 1))
"Execute given SQL and resgister its timing into STATE."
(multiple-value-bind (res secs)
(timing
(with-pgsql-transaction (dbname)
(handler-case
(pgsql-execute sql)
(cl-postgres:database-error (e)
(log-message :error "~a" e))
(cl-postgres:postgresql-warning (w)
(log-message :warning "~a" w)))))
(declare (ignore res))
(pgstate-incf state label :rows count :secs secs)))
;;;
;;; PostgreSQL queries
;;;

187
src/pgsql-schema.lisp Normal file
View File

@ -0,0 +1,187 @@
;;;
;;; Tools to handle PostgreSQL tables and indexes creations
;;;
(in-package pgloader.pgsql)
(defvar *pgsql-reserved-keywords* nil
"We need to always quote PostgreSQL reserved keywords")
(defstruct pgsql-column name type-name type-mod nullable default)
(defmethod format-pgsql-column ((col pgsql-column) &key identifier-case)
"Return a string representing the PostgreSQL column definition."
(let* ((column-name
(apply-identifier-case (pgsql-column-type-name col) identifier-case))
(type-definition
(format nil
"~a~@[~a~]~:[~; not null~]~@[ default ~a~]"
(pgsql-column-type-name col)
(pgsql-column-type-mod col)
(pgsql-column-nullable col)
(pgsql-column-default col))))
(format nil "~a ~22t ~a" column-name type-definition)))
(defun apply-identifier-case (identifier case)
"Return given IDENTIFIER with CASE handled to be PostgreSQL compatible."
(let ((case
(if (member identifier *pgsql-reserved-keywords* :test #'string=)
:quote
case)))
(ecase case
(:downcase (cl-ppcre:regex-replace-all
"[^a-zA-Z0-9]" (string-downcase identifier) "_"))
(:quote (format nil "\"~a\"" identifier)))))
(defun create-table-sql (table-name cols &key identifier-case)
"Return a PostgreSQL CREATE TABLE statement from given COLS.
Each element of the COLS list is expected to be of a type handled by the
`format-pgsql-column' generic function."
(with-output-to-string (s)
(let ((table-name (apply-identifier-case table-name identifier-case)))
(format s "CREATE TABLE ~a ~%(~%" table-name))
(loop
for (col . last?) on cols
for pg-coldef = (format-pgsql-column col :identifier-case identifier-case)
do (format s " ~a~:[~;,~]~%" pg-coldef last?))
(format s ");~%")))
(defun drop-table-if-exists-sql (table-name &key identifier-case)
"Return the PostgreSQL DROP TABLE IF EXISTS statement for TABLE-NAME."
(let ((table-name (apply-identifier-case table-name identifier-case)))
(format nil "DROP TABLE IF EXISTS ~a;~%" table-name)))
(defun create-table-sql-list (all-columns
&key
include-drop
(identifier-case :downcase))
"Return the list of CREATE TABLE statements to run against PostgreSQL.
The ALL-COLUMNS parameter must be a list of alist associations where the
car is the table-name (a string) and the cdr is a column list. Each
element of the column list is expected to be of a type handled by the
`format-pgsql-column' generic function, such as `pgsql-column'."
(loop
for (table-name . cols) in all-columns
when include-drop
collect (drop-table-if-exists-sql table-name
:identifier-case identifier-case)
collect (create-table-sql table-name cols
:identifier-case identifier-case)))
(defun create-tables (all-columns
&key
(identifier-case :downcase)
include-drop
(client-min-messages :notice))
"Create all tables in database dbname in PostgreSQL.
The ALL-COLUMNS parameter must be a list of alist associations where the
car is the table-name (a string) and the cdr is a column list. Each
element of the column list is expected to be of a type handled by the
`format-pgsql-column' generic function, such as `pgsql-column'."
(loop
for nb-tables from 0
for sql in (create-table-sql-list all-columns
:identifier-case identifier-case
:include-drop include-drop)
do (pgsql-execute sql :client-min-messages client-min-messages)
finally (return nb-tables)))
;;;
;;; Index support: TODO
;;;
(defun create-index-sql (table-name table-oid index-name unique cols
&key identifier-case)
"Return a PostgreSQL CREATE INDEX statement as a string."
(let* ((index-name (format nil "idx_~a_~a" table-oid index-name))
(table-name (apply-identifier-case table-name identifier-case))
(index-name (apply-identifier-case index-name identifier-case))
(cols
(mapcar
(lambda (col) (apply-identifier-case col identifier-case)) cols)))
(cond
((string= index-name "PRIMARY")
(format nil
"ALTER TABLE ~a ADD PRIMARY KEY (~{~a~^, ~});" table-name cols))
(t
(format nil
"CREATE~:[~; UNIQUE~] INDEX ~a ON ~a (~{~a~^, ~});"
unique index-name table-name cols)))))
(defun drop-index-if-exists-sql (table-name table-oid index-name
&key identifier-case)
"Return the DROP INDEX statement for PostgreSQL"
(cond
((string= index-name "PRIMARY")
(let* ((pkey-name (format nil "~a_pkey" table-name))
(pkey-name (apply-identifier-case pkey-name identifier-case))
(table-name (apply-identifier-case table-name identifier-case)))
(format nil "ALTER TABLE ~a DROP CONSTRAINT ~a;" table-name pkey-name)))
(t
(let* ((index-name (format nil "idx_~a_~a" table-oid index-name))
(index-name (apply-identifier-case index-name identifier-case)))
(format nil "DROP INDEX IF EXISTS ~a;" index-name)))))
(defun drop-index-sql-list (table-name table-oid indexes
&key identifier-case)
"Return the DROP INDEX statements for given INDEXES definitions."
(loop
for (index-name unique cols) in indexes
;; no need to alter table drop constraint, when include-drop
;; is true we just did drop the table and created it again
;; anyway
unless (string= index-name "PRIMARY")
collect (drop-index-if-exists-sql table-name table-oid index-name
:identifier-case identifier-case)))
(defun create-index-sql-list (table-name table-oid indexes
&key identifier-case)
"Return the CREATE INDEX statements from given INDEXES definitions."
(loop
for (index-name unique cols) in indexes
collect (create-index-sql table-name table-oid index-name unique cols
:identifier-case identifier-case)))
(defun create-indexes-in-kernel (dbname table-name indexes kernel channel
&key
identifier-case include-drop
state (label "create index"))
"Create indexes for given table in dbname, using given lparallel KERNEL
and CHANNEL so that the index build happen in concurrently with the data
copying."
(let* ((lp:*kernel* kernel)
(table-oid
(with-pgsql-transaction (dbname)
(pomo:query
(format nil "select '~a'::regclass::oid" table-name) :single))))
(pgstate-add-table state dbname label)
(when include-drop
(let ((drop-channel (lp:make-channel))
(drop-indexes
(drop-index-sql-list table-name table-oid indexes
:identifier-case identifier-case)))
(loop
for sql in drop-indexes
do
(log-message :notice "~a" sql)
(lp:submit-task drop-channel
#'pgsql-execute-with-timing
dbname label sql state))
;; wait for the DROP INDEX to be done before issuing CREATE INDEX
(loop for idx in drop-indexes do (lp:receive-result drop-channel))))
(loop
for sql in (create-index-sql-list table-name table-oid indexes
:identifier-case identifier-case)
do
(log-message :notice "~a" sql)
(lp:submit-task channel
#'pgsql-execute-with-timing
dbname label sql state))))

249
src/sqlite.lisp Normal file
View File

@ -0,0 +1,249 @@
;;;
;;; Tools to handle the SQLite Database
;;;
(in-package :pgloader.sqlite)
(defvar *sqlite-db* nil
"The SQLite database connection handler.")
;;;
;;; SQLite tools connecting to a database
;;;
(defstruct (coldef
(:constructor make-coldef (seq name type nullable default pk-id)))
seq name type nullable default pk-id)
(defmethod format-pgsql-column ((col coldef) &key identifier-case)
"Return a string representing the PostgreSQL column definition."
(let* ((column-name
(apply-identifier-case (coldef-name col) identifier-case))
(type-definition
(format nil
"~a~:[~; not null~]~@[ default ~a~]"
(coldef-type col)
(coldef-nullable col)
(coldef-default col))))
(format nil "~a ~22t ~a" column-name type-definition)))
(defun list-tables (&optional (db *sqlite-db*))
"Return the list of tables found in SQLITE-DB."
(let ((sql "SELECT tbl_name FROM sqlite_master WHERE type='table'"))
(loop for (name) in (sqlite:execute-to-list db sql)
collect name)))
(defun list-columns (table-name &optional (db *sqlite-db*))
"Return the list of columns found in TABLE-NAME."
(let ((sql (format nil "PRAGMA table_info(~a)" table-name)))
(loop for (seq name type nullable default pk-id) in
(sqlite:execute-to-list db sql)
collect (make-coldef seq name type (= 1 nullable) default pk-id))))
(defun list-all-columns (&optional (db *sqlite-db*))
"Get the list of SQLite column definitions per table."
(loop for table-name in (list-tables db)
collect (cons table-name (list-columns table-name))))
(defun list-all-indexes (&optional (db *sqlite-db*))
"Get the list of SQLite index definitions per table."
(let ((sql "SELECT name, tbl_name, sql FROM sqlite_master WHERE type='index'"))
(loop with schema = nil
for (index-name table-name sql) in (sqlite:execute-to-list db sql)
do (let ((entry (assoc table-name schema :test 'equal))
(idxdef (cons index-name sql)))
(if entry
(push idxdef (cdr entry))
(push (cons table-name (list idxdef)) schema)))
finally (return (reverse (loop for (name . indexes) in schema
collect (cons name (reverse indexes))))))))
;;;
;;; Map a function to each row extracted from SQLite
;;;
(defun map-rows (db table-name &key process-row-fn)
"Extract SQLite data and call PROCESS-ROW-FN function with a single
argument (a list of column values) for each row"
(let ((sql (format nil "SELECT * FROM ~a" table-name)))
(loop
with statement = (sqlite:prepare-statement db sql)
with column-numbers =
(loop for i from 0
for name in (sqlite:statement-column-names statement)
collect i)
while (sqlite:step-statement statement)
for row = (mapcar (lambda (x)
(sqlite:statement-column-value statement x))
column-numbers)
do (funcall process-row-fn row)
finally (sqlite:finalize-statement statement))))
(defun copy-to-queue (db table-name dataq)
"Copy data from SQLite table TABLE-NAME within connection DB into queue DATAQ"
(let ((read
(pgloader.queue:map-push-queue dataq #'map-rows db table-name)))
(pgstate-incf *state* table-name :read read)))
(defun stream-table (db table-name
&key
(kernel nil k-s-p)
pg-dbname
truncate
transforms)
"Stream the contents of TABLE-NAME in SQLite down to PostgreSQL."
(let* ((summary (null *state*))
(*state* (or *state* (pgloader.utils:make-pgstate)))
(lp:*kernel* (or kernel (make-kernel 2)))
(channel (lp:make-channel))
(dataq (lq:make-queue :fixed-capacity 4096))
(transforms (or transforms
(let ((cols (list-columns table-name db)))
(loop for col in cols
if (string-equal "float" (coldef-type col))
collect (lambda (f)
(format nil "~f" f))
else
collect nil)))))
(with-stats-collection (pg-dbname table-name :state *state* :summary summary)
(log-message :notice "COPY ~a" table-name)
;; read data from SQLite
(lp:submit-task channel #'copy-to-queue db table-name dataq)
;; and start another task to push that data from the queue to PostgreSQL
(lp:submit-task channel
#'pgloader.pgsql:copy-from-queue
pg-dbname table-name dataq
:truncate truncate
:transforms transforms)
;; now wait until both the tasks are over
(loop for tasks below 2 do (lp:receive-result channel)
finally
(log-message :info "COPY ~a done." table-name)
(unless k-s-p (lp:end-kernel))))))
(defun create-indexes-in-kernel (dbname indexes kernel channel
&key include-drop state (label "create index"))
"Create indexes for given table in dbname, using given lparallel KERNEL
and CHANNEL so that the index build happen in concurrently with the data
copying."
(let* ((lp:*kernel* kernel))
(pgstate-add-table state dbname label)
(when include-drop
(let ((drop-channel (lp:make-channel))
(drop-indexes
(loop for (name . sql) in indexes
collect (format nil "DROP INDEX IF EXISTS ~a;" name))))
(loop
for sql in drop-indexes
do
(log-message :notice "~a" sql)
(lp:submit-task drop-channel
#'pgsql-execute-with-timing
dbname label sql state))
;; wait for the DROP INDEX to be done before issuing CREATE INDEX
(loop for idx in drop-indexes do (lp:receive-result drop-channel))))
(loop
for (name . sql) in indexes
do
(log-message :notice "~a" sql)
(lp:submit-task channel
#'pgsql-execute-with-timing
dbname label sql state))))
(defun stream-database (filename
&key
pg-dbname
(schema-only nil)
(create-tables nil)
(include-drop nil)
(create-indexes t)
(reset-sequences t)
(truncate nil)
only-tables)
"Export SQLite data and Import it into PostgreSQL"
(let* ((*sqlite-db* (sqlite:connect filename))
(*state* (make-pgstate))
(idx-state (make-pgstate))
(seq-state (make-pgstate))
(copy-kernel (make-kernel 2))
(all-columns (list-all-columns))
(all-indexes (list-all-indexes))
(max-indexes (loop for (table . indexes) in all-indexes
maximizing (length indexes)))
(idx-kernel (when (and max-indexes (< 0 max-indexes))
(make-kernel max-indexes)))
(idx-channel (when idx-kernel
(let ((lp:*kernel* idx-kernel))
(lp:make-channel)))))
;; if asked, first drop/create the tables on the PostgreSQL side
(when create-tables
(log-message :notice "~:[~;DROP then ~]CREATE TABLES" include-drop)
(with-pgsql-transaction (pg-dbname)
(create-tables all-columns :include-drop include-drop)))
(loop
for (table-name . columns) in all-columns
when (or (null only-tables)
(member table-name only-tables :test #'equal))
do
(progn
;; first COPY the data from SQLite to PostgreSQL, using copy-kernel
(unless schema-only
(stream-table *sqlite-db* table-name
:kernel copy-kernel
:pg-dbname pg-dbname
:truncate truncate))
;; Create the indexes for that table in parallel with the next
;; COPY, and all at once in concurrent threads to benefit from
;; PostgreSQL synchronous scan ability
;;
;; We just push new index build as they come along, if one
;; index build requires much more time than the others our
;; index build might get unsync: indexes for different tables
;; will get built in parallel --- not a big problem.
(when create-indexes
(let* ((indexes
(cdr (assoc table-name all-indexes :test #'string=))))
(create-indexes-in-kernel pg-dbname indexes
idx-kernel idx-channel
:state idx-state
:include-drop include-drop)))))
;; don't forget to reset sequences, but only when we did actually import
;; the data.
(when (and (not schema-only) reset-sequences)
(let ((tables (or only-tables
(mapcar #'car all-columns))))
(log-message :notice "Reset sequences")
(with-stats-collection (pg-dbname "reset sequences"
:use-result-as-rows t
:state seq-state)
(pgloader.pgsql:reset-all-sequences pg-dbname :tables tables))))
;; now end the kernels
(let ((lp:*kernel* idx-kernel)) (lp:end-kernel))
(let ((lp:*kernel* copy-kernel))
;; wait until the indexes are done being built...
;; don't forget accounting for that waiting time.
(with-stats-collection (pg-dbname "index build completion" :state *state*)
(loop for idx in all-indexes do (lp:receive-result idx-channel)))
(lp:end-kernel))
;; and report the total time spent on the operation
(report-summary)
(format t pgloader.utils::*header-line*)
(report-summary :state idx-state :header nil :footer nil)
(report-summary :state seq-state :header nil :footer nil)
;; don't forget to add up the RESET SEQUENCES timings
(incf (pgloader.utils::pgstate-secs *state*)
(pgloader.utils::pgstate-secs seq-state))
(report-pgstate-stats *state* "Total streaming time")))