Assorted fixes to MS SQL support.

Having been given a test instance of a MS SQL database allows to quickly
fix a series of assorted bugs related to schema handling of MS SQL
databases. As it's the only source with a proper notion of schema that
pgloader supports currently, it's not a surprise we had them.

Fix #343. Fix #349. Fix #354.
This commit is contained in:
Dimitri Fontaine 2016-03-16 21:43:04 +01:00
parent c1fc4f0879
commit f1fe9ab702
8 changed files with 56 additions and 41 deletions

View File

@ -353,6 +353,7 @@
#:copy-from-file
#:copy-from-queue
#:reset-all-sequences
#:create-schemas
#:create-tables
#:create-views
#:format-pgsql-column

View File

@ -28,6 +28,7 @@
option-create-schemas
option-create-indexes
option-reset-sequences
option-foreign-keys
option-encoding
option-identifiers-case))

View File

@ -241,6 +241,11 @@
;; unless (eq attnum :NULL)
;; collect attnum)))))
(defun list-schemas ()
"Return the list of PostgreSQL schemas in the already established
PostgreSQL connection."
(pomo:query "SELECT nspname FROM pg_catalog.pg_namespace;" :column))
(defun list-tables-and-fkeys (&optional schema-name)
"Yet another table listing query."
(loop :for (relname fkeys) :in (pomo:query (format nil "

View File

@ -161,6 +161,28 @@
:do (pgsql-execute sql :client-min-messages client-min-messages)
:finally (return nb-tables)))
(defun create-schemas (catalog
&key
include-drop
(client-min-messages :notice))
"Create all schemas from the given database CATALOG."
(let ((schema-list (list-schemas)))
(when include-drop
;; if asked, first DROP the schema CASCADE.
(loop :for schema :in (catalog-schema-list catalog)
:for schema-name := (schema-name schema)
:when (member schema-name schema-list :test #'string=)
:do (let ((sql (format nil "DROP SCHEMA ~a CASCADE;" schema-name)))
(pgsql-execute sql :client-min-messages client-min-messages))))
;; now create the schemas (again?)
(loop :for schema :in (catalog-schema-list catalog)
:for schema-name := (schema-name schema)
:when (or include-drop
(not (member schema-name schema-list :test #'string=)))
:do (let ((sql (format nil "CREATE SCHEMA ~a;" (schema-name schema))))
(pgsql-execute sql :client-min-messages client-min-messages)))))
(defun create-tables (catalog
&key
if-not-exists

View File

@ -31,21 +31,22 @@
(drop-pgsql-fkeys catalog))
(when create-schemas
(loop :for schema :in (catalog-schema-list catalog)
:do (when create-schemas
(let ((sql (format nil "CREATE SCHEMA ~a;" schema)))
(pgsql-execute sql)))))
(log-message :debug "Create schemas")
(create-schemas catalog :include-drop include-drop))
(log-message :debug "Create tables")
(create-tables catalog :include-drop include-drop)
;; Some database sources allow the same index name being used
;; against several tables, so we add the PostgreSQL table OID in the
;; index name, to differenciate. Set the table oids now.
(when set-table-oids
(log-message :debug "Set table OIDs")
(set-table-oids catalog))
;; We might have to MATERIALIZE VIEWS
(when materialize-views
(log-message :debug "Create views for matview support")
(create-views catalog :include-drop include-drop)))))
(defmethod cleanup ((copy db-copy) (catalog catalog) &key materialize-views)
@ -206,14 +207,6 @@
(view-list catalog))
:do (let ((table-source (instanciate-table-copy-object copy table)))
;; that needs *print-circle* to true, and anyway it's too much
;; output in general.
;;
;; (log-message :debug "TARGET: ~a" (target table-source))
(log-message :debug "TRANSFORMS(~a): ~s"
(format-table-name table)
(transforms table-source))
;; first COPY the data from source to PostgreSQL, using copy-kernel
(unless schema-only
;; prepare the writers-count hash-table, as we start

View File

@ -9,8 +9,12 @@
(:source (:type "nchar") :target (:type "text" :drop-typemod t))
(:source (:type "varchar") :target (:type "text" :drop-typemod t))
(:source (:type "nvarchar") :target (:type "text" :drop-typemod t))
(:source (:type "ntext") :target (:type "text" :drop-typemod t))
(:source (:type "xml") :target (:type "text" :drop-typemod t))
(:source (:type "int" :auto-increment t)
:target (:type "bigserial" :drop-default t))
(:source (:type "bit") :target (:type "boolean")
:using pgloader.transforms::sql-server-bit-to-boolean)
@ -80,11 +84,7 @@
(defmethod mssql-column-ctype ((col mssql-column))
"Build the ctype definition from the full mssql-column information."
(let ((type (mssql-column-type col)))
(cond ((and (string= type "int")
(mssql-column-identity col))
"bigserial")
((member type '("float" "real") :test #'string=)
(cond ((member type '("float" "real") :test #'string=)
;; see https://msdn.microsoft.com/en-us/library/ms173773.aspx
;; scale is supposed to be nil, and useless in PostgreSQL, so we
;; just ignore it
@ -108,12 +108,9 @@
field
(declare (ignore schema)) ; FIXME
(let* ((ctype (mssql-column-ctype field))
(extra (when (mssql-column-identity field) "auto_increment"))
(pgcol
(apply-casting-rules table-name name type ctype
;; drop default value (forcing it to nil
;; here) for serial data types
(unless (string= "bigserial" ctype) default)
nullable nil)))
(apply-casting-rules table-name name type ctype default nullable extra)))
;; the MS SQL driver smartly maps data to the proper CL type, but the
;; pgloader API only wants to see text representations to send down the
;; COPY protocol.

View File

@ -29,12 +29,10 @@
"Extract Mssql data and call PROCESS-ROW-FN function with a single
argument (a list of column values) for each row."
(with-connection (*mssql-db* (source-db mssql))
(let* ((sql (destructuring-bind (schema . table-name)
(source mssql)
(format nil "SELECT ~{~a~^, ~} FROM [~a].[~a];"
(get-column-list (fields mssql))
schema
table-name))))
(let* ((sql (format nil "SELECT ~{~a~^, ~} FROM [~a].[~a];"
(get-column-list (fields mssql))
(table-schema (source mssql))
(table-name (source mssql)))))
(log-message :debug "~a" sql)
(handler-case
(handler-bind

View File

@ -11,7 +11,10 @@
(in-package :pgloader.schema)
(defmacro push-to-end (item place)
`(setf ,place (nconc ,place (list ,item))))
`(progn
(setf ,place (nconc ,place (list ,item)))
;; and return the item we just pushed at the end of the place
,item))
;;;
;;; TODO: stop using anonymous data structures for database catalogs,
@ -148,8 +151,7 @@
(let ((schema (make-schema :source-name schema-name
:name (when schema-name
(apply-identifier-case schema-name)))))
(push-to-end schema (catalog-schema-list catalog))
schema))
(push-to-end schema (catalog-schema-list catalog))))
(defmethod add-table ((schema schema) table-name &key comment)
"Add TABLE-NAME to SCHEMA and return the new table instance."
@ -159,8 +161,7 @@
:schema (schema-name schema)
:comment (unless (or (null comment) (string= "" comment))
comment))))
(push-to-end table (schema-table-list schema))
table))
(push-to-end table (schema-table-list schema))))
(defmethod add-view ((schema schema) view-name &key comment)
"Add TABLE-NAME to SCHEMA and return the new table instance."
@ -170,8 +171,7 @@
:schema (schema-name schema)
:comment (unless (or (null comment) (string= "" comment))
comment))))
(push-to-end view (schema-view-list schema))
view))
(push-to-end view (schema-view-list schema))))
(defmethod find-schema ((catalog catalog) schema-name &key)
"Find SCHEMA-NAME in CATALOG and return the SCHEMA object of this name."
@ -208,13 +208,11 @@
(defmethod add-field ((table table) field &key)
"Add COLUMN to TABLE and return the TABLE."
(push-to-end field (table-field-list table))
table)
(push-to-end field (table-field-list table)))
(defmethod add-column ((table table) column &key)
"Add COLUMN to TABLE and return the TABLE."
(push-to-end column (table-column-list table))
table)
(push-to-end column (table-column-list table)))
(defmethod cast ((table table))
"Cast all fields in table into columns."
@ -253,8 +251,8 @@
(defmethod maybe-add-fkey ((table table) fkey-name fkey &key key (test #'string=))
"Add the foreign key FKEY to the table-fkey-list of TABLE unless it
already exists, and return the FKEY object."
(let ((fkey (find-fkey table fkey-name :key key :test test)))
(or fkey (add-fkey table fkey))))
(let ((current-fkey (find-fkey table fkey-name :key key :test test)))
(or current-fkey (add-fkey table fkey))))
;;;