Fix SQLite index support, add foreign keys support.

It turns out recent changes broke tne SQLite index support (from adding
support for MS SQL partial/filtered indexes), so fix it by using the
pgsql-index structure rather than the specific sqlite-idx one.

In passing, improve detection of PRIMARY KEY indexes, which was still
lacking. This work showed that the introspection done by pgloader was
wrong, it's way more crazy that we though, so adjust the code to loop
over PRAGMA calls for each object we inspect.

While adding PRAGMA calls, add support for foreign keys too, we have the
code infrastructure that makes it easy now.
This commit is contained in:
Dimitri Fontaine 2016-03-27 20:39:13 +02:00
parent cdc5d2f06b
commit fe3601b04c
7 changed files with 93 additions and 52 deletions

View File

@ -368,6 +368,8 @@
#:format-extra-type
#:format-extra-triggers
#:make-pgsql-fkey
#:pgsql-fkey-columns
#:pgsql-fkey-foreign-columns
#:format-pgsql-create-fkey
#:format-pgsql-drop-fkey
#:drop-pgsql-fkeys
@ -640,6 +642,9 @@
#:index-table-name
#:format-pgsql-create-index
#:create-indexes-in-kernel
#:make-pgsql-fkey
#:pgsql-fkey-columns
#:pgsql-fkey-foreign-columns
#:set-table-oids
#:reset-sequences
#:comment-on-tables-and-columns)

View File

@ -26,6 +26,7 @@ load database
option-create-tables
option-create-indexes
option-reset-sequences
option-foreign-keys
option-encoding))
(defrule sqlite-options (and kw-with
@ -111,6 +112,7 @@ load database
,(sql-code-block pg-db-conn :pre before "before load")
(pgloader.sqlite:copy-database source
:set-table-oids t
:including ',incl
:excluding ',excl
,@(remove-batch-control-option options))

View File

@ -54,7 +54,7 @@
(defmethod format-pgsql-create-fkey ((fk pgsql-fkey))
"Generate the PostgreSQL statement to rebuild a MySQL Foreign Key"
(format nil
"ALTER TABLE ~a ADD CONSTRAINT ~a FOREIGN KEY(~{~a~^,~}) REFERENCES ~a(~{~a~^,~})~:[~*~; ON UPDATE ~a~]~:[~*~; ON DELETE ~a~]"
"ALTER TABLE ~a ADD ~@[CONSTRAINT ~a ~]FOREIGN KEY(~{~a~^,~}) REFERENCES ~a(~{~a~^,~})~:[~*~; ON UPDATE ~a~]~:[~*~; ON DELETE ~a~]"
(format-table-name (pgsql-fkey-table fk))
(pgsql-fkey-name fk) ; constraint name
(pgsql-fkey-columns fk)
@ -67,14 +67,15 @@
(defmethod format-pgsql-drop-fkey ((fk pgsql-fkey) &key all-pgsql-fkeys)
"Generate the PostgreSQL statement to rebuild a MySQL Foreign Key"
(let* ((constraint-name (apply-identifier-case (pgsql-fkey-name fk)))
(table-name (format-table-name (pgsql-fkey-table fk)))
(fkeys (cdr (assoc table-name all-pgsql-fkeys :test #'string=)))
(fkey-exists (member constraint-name fkeys :test #'string=)))
(when fkey-exists
;; we could do that without all-pgsql-fkeys in 9.2 and following with:
;; alter table if exists ... drop constraint if exists ...
(format nil "ALTER TABLE ~a DROP CONSTRAINT ~a" table-name constraint-name))))
(when (pgsql-fkey-name fk)
(let* ((constraint-name (apply-identifier-case (pgsql-fkey-name fk)))
(table-name (format-table-name (pgsql-fkey-table fk)))
(fkeys (cdr (assoc table-name all-pgsql-fkeys :test #'string=)))
(fkey-exists (member constraint-name fkeys :test #'string=)))
(when fkey-exists
;; we could do that without all-pgsql-fkeys in 9.2 and following with:
;; alter table if exists ... drop constraint if exists ...
(format nil "ALTER TABLE ~a DROP CONSTRAINT ~a" table-name constraint-name)))))
(defun drop-pgsql-fkeys (catalog)
"Drop all Foreign Key Definitions given, to prepare for a clean run."

View File

@ -80,9 +80,7 @@
(unless data-only
(loop :for sql :in pkeys
:when sql
:do (progn
(log-message :notice "~a" sql)
(pgsql-execute-with-timing :post "Primary Keys" sql))))
:do (pgsql-execute-with-timing :post "Primary Keys" sql)))
;;
;; Foreign Key Constraints

View File

@ -93,38 +93,73 @@
:do (let ((table (add-table schema table-name)))
(list-columns table db))))
(defstruct sqlite-idx name table-name sql)
;;;
;;; Index support
;;;
(defun is-index-pk (table index-col-name-list)
"The only way to know with SQLite pragma introspection if a particular
UNIQUE index is actually PRIMARY KEY is by comparing the list of column
names in the index with the ones marked with non-zero pk in the table
definition."
(equal (loop :for field :in (table-field-list table)
:when (< 0 (coldef-pk-id field))
:collect (coldef-name field))
index-col-name-list))
(defmethod index-table-name ((index sqlite-idx))
(sqlite-idx-table-name index))
(defun list-index-cols (index-name &optional (db *sqlite-db*))
"Return the list of columns in INDEX-NAME."
(let ((sql (format nil "PRAGMA index_info(~a)" index-name)))
(loop :for (index-pos table-pos col-name) :in (sqlite:execute-to-list db sql)
:collect col-name)))
(defmethod format-pgsql-create-index ((table table) (index sqlite-idx))
"Generate the PostgresQL statement to build the given SQLite index definition."
(sqlite-idx-sql index))
(defun list-all-indexes (schema
&key
(db *sqlite-db*)
including
excluding)
"Get the list of SQLite index definitions per table."
(let ((sql (format nil
"SELECT name, tbl_name, replace(replace(sql, '[', ''), ']', '')
FROM sqlite_master
WHERE type='index'
~:[~*~;AND (~{~a~^~&~10t or ~})~]
~:[~*~;AND (~{~a~^~&~10t and ~})~]"
including ; do we print the clause?
(filter-list-to-where-clause including nil)
excluding ; do we print the clause?
(filter-list-to-where-clause excluding t))))
(log-message :info "~a" sql)
(defun list-indexes (table &optional (db *sqlite-db*))
"Return the list of indexes attached to TABLE."
(let* ((table-name (table-source-name table))
(sql (format nil "PRAGMA index_list(~a)" table-name)))
(loop
:for (index-name table-name sql) :in (sqlite:execute-to-list db sql)
:when sql
:do (let ((table (find-table schema table-name))
(idxdef (make-sqlite-idx :name index-name
:sql sql)))
(add-index table idxdef))
:finally (return schema))))
:for (seq index-name unique origin partial) :in (sqlite:execute-to-list db sql)
:do (let* ((cols (list-index-cols index-name db))
(index (make-pgsql-index :name index-name
:primary (is-index-pk table cols)
:unique (= unique 1)
:columns cols)))
(add-index table index)))))
(defun list-all-indexes (schema &key (db *sqlite-db*))
"Get the list of SQLite index definitions per table."
(loop :for table :in (schema-table-list schema)
:do (list-indexes table db)))
;;;
;;; Foreign keys support
;;;
(defun list-fkeys (table &optional (db *sqlite-db*))
"Return the list of indexes attached to TABLE."
(let* ((table-name (table-source-name table))
(sql (format nil "PRAGMA foreign_key_list(~a)" table-name)))
(loop
:with fkey-table := (make-hash-table)
:for (id seq ftable-name from to on-update on-delete match)
:in (sqlite:execute-to-list db sql)
:do (let* ((ftable (find-table (table-schema table) ftable-name))
(fkey (or (gethash id fkey-table)
(let ((pg-fkey
(make-pgsql-fkey :table table
:columns nil
:foreign-table ftable
:foreign-columns nil
:update-rule on-update
:delete-rule on-delete)))
(setf (gethash id fkey-table) pg-fkey)
(add-fkey table pg-fkey)
pg-fkey))))
(push-to-end from (pgsql-fkey-columns fkey))
(push-to-end to (pgsql-fkey-foreign-columns fkey))))))
(defun list-all-fkeys (schema &key (db *sqlite-db*))
"Get the list of SQLite foreign keys definitions per table."
(loop :for table :in (schema-table-list schema)
:do (list-fkeys table db)))

View File

@ -93,12 +93,12 @@
&key
materialize-views
only-tables
create-indexes
foreign-keys
(create-indexes t)
(foreign-keys t)
including
excluding)
"SQLite introspection to prepare the migration."
(declare (ignore materialize-views only-tables foreign-keys))
(declare (ignore materialize-views only-tables))
(let ((schema (add-schema catalog nil)))
(with-stats-collection ("fetch meta data"
:use-result-as-rows t
@ -112,10 +112,10 @@
:excluding excluding)
(when create-indexes
(list-all-indexes schema
:db *sqlite-db*
:including including
:excluding excluding)))
(list-all-indexes schema :db *sqlite-db*))
(when foreign-keys
(list-all-fkeys schema :db *sqlite-db*)))
;; return how many objects we're going to deal with in total
;; for stats collection

View File

@ -5,7 +5,7 @@ load database
-- including only table names like 'Invoice%'
with workers = 4,
concurrency = 1,
include drop, create tables, create indexes, reset sequences
concurrency = 2,
include drop, create tables, create indexes, reset sequences, foreign keys
set work_mem to '16MB', maintenance_work_mem to '512 MB';