Improve our internal catalog representation.

First, add index and foreign keys to the list of objects supported by
the shared catalog facility, where is was only found in the pgsql schema
specific package for historical raisons.

Then also add to our catalog internal structures the notion of a trigger
and a stored procedure, allowing for cleaner advanced default values
support in the MySQL cast functions.

Once we now have a proper and complete catalog, review the pgsql module
DDL output function in terms of the catalog and rewrite the schema
creation support so that it takes direct benefit of our internal
catalogs representation.

In passing, clean-up the code organisation of the pgsql target support
module to be easier to work with.

Next step consists of getting rid of src/pgsql/queries.lisp: this
facility should be replaced by the usage of a target catalog that we
fetch the usual way, thanks to the new src/pgsql/pgsql-schema.lisp file
and list-all-* functions.

That will in turn allow for an explicit step of merging the pre-existing
PostgreSQL catalog when it's been created by other tools than pgloader,
that is when migrating with the help of an ORM. See #400 for details.
This commit is contained in:
Dimitri Fontaine 2016-08-01 23:14:58 +02:00
parent 87f6d3a0a0
commit 2aedac7037
24 changed files with 1535 additions and 1122 deletions

View File

@ -64,23 +64,23 @@
;; PostgreSQL related utils
(:file "read-sql-files")
(:file "quoting")
(:file "schema-structs" :depends-on ("quoting"))
(:file "alter-table" :depends-on ("schema-structs"))
(:file "catalog" :depends-on ("quoting"))
(:file "alter-table" :depends-on ("catalog"))
;; State, monitoring, reporting
(:file "reject" :depends-on ("state"))
(:file "report" :depends-on ("state"
"utils"
"schema-structs"))
"catalog"))
(:file "monitor" :depends-on ("logs"
"state"
"reject"
"report"))
(:file "archive" :depends-on ("logs"))
(:file "pg-format-column" :depends-on ("schema-structs"
"monitor"
"state"))
;; (:file "pg-format-column" :depends-on ("catalog"
;; "monitor"
;; "state"))
;; generic connection api
(:file "connection" :depends-on ("archive"))))
@ -88,12 +88,20 @@
;; package pgloader.pgsql
(:module pgsql
:depends-on ("package" "params" "utils")
:serial t
:components
((:file "copy-format")
(:file "connection")
(:file "pgsql-ddl")
(:file "pgsql-schema")
(:file "pgsql-trigger")
(:file "pgsql-index-filter")
(:file "queries")
(:file "schema")
(:file "pgsql"
(:file "schema" :depends-on ("pgsql-trigger"))
(:file "retry-batch")
(:file "copy-from-queue"
:depends-on ("copy-format"
"retry-batch"
"queries"
"schema"))))

View File

@ -2,6 +2,13 @@
;;;
;;; To avoid circular files dependencies, define all the packages here
;;;
(eval-when (:compile-toplevel :load-toplevel :execute)
(defun cl-user::export-inherited-symbols (source target)
(let ((pkg-source (find-package (string-upcase source)))
(pkg-target (find-package (string-upcase target))))
(do-external-symbols (s pkg-source)
(export s pkg-target)))))
(defpackage #:pgloader.transforms
(:use #:cl)
(:export #:precision
@ -27,15 +34,21 @@
(:use #:cl #:pgloader.params)
(:export #:apply-identifier-case))
(defpackage #:pgloader.schema
(defpackage #:pgloader.catalog
(:use #:cl #:pgloader.params #:pgloader.quoting)
(:export #:catalog
(:export #:format-create-sql
#:format-drop-sql
#:format-default-value
#:catalog
#:schema
#:table
#:sqltype
#:column
#:index
#:index
#:fkey
#:trigger
#:procedure
#:cast ; generic function for sources
@ -46,17 +59,22 @@
#:make-table
#:create-table
#:make-view
#:make-sqltype
#:make-column
#:make-index
#:make-index
#:make-fkey
#:make-trigger
#:make-procedure
#:catalog-name
#:catalog-schema-list
#:schema-name
#:schema-catalog
#:schema-source-name
#:schema-table-list
#:schema-view-list
#:table-name
#:table-source-name
#:table-schema
@ -66,6 +84,13 @@
#:table-column-list
#:table-index-list
#:table-fkey-list
#:table-trigger-list
#:sqltype-name
#:sqltype-type
#:sqltype-source-def
#:sqltype-extra
#:column-name
#:column-type-name
#:column-type-mod
@ -73,6 +98,41 @@
#:column-default
#:column-comment
#:column-transform
#:column-extra
#:index-name
#:index-schema
#:index-table
#:index-primary
#:index-unique
#:index-columns
#:index-sql
#:index-conname
#:index-condef
#:index-filter
#:fkey-name
#:fkey-foreign-table
#:fkey-foreign-columns
#:fkey-table
#:fkey-columns
#:fkey-condef
#:fkey-update-rule
#:fkey-delete-rule
#:fkey-match-rule
#:fkey-deferrable
#:fkey-initially-deferred
#:trigger-name
#:trigger-table
#:trigger-action
#:trigger-procedure-name
#:trigger-procedure
#:procedure-name
#:procedure-returns
#:procedure-language
#:procedure-body
#:table-list
#:view-list
@ -107,7 +167,7 @@
#:format-table-name))
(defpackage #:pgloader.state
(:use #:cl #:pgloader.params #:pgloader.schema)
(:use #:cl #:pgloader.params #:pgloader.catalog)
(:export #:make-pgstate
#:pgstate-tabnames
#:pgstate-tables
@ -153,32 +213,15 @@
(defpackage #:pgloader.utils
(:use #:cl
#:pgloader.params #:pgloader.schema
#:pgloader.monitor #:pgloader.state)
#:pgloader.params #:pgloader.catalog #:pgloader.monitor #:pgloader.state)
(:import-from #:alexandria
#:appendf
#:read-file-into-string)
(:export #:with-monitor ; monitor
#:*monitoring-queue*
#:with-stats-collection
#:elapsed-time-since
#:timing
;; bits from alexandria
(:export ;; bits from alexandria
#:appendf
#:read-file-into-string
;; state
#:make-pgstate
#:pgstate-tabnames
;; events
#:log-message
#:new-label
#:update-stats
#:process-bad-row
;; utils
#:format-interval
#:camelCase-to-colname
@ -194,85 +237,11 @@
#:make-external-format
;; quoting
#:apply-identifier-case
#:apply-identifier-case))
;; schema
#:catalog
#:schema
#:table
#:column
#:index
#:index
#:fkey
#:cast ; generic function for sources
#:make-catalog
#:make-schema
#:make-table
#:create-table
#:make-view
#:make-column
#:make-index
#:make-index
#:make-fkey
#:catalog-name
#:catalog-schema-list
#:schema-name
#:schema-source-name
#:schema-table-list
#:schema-view-list
#:table-name
#:table-source-name
#:table-schema
#:table-oid
#:table-comment
#:table-field-list
#:table-column-list
#:table-index-list
#:table-fkey-list
#:column-name
#:column-type-name
#:column-type-mod
#:column-type-nullable
#:column-default
#:column-comment
#:column-transform
#:table-list
#:view-list
#:add-schema
#:find-schema
#:maybe-add-schema
#:add-table
#:find-table
#:maybe-add-table
#:add-view
#:find-view
#:maybe-add-view
#:add-field
#:add-column
#:add-index
#:find-index
#:maybe-add-index
#:add-fkey
#:find-fkey
#:maybe-add-fkey
#:count-tables
#:count-views
#:count-indexes
#:count-fkeys
#:max-indexes-per-table
#:push-to-end
#:with-schema
#:alter-table
#:alter-schema
#:format-table-name
#:format-default-value
#:format-column))
(cl-user::export-inherited-symbols "pgloader.catalog" "pgloader.utils")
(cl-user::export-inherited-symbols "pgloader.monitor" "pgloader.utils")
(cl-user::export-inherited-symbols "pgloader.state" "pgloader.utils")
(defpackage #:pgloader.batch
(:use #:cl #:pgloader.params #:pgloader.monitor)
@ -346,7 +315,7 @@
(defpackage #:pgloader.pgsql
(:use #:cl
#:pgloader.params #:pgloader.utils #:pgloader.connection
#:pgloader.schema)
#:pgloader.catalog)
(:export #:pgsql-connection
#:pgconn-use-ssl
#:pgconn-table-name
@ -356,40 +325,36 @@
#:pgsql-execute-with-timing
#:pgsql-connect-and-execute-with-timing
;; PostgreSQL schema facilities
;; postgresql schema facilities
#:truncate-tables
#:copy-from-file
#:copy-from-queue
#:reset-all-sequences
#:create-sqltypes
#:create-schemas
#:create-tables
#:create-views
#:format-pgsql-column
#:format-extra-type
#:format-extra-triggers
#:make-pgsql-fkey
#:pgsql-fkey-columns
#:pgsql-fkey-foreign-columns
#:format-pgsql-create-fkey
#:format-pgsql-drop-fkey
#:drop-pgsql-fkeys
#:create-pgsql-fkeys
#:pgsql-index
#:pgsql-index-filter
#:make-pgsql-index
#:index-table-name
#:create-triggers
#:translate-index-filter
#:process-index-definitions
#:format-pgsql-create-index
#:fetch-pgsql-catalog
#:create-indexes-in-kernel
#:set-table-oids
#:drop-indexes
#:maybe-drop-indexes
#:create-indexes-again
#:reset-sequences
#:comment-on-tables-and-columns
;; PostgreSQL introspection queries
;; index filter rewriting support
#:translate-index-filter
#:process-index-definitions
;; postgresql introspection queries
#:list-databases
#:list-tables
#:list-columns-query
@ -399,13 +364,13 @@
#:list-tables-and-fkeys
#:list-table-oids
;; PostgreSQL Identifiers
;; postgresql identifiers
#:list-reserved-keywords
;; PostgreSQL user provided GUCs
;; postgresql user provided gucs
#:sanitize-user-gucs
;; PostgreSQL data format
;; postgresql data format
#:get-date-columns
#:format-vector-row))
@ -425,7 +390,7 @@
#:md-copy
#:db-copy
;; Accessors
;; accessors
#:source-db
#:target-db
#:source
@ -437,7 +402,7 @@
#:skip-lines
#:header
;; Main protocol/API
;; main protocol/api
#:map-rows
#:copy-column-list
#:queue-raw-data
@ -446,7 +411,7 @@
#:copy-to
#:copy-database
;; md-copy protocol/API
;; md-copy protocol/api
#:parse-header
#:process-rows
@ -465,7 +430,7 @@
#:complete-pgsql-database
#:end-kernels
;; file based utils for CSV, fixed etc
;; file based utils for csv, fixed etc
#:with-open-file-or-stream
#:get-pathname
#:project-fields
@ -479,7 +444,7 @@
;;;
;;; Other utilities
;;; other utilities
;;;
(defpackage #:pgloader.ini
(:use #:cl #:pgloader.params #:pgloader.utils #:pgloader.connection)
@ -500,7 +465,7 @@
;;
;; Specific source handling
;; specific source handling
;;
(defpackage #:pgloader.csv
(:use #:cl
@ -558,7 +523,6 @@
#:pgsql-execute
#:pgsql-execute-with-timing
#:create-tables
#:format-pgsql-column
#:format-vector-row)
(:export #:ixf-connection
#:copy-ixf
@ -574,7 +538,6 @@
#:pgsql-execute
#:pgsql-execute-with-timing
#:create-tables
#:format-pgsql-column
#:format-vector-row)
(:export #:dbf-connection
#:copy-db3
@ -597,18 +560,9 @@
#:create-tables
#:create-views
#:truncate-tables
#:format-pgsql-column
#:format-extra-type
#:format-extra-triggers
#:make-pgsql-fkey
#:format-pgsql-create-fkey
#:format-pgsql-drop-fkey
#:drop-pgsql-fkeys
#:create-pgsql-fkeys
#:make-pgsql-index
#:format-pgsql-create-index
#:create-indexes-in-kernel
#:set-table-oids
#:format-vector-row
#:reset-sequences
#:comment-on-tables-and-columns)
@ -637,15 +591,7 @@
#:pgsql-execute-with-timing
#:create-tables
#:truncate-tables
#:format-pgsql-column
#:make-pgsql-index
#:index-table-name
#:format-pgsql-create-index
#:create-indexes-in-kernel
#:make-pgsql-fkey
#:pgsql-fkey-columns
#:pgsql-fkey-foreign-columns
#:set-table-oids
#:reset-sequences
#:comment-on-tables-and-columns)
(:export #:sqlite-connection
@ -672,18 +618,9 @@
#:list-table-oids
#:create-tables
#:create-views
#:truncate-tables
#:format-pgsql-column
#:format-extra-type
#:make-pgsql-fkey
#:format-pgsql-create-fkey
#:format-pgsql-drop-fkey
#:drop-pgsql-fkeys
#:create-pgsql-fkeys
#:make-pgsql-index
#:format-pgsql-create-index
#:create-indexes-in-kernel
#:set-table-oids
#:format-vector-row
#:reset-sequences)
(:export #:mssql-connection
@ -698,8 +635,6 @@
(defpackage #:pgloader.mssql.index-filter
(:use #:cl #:esrap #:pgloader.utils #:pgloader.mssql)
(:import-from #:pgloader.pgsql
#:pgsql-index
#:pgsql-index-filter
#:translate-index-filter))
(defpackage #:pgloader.syslog
@ -713,7 +648,7 @@
;;;
;;; The Command Parser
;;; the command parser
;;;
(defpackage #:pgloader.parser
(:use #:cl #:esrap #:metabang.bind
@ -755,7 +690,7 @@
(:export #:parse-commands
#:parse-commands-from-file
;; tools to enable complete CLI parsing in main.lisp
;; tools to enable complete cli parsing in main.lisp
#:process-relative-pathnames
#:parse-source-string
#:parse-source-string-for-type
@ -793,7 +728,7 @@
;;
;; Main package
;; main package
;;
(defpackage #:pgloader
(:use #:cl

View File

@ -34,12 +34,12 @@
(defrule rename-to (and kw-rename kw-to quoted-namestring)
(:lambda (stmt)
(bind (((_ _ new-name) stmt))
(list #'pgloader.schema::alter-table-rename new-name))))
(list #'pgloader.catalog::alter-table-rename new-name))))
(defrule set-schema (and kw-set kw-schema quoted-namestring)
(:lambda (stmt)
(bind (((_ _ schema) stmt))
(list #'pgloader.schema::alter-table-set-schema schema))))
(list #'pgloader.catalog::alter-table-set-schema schema))))
(defrule alter-table-action (or rename-to
set-schema))
@ -51,7 +51,7 @@
(destructuring-bind (match-rule-target-list schema action)
alter-table-command
(loop :for match-rule-target :in match-rule-target-list
:collect (pgloader.schema::make-match-rule
:collect (pgloader.catalog::make-match-rule
:type (first match-rule-target)
:target (second match-rule-target)
:schema schema
@ -73,10 +73,10 @@
kw-rename kw-to quoted-namestring)
(:lambda (alter-schema-command)
(bind (((_ _ current-name _ _ new-name) alter-schema-command))
(pgloader.schema::make-match-rule
(pgloader.catalog::make-match-rule
:type :string
:target current-name
:action #'pgloader.schema::alter-schema-rename
:action #'pgloader.catalog::alter-schema-rename
:args (list new-name)))))
;;; currently we only support a single ALTER SCHEMA variant

379
src/pgsql/connection.lisp Normal file
View File

@ -0,0 +1,379 @@
;;;
;;; Tools to handle PostgreSQL queries
;;;
(in-package :pgloader.pgsql)
;;;
;;; PostgreSQL Tools connecting to a database
;;;
(defclass pgsql-connection (db-connection)
((use-ssl :initarg :use-ssl :accessor pgconn-use-ssl)
(table-name :initarg :table-name :accessor pgconn-table-name))
(:documentation "PostgreSQL connection for pgloader"))
(defmethod initialize-instance :after ((pgconn pgsql-connection) &key)
"Assign the type slot to pgsql."
(setf (slot-value pgconn 'type) "pgsql"))
(defmethod clone-connection ((c pgsql-connection))
(let ((clone
(change-class (call-next-method c) 'pgsql-connection)))
(setf (pgconn-use-ssl clone) (pgconn-use-ssl c)
(pgconn-table-name clone) (pgconn-table-name c))
clone))
(defmethod ssl-enable-p ((pgconn pgsql-connection))
"Return non-nil when the connection uses SSL"
(member (pgconn-use-ssl pgconn) '(:try :yes)))
(defun new-pgsql-connection (pgconn)
"Prepare a new connection object with all the same properties as pgconn,
so as to avoid stepping on it's handle"
(make-instance 'pgsql-connection
:user (db-user pgconn)
:pass (db-pass pgconn)
:host (db-host pgconn)
:port (db-port pgconn)
:name (db-name pgconn)
:use-ssl (pgconn-use-ssl pgconn)
:table-name (pgconn-table-name pgconn)))
;;;
;;; Implement SSL Client Side certificates
;;; http://www.postgresql.org/docs/current/static/libpq-ssl.html#LIBPQ-SSL-FILE-USAGE
;;;
(defvar *pgsql-client-certificate* "~/.postgresql/postgresql.crt"
"File where to read the PostgreSQL Client Side SSL Certificate.")
(defvar *pgsql-client-key* "~/.postgresql/postgresql.key"
"File where to read the PostgreSQL Client Side SSL Private Key.")
;;;
;;; We need to distinguish some special cases of PostgreSQL errors within
;;; Class 53 — Insufficient Resources: in case of "too many connections" we
;;; typically want to leave room for another worker to finish and free one
;;; connection, then try again.
;;;
;;; http://www.postgresql.org/docs/9.4/interactive/errcodes-appendix.html
;;;
;;; The "leave room to finish and try again" heuristic is currently quite
;;; simplistic, but at least it work in my test cases.
;;;
(cl-postgres-error::deferror "53300"
too-many-connections cl-postgres-error:insufficient-resources)
(cl-postgres-error::deferror "53400"
configuration-limit-exceeded cl-postgres-error:insufficient-resources)
(defvar *retry-connect-times* 5
"How many times to we try to connect again.")
(defvar *retry-connect-delay* 0.5
"How many seconds to wait before trying to connect again.")
(defmethod open-connection ((pgconn pgsql-connection) &key username)
"Open a PostgreSQL connection."
(let* (#+unix
(cl-postgres::*unix-socket-dir* (get-unix-socket-dir pgconn))
(crt-file (expand-user-homedir-pathname *pgsql-client-certificate*))
(key-file (expand-user-homedir-pathname *pgsql-client-key*))
(pomo::*ssl-certificate-file* (when (and (ssl-enable-p pgconn)
(probe-file crt-file))
(uiop:native-namestring crt-file)))
(pomo::*ssl-key-file* (when (and (ssl-enable-p pgconn)
(probe-file key-file))
(uiop:native-namestring key-file))))
(flet ((connect (pgconn username)
(handler-case
;; in some cases (client_min_messages set to debug5
;; for example), PostgreSQL might send us some
;; WARNINGs already when opening a new connection
(handler-bind ((cl-postgres:postgresql-warning
#'(lambda (w)
(log-message :warning "~a" w)
(muffle-warning))))
(pomo:connect (db-name pgconn)
(or username (db-user pgconn))
(db-pass pgconn)
(let ((host (db-host pgconn)))
(if (and (consp host) (eq :unix (car host)))
:unix
host))
:port (db-port pgconn)
:use-ssl (or (pgconn-use-ssl pgconn) :no)))
((or too-many-connections configuration-limit-exceeded) (e)
(log-message :error
"Failed to connect to ~a: ~a; will try again in ~fs"
pgconn e *retry-connect-delay*)
(sleep *retry-connect-delay*)))))
(loop :while (null (conn-handle pgconn))
:repeat *retry-connect-times*
:do (setf (conn-handle pgconn) (connect pgconn username))))
(unless (conn-handle pgconn)
(error "Failed ~d times to connect to ~a" *retry-connect-times* pgconn))
(log-message :debug "CONNECTED TO ~s" pgconn)
(set-session-gucs *pg-settings* :database (conn-handle pgconn))
pgconn))
(defmethod close-connection ((pgconn pgsql-connection))
"Close a PostgreSQL connection."
(assert (not (null (conn-handle pgconn))))
(pomo:disconnect (conn-handle pgconn))
(setf (conn-handle pgconn) nil)
pgconn)
(defmethod query ((pgconn pgsql-connection) sql &key)
(let ((pomo:*database* (conn-handle pgconn)))
(log-message :debug "~a" sql)
(pomo:query sql)))
(defmacro handling-pgsql-notices (&body forms)
"The BODY is run within a PostgreSQL transaction where *pg-settings* have
been applied. PostgreSQL warnings and errors are logged at the
appropriate log level."
`(handler-bind
((cl-postgres:database-error
#'(lambda (e)
(log-message :error "~a" e)))
(cl-postgres:postgresql-warning
#'(lambda (w)
(log-message :warning "~a" w)
(muffle-warning))))
(progn ,@forms)))
(defmacro with-pgsql-transaction ((&key pgconn database) &body forms)
"Run FORMS within a PostgreSQL transaction to DBNAME, reusing DATABASE if
given."
(if database
`(let ((pomo:*database* ,database))
(handling-pgsql-notices
(pomo:with-transaction ()
(log-message :debug "BEGIN")
,@forms)))
;; no database given, create a new database connection
`(with-pgsql-connection (,pgconn)
(pomo:with-transaction ()
(log-message :debug "BEGIN")
,@forms))))
(defmacro with-pgsql-connection ((pgconn) &body forms)
"Run FROMS within a PostgreSQL connection to DBNAME. To get the connection
spec from the DBNAME, use `get-connection-spec'."
(let ((conn (gensym "pgsql-conn")))
`(with-connection (,conn ,pgconn)
(let ((pomo:*database* (conn-handle ,conn)))
(handling-pgsql-notices
,@forms)))))
(defun get-unix-socket-dir (pgconn)
"When *pgconn* host is a (cons :unix path) value, return the right value
for cl-postgres::*unix-socket-dir*."
(let ((host (db-host pgconn)))
(if (and (consp host) (eq :unix (car host)))
;; set to *pgconn* host value
(directory-namestring (fad:pathname-as-directory (cdr host)))
;; keep as is.
cl-postgres::*unix-socket-dir*)))
(defun set-session-gucs (alist &key transaction database)
"Set given GUCs to given values for the current session."
(let ((pomo:*database* (or database pomo:*database*)))
(loop
:for (name . value) :in alist
:for set := (cond
((string-equal "search_path" name)
;; for search_path, don't quote the value
(format nil "SET~:[~; LOCAL~] ~a TO ~a"
transaction name value))
(t
;; general case: quote the value
(format nil "SET~:[~; LOCAL~] ~a TO '~a'"
transaction name value)))
:do (progn ; indent helper
(log-message :debug set)
(pomo:execute set)))))
;;;
;;; The parser is still hard-coded to support only PostgreSQL targets
;;;
(defun sanitize-user-gucs (gucs)
"Forbid certain actions such as setting a client_encoding different from utf8."
(let ((gucs
(append
(list (cons "client_encoding" "utf8"))
(loop :for (name . value) :in gucs
:when (and (string-equal name "client_encoding")
(not (member value '("utf-8" "utf8") :test #'string-equal)))
:do (log-message :warning
"pgloader always talk to PostgreSQL in utf-8, client_encoding has been forced to 'utf8'.")
:else
:collect (cons name value)))))
;;
;; Now see about the application_name, provide "pgloader" if it's not
;; been overloaded already.
;;
(cond ((not (assoc "application_name" gucs :test #'string-equal))
(append gucs (list (cons "application_name" "pgloader"))))
(t
gucs))))
;;;
;;; DDL support with stats (timing, object count)
;;;
(defun pgsql-connect-and-execute-with-timing (pgconn section label sql &key (count 1))
"Run pgsql-execute-with-timing within a newly establised connection."
(with-pgsql-connection (pgconn)
(pomo:with-transaction ()
(pgsql-execute-with-timing section label sql :count count))))
(defun pgsql-execute-with-timing (section label sql &key (count 1))
"Execute given SQL and resgister its timing into STATE."
(multiple-value-bind (res secs)
(timing
(handler-case
(pgsql-execute sql)
(cl-postgres:database-error (e)
(log-message :error "~a" e)
(update-stats section label :errs 1 :rows (- count)))))
(declare (ignore res))
(update-stats section label :read count :rows count :secs secs)))
(defun pgsql-execute (sql &key client-min-messages)
"Execute given SQL in current transaction"
(when client-min-messages
(pomo:execute
(format nil "SET LOCAL client_min_messages TO ~a;"
(symbol-name client-min-messages))))
(log-message :notice "~a" sql)
(pomo:execute sql)
(when client-min-messages
(pomo:execute (format nil "RESET client_min_messages;"))))
;;;
;;; PostgreSQL version specific support, that we get once connected
;;;
(defun list-reserved-keywords (pgconn)
"Connect to PostgreSQL DBNAME and fetch reserved keywords."
(handler-case
(with-pgsql-connection (pgconn)
(pomo:query "select word
from pg_get_keywords()
where catcode IN ('R', 'T')" :column))
;; support for Amazon Redshift
(cl-postgres-error::syntax-error-or-access-violation (e)
;; 42883 undefined_function
;; Database error 42883: function pg_get_keywords() does not exist
;;
;; the following list comes from a manual query against a local
;; PostgreSQL server (version 9.5devel), it's better to have this list
;; than nothing at all.
(declare (ignore e))
(list "all"
"analyse"
"analyze"
"and"
"any"
"array"
"as"
"asc"
"asymmetric"
"authorization"
"binary"
"both"
"case"
"cast"
"check"
"collate"
"collation"
"column"
"concurrently"
"constraint"
"create"
"cross"
"current_catalog"
"current_date"
"current_role"
"current_schema"
"current_time"
"current_timestamp"
"current_user"
"default"
"deferrable"
"desc"
"distinct"
"do"
"else"
"end"
"except"
"false"
"fetch"
"for"
"foreign"
"freeze"
"from"
"full"
"grant"
"group"
"having"
"ilike"
"in"
"initially"
"inner"
"intersect"
"into"
"is"
"isnull"
"join"
"lateral"
"leading"
"left"
"like"
"limit"
"localtime"
"localtimestamp"
"natural"
"not"
"notnull"
"null"
"offset"
"on"
"only"
"or"
"order"
"outer"
"overlaps"
"placing"
"primary"
"references"
"returning"
"right"
"select"
"session_user"
"similar"
"some"
"symmetric"
"table"
"then"
"to"
"trailing"
"true"
"union"
"unique"
"user"
"using"
"variadic"
"verbose"
"when"
"where"
"window"
"with"))))

View File

@ -144,124 +144,3 @@
(lp:kernel-worker-index)
(format-table-name table) seconds)
(list :writer table seconds)))
;;;
;;; Compute how many rows we're going to try loading next, depending on
;;; where we are in the batch currently and where is the next-error to be
;;; seen, if that's between current position and the end of the batch.
;;;
(defun next-batch-rows (batch-rows current-batch-pos next-error)
"How many rows should we process in next iteration?"
(cond
((< current-batch-pos next-error)
;; We Can safely push a batch with all the rows until the first error,
;; and here current-batch-pos should be 0 anyways.
;;
;; How many rows do we have from position 0 to position next-error,
;; excluding next-error? Well, next-error.
(- next-error current-batch-pos))
((= current-batch-pos next-error)
;; Now we got to the line that we know is an error, we need to process
;; only that one in the next batch
1)
(t
;; We're past the known erroneous row. The batch might have new errors,
;; or maybe that was the only one. We'll figure it out soon enough,
;; let's try the whole remaining rows.
(- batch-rows current-batch-pos))))
;;;
;;; In case of COPY error, PostgreSQL gives us the line where the error was
;;; found as a CONTEXT message. Let's parse that information to optimize our
;;; batching splitting in case of errors.
;;;
;;; CONTEXT: COPY errors, line 1, column b: "2006-13-11"
;;; CONTEXT: COPY byte, line 1: "hello\0world"
;;;
;;; Those error messages are a translation target, tho, so we can only
;;; assume to recognize the command tag (COPY), the comma, and a numer after
;;; a world that might be Zeile (de), línea (es), ligne (fr), riga (it),
;;; linia (pl), linha (pt), строка (ru), 行 (zh), or something else
;;; entirely.
;;;
(defun parse-copy-error-context (context)
"Given a COPY command CONTEXT error message, return the batch position
where the error comes from."
(cl-ppcre:register-groups-bind ((#'parse-integer n))
("COPY [^,]+, [^ ]+ (\\d+)" context :sharedp t)
(1- n)))
;;;
;;; The main retry batch function.
;;;
(defun retry-batch (table columns batch batch-rows condition
&optional (current-batch-pos 0)
&aux (nb-errors 0))
"Batch is a list of rows containing at least one bad row, the first such
row is known to be located at FIRST-ERROR index in the BATCH array."
(log-message :info "Entering error recovery.")
(loop
:with next-error = (parse-copy-error-context
(cl-postgres::database-error-context condition))
:while (< current-batch-pos batch-rows)
:do
(progn ; indenting helper
(when (= current-batch-pos next-error)
(log-message :info "error recovery at ~d/~d, processing bad row"
(+ 1 next-error) batch-rows)
(process-bad-row table condition (aref batch current-batch-pos))
(incf current-batch-pos)
(incf nb-errors))
(let* ((current-batch-rows
(next-batch-rows batch-rows current-batch-pos next-error)))
(when (< 0 current-batch-rows)
(handler-case
(with-pgsql-transaction (:database pomo:*database*)
(let* ((table-name (format-table-name table))
(stream
(cl-postgres:open-db-writer pomo:*database*
table-name columns)))
(if (< current-batch-pos next-error)
(log-message :info
"error recovery at ~d/~d, next error at ~d, loading ~d row~:p"
current-batch-pos batch-rows (+ 1 next-error) current-batch-rows)
(log-message :info
"error recovery at ~d/~d, trying ~d row~:p"
current-batch-pos batch-rows current-batch-rows))
(unwind-protect
(loop :repeat current-batch-rows
:for pos :from current-batch-pos
:do (db-write-row stream (aref batch pos)))
;; close-db-writer is the one signaling cl-postgres-errors
(cl-postgres:close-db-writer stream)
(incf current-batch-pos current-batch-rows))))
;; the batch didn't make it, prepare error handling for next turn
((or
cl-postgres-error::data-exception
cl-postgres-error::integrity-violation
cl-postgres-error:internal-error
cl-postgres-error::insufficient-resources
cl-postgres-error::program-limit-exceeded) (next-error-in-batch)
(setf condition next-error-in-batch
next-error
(+ current-batch-pos
(parse-copy-error-context
(cl-postgres::database-error-context condition))))))))))
(log-message :info "Recovery found ~d errors in ~d row~:p" nb-errors batch-rows)
;; Return how many rows we did load, for statistics purposes
(- batch-rows nb-errors))

269
src/pgsql/pgsql-ddl.lisp Normal file
View File

@ -0,0 +1,269 @@
;;;
;;; PostgreSQL fkey support implementation as a Target Database
;;;
(in-package :pgloader.pgsql)
;;;
;;; Schemas
;;;
(defmethod format-create-sql ((schema schema) &key (stream nil) if-not-exists)
(format stream "CREATE SCHEMA~@[~IF NOT EXISTS~] ~s;"
if-not-exists
(schema-name schema)))
(defmethod format-drop-sql ((schema schema) &key (stream nil) cascade)
(declare (ignore pgsql))
(format stream "DROP SCHEMA ~s~@[ CASCADE~];" (schema-name schema) cascade))
;;;
;;; Types
;;;
(defmethod format-create-sql ((sqltype sqltype) &key (stream nil) if-not-exists)
(declare (ignore if-not-exists))
(ecase (sqltype-type sqltype)
((:enum :set)
(format stream "CREATE TYPE ~a AS ENUM (~{'~a'~^, ~});"
(sqltype-name sqltype)
(sqltype-extra sqltype)))))
(defmethod format-drop-sql ((sqltype sqltype) &key (stream nil) cascade)
(format stream "DROP TYPE ~s~@[ CASCADE~];" (sqltype-name sqltype) cascade))
;;;
;;; Tables
;;;
(defmethod format-create-sql ((table table) &key (stream nil) if-not-exists)
;;
;; In case stream would be nil, which means return a string, we use this
;; with-output-to-string form and format its output in stream...
;;
(format stream "~a"
(with-output-to-string (s)
(format s "CREATE TABLE~:[~; IF NOT EXISTS~] ~a ~%(~%"
if-not-exists
(format-table-name table))
(let ((max (reduce #'max
(mapcar #'length
(mapcar #'column-name
(table-column-list table))))))
(loop
:for (col . last?) :on (table-column-list table)
:do (progn
(format s " ")
(format-create-sql col
:stream s
:pretty-print t
:max-column-name-length max)
(format s "~:[~;,~]~%" last?))))
(format s ");~%"))))
(defmethod format-drop-sql ((table table) &key (stream nil) cascade)
"Return the PostgreSQL DROP TABLE IF EXISTS statement for TABLE-NAME."
(format stream
"DROP TABLE IF EXISTS ~a~@[ CASCADE~];"
(format-table-name table) cascade))
;;;
;;; Columns
;;;
(defun get-column-type-name-from-sqltype (column)
"Return the column type name. When column-type is a sqltype, the sqltype
might be either an ENUM or a SET. In the case of a SET, we want an array
type to be defined here."
(let ((type-name (column-type-name column)))
(typecase type-name
(sqltype (ecase (sqltype-type type-name)
(:enum (sqltype-name type-name))
(:set (format nil "~a[]" (sqltype-name type-name)))))
(string type-name))))
(defmethod format-create-sql ((column column)
&key
(stream nil)
if-not-exists
pretty-print
((:max-column-name-length max)))
(declare (ignore if-not-exists))
(format stream
"~a~vt~a~:[~*~;~a~]~:[ not null~;~]~:[~; default ~a~]"
(column-name column)
(if pretty-print (if max (+ 3 max) 22) 1)
(get-column-type-name-from-sqltype column)
(column-type-mod column)
(column-type-mod column)
(column-nullable column)
(column-default column)
(format-default-value column)))
(defvar *pgsql-default-values*
'((:null . "NULL")
(:current-date . "CURRENT_DATE")
(:current-timestamp . "CURRENT_TIMESTAMP")
(:generate-uuid . "uuid_generate_v1()"))
"Common normalized default values and their PostgreSQL spelling.")
(defmethod format-default-value ((column column) &key (stream nil))
(let* ((default (column-default column))
(clean-default (cdr (assoc default *pgsql-default-values*)))
(transform (column-transform column)))
(or clean-default
(if transform
(let* ((transformed-default
(handler-case
(funcall transform default)
(condition (c)
(log-message :warning
"Failed to transform default value ~s: ~a"
default c)
;; can't transform: return nil
nil)))
(transformed-column
(make-column :default transformed-default)))
(format-default-value transformed-column))
(if default
(format stream "'~a'" default)
(format stream "NULL"))))))
;;;
;;; Indexes
;;;
(defmethod format-create-sql ((index index) &key (stream nil) if-not-exists)
(declare (ignore if-not-exists))
(let* ((table (index-table index))
(index-name (if (and *preserve-index-names*
(not (string-equal "primary" (index-name index)))
(table-oid (index-table index)))
(index-name index)
;; in the general case, we build our own index name.
(format nil "idx_~a_~a"
(table-oid (index-table index))
(index-name index))))
(index-name (apply-identifier-case index-name)))
(cond
((or (index-primary index)
(and (index-condef index) (index-unique index)))
(values
;; ensure good concurrency here, don't take the ACCESS EXCLUSIVE
;; LOCK on the table before we have the index done already
(or (index-sql index)
(format stream
"CREATE UNIQUE INDEX ~@[~a.~]~a ON ~a (~{~a~^, ~})~@[ WHERE ~a~];"
(when (index-schema index) (schema-name (index-schema index)))
index-name
(format-table-name table)
(index-columns index)
(index-filter index)))
(format stream
;; don't use the index schema name here, PostgreSQL doesn't
;; like it, might be implicit from the table's schema
;; itself...
"ALTER TABLE ~a ADD ~a USING INDEX ~a;"
(format-table-name table)
(cond ((index-primary index) "PRIMARY KEY")
((index-unique index) "UNIQUE"))
index-name)))
((index-condef index)
(format stream "ALTER TABLE ~a ADD ~a;"
(format-table-name table)
(index-condef index)))
(t
(or (index-sql index)
(format stream
"CREATE~:[~; UNIQUE~] INDEX ~@[~a.~]~a ON ~a (~{~a~^, ~})~@[ WHERE ~a~];"
(index-unique index)
(when (index-schema index) (schema-name (index-schema index)))
index-name
(format-table-name table)
(index-columns index)
(index-filter index)))))))
(defmethod format-drop-sql ((index index) &key (stream nil) cascade)
(declare (ignore cascade))
(let* ((schema-name (schema-name (index-schema index)))
(index-name (index-name index)))
(cond ((index-conname index)
;; here always quote the constraint name, currently the name
;; comes from one source only, the PostgreSQL database catalogs,
;; so don't question it, quote it.
(format stream "ALTER TABLE ~a DROP CONSTRAINT ~s;"
(format-table-name (index-table index))
(index-conname index)))
(t
(format stream "DROP INDEX ~@[~a.~]~a;" schema-name index-name)))))
;;;
;;; Foreign Keys
;;;
(defmethod format-create-sql ((fk fkey) &key (stream nil) if-not-exists)
(declare (ignore if-not-exists))
(format stream
"ALTER TABLE ~a ADD ~@[CONSTRAINT ~a ~]FOREIGN KEY(~{~a~^,~}) REFERENCES ~a(~{~a~^,~})~:[~*~; ON UPDATE ~a~]~:[~*~; ON DELETE ~a~]"
(format-table-name (fkey-table fk))
(fkey-name fk) ; constraint name
(fkey-columns fk)
(format-table-name (fkey-foreign-table fk))
(fkey-foreign-columns fk)
(fkey-update-rule fk)
(fkey-update-rule fk)
(fkey-delete-rule fk)
(fkey-delete-rule fk)))
(defmethod format-drop-sql ((fk fkey) &key (stream nil) cascade)
(declare (ignore cascade))
(let* ((constraint-name (apply-identifier-case (fkey-name fk)))
(table-name (format-table-name (fkey-table fk))))
(format stream "ALTER TABLE ~a DROP CONSTRAINT ~a" table-name constraint-name)))
;;;
;;; Triggers
;;;
(defmethod format-create-sql ((trigger trigger) &key (stream nil) if-not-exists)
(declare (ignore if-not-exists))
(format stream
"CREATE TRIGGER ~a ~a ON ~a FOR EACH ROW EXECUTE PROCEDURE ~a()"
(trigger-name trigger)
(trigger-action trigger)
(format-table-name (trigger-table trigger))
(trigger-procedure-name trigger)))
(defmethod format-drop-sql ((trigger trigger) &key (stream nil) cascade)
(declare (ignore pgsql))
(format stream
"DROP TRIGGER ~a ON ~a~@[ CASCADE~];"
(trigger-name trigger)
(format-table-name (trigger-table trigger))
cascade))
;;;
;;; Procedures
;;;
(defmethod format-create-sql ((procedure procedure) &key (stream nil) if-not-exists)
(declare (ignore if-not-exists))
(format stream
"CREATE OR REPLACE FUNCTION ~a() RETURNS ~a LANGUAGE ~a AS $$~%~a~%$$;"
(procedure-name procedure)
(procedure-returns procedure)
(procedure-language procedure)
(procedure-body procedure)))
(defmethod format-drop-sql ((procedure procedure) &key (stream nil) cascade)
(declare (ignore pgsql))
(format stream
"DROP FUNCTION ~a()~@[ CASCADE~];" (procedure-name procedure) cascade))
;;;
;;; Comments
;;;

View File

@ -0,0 +1,51 @@
;;;
;;; API to rewrite index WHERE clauses (filter)
;;;
(in-package #:pgloader.pgsql)
(defgeneric translate-index-filter (table index sql-dialect)
(:documentation
"Translate the filter clause of INDEX in PostgreSQL slang."))
(defmethod translate-index-filter ((table table)
(index index)
(sql-dialect t))
"Implement a default facility that does nothing."
nil)
;;;
;;; Generic code that drives all index filter clauses rewriting.
;;;
(defgeneric process-index-definitions (object &key sql-dialect)
(:documentation "Rewrite all indexes filters in given catalog OBJECT."))
(defmethod process-index-definitions ((catalog catalog) &key sql-dialect)
"Rewrite all index filters in CATALOG."
(loop :for schema :in (catalog-schema-list catalog)
:do (process-index-definitions schema :sql-dialect sql-dialect)))
(defmethod process-index-definitions ((schema schema) &key sql-dialect)
"Rewrite all index filters in CATALOG."
(loop :for table :in (schema-table-list schema)
:do (process-index-definitions table :sql-dialect sql-dialect)))
(defmethod process-index-definitions ((table table) &key sql-dialect)
"Rewrite all index filter in TABLE."
(loop :for index :in (table-index-list table)
:when (index-filter index)
:do (let ((pg-filter
(handler-case
(translate-index-filter table index sql-dialect)
(condition (c)
(log-message :error
"Failed to translate index ~s on table ~s because of filter clause ~s"
(index-name index)
(format-table-name table)
(index-filter index))
(log-message :debug "filter translation error: ~a" c)
;; try to create the index without the WHERE clause...
(setf (index-filter index) nil)))))
(log-message :info "tranlate-index-filter: ~s" pg-filter)
(setf (index-filter index) pg-filter))))

255
src/pgsql/pgsql-schema.lisp Normal file
View File

@ -0,0 +1,255 @@
;;;
;;; Tools to query the PostgreSQL Schema, either source or target
;;;
(in-package :pgloader.pgsql)
(defun fetch-pgsql-catalog (target &key table including excluding)
"Fetch PostgreSQL catalogs for the target database."
(let ((catalog (make-catalog :name (db-name target))))
(with-pgsql-connection (target)
(when (and table (not including))
;; rewrite the table constraint as an including expression
(let ((schema
(or (table-schema table)
(make-schema :name (query-table-schema (table-name table))))))
(setf including
(list (cons (schema-name schema)
(list (table-name table)))))))
(list-all-columns catalog
:table-type :table
:including including
:excluding excluding)
(list-all-indexes catalog
:including including
:excluding excluding)
(list-all-fkeys catalog
:including including
:excluding excluding))
catalog))
(defun query-table-schema (table-name)
"Get PostgreSQL schema name where to locate TABLE-NAME by following the
current search_path rules. A PostgreSQL connection must be opened."
(pomo:query (format nil "
select nspname
from pg_namespace n
join pg_class c on n.oid = c.relnamespace
where c.oid = '~a'::regclass;"
table-name) :single))
(defvar *table-type* '((:table . "r")
(:view . "v")
(:index . "i")
(:sequence . "S"))
"Associate internal table type symbol with what's found in PostgreSQL
pg_class.relkind column.")
(defun filter-list-to-where-clause (filter-list
&optional
not
(schema-col "table_schema")
(table-col "table_name"))
"Given an INCLUDING or EXCLUDING clause, turn it into a PostgreSQL WHERE
clause."
(loop :for (schema . table-name-list) :in filter-list
:append (mapcar (lambda (table-name)
(format nil "(~a = '~a' and ~a ~:[~;NOT ~]~~ '~a')"
schema-col schema table-col not table-name))
table-name-list)))
(defun list-all-columns (catalog
&key
(table-type :table)
including
excluding
&aux
(table-type-name (cdr (assoc table-type *table-type*))))
"Get the list of PostgreSQL column names per table."
(loop :for (schema-name table-name table-oid name type typmod notnull default)
:in
(pomo:query (format nil "
select nspname, relname, c.oid, attname,
t.oid::regtype as type,
case when atttypmod > 0 then atttypmod - 4 else null end as typmod,
attnotnull,
case when atthasdef then def.adsrc end as default
from pg_class c
join pg_namespace n on n.oid = c.relnamespace
left join pg_attribute a on c.oid = a.attrelid
join pg_type t on t.oid = a.atttypid and attnum > 0
left join pg_attrdef def on a.attrelid = def.adrelid
and a.attnum = def.adnum
where nspname !~~ '^pg_' and n.nspname <> 'information_schema'
and relkind = '~a'
~:[~*~;and (~{~a~^~&~10t or ~})~]
~:[~*~;and (~{~a~^~&~10t and ~})~]
order by nspname, relname, attnum"
table-type-name
including ; do we print the clause?
(filter-list-to-where-clause including
nil
"n.nspname"
"c.relname")
excluding ; do we print the clause?
(filter-list-to-where-clause excluding
nil
"n.nspname"
"c.relname")))
:do
(let* ((schema (maybe-add-schema catalog schema-name))
(table (maybe-add-table schema table-name :oid table-oid))
(field (make-column :name name
:type-name type
:type-mod typmod
:nullable (not notnull)
:default default)))
(add-field table field))
:finally (return catalog)))
(defun list-all-indexes (catalog &key including excluding)
"Get the list of PostgreSQL index definitions per table."
(loop
:for (schema-name name table-schema table-name primary unique sql conname condef)
:in (pomo:query (format nil "
select n.nspname,
i.relname,
rn.nspname,
r.relname,
indisprimary,
indisunique,
pg_get_indexdef(indexrelid),
c.conname,
pg_get_constraintdef(c.oid)
from pg_index x
join pg_class i ON i.oid = x.indexrelid
join pg_class r ON r.oid = x.indrelid
join pg_namespace n ON n.oid = i.relnamespace
join pg_namespace rn ON rn.oid = r.relnamespace
left join pg_constraint c ON c.conindid = i.oid
where n.nspname !~~ '^pg_' and n.nspname <> 'information_schema'
~:[~*~;and (~{~a~^~&~10t or ~})~]
~:[~*~;and (~{~a~^~&~10t and ~})~]
order by n.nspname, r.relname"
including ; do we print the clause?
(filter-list-to-where-clause including
nil
"n.nspname"
"i.relname")
excluding ; do we print the clause?
(filter-list-to-where-clause excluding
nil
"n.nspname"
"i.relname")))
:do (let* ((schema (find-schema catalog schema-name))
(tschema (find-schema catalog table-schema))
(table (find-table tschema table-name))
(pg-index
(make-index :name name
:schema schema
:table table
:primary primary
:unique unique
:columns nil
:sql sql
:conname (unless (eq :null conname) conname)
:condef (unless (eq :null condef) condef))))
(maybe-add-index table name pg-index :key #'index-name))
:finally (return catalog)))
(defun list-all-fkeys (catalog &key including excluding)
"Get the list of PostgreSQL index definitions per table."
(loop
:for (schema-name table-name fschema-name ftable-name conname cols fcols
updrule delrule mrule deferrable deferred condef)
:in
(pomo:query (format nil "
select n.nspname, c.relname, nf.nspname, cf.relname as frelname,
conname,
(select string_agg(attname, ',')
from pg_attribute
where attrelid = r.conrelid and array[attnum] <@ conkey
) as conkey,
(select string_agg(attname, ',')
from pg_attribute
where attrelid = r.confrelid and array[attnum] <@ confkey
) as confkey,
confupdtype, confdeltype, confmatchtype,
condeferrable, condeferred,
pg_catalog.pg_get_constraintdef(r.oid, true) as condef
from pg_catalog.pg_constraint r
JOIN pg_class c on r.conrelid = c.oid
JOIN pg_namespace n on c.relnamespace = n.oid
JOIN pg_class cf on r.confrelid = c.oid
JOIN pg_namespace nf on cf.relnamespace = nf.oid
where r.contype = 'f'
AND c.relkind = 'r' and cf.relkind = 'r'
AND n.nspname !~~ '^pg_' and n.nspname <> 'information_schema'
AND nf.nspname !~~ '^pg_' and nf.nspname <> 'information_schema'
~:[~*~;and (~{~a~^~&~10t or ~})~]
~:[~*~;and (~{~a~^~&~10t and ~})~]
~:[~*~;and (~{~a~^~&~10t or ~})~]
~:[~*~;and (~{~a~^~&~10t and ~})~]"
including ; do we print the clause (table)?
(filter-list-to-where-clause including
nil
"n.nspname"
"c.relname")
excluding ; do we print the clause (table)?
(filter-list-to-where-clause excluding
nil
"n.nspname"
"c.relname")
including ; do we print the clause (ftable)?
(filter-list-to-where-clause including
nil
"nf.nspname"
"cf.relname")
excluding ; do we print the clause (ftable)?
(filter-list-to-where-clause excluding
nil
"nf.nspname"
"cf.relname")))
:do (flet ((pg-fk-rule-to-action (rule)
(case rule
(#\a "NO ACTION")
(#\r "RESTRICT")
(#\c "CASCADE")
(#\n "SET NULL")
(#\d "SET DEFAULT")))
(pg-fk-match-rule-to-match-clause (rule)
(case rule
(#\f "FULL")
(#\p "PARTIAL")
(#\s "SIMPLE"))))
(let* ((schema (find-schema catalog schema-name))
(table (find-table schema table-name))
(fschema (find-schema catalog fschema-name))
(ftable (find-table fschema ftable-name))
(fk
(make-fkey :name (apply-identifier-case conname)
:condef condef
:table table
:columns (split-sequence:split-sequence #\, cols)
:foreign-table ftable
:foreign-columns (split-sequence:split-sequence #\, fcols)
:update-rule (pg-fk-rule-to-action updrule)
:delete-rule (pg-fk-rule-to-action delrule)
:match-rule (pg-fk-match-rule-to-match-clause mrule)
:deferrable deferrable
:initially-deferred deferred)))
(if (and table ftable)
(add-fkey table fk)
(log-message :notice "Foreign Key ~a is ignored, one of its table is missing from pgloader table selection"
conname))))
:finally (return catalog)))

View File

@ -0,0 +1,65 @@
;;;
;;; Create the PostgreSQL schema from our internal Catalog representation.
;;; Here, triggers and their stored procedures.
;;;
(in-package #:pgloader.pgsql)
(defvar *pgsql-triggers-procedures*
`((:on-update-current-timestamp .
,(lambda (trigger column table)
(let ((body (format nil
"BEGIN~% NEW.~a = now();~% RETURN NEW;~%END;"
(column-name column))))
(make-procedure :name (trigger-procedure-name trigger)
:returns "trigger"
:language "plpgsql"
:body body)))))
"List of lambdas to generate procedure definitions from pgloader internal
trigger names as positioned in the internal catalogs at CAST time.")
(defun rename-trigger (trigger)
"Turn a common lisp symbol into a proper PostgreSQL trigger name."
(setf (trigger-name trigger)
(string-downcase
(cl-ppcre:regex-replace-all "-"
(symbol-name (trigger-name trigger))
"_"))))
(defun process-triggers (table)
"Return the list of PostgreSQL statements to create a catalog trigger."
(loop :for column :in (table-column-list table)
:when (column-extra column)
:do (etypecase (column-extra column)
(trigger
;; finish the trigger CAST and attach it to the table now
(let* ((trigger (column-extra column))
(proc (or (trigger-procedure trigger)
;;
;; We have a trigger with no attached
;; procedure, so we search for the trigger
;; procedure-name in
;; *pgsql-triggers-procedures* to find a
;; lambda form to call to produce our PLpgSQL
;; procedure
;;
(let ((generate-proc
(cdr
(assoc (trigger-name trigger)
*pgsql-triggers-procedures*))))
(assert (functionp generate-proc))
(funcall generate-proc
trigger column table)))))
;;
;; Properly attach the procedure to the trigger and the
;; trigger to the table.
;;
(unless (trigger-procedure trigger)
(setf (trigger-procedure trigger) proc))
(rename-trigger trigger)
(setf (column-extra column) nil)
(setf (trigger-table trigger) table)
(push-to-end trigger (table-trigger-list table)))))))

View File

@ -3,252 +3,6 @@
;;;
(in-package :pgloader.pgsql)
;;;
;;; PostgreSQL Tools connecting to a database
;;;
(defclass pgsql-connection (db-connection)
((use-ssl :initarg :use-ssl :accessor pgconn-use-ssl)
(table-name :initarg :table-name :accessor pgconn-table-name))
(:documentation "PostgreSQL connection for pgloader"))
(defmethod initialize-instance :after ((pgconn pgsql-connection) &key)
"Assign the type slot to pgsql."
(setf (slot-value pgconn 'type) "pgsql"))
(defmethod clone-connection ((c pgsql-connection))
(let ((clone
(change-class (call-next-method c) 'pgsql-connection)))
(setf (pgconn-use-ssl clone) (pgconn-use-ssl c)
(pgconn-table-name clone) (pgconn-table-name c))
clone))
(defmethod ssl-enable-p ((pgconn pgsql-connection))
"Return non-nil when the connection uses SSL"
(member (pgconn-use-ssl pgconn) '(:try :yes)))
(defun new-pgsql-connection (pgconn)
"Prepare a new connection object with all the same properties as pgconn,
so as to avoid stepping on it's handle"
(make-instance 'pgsql-connection
:user (db-user pgconn)
:pass (db-pass pgconn)
:host (db-host pgconn)
:port (db-port pgconn)
:name (db-name pgconn)
:use-ssl (pgconn-use-ssl pgconn)
:table-name (pgconn-table-name pgconn)))
;;;
;;; Implement SSL Client Side certificates
;;; http://www.postgresql.org/docs/current/static/libpq-ssl.html#LIBPQ-SSL-FILE-USAGE
;;;
(defvar *pgsql-client-certificate* "~/.postgresql/postgresql.crt"
"File where to read the PostgreSQL Client Side SSL Certificate.")
(defvar *pgsql-client-key* "~/.postgresql/postgresql.key"
"File where to read the PostgreSQL Client Side SSL Private Key.")
;;;
;;; We need to distinguish some special cases of PostgreSQL errors within
;;; Class 53 — Insufficient Resources: in case of "too many connections" we
;;; typically want to leave room for another worker to finish and free one
;;; connection, then try again.
;;;
;;; http://www.postgresql.org/docs/9.4/interactive/errcodes-appendix.html
;;;
;;; The "leave room to finish and try again" heuristic is currently quite
;;; simplistic, but at least it work in my test cases.
;;;
(cl-postgres-error::deferror "53300"
too-many-connections cl-postgres-error:insufficient-resources)
(cl-postgres-error::deferror "53400"
configuration-limit-exceeded cl-postgres-error:insufficient-resources)
(defvar *retry-connect-times* 5
"How many times to we try to connect again.")
(defvar *retry-connect-delay* 0.5
"How many seconds to wait before trying to connect again.")
(defmethod open-connection ((pgconn pgsql-connection) &key username)
"Open a PostgreSQL connection."
(let* ((crt-file (expand-user-homedir-pathname *pgsql-client-certificate*))
(key-file (expand-user-homedir-pathname *pgsql-client-key*))
(pomo::*ssl-certificate-file* (when (and (ssl-enable-p pgconn)
(probe-file crt-file))
(uiop:native-namestring crt-file)))
(pomo::*ssl-key-file* (when (and (ssl-enable-p pgconn)
(probe-file key-file))
(uiop:native-namestring key-file))))
(flet ((connect (pgconn username)
(handler-case
(pomo:connect (db-name pgconn)
(or username (db-user pgconn))
(db-pass pgconn)
(let ((host (db-host pgconn)))
(if (and (consp host) (eq :unix (car host)))
:unix
host))
:port (db-port pgconn)
:use-ssl (or (pgconn-use-ssl pgconn) :no))
((or too-many-connections configuration-limit-exceeded) (e)
(log-message :error
"Failed to connect to ~a: ~a; will try again in ~fs"
pgconn e *retry-connect-delay*)
(sleep *retry-connect-delay*)))))
(loop :while (null (conn-handle pgconn))
:repeat *retry-connect-times*
:do (setf (conn-handle pgconn) (connect pgconn username)))))
(unless (conn-handle pgconn)
(error "Failed ~d times to connect to ~a" *retry-connect-times* pgconn))
pgconn)
(defmethod close-connection ((pgconn pgsql-connection))
"Close a PostgreSQL connection."
(assert (not (null (conn-handle pgconn))))
(pomo:disconnect (conn-handle pgconn))
(setf (conn-handle pgconn) nil)
pgconn)
(defmethod query ((pgconn pgsql-connection) sql &key)
(let ((pomo:*database* (conn-handle pgconn)))
(log-message :debug "~a" sql)
(pomo:query sql)))
(defmacro handling-pgsql-notices (&body forms)
"The BODY is run within a PostgreSQL transaction where *pg-settings* have
been applied. PostgreSQL warnings and errors are logged at the
appropriate log level."
`(handler-bind
((cl-postgres:database-error
#'(lambda (e)
(log-message :error "~a" e)))
(cl-postgres:postgresql-warning
#'(lambda (w)
(log-message :warning "~a" w)
(muffle-warning))))
(progn ,@forms)))
(defmacro with-pgsql-transaction ((&key pgconn database) &body forms)
"Run FORMS within a PostgreSQL transaction to DBNAME, reusing DATABASE if
given."
(if database
`(let ((pomo:*database* ,database))
(handling-pgsql-notices
(pomo:with-transaction ()
(log-message :debug "BEGIN")
,@forms)))
;; no database given, create a new database connection
`(with-pgsql-connection (,pgconn)
(pomo:with-transaction ()
(log-message :debug "BEGIN")
,@forms))))
(defmacro with-pgsql-connection ((pgconn) &body forms)
"Run FROMS within a PostgreSQL connection to DBNAME. To get the connection
spec from the DBNAME, use `get-connection-spec'."
(let ((conn (gensym "pgsql-conn")))
`(let (#+unix (cl-postgres::*unix-socket-dir* (get-unix-socket-dir ,pgconn)))
(with-connection (,conn ,pgconn)
(let ((pomo:*database* (conn-handle ,conn)))
(log-message :debug "CONNECTED TO ~s" ,conn)
(set-session-gucs *pg-settings*)
(handling-pgsql-notices
,@forms))))))
(defun get-unix-socket-dir (pgconn)
"When *pgconn* host is a (cons :unix path) value, return the right value
for cl-postgres::*unix-socket-dir*."
(let ((host (db-host pgconn)))
(if (and (consp host) (eq :unix (car host)))
;; set to *pgconn* host value
(directory-namestring (fad:pathname-as-directory (cdr host)))
;; keep as is.
cl-postgres::*unix-socket-dir*)))
(defun set-session-gucs (alist &key transaction database)
"Set given GUCs to given values for the current session."
(let ((pomo:*database* (or database pomo:*database*)))
(loop
:for (name . value) :in alist
:for set := (cond
((string-equal "search_path" name)
;; for search_path, don't quote the value
(format nil "SET~:[~; LOCAL~] ~a TO ~a"
transaction name value))
(t
;; general case: quote the value
(format nil "SET~:[~; LOCAL~] ~a TO '~a'"
transaction name value)))
:do (progn ; indent helper
(log-message :debug set)
(pomo:execute set)))))
(defun pgsql-connect-and-execute-with-timing (pgconn section label sql &key (count 1))
"Run pgsql-execute-with-timing within a newly establised connection."
(with-pgsql-connection (pgconn)
(pomo:with-transaction ()
(pgsql-execute-with-timing section label sql :count count))))
(defun pgsql-execute-with-timing (section label sql &key (count 1))
"Execute given SQL and resgister its timing into STATE."
(multiple-value-bind (res secs)
(timing
(handler-case
(pgsql-execute sql)
(cl-postgres:database-error (e)
(log-message :error "~a" e)
(update-stats section label :errs 1 :rows (- count)))))
(declare (ignore res))
(update-stats section label :read count :rows count :secs secs)))
(defun pgsql-execute (sql &key client-min-messages)
"Execute given SQL in current transaction"
(when client-min-messages
(pomo:execute
(format nil "SET LOCAL client_min_messages TO ~a;"
(symbol-name client-min-messages))))
(log-message :notice "~a" sql)
(pomo:execute sql)
(when client-min-messages
(pomo:execute (format nil "RESET client_min_messages;"))))
;;;
;;; PostgreSQL Utility Queries
;;;
;; (defun list-databases (&optional (username "postgres"))
;; "Connect to a local database and get the database list"
;; (with-pgsql-transaction (:dbname "postgres" :username username)
;; (loop for (dbname) in (pomo:query
;; "select datname
;; from pg_database
;; where datname !~ 'postgres|template'")
;; collect dbname)))
;; (defun list-tables (&optional dbname)
;; "Return an alist of tables names and list of columns to pay attention to."
;; (with-pgsql-transaction (:dbname dbname)
;; (loop for (relname colarray) in (pomo:query "
;; select relname, array_agg(case when typname in ('date', 'timestamptz')
;; then attnum end
;; order by attnum)
;; from pg_class c
;; join pg_namespace n on n.oid = c.relnamespace
;; left join pg_attribute a on c.oid = a.attrelid
;; join pg_type t on t.oid = a.atttypid
;; where c.relkind = 'r'
;; and attnum > 0
;; and n.nspname = 'public'
;; group by relname
;; ")
;; collect (cons relname (loop
;; for attnum across colarray
;; unless (eq attnum :NULL)
;; collect attnum)))))
(defun list-schemas ()
"Return the list of PostgreSQL schemas in the already established
PostgreSQL connection."
@ -322,144 +76,6 @@ select i.relname,
:conname (unless (eq :null conname) conname)
:condef (unless (eq :null condef) condef))))
(defun sanitize-user-gucs (gucs)
"Forbid certain actions such as setting a client_encoding different from utf8."
(let ((gucs
(append
(list (cons "client_encoding" "utf8"))
(loop :for (name . value) :in gucs
:when (and (string-equal name "client_encoding")
(not (member value '("utf-8" "utf8") :test #'string-equal)))
:do (log-message :warning
"pgloader always talk to PostgreSQL in utf-8, client_encoding has been forced to 'utf8'.")
:else
:collect (cons name value)))))
;;
;; Now see about the application_name, provide "pgloader" if it's not
;; been overloaded already.
;;
(cond ((not (assoc "application_name" gucs :test #'string-equal))
(append gucs (list (cons "application_name" "pgloader"))))
(t
gucs))))
(defun list-reserved-keywords (pgconn)
"Connect to PostgreSQL DBNAME and fetch reserved keywords."
(handler-case
(with-pgsql-connection (pgconn)
(pomo:query "select word
from pg_get_keywords()
where catcode IN ('R', 'T')" :column))
;; support for Amazon Redshift
(cl-postgres-error::syntax-error-or-access-violation (e)
;; 42883 undefined_function
;; Database error 42883: function pg_get_keywords() does not exist
;;
;; the following list comes from a manual query against a local
;; PostgreSQL server (version 9.5devel), it's better to have this list
;; than nothing at all.
(declare (ignore e))
(list "all"
"analyse"
"analyze"
"and"
"any"
"array"
"as"
"asc"
"asymmetric"
"authorization"
"binary"
"both"
"case"
"cast"
"check"
"collate"
"collation"
"column"
"concurrently"
"constraint"
"create"
"cross"
"current_catalog"
"current_date"
"current_role"
"current_schema"
"current_time"
"current_timestamp"
"current_user"
"default"
"deferrable"
"desc"
"distinct"
"do"
"else"
"end"
"except"
"false"
"fetch"
"for"
"foreign"
"freeze"
"from"
"full"
"grant"
"group"
"having"
"ilike"
"in"
"initially"
"inner"
"intersect"
"into"
"is"
"isnull"
"join"
"lateral"
"leading"
"left"
"like"
"limit"
"localtime"
"localtimestamp"
"natural"
"not"
"notnull"
"null"
"offset"
"on"
"only"
"or"
"order"
"outer"
"overlaps"
"placing"
"primary"
"references"
"returning"
"right"
"select"
"session_user"
"similar"
"some"
"symmetric"
"table"
"then"
"to"
"trailing"
"true"
"union"
"unique"
"user"
"using"
"variadic"
"verbose"
"when"
"where"
"window"
"with"))))
(defun reset-all-sequences (pgconn &key tables)
"Reset all sequences to the max value of the column they are attached to."
(let ((newconn (clone-connection pgconn)))

125
src/pgsql/retry-batch.lisp Normal file
View File

@ -0,0 +1,125 @@
;;;
;;; The PostgreSQL COPY TO implementation, with batches and retries.
;;;
(in-package #:pgloader.pgsql)
;;;
;;; Compute how many rows we're going to try loading next, depending on
;;; where we are in the batch currently and where is the next-error to be
;;; seen, if that's between current position and the end of the batch.
;;;
(defun next-batch-rows (batch-rows current-batch-pos next-error)
"How many rows should we process in next iteration?"
(cond
((< current-batch-pos next-error)
;; We Can safely push a batch with all the rows until the first error,
;; and here current-batch-pos should be 0 anyways.
;;
;; How many rows do we have from position 0 to position next-error,
;; excluding next-error? Well, next-error.
(- next-error current-batch-pos))
((= current-batch-pos next-error)
;; Now we got to the line that we know is an error, we need to process
;; only that one in the next batch
1)
(t
;; We're past the known erroneous row. The batch might have new errors,
;; or maybe that was the only one. We'll figure it out soon enough,
;; let's try the whole remaining rows.
(- batch-rows current-batch-pos))))
;;;
;;; In case of COPY error, PostgreSQL gives us the line where the error was
;;; found as a CONTEXT message. Let's parse that information to optimize our
;;; batching splitting in case of errors.
;;;
;;; CONTEXT: COPY errors, line 1, column b: "2006-13-11"
;;; CONTEXT: COPY byte, line 1: "hello\0world"
;;;
;;; Those error messages are a translation target, tho, so we can only
;;; assume to recognize the command tag (COPY), the comma, and a numer after
;;; a world that might be Zeile (de), línea (es), ligne (fr), riga (it),
;;; linia (pl), linha (pt), строка (ru), 行 (zh), or something else
;;; entirely.
;;;
(defun parse-copy-error-context (context)
"Given a COPY command CONTEXT error message, return the batch position
where the error comes from."
(cl-ppcre:register-groups-bind ((#'parse-integer n))
("COPY [^,]+, [^ ]+ (\\d+)" context :sharedp t)
(1- n)))
;;;
;;; The main retry batch function.
;;;
(defun retry-batch (table columns batch batch-rows condition
&optional (current-batch-pos 0)
&aux (nb-errors 0))
"Batch is a list of rows containing at least one bad row, the first such
row is known to be located at FIRST-ERROR index in the BATCH array."
(log-message :info "Entering error recovery.")
(loop
:with next-error = (parse-copy-error-context
(cl-postgres::database-error-context condition))
:while (< current-batch-pos batch-rows)
:do
(progn ; indenting helper
(when (= current-batch-pos next-error)
(log-message :info "error recovery at ~d/~d, processing bad row"
(+ 1 next-error) batch-rows)
(process-bad-row table condition (aref batch current-batch-pos))
(incf current-batch-pos)
(incf nb-errors))
(let* ((current-batch-rows
(next-batch-rows batch-rows current-batch-pos next-error)))
(when (< 0 current-batch-rows)
(handler-case
(with-pgsql-transaction (:database pomo:*database*)
(let* ((table-name (format-table-name table))
(stream
(cl-postgres:open-db-writer pomo:*database*
table-name columns)))
(if (< current-batch-pos next-error)
(log-message :info
"error recovery at ~d/~d, next error at ~d, loading ~d row~:p"
current-batch-pos batch-rows (+ 1 next-error) current-batch-rows)
(log-message :info
"error recovery at ~d/~d, trying ~d row~:p"
current-batch-pos batch-rows current-batch-rows))
(unwind-protect
(loop :repeat current-batch-rows
:for pos :from current-batch-pos
:do (db-write-row stream (aref batch pos)))
;; close-db-writer is the one signaling cl-postgres-errors
(cl-postgres:close-db-writer stream)
(incf current-batch-pos current-batch-rows))))
;; the batch didn't make it, prepare error handling for next turn
((or
cl-postgres-error::data-exception
cl-postgres-error::integrity-violation
cl-postgres-error:internal-error
cl-postgres-error::insufficient-resources
cl-postgres-error::program-limit-exceeded) (next-error-in-batch)
(setf condition next-error-in-batch
next-error
(+ current-batch-pos
(parse-copy-error-context
(cl-postgres::database-error-context condition))))))))))
(log-message :info "Recovery found ~d errors in ~d row~:p" nb-errors batch-rows)
;; Return how many rows we did load, for statistics purposes
(- batch-rows nb-errors))

View File

@ -1,92 +1,19 @@
;;;
;;; Tools to handle PostgreSQL tables and indexes creations
;;;
(in-package pgloader.pgsql)
(in-package #:pgloader.pgsql)
;;;
;;; Some parts of the logic here needs to be specialized depending on the
;;; source type, such as SQLite or MySQL. To do so, sources must define
;;; their own column struct and may implement the methods
;;; `format-pgsql-column' and `format-extra-type' on those.
;;;
(defstruct pgsql-column name type-name type-mod nullable default)
(defgeneric format-pgsql-column (col)
(:documentation
"Return the PostgreSQL column definition (type, default, not null, ...)"))
(defgeneric format-extra-type (col &key include-drop)
(:documentation
"Return a list of PostgreSQL commands to create an extra type for given
column, or nil of none is required. If no special extra type is ever
needed, it's allowed not to specialize this generic into a method."))
(defgeneric format-extra-triggers (table col &key drop)
(:documentation
"Return a list of string representing the extra SQL commands needed to
implement PostgreSQL triggers."))
(defmethod format-extra-type ((col T) &key include-drop)
"The default `format-extra-type' implementation returns an empty list."
(declare (ignorable include-drop))
nil)
(defmethod format-extra-triggers ((table T) (col T) &key drop)
"The default `format-extra-triggers' implementation returns an empty list."
(declare (ignorable table col drop))
nil)
;;;
;;; API for Foreign Keys
;;;
(defstruct pgsql-fkey
name table columns foreign-table foreign-columns update-rule delete-rule)
(defgeneric format-pgsql-create-fkey (fkey)
(:documentation
"Return the PostgreSQL command to define a Foreign Key Constraint."))
(defgeneric format-pgsql-drop-fkey (fkey &key)
(:documentation
"Return the PostgreSQL command to DROP a Foreign Key Constraint."))
(defmethod format-pgsql-create-fkey ((fk pgsql-fkey))
"Generate the PostgreSQL statement to rebuild a MySQL Foreign Key"
(format nil
"ALTER TABLE ~a ADD ~@[CONSTRAINT ~a ~]FOREIGN KEY(~{~a~^,~}) REFERENCES ~a(~{~a~^,~})~:[~*~; ON UPDATE ~a~]~:[~*~; ON DELETE ~a~]"
(format-table-name (pgsql-fkey-table fk))
(pgsql-fkey-name fk) ; constraint name
(pgsql-fkey-columns fk)
(format-table-name (pgsql-fkey-foreign-table fk))
(pgsql-fkey-foreign-columns fk)
(pgsql-fkey-update-rule fk)
(pgsql-fkey-update-rule fk)
(pgsql-fkey-delete-rule fk)
(pgsql-fkey-delete-rule fk)))
(defmethod format-pgsql-drop-fkey ((fk pgsql-fkey) &key all-pgsql-fkeys)
"Generate the PostgreSQL statement to rebuild a MySQL Foreign Key"
(when (pgsql-fkey-name fk)
(let* ((constraint-name (apply-identifier-case (pgsql-fkey-name fk)))
(table-name (format-table-name (pgsql-fkey-table fk)))
(fkeys (cdr (assoc table-name all-pgsql-fkeys :test #'string=)))
(fkey-exists (member constraint-name fkeys :test #'string=)))
(when fkey-exists
;; we could do that without all-pgsql-fkeys in 9.2 and following with:
;; alter table if exists ... drop constraint if exists ...
(format nil "ALTER TABLE ~a DROP CONSTRAINT ~a" table-name constraint-name)))))
(defun drop-pgsql-fkeys (catalog)
"Drop all Foreign Key Definitions given, to prepare for a clean run."
(let ((all-pgsql-fkeys (list-tables-and-fkeys)))
(loop :for table :in (table-list catalog)
:do
(loop :for fkey :in (table-fkey-list table)
:for sql := (format-pgsql-drop-fkey fkey
:all-pgsql-fkeys all-pgsql-fkeys)
:when sql
:do (pgsql-execute sql)))))
(loop :for table :in (table-list catalog)
:do
(loop :for fkey :in (table-fkey-list table)
:for sql := (format-drop-sql fkey)
:when sql
:do (pgsql-execute sql))))
(defun create-pgsql-fkeys (catalog
&key
@ -97,69 +24,42 @@
(with-stats-collection (label :section section :use-result-as-rows t)
(loop :for table :in (table-list catalog)
:sum (loop :for fkey :in (table-fkey-list table)
:for sql := (format-pgsql-create-fkey fkey)
:for sql := (format-create-sql fkey)
:do (pgsql-execute-with-timing section label sql)
:count t))))
;;;
;;; Table schema rewriting support
;;; Table schema support
;;;
(defun create-table-sql (table &key if-not-exists)
"Return a PostgreSQL CREATE TABLE statement from given COLS.
(defun create-sqltypes (catalog &key if-not-exists include-drop)
"Create the needed data types for given CATALOG."
(let ((sqltype-list))
;; build the sqltype list
(loop :for table :in (table-list catalog)
:do (loop :for column :in (table-column-list table)
:do (when (typep (column-type-name column) 'sqltype)
(pushnew (column-type-name column) sqltype-list
:test #'string-equal
:key #'sqltype-name))))
Each element of the COLS list is expected to be of a type handled by the
`format-pgsql-column' generic function."
(with-output-to-string (s)
(format s "CREATE TABLE~:[~; IF NOT EXISTS~] ~a ~%(~%"
if-not-exists
(format-table-name table))
(let ((max (reduce #'max
(mapcar #'length
(mapcar #'column-name (table-column-list table))))))
(loop
:for (col . last?) :on (table-column-list table)
:for pg-coldef := (format-column col
:pretty-print t
:max-column-name-length max )
:do (format s " ~a~:[~;,~]~%" pg-coldef last?)))
(format s ");~%")))
(defun drop-table-if-exists-sql (table)
"Return the PostgreSQL DROP TABLE IF EXISTS statement for TABLE-NAME."
(format nil "DROP TABLE IF EXISTS ~a CASCADE;" (format-table-name table)))
;; now create the types
(loop :for sqltype :in sqltype-list
:when include-drop
:do (pgsql-execute (format-drop-sql sqltype :cascade t))
:do (pgsql-execute
(format-create-sql sqltype :if-not-exists if-not-exists)))))
(defun create-table-sql-list (table-list
&key
if-not-exists
include-drop)
&key
if-not-exists
include-drop)
"Return the list of CREATE TABLE statements to run against PostgreSQL."
(loop
:for table :in table-list
:for cols := (table-column-list table)
:for fields := (table-field-list table)
:for extra-types := (loop :for field :in fields
:append (format-extra-type
field :include-drop include-drop))
:for pre-extra-triggers
:= (when include-drop
(loop :for field :in fields
:append (format-extra-triggers table field :drop t)))
:for post-extra-triggers
:= (loop :for field :in fields
:append (format-extra-triggers table field))
(loop :for table :in table-list
:when include-drop
:collect (drop-table-if-exists-sql table)
:collect (format-drop-sql table :cascade t)
:when extra-types :append extra-types
:when pre-extra-triggers :append pre-extra-triggers
:collect (create-table-sql table :if-not-exists if-not-exists)
:when post-extra-triggers :append post-extra-triggers))
:collect (format-create-sql table :if-not-exists if-not-exists)))
(defun create-table-list (table-list
&key
@ -220,6 +120,23 @@
:include-drop include-drop
:client-min-messages client-min-messages))
(defun create-triggers (catalog &key (client-min-messages :notice))
"Create the catalog objects that come after the data has been loaded."
(let ((sql-list
(loop :for table :in (table-list catalog)
:do (process-triggers table)
:when (table-trigger-list table)
:append (loop :for trigger :in (table-trigger-list table)
:collect (format-create-sql (trigger-procedure trigger))
:collect (format-create-sql trigger)))))
(loop :for sql :in sql-list
:do (pgsql-execute sql :client-min-messages client-min-messages))))
;;;
;;; DDL Utilities: TRUNCATE, ENABLE/DISABLE triggers
;;;
(defun truncate-tables (pgconn catalog-or-table)
"Truncate given TABLE-NAME in database DBNAME"
(with-pgsql-transaction (:pgconn pgconn)
@ -260,136 +177,6 @@
(progn ,@forms)))
;;;
;;; Index support
;;;
(defstruct pgsql-index
;; the struct is used both for supporting new index creation from non
;; PostgreSQL system and for drop/create indexes when using the 'drop
;; indexes' option (in CSV mode and the like)
name schema table-oid primary unique columns sql conname condef filter)
(defgeneric format-pgsql-create-index (table index)
(:documentation
"Return the PostgreSQL command to define an Index."))
(defgeneric format-pgsql-drop-index (table index)
(:documentation
"Return the PostgreSQL command to drop an Index."))
(defgeneric translate-index-filter (table index sql-dialect)
(:documentation
"Translate the filter clause of INDEX in PostgreSQL slang."))
(defmethod translate-index-filter ((table table)
(index pgsql-index)
(sql-dialect t))
"Implement a default facility that does nothing."
nil)
(defmethod format-pgsql-create-index ((table table) (index pgsql-index))
"Generate the PostgreSQL statement list to rebuild a Foreign Key"
(let* ((index-name (if (and *preserve-index-names*
(not (string-equal "primary" (pgsql-index-name index)))
(pgsql-index-table-oid index))
(pgsql-index-name index)
;; in the general case, we build our own index name.
(format nil "idx_~a_~a"
(pgsql-index-table-oid index)
(pgsql-index-name index))))
(index-name (apply-identifier-case index-name)))
(cond
((or (pgsql-index-primary index)
(and (pgsql-index-condef index) (pgsql-index-unique index)))
(values
;; ensure good concurrency here, don't take the ACCESS EXCLUSIVE
;; LOCK on the table before we have the index done already
(or (pgsql-index-sql index)
(format nil
"CREATE UNIQUE INDEX ~@[~a.~]~a ON ~a (~{~a~^, ~})~@[ WHERE ~a~];"
(pgsql-index-schema index)
index-name
(format-table-name table)
(pgsql-index-columns index)
(pgsql-index-filter index)))
(format nil
;; don't use the index schema name here, PostgreSQL doesn't
;; like it, might be implicit from the table's schema
;; itself...
"ALTER TABLE ~a ADD ~a USING INDEX ~a;"
(format-table-name table)
(cond ((pgsql-index-primary index) "PRIMARY KEY")
((pgsql-index-unique index) "UNIQUE"))
index-name)))
((pgsql-index-condef index)
(format nil "ALTER TABLE ~a ADD ~a;"
(format-table-name table)
(pgsql-index-condef index)))
(t
(or (pgsql-index-sql index)
(format nil
"CREATE~:[~; UNIQUE~] INDEX ~@[~a.~]~a ON ~a (~{~a~^, ~})~@[ WHERE ~a~];"
(pgsql-index-unique index)
(pgsql-index-schema index)
index-name
(format-table-name table)
(pgsql-index-columns index)
(pgsql-index-filter index)))))))
(defmethod format-pgsql-drop-index ((table table) (index pgsql-index))
"Generate the PostgreSQL statement to DROP the index."
(let* ((schema-name (apply-identifier-case (pgsql-index-schema index)))
(index-name (apply-identifier-case (pgsql-index-name index))))
(cond ((pgsql-index-conname index)
;; here always quote the constraint name, currently the name
;; comes from one source only, the PostgreSQL database catalogs,
;; so don't question it, quote it.
(format nil "ALTER TABLE ~a DROP CONSTRAINT ~s;"
(format-table-name table)
(pgsql-index-conname index)))
(t
(format nil "DROP INDEX ~@[~a.~]~a;" schema-name index-name)))))
;;;
;;; API to rewrite index WHERE clauses (filter)
;;;
(defgeneric process-index-definitions (object &key sql-dialect)
(:documentation "Rewrite all indexes filters in given catalog OBJECT."))
(defmethod process-index-definitions ((catalog catalog) &key sql-dialect)
"Rewrite all index filters in CATALOG."
(loop :for schema :in (catalog-schema-list catalog)
:do (process-index-definitions schema :sql-dialect sql-dialect)))
(defmethod process-index-definitions ((schema schema) &key sql-dialect)
"Rewrite all index filters in CATALOG."
(loop :for table :in (schema-table-list schema)
:do (process-index-definitions table :sql-dialect sql-dialect)))
(defmethod process-index-definitions ((table table) &key sql-dialect)
"Rewrite all index filter in TABLE."
(loop :for index :in (table-index-list table)
:when (pgsql-index-filter index)
:do (let ((pg-filter
(handler-case
(translate-index-filter table index sql-dialect)
(condition (c)
(log-message :error
"Failed to translate index ~s on table ~s because of filter clause ~s"
(pgsql-index-name index)
(format-table-name table)
(pgsql-index-filter index))
(log-message :debug "filter translation error: ~a" c)
;; try to create the index without the WHERE clause...
(setf (pgsql-index-filter index) nil)))))
(log-message :info "tranlate-index-filter: ~s" pg-filter)
(setf (pgsql-index-filter index) pg-filter))))
;;;
;;; Parallel index building.
;;;
@ -403,7 +190,7 @@
:for index :in (table-index-list table)
:collect (multiple-value-bind (sql pkey)
;; we postpone the pkey upgrade of the index for later.
(format-pgsql-create-index table index)
(format-create-sql index)
(lp:submit-task channel
#'pgsql-connect-and-execute-with-timing
@ -430,9 +217,7 @@
:for table-name := (format-table-name table)
:for table-oid := (cdr (assoc table-name table-oids :test #'string=))
:unless table-oid :do (error "OID not found for ~s." table-name)
:do (setf (table-oid table) table-oid)
(loop :for index :in (table-index-list table)
:do (setf (pgsql-index-table-oid index) table-oid)))))
:do (setf (table-oid table) table-oid))))
;;;
;;; Drop indexes before loading
@ -441,7 +226,7 @@
"Drop indexes in PGSQL-INDEX-LIST. A PostgreSQL connection must already be
active when calling that function."
(loop :for index :in (table-index-list table)
:do (let ((sql (format-pgsql-drop-index table index)))
:do (let ((sql (format-drop-sql index)))
(pgsql-execute-with-timing section "drop indexes" sql))))
;;;
@ -451,14 +236,11 @@
"Drop the indexes for TABLE-NAME on TARGET PostgreSQL connection, and
returns a list of indexes to create again."
(with-pgsql-connection (target)
(let ((indexes (list-indexes table))
(let ((indexes (table-index-list table))
;; we get the list of indexes from PostgreSQL catalogs, so don't
;; question their spelling, just quote them.
(*identifier-case* :quote))
;; set the indexes list in the table structure
(setf (table-index-list table) indexes)
(cond ((and indexes (not drop-indexes))
(log-message :warning
"Target table ~s has ~d indexes defined against it."

View File

@ -111,7 +111,7 @@
pg-typemod)
:nullable (and (not set-not-null) (or (not source-not-null) drop-not-null))
:default (when (and source-default (not drop-default))
(format-default-value source-default using))
source-default)
:transform using)))
;; NO MATCH
@ -121,7 +121,7 @@
(make-column :name (apply-identifier-case source-column-name)
:type-name source-ctype
:nullable (not source-not-null)
:default (format-default-value source-default using)
:default source-default
:transform using))))
(defun apply-casting-rules (table-name column-name
@ -155,7 +155,7 @@
table-name column-name ctype default
(string= "NO" nullable)
(string/= "" extra) extra
(format-column coldef)
(format-create-sql coldef)
using)
(return coldef))))))

View File

@ -27,13 +27,19 @@
(with-pgsql-transaction (:pgconn (target-db copy))
;; we need to first drop the Foreign Key Constraints, so that we
;; can DROP TABLE when asked
(when (and foreign-keys include-drop)
(drop-pgsql-fkeys catalog))
;; (when (and foreign-keys include-drop)
;; (drop-pgsql-fkeys catalog))
(when create-schemas
(log-message :debug "Create schemas")
(create-schemas catalog :include-drop include-drop))
;; create new SQL types (ENUMs, SETs) if needed and before we get to
;; the table definitions that will use them
(log-message :debug "Create SQL types (enums, sets)")
(create-sqltypes catalog :include-drop include-drop)
;; now the tables
(log-message :debug "Create tables")
(create-tables catalog :include-drop include-drop)
@ -42,11 +48,11 @@
;; index name, to differenciate. Set the table oids now.
(when set-table-oids
(log-message :debug "Set table OIDs")
(set-table-oids catalog))
(pgloader.pgsql::set-table-oids catalog))
;; We might have to MATERIALIZE VIEWS
(when materialize-views
(log-message :debug "Create views for matview support")
(log-message :debug "Create tables for matview support")
(create-views catalog :include-drop include-drop)))))
(defmethod cleanup ((copy db-copy) (catalog catalog) &key materialize-views)
@ -92,6 +98,12 @@
(when (and foreign-keys (not data-only))
(create-pgsql-fkeys catalog))
;;
;; Triggers and stored procedures -- includes special default values
;;
(with-pgsql-transaction (:pgconn (target-db copy))
(create-triggers catalog))
;;
;; And now, comments on tables and columns.
;;

View File

@ -99,10 +99,20 @@
create-indexes reset-sequences materialize-views
set-table-oids including excluding))
;; this sets (table-index-list (target copy))
(maybe-drop-indexes (target-db copy)
(target copy)
:drop-indexes drop-indexes)
(let* ((pgsql-catalog
(fetch-pgsql-catalog (target-db copy) :table (target copy))))
(when (= 1 (count-tables pgsql-catalog))
;; we found the target table, grab its definition
(setf (target copy)
(first (schema-table-list
(first (catalog-schema-list pgsql-catalog))))))
;; this sets (table-index-list (target copy))
(maybe-drop-indexes (target-db copy)
(target copy)
:drop-indexes drop-indexes))
;; ensure we truncate only one
(when truncate

View File

@ -120,5 +120,24 @@
(unless (column-transform pgcol)
(setf (column-transform pgcol)
(lambda (val) (if val (format nil "~a" val) :null))))
;; normalize default values
;; see *pgsql-default-values*
(setf (column-default pgcol)
(cond ((null default) :null)
((and (stringp default) (string= "NULL" default)) :null)
((and (stringp default)
;; address CURRENT_TIMESTAMP(6) and other spellings
(or (uiop:string-prefix-p "CURRENT_TIMESTAMP" default)
(string= "CURRENT TIMESTAMP" default)))
:current-timestamp)
((and (stringp default)
(or (string= "newid()" default)
(string= "newsequentialid()" default)))
:generate-uuid)
(t default)))
pgcol)))

View File

@ -7,7 +7,7 @@
(in-package #:pgloader.mssql.index-filter)
(defmethod translate-index-filter ((table table)
(index pgsql-index)
(index index)
(sql-dialect (eql 'copy-mssql)))
"Transform given MS SQL index filter to PostgreSQL slang."
(labels ((process-expr (expression)
@ -27,8 +27,8 @@
(mapcar (column-transform col) (rest argument)))
(t
(funcall (column-transform col) argument))))))))
(when (pgsql-index-filter index)
(let* ((raw-expr (parse-index-filter-clause (pgsql-index-filter index)))
(when (index-filter index)
(let* ((raw-expr (parse-index-filter-clause (index-filter index)))
(pg-expr
(loop :for node :in raw-expr
:collect (typecase node

View File

@ -197,14 +197,16 @@ order by SchemaName,
:do
(let* ((schema (find-schema catalog schema-name))
(table (find-table schema table-name))
(pg-index (make-pgsql-index :name index-name
:primary (= pkey 1)
:unique (= unique 1)
:columns nil
:filter filter))
(pg-index (make-index :name index-name
:schema schema
:table table
:primary (= pkey 1)
:unique (= unique 1)
:columns nil
:filter filter))
(index (maybe-add-index table index-name pg-index
:key #'pgloader.pgsql::pgsql-index-name)))
(push-to-end col (pgloader.pgsql::pgsql-index-columns index)))
(push-to-end col (index-columns index)))
:finally (return catalog)))
(defun list-all-fkeys (catalog &key including excluding)
@ -259,15 +261,15 @@ ORDER BY KCU1.CONSTRAINT_NAME, KCU1.ORDINAL_POSITION"
(fschema (find-schema catalog fschema-name))
(ftable (find-table fschema ftable-name))
(pg-fkey
(make-pgsql-fkey :name fkey-name
:table table
:columns nil
:foreign-table ftable
:foreign-columns nil))
(make-fkey :name fkey-name
:table table
:columns nil
:foreign-table ftable
:foreign-columns nil))
(fkey (maybe-add-fkey table fkey-name pg-fkey
:key #'pgloader.pgsql::pgsql-fkey-name)))
(push-to-end col (pgloader.pgsql::pgsql-fkey-columns fkey))
(push-to-end fcol (pgloader.pgsql::pgsql-fkey-foreign-columns fkey)))
(push-to-end col (fkey-columns fkey))
(push-to-end fcol (fkey-foreign-columns fkey)))
:finally (return catalog)))

View File

@ -4,39 +4,9 @@
(in-package :pgloader.mysql)
;;;
;;; Some functions to deal with ENUM and SET types
;;;
(defun explode-mysql-enum (ctype)
"Convert MySQL ENUM expression into a list of labels."
(cl-ppcre:register-groups-bind (list)
("(?i)(?:ENUM|SET)\\s*\\((.*)\\)" ctype)
(first (cl-csv:read-csv list :separator #\, :quote #\' :escape "\\"))))
(defun get-enum-type-name (table-name column-name)
"Return the Type Name we're going to use in PostgreSQL."
(apply-identifier-case (format nil "~a_~a" table-name column-name)))
(defun get-create-enum (table-name column-name ctype)
"Return a PostgreSQL CREATE ENUM TYPE statement from MySQL enum column."
(with-output-to-string (s)
(format s "CREATE TYPE ~a AS ENUM (~{'~a'~^, ~});"
(get-enum-type-name table-name column-name)
(explode-mysql-enum ctype))))
(defun cast-enum (table-name column-name type ctype typemod)
"Cast MySQL inline ENUM type to using a PostgreSQL named type.
The type naming is hardcoded to be table-name_column-name"
(defun enum-or-set-name (table-name column-name type ctype typemod)
(declare (ignore type ctype typemod))
(get-enum-type-name table-name column-name))
(defun cast-set (table-name column-name type ctype typemod)
"Cast MySQL inline SET type to using a PostgreSQL ENUM Array.
The ENUM data type name is hardcoded to be table-name_column-name"
(declare (ignore type ctype typemod))
(format nil "\"~a_~a\"[]" table-name column-name))
(string-downcase (format nil "~a_~a" table-name column-name)))
;;;
;;; The default MySQL Type Casting Rules
@ -157,10 +127,10 @@
;; Inline MySQL "interesting" datatype
(:source (:type "enum")
:target (:type ,#'cast-enum))
:target (:type ,#'enum-or-set-name))
(:source (:type "set")
:target (:type ,#'cast-set)
:target (:type ,#'enum-or-set-name)
:using pgloader.transforms::set-to-enum-array)
;; geometric data types, just POINT for now
@ -178,72 +148,62 @@
(table-name name comment dtype ctype default nullable extra)))
table-name name dtype ctype default nullable extra comment)
(defmethod format-extra-type ((col mysql-column) &key include-drop)
"Return a string representing the extra needed PostgreSQL CREATE TYPE
statement, if such is needed"
(let ((dtype (mysql-column-dtype col)))
(when (or (string-equal "enum" dtype)
(string-equal "set" dtype))
(list
(when include-drop
(let* ((type-name
(get-enum-type-name (mysql-column-table-name col)
(mysql-column-name col))))
(format nil "DROP TYPE IF EXISTS ~a CASCADE;" type-name)))
(get-create-enum (mysql-column-table-name col)
(mysql-column-name col)
(mysql-column-ctype col))))))
(defmethod format-extra-triggers ((table table) (col mysql-column) &key drop)
"Return a list of string representing the extra SQL commands needed to
implement some MySQL features as PostgreSQL triggers, such as on update
CURRENT_TIMESTAMP.
When drop is t, only output the DROP sql statements."
(when (string= (mysql-column-extra col) "on update CURRENT_TIMESTAMP")
(let* ((field-pos (position (mysql-column-name col)
(table-field-list table)
:key #'mysql-column-name
:test #'string=))
(col-name (funcall #'column-name
(nth field-pos (table-column-list table))))
(fun-name (format nil "on_update_current_timestamp_~a" col-name))
(update-fun-sql
(format nil "
CREATE OR REPLACE FUNCTION ~a()
RETURNS TRIGGER
LANGUAGE plpgsql
AS $$
BEGIN
NEW.~a = now();
RETURN NEW;
END;
$$;"
fun-name col-name))
(trigger-sql
(format nil "
CREATE TRIGGER on_update_current_timestamp
BEFORE UPDATE ON ~a
FOR EACH ROW EXECUTE PROCEDURE ~a();"
(format-table-name table) fun-name)))
(if drop
(list
;; don't DROP the function here, because it might be shared by
;; several tables with "on update" rules on a field sharing the
;; same name (such as "last_update").
;; the CREATE OR REPLACE FUNCTION will then be innoffensive.
(format nil "DROP TRIGGER IF EXISTS on_update_current_timestamp ON ~a;"
(mysql-column-table-name col)))
(list update-fun-sql trigger-sql)))))
(defun explode-mysql-enum (ctype)
"Convert MySQL ENUM expression into a list of labels."
(cl-ppcre:register-groups-bind (list)
("(?i)(?:ENUM|SET)\\s*\\((.*)\\)" ctype)
(first (cl-csv:read-csv list :separator #\, :quote #\' :escape "\\"))))
(defmethod cast ((col mysql-column))
"Return the PostgreSQL type definition from given MySQL column definition."
(with-slots (table-name name dtype ctype default nullable extra comment)
col
(let ((pgcol
(apply-casting-rules table-name name dtype ctype default nullable extra)))
(let* ((pgcol
(apply-casting-rules table-name name dtype ctype default nullable extra)))
(setf (column-comment pgcol) comment)
;; normalize default values
(setf (column-default pgcol)
(cond ((and (stringp default) (string= "NULL" default)) :null)
((and (stringp default)
;; address CURRENT_TIMESTAMP(6) and other spellings
(or (uiop:string-prefix-p "CURRENT_TIMESTAMP" default)
(string= "CURRENT TIMESTAMP" default)))
:current-timestamp)
(t default)))
;; extra user-defined data types
(when (or (string-equal "set" dtype)
(string-equal "enum" dtype))
;;
;; SET and ENUM both need more care, if the target is PostgreSQL we
;; need to create per-schema user defined data types that match the
;; column local definition here.
;;
(let ((sqltype-name (enum-or-set-name table-name
(column-name pgcol)
dtype
ctype
nil)))
(setf (column-type-name pgcol)
(make-sqltype :name sqltype-name
:type (intern (string-upcase dtype)
(find-package "KEYWORD"))
:source-def ctype
:extra (explode-mysql-enum ctype)))))
;; extra triggers
;;
;; See the generic function `post-process-catalog' for the next step.
;;
(when (string= extra "on update CURRENT_TIMESTAMP")
(let* ((pro-name (format nil
"on_update_current_timestamp_~a"
(column-name pgcol))))
(setf (column-extra pgcol)
(make-trigger :name :on-update-current-timestamp
:action "BEFORE UPDATE"
:procedure-name pro-name))))
pgcol)))

View File

@ -221,24 +221,26 @@ order by table_name, ordinal_position"
~:[~*~;and (~{table_name ~a~^ or ~})~]
~:[~*~;and (~{table_name ~a~^ and ~})~]
GROUP BY table_name, index_name;"
(db-name *connection*)
only-tables ; do we print the clause?
only-tables
including ; do we print the clause?
(filter-list-to-where-clause including)
excluding ; do we print the clause?
(filter-list-to-where-clause excluding t)))
:do (let ((table (find-table schema table-name))
(index
(make-pgsql-index :name name ; further processing is needed
:primary (string= name "PRIMARY")
:unique (string= "0" non-unique)
:columns (mapcar
#'apply-identifier-case
(sq:split-sequence #\, cols)))))
(db-name *connection*)
only-tables ; do we print the clause?
only-tables
including ; do we print the clause?
(filter-list-to-where-clause including)
excluding ; do we print the clause?
(filter-list-to-where-clause excluding t)))
:do (let* ((table (find-table schema table-name))
(index
(make-index :name name ; further processing is needed
:schema schema
:table table
:primary (string= name "PRIMARY")
:unique (string= "0" non-unique)
:columns (mapcar
#'apply-identifier-case
(sq:split-sequence #\, cols)))))
(add-index table index))
:finally
(return schema)))
(return schema)))
;;;
;;; MySQL Foreign Keys
@ -296,16 +298,16 @@ FROM
:do (let* ((table (find-table schema table-name))
(ftable (find-table schema ftable-name))
(fk
(make-pgsql-fkey :name (apply-identifier-case name)
:table table
:columns (mapcar #'apply-identifier-case
(sq:split-sequence #\, cols))
:foreign-table ftable
:foreign-columns (mapcar
#'apply-identifier-case
(sq:split-sequence #\, fcols))
:update-rule update-rule
:delete-rule delete-rule)))
(make-fkey :name (apply-identifier-case name)
:table table
:columns (mapcar #'apply-identifier-case
(sq:split-sequence #\, cols))
:foreign-table ftable
:foreign-columns (mapcar
#'apply-identifier-case
(sq:split-sequence #\, fcols))
:update-rule update-rule
:delete-rule delete-rule)))
(if (and name table ftable)
(add-fkey table fk)
(log-message :error

View File

@ -91,5 +91,14 @@
(setf (column-transform pgcol)
(lambda (val) (if val (format nil "~a" val) :null))))
(setf (column-default pgcol)
(cond ((and (stringp default) (string= "NULL" default)) :null)
((and (stringp default)
;; address CURRENT_TIMESTAMP(6) and other spellings
(or (uiop:string-prefix-p "CURRENT_TIMESTAMP" default)
(string= "CURRENT TIMESTAMP" default)))
:current-timestamp)
(t default)))
pgcol)))

View File

@ -120,10 +120,11 @@
(loop
:for (seq index-name unique origin partial) :in (sqlite:execute-to-list db sql)
:do (let* ((cols (list-index-cols index-name db))
(index (make-pgsql-index :name index-name
:primary (is-index-pk table cols)
:unique (= unique 1)
:columns cols)))
(index (make-index :name index-name
:table table
:primary (is-index-pk table cols)
:unique (= unique 1)
:columns cols)))
(add-index table index)))))
(defun list-all-indexes (schema &key (db *sqlite-db*))
@ -147,17 +148,17 @@
:do (let* ((ftable (find-table (table-schema table) ftable-name))
(fkey (or (gethash id fkey-table)
(let ((pg-fkey
(make-pgsql-fkey :table table
:columns nil
:foreign-table ftable
:foreign-columns nil
:update-rule on-update
:delete-rule on-delete)))
(make-fkey :table table
:columns nil
:foreign-table ftable
:foreign-columns nil
:update-rule on-update
:delete-rule on-delete)))
(setf (gethash id fkey-table) pg-fkey)
(add-fkey table pg-fkey)
pg-fkey))))
(push-to-end from (pgsql-fkey-columns fkey))
(push-to-end to (pgsql-fkey-foreign-columns fkey))))))
(push-to-end from (fkey-columns fkey))
(push-to-end to (fkey-foreign-columns fkey))))))
(defun list-all-fkeys (schema &key (db *sqlite-db*))
"Get the list of SQLite foreign keys definitions per table."

View File

@ -2,7 +2,7 @@
;;; ALTER TABLE allows pgloader to apply transformations on the catalog
;;; retrieved before applying it to PostgreSQL: SET SCHEMA, RENAME, etc.
;;;
(in-package :pgloader.schema)
(in-package :pgloader.catalog)
#|
See src/parsers/command-alter-table.lisp

View File

@ -8,8 +8,11 @@
;;; Utility function using those definitions are found in schema.lisp in the
;;; same directory.
;;;
(in-package :pgloader.schema)
(in-package :pgloader.catalog)
;;;
;;; A macro to ease writing the catalog handling API
;;;
(defmacro push-to-end (item place)
`(progn
(setf ,place (nconc ,place (list ,item)))
@ -17,10 +20,20 @@
,item))
;;;
;;; TODO: stop using anonymous data structures for database catalogs,
;;; currently list of alists of lists... the madness has found its way in
;;; lots of places tho.
;;; One interesting thing to do to all those catalog objects is to be able
;;; to print the DDL commands out: CREATE and DROP SQL statements.
;;;
(defgeneric format-create-sql (object &key stream if-not-exists)
(:documentation "Generate proper SQL command to create OBJECT in
PostgreSQL. The output is written to STREAM."))
(defgeneric format-drop-sql (object &key stream cascade)
(:documentation "Generate proper SQL command to drop OBJECT in PostgreSQL.
The output is written to STREAM."))
(defgeneric format-default-value (column &key stream)
(:documentation "Generate proper value to be used as a default value for
given COLUMN in PostgreSQL. The output is written to STREAM."))
;;;
;;; A database catalog is a list of schema each containing a list of tables,
@ -34,19 +47,38 @@
(defstruct table source-name name schema oid comment
;; field is for SOURCE
;; column is for TARGET
field-list column-list index-list fkey-list)
field-list column-list index-list fkey-list trigger-list)
;;;
;;; When migrating from another database to PostgreSQL some data types might
;;; need to be tranformed dynamically into User Defined Types: ENUMs, SET,
;;; etc.
;;;
(defstruct sqltype name type source-def extra)
;;;
;;; The generic PostgreSQL column that the CAST generic function is asked to
;;; produce, so that we know how to CREATE TABLEs in PostgreSQL whatever the
;;; source is.
;;;
(defstruct column name type-name type-mod nullable default comment transform)
(defstruct column name type-name type-mod nullable default comment transform extra)
;;; those are currently defined in ./schema.lisp
;; (defstruct index name primary unique columns sql conname condef)
;; (defstruct fkey
;; name columns foreign-table foreign-columns update-rule delete-rule)
;;;
;;; Index and Foreign Keys
;;;
(defstruct fkey
name table columns foreign-table foreign-columns condef
update-rule delete-rule match-rule deferrable initially-deferred)
(defstruct index
name schema table primary unique columns sql conname condef filter)
;;;
;;; Triggers and trigger procedures, no args support (yet?)
;;;
(defstruct trigger name table action procedure-name procedure)
(defstruct procedure name returns language body)
;;;
;;; Main data collection API
@ -164,12 +196,13 @@
(apply-identifier-case schema-name)))))
(push-to-end schema (catalog-schema-list catalog))))
(defmethod add-table ((schema schema) table-name &key comment)
(defmethod add-table ((schema schema) table-name &key comment oid)
"Add TABLE-NAME to SCHEMA and return the new table instance."
(let ((table
(make-table :source-name table-name
:name (apply-identifier-case table-name)
:schema schema
:oid oid
:comment (unless (or (null comment) (string= "" comment))
comment))))
(push-to-end table (schema-table-list schema))))
@ -205,11 +238,11 @@
(let ((schema (find-schema catalog schema-name)))
(or schema (add-schema catalog schema-name))))
(defmethod maybe-add-table ((schema schema) table-name &key comment)
(defmethod maybe-add-table ((schema schema) table-name &key comment oid)
"Add TABLE-NAME to the table-list for SCHEMA, or return the existing table
of the same name if it already exists in the schema table-list."
(let ((table (find-table schema table-name)))
(or table (add-table schema table-name :comment comment))))
(or table (add-table schema table-name :oid oid :comment comment))))
(defmethod maybe-add-view ((schema schema) view-name &key comment)
"Add TABLE-NAME to the table-list for SCHEMA, or return the existing table
@ -360,3 +393,4 @@
(pgloader.pgsql:pgsql-execute sql)))
(table-name ,table-name))))
,@body)))