Add support for PostgreSQL as a source database.

It's now possible to use pgloader to migrate from PostgreSQL to PostgreSQL.
That might be useful for several reasons, including applying user defined
cast rules at COPY time, or just moving from an hosted solution to another.
This commit is contained in:
Dimitri Fontaine 2018-08-20 11:09:52 +02:00
parent 1ee389d121
commit fc3a1949f7
17 changed files with 639 additions and 61 deletions

View File

@ -182,7 +182,13 @@
;; :depends-on ("mysql-schema")) ;; :depends-on ("mysql-schema"))
(:file "mysql" (:file "mysql"
:depends-on ("mysql-cast-rules" :depends-on ("mysql-cast-rules"
"mysql-schema")))))) "mysql-schema"))))
(:module "pgsql"
:serial t
:depends-on ("common")
:components ((:file "pgsql-cast-rules")
(:file "pgsql")))))
;; package pgloader.copy ;; package pgloader.copy
(:module "pg-copy" (:module "pg-copy"
@ -247,6 +253,7 @@
(:file "command-including-like") (:file "command-including-like")
(:file "command-mssql") (:file "command-mssql")
(:file "command-sqlite") (:file "command-sqlite")
(:file "command-pgsql")
(:file "command-archive") (:file "command-archive")
(:file "command-parser") (:file "command-parser")
(:file "parse-sqlite-type-name") (:file "parse-sqlite-type-name")

View File

@ -46,6 +46,12 @@
(with-stats-collection ("Create SQL Types" :section :pre (with-stats-collection ("Create SQL Types" :section :pre
:use-result-as-read t :use-result-as-read t
:use-result-as-rows t) :use-result-as-rows t)
;; some SQL types come from extensions (ip4r, hstore, etc)
(create-extensions catalog
:include-drop include-drop
:if-not-exists t
:client-min-messages :error)
(create-sqltypes catalog (create-sqltypes catalog
:include-drop include-drop :include-drop include-drop
:client-min-messages :error)) :client-min-messages :error))

View File

@ -49,8 +49,9 @@
#:catalog #:catalog
#:schema #:schema
#:table #:extension
#:sqltype #:sqltype
#:table
#:column #:column
#:index #:index
#:fkey #:fkey
@ -82,6 +83,8 @@
#:schema-source-name #:schema-source-name
#:schema-table-list #:schema-table-list
#:schema-view-list #:schema-view-list
#:schema-extension-list
#:schema-sqltype-list
#:schema-in-search-path #:schema-in-search-path
#:table-name #:table-name
@ -96,11 +99,15 @@
#:table-fkey-list #:table-fkey-list
#:table-trigger-list #:table-trigger-list
#:extension-name
#:extension-schema
#:sqltype-name #:sqltype-name
#:sqltype-schema #:sqltype-schema
#:sqltype-type #:sqltype-type
#:sqltype-source-def #:sqltype-source-def
#:sqltype-extra #:sqltype-extra
#:sqltype-extension
#:column-name #:column-name
#:column-type-name #:column-type-name
@ -110,6 +117,7 @@
#:column-comment #:column-comment
#:column-transform #:column-transform
#:column-extra #:column-extra
#:column-transform-default
#:index-name #:index-name
#:index-type #:index-type
@ -152,9 +160,15 @@
#:table-list #:table-list
#:view-list #:view-list
#:extension-list
#:sqltype-list
#:add-schema #:add-schema
#:find-schema #:find-schema
#:maybe-add-schema #:maybe-add-schema
#:add-extension
#:find-extension
#:maybe-add-extension
#:add-sqltype
#:add-table #:add-table
#:find-table #:find-table
#:maybe-add-table #:maybe-add-table
@ -389,6 +403,7 @@
#:truncate-tables #:truncate-tables
#:set-table-oids #:set-table-oids
#:create-extensions
#:create-sqltypes #:create-sqltypes
#:create-schemas #:create-schemas
#:add-to-search-path #:add-to-search-path
@ -417,6 +432,7 @@
#:process-index-definitions #:process-index-definitions
;; postgresql introspection queries ;; postgresql introspection queries
#:list-all-sqltypes
#:list-all-columns #:list-all-columns
#:list-all-indexes #:list-all-indexes
#:list-all-fkeys #:list-all-fkeys
@ -674,6 +690,14 @@
#:*mysql-default-cast-rules* #:*mysql-default-cast-rules*
#:with-mysql-connection)) #:with-mysql-connection))
(defpackage #:pgloader.source.pgsql
(:use #:cl
#:pgloader.params #:pgloader.utils #:pgloader.connection
#:pgloader.sources #:pgloader.pgsql #:pgloader.catalog)
(:import-from #:pgloader.transforms #:precision #:scale)
(:export #:copy-pgsql
#:*pgsql-default-cast-rules*))
(defpackage #:pgloader.source.sqlite (defpackage #:pgloader.source.sqlite
(:use #:cl (:use #:cl
#:pgloader.params #:pgloader.utils #:pgloader.connection #:pgloader.params #:pgloader.utils #:pgloader.connection
@ -763,6 +787,9 @@
(:import-from #:pgloader.source.copy (:import-from #:pgloader.source.copy
#:copy-copy #:copy-copy
#:copy-connection) #:copy-connection)
(:import-from #:pgloader.source.pgsql
#:copy-pgsql
#:*pgsql-default-cast-rules*)
(:import-from #:pgloader.source.mysql (:import-from #:pgloader.source.mysql
#:copy-mysql #:copy-mysql
#:mysql-connection #:mysql-connection

View File

@ -17,6 +17,7 @@
load-copy-file load-copy-file
load-dbf-file load-dbf-file
load-ixf-file load-ixf-file
load-pgsql-database
load-mysql-database load-mysql-database
load-mssql-database load-mssql-database
load-sqlite-database load-sqlite-database
@ -160,12 +161,12 @@
(declare (ignore abs paths no-path-p)) (declare (ignore abs paths no-path-p))
(let ((dotted-parts (reverse (sq:split-sequence #\. filename)))) (let ((dotted-parts (reverse (sq:split-sequence #\. filename))))
(when (<= 2 (length dotted-parts)) (when (<= 2 (length dotted-parts))
(destructuring-bind (extension name-or-ext &rest parts) (destructuring-bind (ext name-or-ext &rest parts)
dotted-parts dotted-parts
(declare (ignore parts)) (declare (ignore parts))
(if (string-equal "tar" name-or-ext) :archive (if (string-equal "tar" name-or-ext) :archive
(loop :for (type . extensions) :in *data-source-filename-extensions* (loop :for (type . extensions) :in *data-source-filename-extensions*
:when (member extension extensions :test #'string-equal) :when (member ext extensions :test #'string-equal)
:return type))))))) :return type)))))))
(defvar *parse-rule-for-source-types* (defvar *parse-rule-for-source-types*
@ -266,6 +267,7 @@
(:dbf 'dbf-option) (:dbf 'dbf-option)
(:ixf 'ixf-option) (:ixf 'ixf-option)
(:sqlite 'sqlite-option) (:sqlite 'sqlite-option)
(:pgsql 'pgsql-option)
(:mysql 'mysql-option) (:mysql 'mysql-option)
(:mssql 'mysql-option)) (:mssql 'mysql-option))
option)))) option))))

View File

@ -0,0 +1,159 @@
;;;
;;; Parse the pgloader commands grammar
;;;
(in-package :pgloader.parser)
;;;
;;; PostgreSQL options
;;;
(defrule pgsql-option (or option-on-error-stop
option-on-error-resume-next
option-workers
option-concurrency
option-batch-rows
option-batch-size
option-prefetch-rows
option-max-parallel-create-index
option-reindex
option-truncate
option-disable-triggers
option-data-only
option-schema-only
option-include-drop
option-drop-schema
option-create-tables
option-create-indexes
option-index-names
option-reset-sequences
option-foreign-keys
option-identifiers-case))
(defrule pgsql-options (and kw-with
(and pgsql-option (* (and comma pgsql-option))))
(:function flatten-option-list))
;;;
;;; Including only some tables or excluding some others
;;;
(defrule including-matching-in-schema-filter
(and kw-including kw-only kw-table kw-names kw-matching filter-list-matching
kw-in kw-schema quoted-namestring)
(:lambda (source)
(bind (((_ _ _ _ _ filter-list _ _ schema) source))
(cons schema filter-list))))
(defrule including-matching-in-schema
(and including-in-schema (* including-in-schema))
(:lambda (source)
(destructuring-bind (inc1 incs) source
(cons :including (list* inc1 incs)))))
(defrule excluding-matching-in-schema-filter
(and kw-excluding kw-table kw-names kw-matching filter-list-matching
kw-in kw-schema quoted-namestring)
(:lambda (source)
(bind (((_ _ _ _ filter-list _ _ schema) source))
(cons schema filter-list))))
(defrule excluding-matching-in-schema
(and excluding-in-schema (* excluding-in-schema))
(:lambda (source)
(destructuring-bind (excl1 excls) source
(cons :excluding (list* excl1 excls)))))
;;;
;;; Allow clauses to appear in any order
;;;
(defrule load-pgsql-optional-clauses (* (or pgsql-options
gucs
casts
alter-table
alter-schema
materialize-views
including-matching-in-schema
excluding-matching-in-schema
decoding-tables-as
before-load
after-load))
(:lambda (clauses-list)
(alexandria:alist-plist clauses-list)))
(defrule pgsql-source (and kw-load kw-database kw-from pgsql-uri)
(:lambda (source) (bind (((_ _ _ uri) source)) uri)))
(defrule load-pgsql-command (and pgsql-source target
load-pgsql-optional-clauses)
(:lambda (command)
(destructuring-bind (source target clauses) command
`(,source ,target ,@clauses))))
;;; LOAD DATABASE FROM pgsql://
(defun lisp-code-for-pgsql-dry-run (pg-src-db-conn pg-dst-db-conn)
`(lambda ()
(log-message :log "DRY RUN, only checking connections.")
(check-connection ,pg-src-db-conn)
(check-connection ,pg-dst-db-conn)))
(defun lisp-code-for-loading-from-pgsql (pg-src-db-conn pg-dst-db-conn
&key
gucs
casts before after options
alter-table alter-schema
((:including incl))
((:excluding excl))
((:decoding decoding-as))
&allow-other-keys)
`(lambda ()
(let* ((*default-cast-rules* ',*pgsql-default-cast-rules*)
(*cast-rules* ',casts)
(*identifier-case* :quote)
(on-error-stop (getf ',options :on-error-stop t))
,@(pgsql-connection-bindings pg-dst-db-conn gucs)
,@(batch-control-bindings options)
(source
(make-instance 'copy-pgsql
:target-db ,pg-dst-db-conn
:source-db ,pg-src-db-conn)))
,(sql-code-block pg-dst-db-conn :pre before "before load")
(copy-database source
:including ',incl
:excluding ',excl
:alter-table ',alter-table
:alter-schema ',alter-schema
:index-names :preserve
:set-table-oids t
:on-error-stop on-error-stop
,@(remove-batch-control-option options))
,(sql-code-block pg-dst-db-conn :post after "after load"))))
(defrule load-pgsql-database load-pgsql-command
(:lambda (source)
(destructuring-bind (pg-src-db-uri
pg-dst-db-uri
&key
gucs casts before after options
alter-table alter-schema
including excluding decoding)
source
(cond (*dry-run*
(lisp-code-for-pgsql-dry-run pg-src-db-uri pg-dst-db-uri))
(t
(lisp-code-for-loading-from-pgsql pg-src-db-uri pg-dst-db-uri
:gucs gucs
:casts casts
:before before
:after after
:options options
:alter-table alter-table
:alter-schema alter-schema
:including including
:excluding excluding
:decoding decoding))))))

View File

@ -13,17 +13,7 @@
include-drop include-drop
(client-min-messages :notice)) (client-min-messages :notice))
"Create the needed data types for given CATALOG." "Create the needed data types for given CATALOG."
(let ((sqltype-list)) (let ((sqltype-list (sqltype-list catalog)))
;; build the sqltype list
(loop :for table :in (append (table-list catalog)
(view-list catalog))
:do (loop :for column :in (table-column-list table)
:do (when (typep (column-type-name column) 'sqltype)
(pushnew (column-type-name column) sqltype-list
:test #'string-equal
:key #'sqltype-name))))
;; now create the types
(loop :for sqltype :in sqltype-list (loop :for sqltype :in sqltype-list
:when include-drop :when include-drop
:count t :count t
@ -114,6 +104,19 @@
:log-level log-level :log-level log-level
:client-min-messages client-min-messages))))) :client-min-messages client-min-messages)))))
(defun create-extensions (catalog
&key
if-not-exists
include-drop
(client-min-messages :notice))
"Create all extensions from the given database CATALOG."
(let ((sql
(loop :for extension :in (extension-list catalog)
:when include-drop
:collect (format-drop-sql extension :if-exists t :cascade t)
:collect (format-create-sql extension :if-not-exists if-not-exists))))
(pgsql-execute sql :client-min-messages client-min-messages)))
(defun create-tables (catalog (defun create-tables (catalog
&key &key
if-not-exists if-not-exists

View File

@ -38,6 +38,25 @@
(sqltype-name sqltype) (sqltype-name sqltype)
cascade)) cascade))
;;;
;;; Extensions
;;;
(defmethod format-create-sql ((extension extension)
&key (stream nil) if-not-exists)
(format stream "CREATE EXTENSION~:[~; IF NOT EXISTS~] ~a WITH SCHEMA ~a;"
if-not-exists
(extension-name extension)
(schema-name (extension-schema extension))))
(defmethod format-drop-sql ((extension extension)
&key (stream nil) cascade if-exists)
(format stream "DROP EXTENSION~:[~; IF EXISTS~] ~a~@[ CASCADE~];"
if-exists
(extension-name extension)
cascade))
;;; ;;;
;;; Tables ;;; Tables
@ -126,26 +145,30 @@
"Common normalized default values and their PostgreSQL spelling.") "Common normalized default values and their PostgreSQL spelling.")
(defmethod format-default-value ((column column) &key (stream nil)) (defmethod format-default-value ((column column) &key (stream nil))
(let* ((default (column-default column)) (if (column-transform-default column)
(clean-default (cdr (assoc default *pgsql-default-values*))) (let* ((default (column-default column))
(transform (column-transform column))) (clean-default (cdr (assoc default *pgsql-default-values*)))
(or clean-default (transform (column-transform column)))
(if transform (or clean-default
(let* ((transformed-default (if transform
(handler-case (let* ((transformed-default
(funcall transform default) (handler-case
(condition (c) (funcall transform default)
(log-message :warning (condition (c)
"Failed to transform default value ~s: ~a" (log-message :warning
default c) "Failed to transform default value ~s: ~a"
;; can't transform: return nil default c)
nil))) ;; can't transform: return nil
(transformed-column nil)))
(make-column :default transformed-default))) (transformed-column
(format-default-value transformed-column)) (make-column :default transformed-default)))
(if default (format-default-value transformed-column))
(ensure-quoted default #\') (if default
(format stream "NULL")))))) (ensure-quoted default #\')
(format stream "NULL")))))
;; else, when column-transform-default is nil:
(column-default column)))
;;; ;;;

View File

@ -19,6 +19,10 @@
(t (t
including)))) including))))
(list-all-sqltypes catalog
:including including
:excluding excluding)
(list-all-columns catalog (list-all-columns catalog
:table-type :table :table-type :table
:including including :including including
@ -116,18 +120,34 @@
"Associate internal table type symbol with what's found in PostgreSQL "Associate internal table type symbol with what's found in PostgreSQL
pg_class.relkind column.") pg_class.relkind column.")
(defun filter-list-to-where-clause (filter-list (defun filter-list-to-where-clause (schema-filter-list
&optional &optional
not not
(schema-col "table_schema") (schema-col "table_schema")
(table-col "table_name")) (table-col "table_name"))
"Given an INCLUDING or EXCLUDING clause, turn it into a PostgreSQL WHERE "Given an INCLUDING or EXCLUDING clause, turn it into a PostgreSQL WHERE
clause." clause."
(loop :for (schema . table-name-list) :in filter-list (loop :for (schema . filter-list) :in schema-filter-list
:append (mapcar (lambda (table-name) :append (mapcar (lambda (filter)
(format nil "(~a = '~a' and ~a ~:[~;NOT ~]~~ '~a')" (typecase filter
schema-col schema table-col not table-name)) (string-match-rule
table-name-list))) (format nil "(~a = '~a' and ~a ~:[~;!~]= '~a')"
schema-col
schema
table-col
not
(string-match-rule-target filter)))
(regex-match-rule
(format nil "(~a = '~a' and ~a ~:[~;NOT ~]~~ '~a')"
schema-col
schema
table-col
not
(regex-match-rule-target filter)))))
filter-list)))
(defun normalize-extra (extra)
(cond ((string= "auto_increment" extra) :auto-increment)))
(defun list-all-columns (catalog (defun list-all-columns (catalog
&key &key
@ -137,7 +157,8 @@
&aux &aux
(table-type-name (cdr (assoc table-type *table-type*)))) (table-type-name (cdr (assoc table-type *table-type*))))
"Get the list of PostgreSQL column names per table." "Get the list of PostgreSQL column names per table."
(loop :for (schema-name table-name table-oid name type typmod notnull default) (loop :for (schema-name table-name table-oid
name type typmod notnull default extra)
:in :in
(query nil (query nil
(format nil (format nil
@ -160,7 +181,9 @@
:type-name type :type-name type
:type-mod typmod :type-mod typmod
:nullable (not notnull) :nullable (not notnull)
:default default))) :default default
:transform-default nil
:extra (normalize-extra extra))))
(add-field table field)) (add-field table field))
:finally (return catalog))) :finally (return catalog)))
@ -187,7 +210,7 @@
(tschema (find-schema catalog table-schema)) (tschema (find-schema catalog table-schema))
(table (find-table tschema table-name)) (table (find-table tschema table-name))
(pg-index (pg-index
(make-index :name name (make-index :name (ensure-quoted name)
:oid oid :oid oid
:schema schema :schema schema
:table table :table table
@ -195,8 +218,10 @@
:unique unique :unique unique
:columns nil :columns nil
:sql sql :sql sql
:conname (unless (eq :null conname) conname) :conname (unless (eq :null conname)
:condef (unless (eq :null condef) condef)))) (ensure-quoted conname))
:condef (unless (eq :null condef)
condef))))
(maybe-add-index table name pg-index :key #'index-name)) (maybe-add-index table name pg-index :key #'index-name))
:finally (return catalog))) :finally (return catalog)))
@ -247,7 +272,7 @@
(fschema (find-schema catalog fschema-name)) (fschema (find-schema catalog fschema-name))
(ftable (find-table fschema ftable-name)) (ftable (find-table fschema ftable-name))
(fk (fk
(make-fkey :name conname (make-fkey :name (ensure-quoted conname)
:oid conoid :oid conoid
:condef condef :condef condef
:table table :table table
@ -355,3 +380,44 @@
(sql "/pgsql/list-table-oids-from-temp-table.sql")))) (sql "/pgsql/list-table-oids-from-temp-table.sql"))))
:do (setf (gethash name oidmap) oid))) :do (setf (gethash name oidmap) oid)))
oidmap)) oidmap))
;;;
;;; PostgreSQL specific support for extensions and user defined data types.
;;;
(defun list-all-sqltypes (catalog &key including excluding)
"Set the catalog's schema extension list and sqltype list"
(loop :for (schema-name extension-name type-name enum-values)
:in (query nil
(format nil
(sql "/pgsql/list-all-sqltypes.sql")
including ; do we print the clause?
(filter-list-to-where-clause including
nil
"n.nspname"
"c.relname")
excluding ; do we print the clause?
(filter-list-to-where-clause excluding
nil
"n.nspname"
"c.relname")))
:do
(let* ((schema (maybe-add-schema catalog schema-name))
(sqltype
(make-sqltype :name (ensure-quoted type-name)
:schema schema
:type (when enum-values :enum)
:extra (when (and enum-values
(not (eq enum-values :null)))
(coerce enum-values 'list)))))
(if (and extension-name (not (eq :null extension-name)))
;; then create extension will create the type
(maybe-add-extension schema extension-name)
;; only create a specific entry for types that we need to create
;; ourselves, when extension is not null "create extension" is
;; going to take care of creating the type.
(add-sqltype schema sqltype)))
:finally (return catalog)))

View File

@ -3,17 +3,37 @@
-- filter-list-to-where-clause for including -- filter-list-to-where-clause for including
-- excluding -- excluding
-- filter-list-to-where-clause for excluding -- filter-list-to-where-clause for excluding
with seqattr as
(
select adrelid,
adnum,
adsrc,
case when adsrc ~~ 'nextval'
then (regexp_match(pg_get_expr(d.adbin, d.adrelid),
'''([^'']+)''')
)[1]::regclass::oid
else null::oid
end as seqoid
from pg_attrdef d
)
select nspname, relname, c.oid, attname, select nspname, relname, c.oid, attname,
t.oid::regtype as type, t.oid::regtype as type,
case when atttypmod > 0 then atttypmod - 4 else null end as typmod, case when atttypmod > 0
then substring(format_type(t.oid, atttypmod) from '\d+(?:,\d+)?')
else null
end as typmod,
attnotnull, attnotnull,
case when atthasdef then def.adsrc end as default case when atthasdef then def.adsrc end as default,
case when s.seqoid is not null then 'auto_increment' end as extra
from pg_class c from pg_class c
join pg_namespace n on n.oid = c.relnamespace join pg_namespace n on n.oid = c.relnamespace
left join pg_attribute a on c.oid = a.attrelid left join pg_attribute a on c.oid = a.attrelid
join pg_type t on t.oid = a.atttypid and attnum > 0 join pg_type t on t.oid = a.atttypid and attnum > 0
left join pg_attrdef def on a.attrelid = def.adrelid left join pg_attrdef def on a.attrelid = def.adrelid
and a.attnum = def.adnum and a.attnum = def.adnum
and a.atthasdef
left join seqattr s on def.adrelid = s.adrelid
and def.adnum = s.adnum
where nspname !~~ '^pg_' and n.nspname <> 'information_schema' where nspname !~~ '^pg_' and n.nspname <> 'information_schema'
and relkind in (~{'~a'~^, ~}) and relkind in (~{'~a'~^, ~})

View File

@ -0,0 +1,4 @@
select nspname, extname
from pg_extension e
join pg_namespace n on n.oid = e.extnamespace
where nspname !~ '^pg_';

View File

@ -0,0 +1,43 @@
--
-- get user defined SQL types
--
select nt.nspname,
extname,
typname,
case when enum.enumtypid is not null
then array_agg(enum.enumlabel order by enumsortorder)
end as enumvalues
from pg_class c
join pg_namespace n on n.oid = c.relnamespace
left join pg_attribute a on c.oid = a.attrelid and a.attnum > 0
join pg_type t on t.oid = a.atttypid
left join pg_namespace nt on nt.oid = t.typnamespace
left join pg_depend d on d.classid = 'pg_type'::regclass
and d.refclassid = 'pg_extension'::regclass
and d.objid = t.oid
left join pg_extension e on refobjid = e.oid
left join pg_enum enum on enum.enumtypid = t.oid
where nt.nspname !~~ '^pg_' and nt.nspname <> 'information_schema'
and n.nspname !~~ '^pg_' and n.nspname <> 'information_schema'
and c.relkind in ('r', 'f', 'p')
~:[~*~;and (~{~a~^~&~10t or ~})~]
~:[~*~;and (~{~a~^~&~10t and ~})~]
and
( t.typrelid = 0
or
(select c.relkind = 'c'
from pg_class c
where c.oid = t.typrelid)
)
and not exists
(
select 1
from pg_type el
where el.oid = t.typelem
and el.typarray = t.oid
)
group by nt.nspname, extname, typname, enumtypid
order by nt.nspname, extname, typname, enumtypid;

View File

@ -0,0 +1,48 @@
;;;
;;; Tools to handle PostgreSQL data type casting rules
;;;
(in-package :pgloader.source.pgsql)
(defparameter *pgsql-default-cast-rules*
'((:source (:type "integer" :auto-increment t)
:target (:type "serial" :drop-default t))
(:source (:type "bigint" :auto-increment t)
:target (:type "bigserial" :drop-default t)))
"Data Type Casting to migrate from PostgtreSQL to PostgreSQL")
(defmethod pgsql-column-ctype ((column column))
"Build the ctype definition from the PostgreSQL column information."
(let ((type-name (column-type-name column))
(type-mod (unless (or (null (column-type-mod column))
(eq :null (column-type-mod column)))
(column-type-mod column))))
(format nil "~a~@[(~a)~]" type-name type-mod)))
(defmethod cast ((field column) &key &allow-other-keys)
"Return the PostgreSQL type definition from the given PostgreSQL column
definition"
(with-slots (pgloader.catalog::name
pgloader.catalog::type-name
pgloader.catalog::type-mod
pgloader.catalog::nullable
pgloader.catalog::default
pgloader.catalog::comment
pgloader.catalog::transform
pgloader.catalog::extra)
field
(let* ((ctype (pgsql-column-ctype field))
(pgcol (apply-casting-rules nil
pgloader.catalog::name
pgloader.catalog::type-name
ctype
pgloader.catalog::default
pgloader.catalog::nullable
pgloader.catalog::extra)))
;; re-install our instruction not to transform default value: it comes
;; from PostgreSQL, and we trust it.
(setf (column-transform-default pgcol)
(column-transform-default field))
pgcol)))

View File

@ -0,0 +1,90 @@
;;;
;;; Read from a PostgreSQL database.
;;;
(in-package :pgloader.source.pgsql)
(defclass copy-pgsql (db-copy) ()
(:documentation "pgloader PostgreSQL Data Source"))
(defmethod initialize-instance :after ((source copy-pgsql) &key)
"Add a default value for transforms in case it's not been provided."
(let* ((transforms (when (slot-boundp source 'transforms)
(slot-value source 'transforms))))
(when (and (slot-boundp source 'fields) (slot-value source 'fields))
;; cast typically happens in copy-database in the schema structure,
;; and the result is then copied into the copy-mysql instance.
(unless (and (slot-boundp source 'columns) (slot-value source 'columns))
(setf (slot-value source 'columns)
(mapcar #'cast (slot-value source 'fields))))
(unless transforms
(setf (slot-value source 'transforms)
(mapcar #'column-transform (slot-value source 'columns)))))))
(defmethod map-rows ((pgsql copy-pgsql) &key process-row-fn)
"Extract PostgreSQL data and call PROCESS-ROW-FN function with a single
argument (a list of column values) for each row"
(let ((map-reader
;;
;; Build a Postmodern row reader that prepares a vector of strings
;; and call PROCESS-ROW-FN with the vector as single argument.
;;
(cl-postgres:row-reader (fields)
(let ((nb-cols (length fields)))
(loop :while (cl-postgres:next-row)
:do (let ((row (make-array nb-cols)))
(loop :for i :from 0
:for field :across fields
:do (setf (aref row i)
(cl-postgres:next-field field)))
(funcall process-row-fn row)))))))
(with-pgsql-connection ((source-db pgsql))
(let* ((cols (mapcar #'column-name (fields pgsql)))
(sql
(format nil "SELECT ~{~s::text~^, ~} FROM ~s.~s" cols
(schema-source-name (table-schema (source pgsql)))
(table-source-name (source pgsql)))))
(cl-postgres:exec-query pomo:*database* sql map-reader)))))
(defmethod fetch-metadata ((pgsql copy-pgsql)
(catalog catalog)
&key
materialize-views
only-tables
create-indexes
foreign-keys
including
excluding)
"PostgreSQL introspection to prepare the migration."
(declare (ignore materialize-views only-tables))
(with-stats-collection ("fetch meta data"
:use-result-as-rows t
:use-result-as-read t
:section :pre)
(with-pgsql-transaction (:pgconn (source-db pgsql))
(list-all-sqltypes catalog
:including including
:excluding excluding)
(list-all-columns catalog
:including including
:excluding excluding)
(when create-indexes
(list-all-indexes catalog
:including including
:excluding excluding))
(when foreign-keys
(list-all-fkeys catalog
:including including
:excluding excluding))
;; return how many objects we're going to deal with in total
;; for stats collection
(+ (count-tables catalog) (count-indexes catalog))))
;; be sure to return the catalog itself
catalog)

View File

@ -96,7 +96,7 @@
"Send the data in the SQLite column ordering." "Send the data in the SQLite column ordering."
(mapcar #'apply-identifier-case (mapcar #'coldef-name (fields sqlite)))) (mapcar #'apply-identifier-case (mapcar #'coldef-name (fields sqlite))))
(defmethod fetch-metadata (sqlite catalog (defmethod fetch-metadata ((sqlite copy-sqlite) (catalog catalog)
&key &key
materialize-views materialize-views
only-tables only-tables

View File

@ -43,25 +43,35 @@
;;; implemented in each source separately. ;;; implemented in each source separately.
;;; ;;;
(defstruct catalog name schema-list types-without-btree) (defstruct catalog name schema-list types-without-btree)
(defstruct schema source-name name catalog table-list view-list in-search-path)
(defstruct schema source-name name catalog in-search-path
table-list view-list extension-list sqltype-list)
(defstruct table source-name name schema oid comment storage-parameter-list (defstruct table source-name name schema oid comment storage-parameter-list
;; field is for SOURCE ;; field is for SOURCE
;; column is for TARGET ;; column is for TARGET
field-list column-list index-list fkey-list trigger-list) field-list column-list index-list fkey-list trigger-list)
;;;
;;; When migrating from PostgreSQL to PostgreSQL we might have to install
;;; extensions to have data type coverage.
;;;
(defstruct extension name schema)
;;; ;;;
;;; When migrating from another database to PostgreSQL some data types might ;;; When migrating from another database to PostgreSQL some data types might
;;; need to be tranformed dynamically into User Defined Types: ENUMs, SET, ;;; need to be tranformed dynamically into User Defined Types: ENUMs, SET,
;;; etc. ;;; etc.
;;; ;;;
(defstruct sqltype name schema type source-def extra) (defstruct sqltype name schema type source-def extra extension)
;;; ;;;
;;; The generic PostgreSQL column that the CAST generic function is asked to ;;; The generic PostgreSQL column that the CAST generic function is asked to
;;; produce, so that we know how to CREATE TABLEs in PostgreSQL whatever the ;;; produce, so that we know how to CREATE TABLEs in PostgreSQL whatever the
;;; source is. ;;; source is.
;;; ;;;
(defstruct column name type-name type-mod nullable default comment transform extra) (defstruct column name type-name type-mod nullable default comment
transform extra (transform-default t))
;;; ;;;
;;; Index and Foreign Keys ;;; Index and Foreign Keys
@ -94,13 +104,18 @@
;;; ;;;
;;; Main data collection API ;;; Main data collection API
;;; ;;;
(defgeneric add-schema (object schema-name &key)) (defgeneric add-schema (object schema-name &key))
(defgeneric add-table (object table-name &key)) (defgeneric add-extension (object extension-name &key))
(defgeneric add-view (object view-name &key)) (defgeneric add-table (object table-name &key))
(defgeneric add-column (object column &key)) (defgeneric add-view (object view-name &key))
(defgeneric add-index (object index &key)) (defgeneric add-sqltype (object column &key))
(defgeneric add-fkey (object fkey &key)) (defgeneric add-column (object column &key))
(defgeneric add-comment (object comment &key)) (defgeneric add-index (object index &key))
(defgeneric add-fkey (object fkey &key))
(defgeneric add-comment (object comment &key))
(defgeneric extension-list (object &key)
(:documentation "Return the list of extensions found in OBJECT."))
(defgeneric table-list (object &key) (defgeneric table-list (object &key)
(:documentation "Return the list of tables found in OBJECT.")) (:documentation "Return the list of tables found in OBJECT."))
@ -112,6 +127,10 @@
(:documentation (:documentation
"Find a schema by SCHEMA-NAME in a catalog OBJECT and return the schema")) "Find a schema by SCHEMA-NAME in a catalog OBJECT and return the schema"))
(defgeneric find-extension (object extension-name &key)
(:documentation
"Find an extension by EXTENSION-NAME in a schema OBJECT and return the table"))
(defgeneric find-table (object table-name &key) (defgeneric find-table (object table-name &key)
(:documentation (:documentation
"Find a table by TABLE-NAME in a schema OBJECT and return the table")) "Find a table by TABLE-NAME in a schema OBJECT and return the table"))
@ -131,6 +150,9 @@
(defgeneric maybe-add-schema (object schema-name &key) (defgeneric maybe-add-schema (object schema-name &key)
(:documentation "Add a new schema or return existing one.")) (:documentation "Add a new schema or return existing one."))
(defgeneric maybe-add-extension (object extension-name &key)
(:documentation "Add a new extension or return existing one."))
(defgeneric maybe-add-table (object table-name &key) (defgeneric maybe-add-table (object table-name &key)
(:documentation "Add a new table or return existing one.")) (:documentation "Add a new table or return existing one."))
@ -167,6 +189,35 @@
;;; ;;;
;;; Implementation of the methods ;;; Implementation of the methods
;;; ;;;
(defmethod extension-list ((schema schema) &key)
"Return the list of extensions for SCHEMA."
(schema-extension-list schema))
(defmethod extension-list ((catalog catalog) &key)
"Return the list of extensions for CATALOG."
(apply #'append (mapcar #'extension-list (catalog-schema-list catalog))))
(defmethod sqltype-list ((column column) &key)
"Return the list of sqltypes for SCHEMA."
(when (typep (column-type-name column) 'sqltype)
(column-type-name column)))
(defmethod sqltype-list ((table table) &key)
"Return the list of sqltypes for SCHEMA."
(apply #'append (mapcar #'sqltype-list (table-column-list table))))
(defmethod sqltype-list ((schema schema) &key)
"Return the list of sqltypes for SCHEMA."
(append (schema-sqltype-list schema)
(apply #'append
(mapcar #'sqltype-list (schema-table-list schema)))))
(defmethod sqltype-list ((catalog catalog) &key)
"Return the list of sqltypes for CATALOG."
(remove-duplicates
(apply #'append (mapcar #'sqltype-list (catalog-schema-list catalog)))
:test #'string-equal :key #'sqltype-name))
(defmethod table-list ((schema schema) &key) (defmethod table-list ((schema schema) &key)
"Return the list of tables for SCHEMA." "Return the list of tables for SCHEMA."
(schema-table-list schema)) (schema-table-list schema))
@ -212,6 +263,17 @@
:in-search-path in-search-path))) :in-search-path in-search-path)))
(push-to-end schema (catalog-schema-list catalog)))) (push-to-end schema (catalog-schema-list catalog))))
(defmethod add-extension ((schema schema) extension-name &key)
"Add EXTENSION-NAME to SCHEMA and return the new extension instance."
(let ((extension
(make-extension :name extension-name
:schema schema)))
(push-to-end extension (schema-extension-list schema))))
(defmethod add-sqltype ((schema schema) sqltype &key)
"Add SQLTYPE instance to SCHEMA and return SQLTYPE."
(push-to-end sqltype (schema-sqltype-list schema)))
(defmethod add-table ((schema schema) table-name &key comment oid) (defmethod add-table ((schema schema) table-name &key comment oid)
"Add TABLE-NAME to SCHEMA and return the new table instance." "Add TABLE-NAME to SCHEMA and return the new table instance."
(let ((table (let ((table
@ -238,6 +300,11 @@
(find schema-name (catalog-schema-list catalog) (find schema-name (catalog-schema-list catalog)
:key #'schema-source-name :test 'string=)) :key #'schema-source-name :test 'string=))
(defmethod find-extension ((schema schema) extension-name &key)
"Find EXTENSION-NAME in SCHEMA and return the EXTENSION object of this name."
(find extension-name (schema-extension-list schema)
:key #'extension-name :test 'string=))
(defmethod find-table ((schema schema) table-name &key) (defmethod find-table ((schema schema) table-name &key)
"Find TABLE-NAME in SCHEMA and return the TABLE object of this name." "Find TABLE-NAME in SCHEMA and return the TABLE object of this name."
(find table-name (schema-table-list schema) (find table-name (schema-table-list schema)
@ -254,6 +321,12 @@
(let ((schema (find-schema catalog schema-name))) (let ((schema (find-schema catalog schema-name)))
(or schema (add-schema catalog schema-name)))) (or schema (add-schema catalog schema-name))))
(defmethod maybe-add-extension ((schema schema) extension-name &key)
"Add TABLE-NAME to the table-list for SCHEMA, or return the existing table
of the same name if it already exists in the schema table-list."
(let ((extension (find-extension schema extension-name)))
(or extension (add-extension schema extension-name))))
(defmethod maybe-add-table ((schema schema) table-name &key comment oid) (defmethod maybe-add-table ((schema schema) table-name &key comment oid)
"Add TABLE-NAME to the table-list for SCHEMA, or return the existing table "Add TABLE-NAME to the table-list for SCHEMA, or return the existing table
of the same name if it already exists in the schema table-list." of the same name if it already exists in the schema table-list."

View File

@ -8,7 +8,8 @@
*/ */
LOAD ARCHIVE LOAD ARCHIVE
FROM http://pgsql.tapoueh.org/temp/foo.zip -- FROM http://pgsql.tapoueh.org/temp/foo.zip
FROM http://geolite.maxmind.com/download/geoip/database/GeoLiteCity_CSV/GeoLiteCity-latest.zip
INTO postgresql:///ip4r INTO postgresql:///ip4r
BEFORE LOAD BEFORE LOAD

6
test/pgsql-source.load Normal file
View File

@ -0,0 +1,6 @@
load database
from pgsql://localhost/pgloader
into pgsql://localhost/copy
-- including only table names matching 'bits', ~/utilisateur/ in schema 'mysql'
;