diff --git a/pgloader.asd b/pgloader.asd index ea56386..62f8b14 100644 --- a/pgloader.asd +++ b/pgloader.asd @@ -64,23 +64,23 @@ ;; PostgreSQL related utils (:file "read-sql-files") (:file "quoting") - (:file "schema-structs" :depends-on ("quoting")) - (:file "alter-table" :depends-on ("schema-structs")) + (:file "catalog" :depends-on ("quoting")) + (:file "alter-table" :depends-on ("catalog")) ;; State, monitoring, reporting (:file "reject" :depends-on ("state")) (:file "report" :depends-on ("state" "utils" - "schema-structs")) + "catalog")) (:file "monitor" :depends-on ("logs" "state" "reject" "report")) (:file "archive" :depends-on ("logs")) - (:file "pg-format-column" :depends-on ("schema-structs" - "monitor" - "state")) + ;; (:file "pg-format-column" :depends-on ("catalog" + ;; "monitor" + ;; "state")) ;; generic connection api (:file "connection" :depends-on ("archive")))) @@ -88,12 +88,20 @@ ;; package pgloader.pgsql (:module pgsql :depends-on ("package" "params" "utils") + :serial t :components ((:file "copy-format") + (:file "connection") + (:file "pgsql-ddl") + (:file "pgsql-schema") + (:file "pgsql-trigger") + (:file "pgsql-index-filter") (:file "queries") - (:file "schema") - (:file "pgsql" + (:file "schema" :depends-on ("pgsql-trigger")) + (:file "retry-batch") + (:file "copy-from-queue" :depends-on ("copy-format" + "retry-batch" "queries" "schema")))) diff --git a/src/package.lisp b/src/package.lisp index feddb52..672b6c2 100644 --- a/src/package.lisp +++ b/src/package.lisp @@ -2,6 +2,13 @@ ;;; ;;; To avoid circular files dependencies, define all the packages here ;;; +(eval-when (:compile-toplevel :load-toplevel :execute) + (defun cl-user::export-inherited-symbols (source target) + (let ((pkg-source (find-package (string-upcase source))) + (pkg-target (find-package (string-upcase target)))) + (do-external-symbols (s pkg-source) + (export s pkg-target))))) + (defpackage #:pgloader.transforms (:use #:cl) (:export #:precision @@ -27,15 +34,21 @@ (:use #:cl #:pgloader.params) (:export #:apply-identifier-case)) -(defpackage #:pgloader.schema +(defpackage #:pgloader.catalog (:use #:cl #:pgloader.params #:pgloader.quoting) - (:export #:catalog + (:export #:format-create-sql + #:format-drop-sql + #:format-default-value + + #:catalog #:schema #:table + #:sqltype #:column #:index - #:index #:fkey + #:trigger + #:procedure #:cast ; generic function for sources @@ -46,17 +59,22 @@ #:make-table #:create-table #:make-view + #:make-sqltype #:make-column #:make-index - #:make-index #:make-fkey + #:make-trigger + #:make-procedure #:catalog-name #:catalog-schema-list + #:schema-name + #:schema-catalog #:schema-source-name #:schema-table-list #:schema-view-list + #:table-name #:table-source-name #:table-schema @@ -66,6 +84,13 @@ #:table-column-list #:table-index-list #:table-fkey-list + #:table-trigger-list + + #:sqltype-name + #:sqltype-type + #:sqltype-source-def + #:sqltype-extra + #:column-name #:column-type-name #:column-type-mod @@ -73,6 +98,41 @@ #:column-default #:column-comment #:column-transform + #:column-extra + + #:index-name + #:index-schema + #:index-table + #:index-primary + #:index-unique + #:index-columns + #:index-sql + #:index-conname + #:index-condef + #:index-filter + + #:fkey-name + #:fkey-foreign-table + #:fkey-foreign-columns + #:fkey-table + #:fkey-columns + #:fkey-condef + #:fkey-update-rule + #:fkey-delete-rule + #:fkey-match-rule + #:fkey-deferrable + #:fkey-initially-deferred + + #:trigger-name + #:trigger-table + #:trigger-action + #:trigger-procedure-name + #:trigger-procedure + + #:procedure-name + #:procedure-returns + #:procedure-language + #:procedure-body #:table-list #:view-list @@ -107,7 +167,7 @@ #:format-table-name)) (defpackage #:pgloader.state - (:use #:cl #:pgloader.params #:pgloader.schema) + (:use #:cl #:pgloader.params #:pgloader.catalog) (:export #:make-pgstate #:pgstate-tabnames #:pgstate-tables @@ -153,32 +213,15 @@ (defpackage #:pgloader.utils (:use #:cl - #:pgloader.params #:pgloader.schema - #:pgloader.monitor #:pgloader.state) + #:pgloader.params #:pgloader.catalog #:pgloader.monitor #:pgloader.state) (:import-from #:alexandria #:appendf #:read-file-into-string) - (:export #:with-monitor ; monitor - #:*monitoring-queue* - #:with-stats-collection - #:elapsed-time-since - #:timing - - ;; bits from alexandria + (:export ;; bits from alexandria #:appendf #:read-file-into-string - ;; state - #:make-pgstate - #:pgstate-tabnames - - ;; events - #:log-message - #:new-label - #:update-stats - #:process-bad-row - ;; utils #:format-interval #:camelCase-to-colname @@ -194,85 +237,11 @@ #:make-external-format ;; quoting - #:apply-identifier-case + #:apply-identifier-case)) - ;; schema - #:catalog - #:schema - #:table - #:column - #:index - #:index - #:fkey - - #:cast ; generic function for sources - - #:make-catalog - #:make-schema - #:make-table - #:create-table - #:make-view - #:make-column - #:make-index - #:make-index - #:make-fkey - - #:catalog-name - #:catalog-schema-list - #:schema-name - #:schema-source-name - #:schema-table-list - #:schema-view-list - #:table-name - #:table-source-name - #:table-schema - #:table-oid - #:table-comment - #:table-field-list - #:table-column-list - #:table-index-list - #:table-fkey-list - #:column-name - #:column-type-name - #:column-type-mod - #:column-type-nullable - #:column-default - #:column-comment - #:column-transform - - #:table-list - #:view-list - #:add-schema - #:find-schema - #:maybe-add-schema - #:add-table - #:find-table - #:maybe-add-table - #:add-view - #:find-view - #:maybe-add-view - #:add-field - #:add-column - #:add-index - #:find-index - #:maybe-add-index - #:add-fkey - #:find-fkey - #:maybe-add-fkey - #:count-tables - #:count-views - #:count-indexes - #:count-fkeys - #:max-indexes-per-table - - #:push-to-end - #:with-schema - #:alter-table - #:alter-schema - - #:format-table-name - #:format-default-value - #:format-column)) +(cl-user::export-inherited-symbols "pgloader.catalog" "pgloader.utils") +(cl-user::export-inherited-symbols "pgloader.monitor" "pgloader.utils") +(cl-user::export-inherited-symbols "pgloader.state" "pgloader.utils") (defpackage #:pgloader.batch (:use #:cl #:pgloader.params #:pgloader.monitor) @@ -346,7 +315,7 @@ (defpackage #:pgloader.pgsql (:use #:cl #:pgloader.params #:pgloader.utils #:pgloader.connection - #:pgloader.schema) + #:pgloader.catalog) (:export #:pgsql-connection #:pgconn-use-ssl #:pgconn-table-name @@ -356,40 +325,36 @@ #:pgsql-execute-with-timing #:pgsql-connect-and-execute-with-timing - ;; PostgreSQL schema facilities + ;; postgresql schema facilities #:truncate-tables #:copy-from-file #:copy-from-queue #:reset-all-sequences + + #:create-sqltypes #:create-schemas #:create-tables #:create-views - #:format-pgsql-column - #:format-extra-type - #:format-extra-triggers - #:make-pgsql-fkey - #:pgsql-fkey-columns - #:pgsql-fkey-foreign-columns - #:format-pgsql-create-fkey - #:format-pgsql-drop-fkey #:drop-pgsql-fkeys #:create-pgsql-fkeys - #:pgsql-index - #:pgsql-index-filter - #:make-pgsql-index - #:index-table-name + #:create-triggers + #:translate-index-filter #:process-index-definitions - #:format-pgsql-create-index + + #:fetch-pgsql-catalog #:create-indexes-in-kernel - #:set-table-oids #:drop-indexes #:maybe-drop-indexes #:create-indexes-again #:reset-sequences #:comment-on-tables-and-columns - ;; PostgreSQL introspection queries + ;; index filter rewriting support + #:translate-index-filter + #:process-index-definitions + + ;; postgresql introspection queries #:list-databases #:list-tables #:list-columns-query @@ -399,13 +364,13 @@ #:list-tables-and-fkeys #:list-table-oids - ;; PostgreSQL Identifiers + ;; postgresql identifiers #:list-reserved-keywords - ;; PostgreSQL user provided GUCs + ;; postgresql user provided gucs #:sanitize-user-gucs - ;; PostgreSQL data format + ;; postgresql data format #:get-date-columns #:format-vector-row)) @@ -425,7 +390,7 @@ #:md-copy #:db-copy - ;; Accessors + ;; accessors #:source-db #:target-db #:source @@ -437,7 +402,7 @@ #:skip-lines #:header - ;; Main protocol/API + ;; main protocol/api #:map-rows #:copy-column-list #:queue-raw-data @@ -446,7 +411,7 @@ #:copy-to #:copy-database - ;; md-copy protocol/API + ;; md-copy protocol/api #:parse-header #:process-rows @@ -465,7 +430,7 @@ #:complete-pgsql-database #:end-kernels - ;; file based utils for CSV, fixed etc + ;; file based utils for csv, fixed etc #:with-open-file-or-stream #:get-pathname #:project-fields @@ -479,7 +444,7 @@ ;;; -;;; Other utilities +;;; other utilities ;;; (defpackage #:pgloader.ini (:use #:cl #:pgloader.params #:pgloader.utils #:pgloader.connection) @@ -500,7 +465,7 @@ ;; -;; Specific source handling +;; specific source handling ;; (defpackage #:pgloader.csv (:use #:cl @@ -558,7 +523,6 @@ #:pgsql-execute #:pgsql-execute-with-timing #:create-tables - #:format-pgsql-column #:format-vector-row) (:export #:ixf-connection #:copy-ixf @@ -574,7 +538,6 @@ #:pgsql-execute #:pgsql-execute-with-timing #:create-tables - #:format-pgsql-column #:format-vector-row) (:export #:dbf-connection #:copy-db3 @@ -597,18 +560,9 @@ #:create-tables #:create-views #:truncate-tables - #:format-pgsql-column - #:format-extra-type - #:format-extra-triggers - #:make-pgsql-fkey - #:format-pgsql-create-fkey - #:format-pgsql-drop-fkey #:drop-pgsql-fkeys #:create-pgsql-fkeys - #:make-pgsql-index - #:format-pgsql-create-index #:create-indexes-in-kernel - #:set-table-oids #:format-vector-row #:reset-sequences #:comment-on-tables-and-columns) @@ -637,15 +591,7 @@ #:pgsql-execute-with-timing #:create-tables #:truncate-tables - #:format-pgsql-column - #:make-pgsql-index - #:index-table-name - #:format-pgsql-create-index #:create-indexes-in-kernel - #:make-pgsql-fkey - #:pgsql-fkey-columns - #:pgsql-fkey-foreign-columns - #:set-table-oids #:reset-sequences #:comment-on-tables-and-columns) (:export #:sqlite-connection @@ -672,18 +618,9 @@ #:list-table-oids #:create-tables #:create-views - #:truncate-tables - #:format-pgsql-column - #:format-extra-type - #:make-pgsql-fkey - #:format-pgsql-create-fkey - #:format-pgsql-drop-fkey #:drop-pgsql-fkeys #:create-pgsql-fkeys - #:make-pgsql-index - #:format-pgsql-create-index #:create-indexes-in-kernel - #:set-table-oids #:format-vector-row #:reset-sequences) (:export #:mssql-connection @@ -698,8 +635,6 @@ (defpackage #:pgloader.mssql.index-filter (:use #:cl #:esrap #:pgloader.utils #:pgloader.mssql) (:import-from #:pgloader.pgsql - #:pgsql-index - #:pgsql-index-filter #:translate-index-filter)) (defpackage #:pgloader.syslog @@ -713,7 +648,7 @@ ;;; -;;; The Command Parser +;;; the command parser ;;; (defpackage #:pgloader.parser (:use #:cl #:esrap #:metabang.bind @@ -755,7 +690,7 @@ (:export #:parse-commands #:parse-commands-from-file - ;; tools to enable complete CLI parsing in main.lisp + ;; tools to enable complete cli parsing in main.lisp #:process-relative-pathnames #:parse-source-string #:parse-source-string-for-type @@ -793,7 +728,7 @@ ;; -;; Main package +;; main package ;; (defpackage #:pgloader (:use #:cl diff --git a/src/parsers/command-alter-table.lisp b/src/parsers/command-alter-table.lisp index 9cea3b0..8da97a7 100644 --- a/src/parsers/command-alter-table.lisp +++ b/src/parsers/command-alter-table.lisp @@ -34,12 +34,12 @@ (defrule rename-to (and kw-rename kw-to quoted-namestring) (:lambda (stmt) (bind (((_ _ new-name) stmt)) - (list #'pgloader.schema::alter-table-rename new-name)))) + (list #'pgloader.catalog::alter-table-rename new-name)))) (defrule set-schema (and kw-set kw-schema quoted-namestring) (:lambda (stmt) (bind (((_ _ schema) stmt)) - (list #'pgloader.schema::alter-table-set-schema schema)))) + (list #'pgloader.catalog::alter-table-set-schema schema)))) (defrule alter-table-action (or rename-to set-schema)) @@ -51,7 +51,7 @@ (destructuring-bind (match-rule-target-list schema action) alter-table-command (loop :for match-rule-target :in match-rule-target-list - :collect (pgloader.schema::make-match-rule + :collect (pgloader.catalog::make-match-rule :type (first match-rule-target) :target (second match-rule-target) :schema schema @@ -73,10 +73,10 @@ kw-rename kw-to quoted-namestring) (:lambda (alter-schema-command) (bind (((_ _ current-name _ _ new-name) alter-schema-command)) - (pgloader.schema::make-match-rule + (pgloader.catalog::make-match-rule :type :string :target current-name - :action #'pgloader.schema::alter-schema-rename + :action #'pgloader.catalog::alter-schema-rename :args (list new-name))))) ;;; currently we only support a single ALTER SCHEMA variant diff --git a/src/pgsql/connection.lisp b/src/pgsql/connection.lisp new file mode 100644 index 0000000..d5c7acc --- /dev/null +++ b/src/pgsql/connection.lisp @@ -0,0 +1,379 @@ +;;; +;;; Tools to handle PostgreSQL queries +;;; +(in-package :pgloader.pgsql) + +;;; +;;; PostgreSQL Tools connecting to a database +;;; +(defclass pgsql-connection (db-connection) + ((use-ssl :initarg :use-ssl :accessor pgconn-use-ssl) + (table-name :initarg :table-name :accessor pgconn-table-name)) + (:documentation "PostgreSQL connection for pgloader")) + +(defmethod initialize-instance :after ((pgconn pgsql-connection) &key) + "Assign the type slot to pgsql." + (setf (slot-value pgconn 'type) "pgsql")) + +(defmethod clone-connection ((c pgsql-connection)) + (let ((clone + (change-class (call-next-method c) 'pgsql-connection))) + (setf (pgconn-use-ssl clone) (pgconn-use-ssl c) + (pgconn-table-name clone) (pgconn-table-name c)) + clone)) + +(defmethod ssl-enable-p ((pgconn pgsql-connection)) + "Return non-nil when the connection uses SSL" + (member (pgconn-use-ssl pgconn) '(:try :yes))) + +(defun new-pgsql-connection (pgconn) + "Prepare a new connection object with all the same properties as pgconn, + so as to avoid stepping on it's handle" + (make-instance 'pgsql-connection + :user (db-user pgconn) + :pass (db-pass pgconn) + :host (db-host pgconn) + :port (db-port pgconn) + :name (db-name pgconn) + :use-ssl (pgconn-use-ssl pgconn) + :table-name (pgconn-table-name pgconn))) + +;;; +;;; Implement SSL Client Side certificates +;;; http://www.postgresql.org/docs/current/static/libpq-ssl.html#LIBPQ-SSL-FILE-USAGE +;;; +(defvar *pgsql-client-certificate* "~/.postgresql/postgresql.crt" + "File where to read the PostgreSQL Client Side SSL Certificate.") + +(defvar *pgsql-client-key* "~/.postgresql/postgresql.key" + "File where to read the PostgreSQL Client Side SSL Private Key.") + +;;; +;;; We need to distinguish some special cases of PostgreSQL errors within +;;; Class 53 — Insufficient Resources: in case of "too many connections" we +;;; typically want to leave room for another worker to finish and free one +;;; connection, then try again. +;;; +;;; http://www.postgresql.org/docs/9.4/interactive/errcodes-appendix.html +;;; +;;; The "leave room to finish and try again" heuristic is currently quite +;;; simplistic, but at least it work in my test cases. +;;; +(cl-postgres-error::deferror "53300" + too-many-connections cl-postgres-error:insufficient-resources) +(cl-postgres-error::deferror "53400" + configuration-limit-exceeded cl-postgres-error:insufficient-resources) + +(defvar *retry-connect-times* 5 + "How many times to we try to connect again.") + +(defvar *retry-connect-delay* 0.5 + "How many seconds to wait before trying to connect again.") + +(defmethod open-connection ((pgconn pgsql-connection) &key username) + "Open a PostgreSQL connection." + (let* (#+unix + (cl-postgres::*unix-socket-dir* (get-unix-socket-dir pgconn)) + (crt-file (expand-user-homedir-pathname *pgsql-client-certificate*)) + (key-file (expand-user-homedir-pathname *pgsql-client-key*)) + (pomo::*ssl-certificate-file* (when (and (ssl-enable-p pgconn) + (probe-file crt-file)) + (uiop:native-namestring crt-file))) + (pomo::*ssl-key-file* (when (and (ssl-enable-p pgconn) + (probe-file key-file)) + (uiop:native-namestring key-file)))) + (flet ((connect (pgconn username) + (handler-case + ;; in some cases (client_min_messages set to debug5 + ;; for example), PostgreSQL might send us some + ;; WARNINGs already when opening a new connection + (handler-bind ((cl-postgres:postgresql-warning + #'(lambda (w) + (log-message :warning "~a" w) + (muffle-warning)))) + (pomo:connect (db-name pgconn) + (or username (db-user pgconn)) + (db-pass pgconn) + (let ((host (db-host pgconn))) + (if (and (consp host) (eq :unix (car host))) + :unix + host)) + :port (db-port pgconn) + :use-ssl (or (pgconn-use-ssl pgconn) :no))) + ((or too-many-connections configuration-limit-exceeded) (e) + (log-message :error + "Failed to connect to ~a: ~a; will try again in ~fs" + pgconn e *retry-connect-delay*) + (sleep *retry-connect-delay*))))) + (loop :while (null (conn-handle pgconn)) + :repeat *retry-connect-times* + :do (setf (conn-handle pgconn) (connect pgconn username)))) + + (unless (conn-handle pgconn) + (error "Failed ~d times to connect to ~a" *retry-connect-times* pgconn)) + + (log-message :debug "CONNECTED TO ~s" pgconn) + (set-session-gucs *pg-settings* :database (conn-handle pgconn)) + + pgconn)) + +(defmethod close-connection ((pgconn pgsql-connection)) + "Close a PostgreSQL connection." + (assert (not (null (conn-handle pgconn)))) + (pomo:disconnect (conn-handle pgconn)) + (setf (conn-handle pgconn) nil) + pgconn) + +(defmethod query ((pgconn pgsql-connection) sql &key) + (let ((pomo:*database* (conn-handle pgconn))) + (log-message :debug "~a" sql) + (pomo:query sql))) + +(defmacro handling-pgsql-notices (&body forms) + "The BODY is run within a PostgreSQL transaction where *pg-settings* have + been applied. PostgreSQL warnings and errors are logged at the + appropriate log level." + `(handler-bind + ((cl-postgres:database-error + #'(lambda (e) + (log-message :error "~a" e))) + (cl-postgres:postgresql-warning + #'(lambda (w) + (log-message :warning "~a" w) + (muffle-warning)))) + (progn ,@forms))) + +(defmacro with-pgsql-transaction ((&key pgconn database) &body forms) + "Run FORMS within a PostgreSQL transaction to DBNAME, reusing DATABASE if + given." + (if database + `(let ((pomo:*database* ,database)) + (handling-pgsql-notices + (pomo:with-transaction () + (log-message :debug "BEGIN") + ,@forms))) + ;; no database given, create a new database connection + `(with-pgsql-connection (,pgconn) + (pomo:with-transaction () + (log-message :debug "BEGIN") + ,@forms)))) + +(defmacro with-pgsql-connection ((pgconn) &body forms) + "Run FROMS within a PostgreSQL connection to DBNAME. To get the connection + spec from the DBNAME, use `get-connection-spec'." + (let ((conn (gensym "pgsql-conn"))) + `(with-connection (,conn ,pgconn) + (let ((pomo:*database* (conn-handle ,conn))) + (handling-pgsql-notices + ,@forms))))) + +(defun get-unix-socket-dir (pgconn) + "When *pgconn* host is a (cons :unix path) value, return the right value + for cl-postgres::*unix-socket-dir*." + (let ((host (db-host pgconn))) + (if (and (consp host) (eq :unix (car host))) + ;; set to *pgconn* host value + (directory-namestring (fad:pathname-as-directory (cdr host))) + ;; keep as is. + cl-postgres::*unix-socket-dir*))) + +(defun set-session-gucs (alist &key transaction database) + "Set given GUCs to given values for the current session." + (let ((pomo:*database* (or database pomo:*database*))) + (loop + :for (name . value) :in alist + :for set := (cond + ((string-equal "search_path" name) + ;; for search_path, don't quote the value + (format nil "SET~:[~; LOCAL~] ~a TO ~a" + transaction name value)) + (t + ;; general case: quote the value + (format nil "SET~:[~; LOCAL~] ~a TO '~a'" + transaction name value))) + :do (progn ; indent helper + (log-message :debug set) + (pomo:execute set))))) + + + +;;; +;;; The parser is still hard-coded to support only PostgreSQL targets +;;; +(defun sanitize-user-gucs (gucs) + "Forbid certain actions such as setting a client_encoding different from utf8." + (let ((gucs + (append + (list (cons "client_encoding" "utf8")) + (loop :for (name . value) :in gucs + :when (and (string-equal name "client_encoding") + (not (member value '("utf-8" "utf8") :test #'string-equal))) + :do (log-message :warning + "pgloader always talk to PostgreSQL in utf-8, client_encoding has been forced to 'utf8'.") + :else + :collect (cons name value))))) + ;; + ;; Now see about the application_name, provide "pgloader" if it's not + ;; been overloaded already. + ;; + (cond ((not (assoc "application_name" gucs :test #'string-equal)) + (append gucs (list (cons "application_name" "pgloader")))) + + (t + gucs)))) + + +;;; +;;; DDL support with stats (timing, object count) +;;; + +(defun pgsql-connect-and-execute-with-timing (pgconn section label sql &key (count 1)) + "Run pgsql-execute-with-timing within a newly establised connection." + (with-pgsql-connection (pgconn) + (pomo:with-transaction () + (pgsql-execute-with-timing section label sql :count count)))) + +(defun pgsql-execute-with-timing (section label sql &key (count 1)) + "Execute given SQL and resgister its timing into STATE." + (multiple-value-bind (res secs) + (timing + (handler-case + (pgsql-execute sql) + (cl-postgres:database-error (e) + (log-message :error "~a" e) + (update-stats section label :errs 1 :rows (- count))))) + (declare (ignore res)) + (update-stats section label :read count :rows count :secs secs))) + +(defun pgsql-execute (sql &key client-min-messages) + "Execute given SQL in current transaction" + (when client-min-messages + (pomo:execute + (format nil "SET LOCAL client_min_messages TO ~a;" + (symbol-name client-min-messages)))) + + (log-message :notice "~a" sql) + (pomo:execute sql) + + (when client-min-messages + (pomo:execute (format nil "RESET client_min_messages;")))) + + +;;; +;;; PostgreSQL version specific support, that we get once connected +;;; +(defun list-reserved-keywords (pgconn) + "Connect to PostgreSQL DBNAME and fetch reserved keywords." + (handler-case + (with-pgsql-connection (pgconn) + (pomo:query "select word + from pg_get_keywords() + where catcode IN ('R', 'T')" :column)) + ;; support for Amazon Redshift + (cl-postgres-error::syntax-error-or-access-violation (e) + ;; 42883 undefined_function + ;; Database error 42883: function pg_get_keywords() does not exist + ;; + ;; the following list comes from a manual query against a local + ;; PostgreSQL server (version 9.5devel), it's better to have this list + ;; than nothing at all. + (declare (ignore e)) + (list "all" + "analyse" + "analyze" + "and" + "any" + "array" + "as" + "asc" + "asymmetric" + "authorization" + "binary" + "both" + "case" + "cast" + "check" + "collate" + "collation" + "column" + "concurrently" + "constraint" + "create" + "cross" + "current_catalog" + "current_date" + "current_role" + "current_schema" + "current_time" + "current_timestamp" + "current_user" + "default" + "deferrable" + "desc" + "distinct" + "do" + "else" + "end" + "except" + "false" + "fetch" + "for" + "foreign" + "freeze" + "from" + "full" + "grant" + "group" + "having" + "ilike" + "in" + "initially" + "inner" + "intersect" + "into" + "is" + "isnull" + "join" + "lateral" + "leading" + "left" + "like" + "limit" + "localtime" + "localtimestamp" + "natural" + "not" + "notnull" + "null" + "offset" + "on" + "only" + "or" + "order" + "outer" + "overlaps" + "placing" + "primary" + "references" + "returning" + "right" + "select" + "session_user" + "similar" + "some" + "symmetric" + "table" + "then" + "to" + "trailing" + "true" + "union" + "unique" + "user" + "using" + "variadic" + "verbose" + "when" + "where" + "window" + "with")))) diff --git a/src/pgsql/pgsql.lisp b/src/pgsql/copy-from-queue.lisp similarity index 56% rename from src/pgsql/pgsql.lisp rename to src/pgsql/copy-from-queue.lisp index 1a5d677..f17add7 100644 --- a/src/pgsql/pgsql.lisp +++ b/src/pgsql/copy-from-queue.lisp @@ -144,124 +144,3 @@ (lp:kernel-worker-index) (format-table-name table) seconds) (list :writer table seconds))) - -;;; -;;; Compute how many rows we're going to try loading next, depending on -;;; where we are in the batch currently and where is the next-error to be -;;; seen, if that's between current position and the end of the batch. -;;; -(defun next-batch-rows (batch-rows current-batch-pos next-error) - "How many rows should we process in next iteration?" - (cond - ((< current-batch-pos next-error) - ;; We Can safely push a batch with all the rows until the first error, - ;; and here current-batch-pos should be 0 anyways. - ;; - ;; How many rows do we have from position 0 to position next-error, - ;; excluding next-error? Well, next-error. - (- next-error current-batch-pos)) - - ((= current-batch-pos next-error) - ;; Now we got to the line that we know is an error, we need to process - ;; only that one in the next batch - 1) - - (t - ;; We're past the known erroneous row. The batch might have new errors, - ;; or maybe that was the only one. We'll figure it out soon enough, - ;; let's try the whole remaining rows. - (- batch-rows current-batch-pos)))) - -;;; -;;; In case of COPY error, PostgreSQL gives us the line where the error was -;;; found as a CONTEXT message. Let's parse that information to optimize our -;;; batching splitting in case of errors. -;;; -;;; CONTEXT: COPY errors, line 1, column b: "2006-13-11" -;;; CONTEXT: COPY byte, line 1: "hello\0world" -;;; -;;; Those error messages are a translation target, tho, so we can only -;;; assume to recognize the command tag (COPY), the comma, and a numer after -;;; a world that might be Zeile (de), línea (es), ligne (fr), riga (it), -;;; linia (pl), linha (pt), строка (ru), 行 (zh), or something else -;;; entirely. -;;; -(defun parse-copy-error-context (context) - "Given a COPY command CONTEXT error message, return the batch position - where the error comes from." - (cl-ppcre:register-groups-bind ((#'parse-integer n)) - ("COPY [^,]+, [^ ]+ (\\d+)" context :sharedp t) - (1- n))) - -;;; -;;; The main retry batch function. -;;; -(defun retry-batch (table columns batch batch-rows condition - &optional (current-batch-pos 0) - &aux (nb-errors 0)) - "Batch is a list of rows containing at least one bad row, the first such - row is known to be located at FIRST-ERROR index in the BATCH array." - - (log-message :info "Entering error recovery.") - - (loop - :with next-error = (parse-copy-error-context - (cl-postgres::database-error-context condition)) - - :while (< current-batch-pos batch-rows) - - :do - (progn ; indenting helper - (when (= current-batch-pos next-error) - (log-message :info "error recovery at ~d/~d, processing bad row" - (+ 1 next-error) batch-rows) - (process-bad-row table condition (aref batch current-batch-pos)) - (incf current-batch-pos) - (incf nb-errors)) - - (let* ((current-batch-rows - (next-batch-rows batch-rows current-batch-pos next-error))) - (when (< 0 current-batch-rows) - (handler-case - (with-pgsql-transaction (:database pomo:*database*) - (let* ((table-name (format-table-name table)) - (stream - (cl-postgres:open-db-writer pomo:*database* - table-name columns))) - - (if (< current-batch-pos next-error) - (log-message :info - "error recovery at ~d/~d, next error at ~d, loading ~d row~:p" - current-batch-pos batch-rows (+ 1 next-error) current-batch-rows) - (log-message :info - "error recovery at ~d/~d, trying ~d row~:p" - current-batch-pos batch-rows current-batch-rows)) - - (unwind-protect - (loop :repeat current-batch-rows - :for pos :from current-batch-pos - :do (db-write-row stream (aref batch pos))) - - ;; close-db-writer is the one signaling cl-postgres-errors - (cl-postgres:close-db-writer stream) - (incf current-batch-pos current-batch-rows)))) - - ;; the batch didn't make it, prepare error handling for next turn - ((or - cl-postgres-error::data-exception - cl-postgres-error::integrity-violation - cl-postgres-error:internal-error - cl-postgres-error::insufficient-resources - cl-postgres-error::program-limit-exceeded) (next-error-in-batch) - - (setf condition next-error-in-batch - - next-error - (+ current-batch-pos - (parse-copy-error-context - (cl-postgres::database-error-context condition)))))))))) - - (log-message :info "Recovery found ~d errors in ~d row~:p" nb-errors batch-rows) - - ;; Return how many rows we did load, for statistics purposes - (- batch-rows nb-errors)) diff --git a/src/pgsql/pgsql-ddl.lisp b/src/pgsql/pgsql-ddl.lisp new file mode 100644 index 0000000..e104fc1 --- /dev/null +++ b/src/pgsql/pgsql-ddl.lisp @@ -0,0 +1,269 @@ +;;; +;;; PostgreSQL fkey support implementation as a Target Database +;;; + +(in-package :pgloader.pgsql) + +;;; +;;; Schemas +;;; +(defmethod format-create-sql ((schema schema) &key (stream nil) if-not-exists) + (format stream "CREATE SCHEMA~@[~IF NOT EXISTS~] ~s;" + if-not-exists + (schema-name schema))) + +(defmethod format-drop-sql ((schema schema) &key (stream nil) cascade) + (declare (ignore pgsql)) + (format stream "DROP SCHEMA ~s~@[ CASCADE~];" (schema-name schema) cascade)) + + +;;; +;;; Types +;;; +(defmethod format-create-sql ((sqltype sqltype) &key (stream nil) if-not-exists) + (declare (ignore if-not-exists)) + (ecase (sqltype-type sqltype) + ((:enum :set) + (format stream "CREATE TYPE ~a AS ENUM (~{'~a'~^, ~});" + (sqltype-name sqltype) + (sqltype-extra sqltype))))) + +(defmethod format-drop-sql ((sqltype sqltype) &key (stream nil) cascade) + (format stream "DROP TYPE ~s~@[ CASCADE~];" (sqltype-name sqltype) cascade)) + + +;;; +;;; Tables +;;; +(defmethod format-create-sql ((table table) &key (stream nil) if-not-exists) + ;; + ;; In case stream would be nil, which means return a string, we use this + ;; with-output-to-string form and format its output in stream... + ;; + (format stream "~a" + (with-output-to-string (s) + (format s "CREATE TABLE~:[~; IF NOT EXISTS~] ~a ~%(~%" + if-not-exists + (format-table-name table)) + (let ((max (reduce #'max + (mapcar #'length + (mapcar #'column-name + (table-column-list table)))))) + (loop + :for (col . last?) :on (table-column-list table) + :do (progn + (format s " ") + (format-create-sql col + :stream s + :pretty-print t + :max-column-name-length max) + (format s "~:[~;,~]~%" last?)))) + (format s ");~%")))) + +(defmethod format-drop-sql ((table table) &key (stream nil) cascade) + "Return the PostgreSQL DROP TABLE IF EXISTS statement for TABLE-NAME." + (format stream + "DROP TABLE IF EXISTS ~a~@[ CASCADE~];" + (format-table-name table) cascade)) + + +;;; +;;; Columns +;;; +(defun get-column-type-name-from-sqltype (column) + "Return the column type name. When column-type is a sqltype, the sqltype + might be either an ENUM or a SET. In the case of a SET, we want an array + type to be defined here." + (let ((type-name (column-type-name column))) + (typecase type-name + (sqltype (ecase (sqltype-type type-name) + (:enum (sqltype-name type-name)) + (:set (format nil "~a[]" (sqltype-name type-name))))) + (string type-name)))) + +(defmethod format-create-sql ((column column) + &key + (stream nil) + if-not-exists + pretty-print + ((:max-column-name-length max))) + (declare (ignore if-not-exists)) + (format stream + "~a~vt~a~:[~*~;~a~]~:[ not null~;~]~:[~; default ~a~]" + (column-name column) + (if pretty-print (if max (+ 3 max) 22) 1) + (get-column-type-name-from-sqltype column) + (column-type-mod column) + (column-type-mod column) + (column-nullable column) + (column-default column) + (format-default-value column))) + +(defvar *pgsql-default-values* + '((:null . "NULL") + (:current-date . "CURRENT_DATE") + (:current-timestamp . "CURRENT_TIMESTAMP") + (:generate-uuid . "uuid_generate_v1()")) + "Common normalized default values and their PostgreSQL spelling.") + +(defmethod format-default-value ((column column) &key (stream nil)) + (let* ((default (column-default column)) + (clean-default (cdr (assoc default *pgsql-default-values*))) + (transform (column-transform column))) + (or clean-default + (if transform + (let* ((transformed-default + (handler-case + (funcall transform default) + (condition (c) + (log-message :warning + "Failed to transform default value ~s: ~a" + default c) + ;; can't transform: return nil + nil))) + (transformed-column + (make-column :default transformed-default))) + (format-default-value transformed-column)) + (if default + (format stream "'~a'" default) + (format stream "NULL")))))) + + +;;; +;;; Indexes +;;; +(defmethod format-create-sql ((index index) &key (stream nil) if-not-exists) + (declare (ignore if-not-exists)) + (let* ((table (index-table index)) + (index-name (if (and *preserve-index-names* + (not (string-equal "primary" (index-name index))) + (table-oid (index-table index))) + (index-name index) + + ;; in the general case, we build our own index name. + (format nil "idx_~a_~a" + (table-oid (index-table index)) + (index-name index)))) + (index-name (apply-identifier-case index-name))) + (cond + ((or (index-primary index) + (and (index-condef index) (index-unique index))) + (values + ;; ensure good concurrency here, don't take the ACCESS EXCLUSIVE + ;; LOCK on the table before we have the index done already + (or (index-sql index) + (format stream + "CREATE UNIQUE INDEX ~@[~a.~]~a ON ~a (~{~a~^, ~})~@[ WHERE ~a~];" + (when (index-schema index) (schema-name (index-schema index))) + index-name + (format-table-name table) + (index-columns index) + (index-filter index))) + (format stream + ;; don't use the index schema name here, PostgreSQL doesn't + ;; like it, might be implicit from the table's schema + ;; itself... + "ALTER TABLE ~a ADD ~a USING INDEX ~a;" + (format-table-name table) + (cond ((index-primary index) "PRIMARY KEY") + ((index-unique index) "UNIQUE")) + index-name))) + + ((index-condef index) + (format stream "ALTER TABLE ~a ADD ~a;" + (format-table-name table) + (index-condef index))) + + (t + (or (index-sql index) + (format stream + "CREATE~:[~; UNIQUE~] INDEX ~@[~a.~]~a ON ~a (~{~a~^, ~})~@[ WHERE ~a~];" + (index-unique index) + (when (index-schema index) (schema-name (index-schema index))) + index-name + (format-table-name table) + (index-columns index) + (index-filter index))))))) + +(defmethod format-drop-sql ((index index) &key (stream nil) cascade) + (declare (ignore cascade)) + (let* ((schema-name (schema-name (index-schema index))) + (index-name (index-name index))) + (cond ((index-conname index) + ;; here always quote the constraint name, currently the name + ;; comes from one source only, the PostgreSQL database catalogs, + ;; so don't question it, quote it. + (format stream "ALTER TABLE ~a DROP CONSTRAINT ~s;" + (format-table-name (index-table index)) + (index-conname index))) + + (t + (format stream "DROP INDEX ~@[~a.~]~a;" schema-name index-name))))) + + +;;; +;;; Foreign Keys +;;; +(defmethod format-create-sql ((fk fkey) &key (stream nil) if-not-exists) + (declare (ignore if-not-exists)) + (format stream + "ALTER TABLE ~a ADD ~@[CONSTRAINT ~a ~]FOREIGN KEY(~{~a~^,~}) REFERENCES ~a(~{~a~^,~})~:[~*~; ON UPDATE ~a~]~:[~*~; ON DELETE ~a~]" + (format-table-name (fkey-table fk)) + (fkey-name fk) ; constraint name + (fkey-columns fk) + (format-table-name (fkey-foreign-table fk)) + (fkey-foreign-columns fk) + (fkey-update-rule fk) + (fkey-update-rule fk) + (fkey-delete-rule fk) + (fkey-delete-rule fk))) + +(defmethod format-drop-sql ((fk fkey) &key (stream nil) cascade) + (declare (ignore cascade)) + (let* ((constraint-name (apply-identifier-case (fkey-name fk))) + (table-name (format-table-name (fkey-table fk)))) + (format stream "ALTER TABLE ~a DROP CONSTRAINT ~a" table-name constraint-name))) + + +;;; +;;; Triggers +;;; +(defmethod format-create-sql ((trigger trigger) &key (stream nil) if-not-exists) + (declare (ignore if-not-exists)) + (format stream + "CREATE TRIGGER ~a ~a ON ~a FOR EACH ROW EXECUTE PROCEDURE ~a()" + (trigger-name trigger) + (trigger-action trigger) + (format-table-name (trigger-table trigger)) + (trigger-procedure-name trigger))) + +(defmethod format-drop-sql ((trigger trigger) &key (stream nil) cascade) + (declare (ignore pgsql)) + (format stream + "DROP TRIGGER ~a ON ~a~@[ CASCADE~];" + (trigger-name trigger) + (format-table-name (trigger-table trigger)) + cascade)) + + +;;; +;;; Procedures +;;; +(defmethod format-create-sql ((procedure procedure) &key (stream nil) if-not-exists) + (declare (ignore if-not-exists)) + (format stream + "CREATE OR REPLACE FUNCTION ~a() RETURNS ~a LANGUAGE ~a AS $$~%~a~%$$;" + (procedure-name procedure) + (procedure-returns procedure) + (procedure-language procedure) + (procedure-body procedure))) + +(defmethod format-drop-sql ((procedure procedure) &key (stream nil) cascade) + (declare (ignore pgsql)) + (format stream + "DROP FUNCTION ~a()~@[ CASCADE~];" (procedure-name procedure) cascade)) + + +;;; +;;; Comments +;;; diff --git a/src/pgsql/pgsql-index-filter.lisp b/src/pgsql/pgsql-index-filter.lisp new file mode 100644 index 0000000..db0a255 --- /dev/null +++ b/src/pgsql/pgsql-index-filter.lisp @@ -0,0 +1,51 @@ +;;; +;;; API to rewrite index WHERE clauses (filter) +;;; +(in-package #:pgloader.pgsql) + +(defgeneric translate-index-filter (table index sql-dialect) + (:documentation + "Translate the filter clause of INDEX in PostgreSQL slang.")) + +(defmethod translate-index-filter ((table table) + (index index) + (sql-dialect t)) + "Implement a default facility that does nothing." + nil) + + +;;; +;;; Generic code that drives all index filter clauses rewriting. +;;; + +(defgeneric process-index-definitions (object &key sql-dialect) + (:documentation "Rewrite all indexes filters in given catalog OBJECT.")) + +(defmethod process-index-definitions ((catalog catalog) &key sql-dialect) + "Rewrite all index filters in CATALOG." + (loop :for schema :in (catalog-schema-list catalog) + :do (process-index-definitions schema :sql-dialect sql-dialect))) + +(defmethod process-index-definitions ((schema schema) &key sql-dialect) + "Rewrite all index filters in CATALOG." + (loop :for table :in (schema-table-list schema) + :do (process-index-definitions table :sql-dialect sql-dialect))) + +(defmethod process-index-definitions ((table table) &key sql-dialect) + "Rewrite all index filter in TABLE." + (loop :for index :in (table-index-list table) + :when (index-filter index) + :do (let ((pg-filter + (handler-case + (translate-index-filter table index sql-dialect) + (condition (c) + (log-message :error + "Failed to translate index ~s on table ~s because of filter clause ~s" + (index-name index) + (format-table-name table) + (index-filter index)) + (log-message :debug "filter translation error: ~a" c) + ;; try to create the index without the WHERE clause... + (setf (index-filter index) nil))))) + (log-message :info "tranlate-index-filter: ~s" pg-filter) + (setf (index-filter index) pg-filter)))) diff --git a/src/pgsql/pgsql-schema.lisp b/src/pgsql/pgsql-schema.lisp new file mode 100644 index 0000000..73a5c2c --- /dev/null +++ b/src/pgsql/pgsql-schema.lisp @@ -0,0 +1,255 @@ +;;; +;;; Tools to query the PostgreSQL Schema, either source or target +;;; + +(in-package :pgloader.pgsql) + +(defun fetch-pgsql-catalog (target &key table including excluding) + "Fetch PostgreSQL catalogs for the target database." + (let ((catalog (make-catalog :name (db-name target)))) + (with-pgsql-connection (target) + + (when (and table (not including)) + ;; rewrite the table constraint as an including expression + (let ((schema + (or (table-schema table) + (make-schema :name (query-table-schema (table-name table)))))) + (setf including + (list (cons (schema-name schema) + (list (table-name table))))))) + + (list-all-columns catalog + :table-type :table + :including including + :excluding excluding) + + (list-all-indexes catalog + :including including + :excluding excluding) + + (list-all-fkeys catalog + :including including + :excluding excluding)) + + catalog)) + +(defun query-table-schema (table-name) + "Get PostgreSQL schema name where to locate TABLE-NAME by following the + current search_path rules. A PostgreSQL connection must be opened." + (pomo:query (format nil " + select nspname + from pg_namespace n + join pg_class c on n.oid = c.relnamespace + where c.oid = '~a'::regclass;" + table-name) :single)) + + +(defvar *table-type* '((:table . "r") + (:view . "v") + (:index . "i") + (:sequence . "S")) + "Associate internal table type symbol with what's found in PostgreSQL + pg_class.relkind column.") + +(defun filter-list-to-where-clause (filter-list + &optional + not + (schema-col "table_schema") + (table-col "table_name")) + "Given an INCLUDING or EXCLUDING clause, turn it into a PostgreSQL WHERE + clause." + (loop :for (schema . table-name-list) :in filter-list + :append (mapcar (lambda (table-name) + (format nil "(~a = '~a' and ~a ~:[~;NOT ~]~~ '~a')" + schema-col schema table-col not table-name)) + table-name-list))) + +(defun list-all-columns (catalog + &key + (table-type :table) + including + excluding + &aux + (table-type-name (cdr (assoc table-type *table-type*)))) + "Get the list of PostgreSQL column names per table." + (loop :for (schema-name table-name table-oid name type typmod notnull default) + :in + (pomo:query (format nil " + select nspname, relname, c.oid, attname, + t.oid::regtype as type, + case when atttypmod > 0 then atttypmod - 4 else null end as typmod, + attnotnull, + case when atthasdef then def.adsrc end as default + from pg_class c + join pg_namespace n on n.oid = c.relnamespace + left join pg_attribute a on c.oid = a.attrelid + join pg_type t on t.oid = a.atttypid and attnum > 0 + left join pg_attrdef def on a.attrelid = def.adrelid + and a.attnum = def.adnum + + where nspname !~~ '^pg_' and n.nspname <> 'information_schema' + and relkind = '~a' + ~:[~*~;and (~{~a~^~&~10t or ~})~] + ~:[~*~;and (~{~a~^~&~10t and ~})~] + + order by nspname, relname, attnum" + table-type-name + including ; do we print the clause? + (filter-list-to-where-clause including + nil + "n.nspname" + "c.relname") + excluding ; do we print the clause? + (filter-list-to-where-clause excluding + nil + "n.nspname" + "c.relname"))) + :do + (let* ((schema (maybe-add-schema catalog schema-name)) + (table (maybe-add-table schema table-name :oid table-oid)) + (field (make-column :name name + :type-name type + :type-mod typmod + :nullable (not notnull) + :default default))) + (add-field table field)) + :finally (return catalog))) + +(defun list-all-indexes (catalog &key including excluding) + "Get the list of PostgreSQL index definitions per table." + (loop + :for (schema-name name table-schema table-name primary unique sql conname condef) + :in (pomo:query (format nil " + select n.nspname, + i.relname, + rn.nspname, + r.relname, + indisprimary, + indisunique, + pg_get_indexdef(indexrelid), + c.conname, + pg_get_constraintdef(c.oid) + from pg_index x + join pg_class i ON i.oid = x.indexrelid + join pg_class r ON r.oid = x.indrelid + join pg_namespace n ON n.oid = i.relnamespace + join pg_namespace rn ON rn.oid = r.relnamespace + left join pg_constraint c ON c.conindid = i.oid + where n.nspname !~~ '^pg_' and n.nspname <> 'information_schema' + ~:[~*~;and (~{~a~^~&~10t or ~})~] + ~:[~*~;and (~{~a~^~&~10t and ~})~] +order by n.nspname, r.relname" + including ; do we print the clause? + (filter-list-to-where-clause including + nil + "n.nspname" + "i.relname") + excluding ; do we print the clause? + (filter-list-to-where-clause excluding + nil + "n.nspname" + "i.relname"))) + :do (let* ((schema (find-schema catalog schema-name)) + (tschema (find-schema catalog table-schema)) + (table (find-table tschema table-name)) + (pg-index + (make-index :name name + :schema schema + :table table + :primary primary + :unique unique + :columns nil + :sql sql + :conname (unless (eq :null conname) conname) + :condef (unless (eq :null condef) condef)))) + (maybe-add-index table name pg-index :key #'index-name)) + :finally (return catalog))) + +(defun list-all-fkeys (catalog &key including excluding) + "Get the list of PostgreSQL index definitions per table." + (loop + :for (schema-name table-name fschema-name ftable-name conname cols fcols + updrule delrule mrule deferrable deferred condef) + :in + (pomo:query (format nil " + select n.nspname, c.relname, nf.nspname, cf.relname as frelname, + conname, + (select string_agg(attname, ',') + from pg_attribute + where attrelid = r.conrelid and array[attnum] <@ conkey + ) as conkey, + (select string_agg(attname, ',') + from pg_attribute + where attrelid = r.confrelid and array[attnum] <@ confkey + ) as confkey, + confupdtype, confdeltype, confmatchtype, + condeferrable, condeferred, + pg_catalog.pg_get_constraintdef(r.oid, true) as condef + from pg_catalog.pg_constraint r + JOIN pg_class c on r.conrelid = c.oid + JOIN pg_namespace n on c.relnamespace = n.oid + JOIN pg_class cf on r.confrelid = c.oid + JOIN pg_namespace nf on cf.relnamespace = nf.oid + where r.contype = 'f' + AND c.relkind = 'r' and cf.relkind = 'r' + AND n.nspname !~~ '^pg_' and n.nspname <> 'information_schema' + AND nf.nspname !~~ '^pg_' and nf.nspname <> 'information_schema' + ~:[~*~;and (~{~a~^~&~10t or ~})~] + ~:[~*~;and (~{~a~^~&~10t and ~})~] + ~:[~*~;and (~{~a~^~&~10t or ~})~] + ~:[~*~;and (~{~a~^~&~10t and ~})~]" + including ; do we print the clause (table)? + (filter-list-to-where-clause including + nil + "n.nspname" + "c.relname") + excluding ; do we print the clause (table)? + (filter-list-to-where-clause excluding + nil + "n.nspname" + "c.relname") + including ; do we print the clause (ftable)? + (filter-list-to-where-clause including + nil + "nf.nspname" + "cf.relname") + excluding ; do we print the clause (ftable)? + (filter-list-to-where-clause excluding + nil + "nf.nspname" + "cf.relname"))) + :do (flet ((pg-fk-rule-to-action (rule) + (case rule + (#\a "NO ACTION") + (#\r "RESTRICT") + (#\c "CASCADE") + (#\n "SET NULL") + (#\d "SET DEFAULT"))) + (pg-fk-match-rule-to-match-clause (rule) + (case rule + (#\f "FULL") + (#\p "PARTIAL") + (#\s "SIMPLE")))) + (let* ((schema (find-schema catalog schema-name)) + (table (find-table schema table-name)) + (fschema (find-schema catalog fschema-name)) + (ftable (find-table fschema ftable-name)) + (fk + (make-fkey :name (apply-identifier-case conname) + :condef condef + :table table + :columns (split-sequence:split-sequence #\, cols) + :foreign-table ftable + :foreign-columns (split-sequence:split-sequence #\, fcols) + :update-rule (pg-fk-rule-to-action updrule) + :delete-rule (pg-fk-rule-to-action delrule) + :match-rule (pg-fk-match-rule-to-match-clause mrule) + :deferrable deferrable + :initially-deferred deferred))) + (if (and table ftable) + (add-fkey table fk) + (log-message :notice "Foreign Key ~a is ignored, one of its table is missing from pgloader table selection" + conname)))) + :finally (return catalog))) + + diff --git a/src/pgsql/pgsql-trigger.lisp b/src/pgsql/pgsql-trigger.lisp new file mode 100644 index 0000000..778cce0 --- /dev/null +++ b/src/pgsql/pgsql-trigger.lisp @@ -0,0 +1,65 @@ +;;; +;;; Create the PostgreSQL schema from our internal Catalog representation. +;;; Here, triggers and their stored procedures. +;;; + +(in-package #:pgloader.pgsql) + +(defvar *pgsql-triggers-procedures* + `((:on-update-current-timestamp . + ,(lambda (trigger column table) + (let ((body (format nil + "BEGIN~% NEW.~a = now();~% RETURN NEW;~%END;" + (column-name column)))) + (make-procedure :name (trigger-procedure-name trigger) + :returns "trigger" + :language "plpgsql" + :body body))))) + "List of lambdas to generate procedure definitions from pgloader internal + trigger names as positioned in the internal catalogs at CAST time.") + +(defun rename-trigger (trigger) + "Turn a common lisp symbol into a proper PostgreSQL trigger name." + (setf (trigger-name trigger) + (string-downcase + (cl-ppcre:regex-replace-all "-" + (symbol-name (trigger-name trigger)) + "_")))) + +(defun process-triggers (table) + "Return the list of PostgreSQL statements to create a catalog trigger." + (loop :for column :in (table-column-list table) + :when (column-extra column) + :do (etypecase (column-extra column) + (trigger + ;; finish the trigger CAST and attach it to the table now + (let* ((trigger (column-extra column)) + (proc (or (trigger-procedure trigger) + ;; + ;; We have a trigger with no attached + ;; procedure, so we search for the trigger + ;; procedure-name in + ;; *pgsql-triggers-procedures* to find a + ;; lambda form to call to produce our PLpgSQL + ;; procedure + ;; + (let ((generate-proc + (cdr + (assoc (trigger-name trigger) + *pgsql-triggers-procedures*)))) + (assert (functionp generate-proc)) + (funcall generate-proc + trigger column table))))) + ;; + ;; Properly attach the procedure to the trigger and the + ;; trigger to the table. + ;; + (unless (trigger-procedure trigger) + (setf (trigger-procedure trigger) proc)) + + (rename-trigger trigger) + + (setf (column-extra column) nil) + + (setf (trigger-table trigger) table) + (push-to-end trigger (table-trigger-list table))))))) diff --git a/src/pgsql/queries.lisp b/src/pgsql/queries.lisp index 4762f57..98142c1 100644 --- a/src/pgsql/queries.lisp +++ b/src/pgsql/queries.lisp @@ -3,252 +3,6 @@ ;;; (in-package :pgloader.pgsql) -;;; -;;; PostgreSQL Tools connecting to a database -;;; -(defclass pgsql-connection (db-connection) - ((use-ssl :initarg :use-ssl :accessor pgconn-use-ssl) - (table-name :initarg :table-name :accessor pgconn-table-name)) - (:documentation "PostgreSQL connection for pgloader")) - -(defmethod initialize-instance :after ((pgconn pgsql-connection) &key) - "Assign the type slot to pgsql." - (setf (slot-value pgconn 'type) "pgsql")) - -(defmethod clone-connection ((c pgsql-connection)) - (let ((clone - (change-class (call-next-method c) 'pgsql-connection))) - (setf (pgconn-use-ssl clone) (pgconn-use-ssl c) - (pgconn-table-name clone) (pgconn-table-name c)) - clone)) - -(defmethod ssl-enable-p ((pgconn pgsql-connection)) - "Return non-nil when the connection uses SSL" - (member (pgconn-use-ssl pgconn) '(:try :yes))) - -(defun new-pgsql-connection (pgconn) - "Prepare a new connection object with all the same properties as pgconn, - so as to avoid stepping on it's handle" - (make-instance 'pgsql-connection - :user (db-user pgconn) - :pass (db-pass pgconn) - :host (db-host pgconn) - :port (db-port pgconn) - :name (db-name pgconn) - :use-ssl (pgconn-use-ssl pgconn) - :table-name (pgconn-table-name pgconn))) - -;;; -;;; Implement SSL Client Side certificates -;;; http://www.postgresql.org/docs/current/static/libpq-ssl.html#LIBPQ-SSL-FILE-USAGE -;;; -(defvar *pgsql-client-certificate* "~/.postgresql/postgresql.crt" - "File where to read the PostgreSQL Client Side SSL Certificate.") - -(defvar *pgsql-client-key* "~/.postgresql/postgresql.key" - "File where to read the PostgreSQL Client Side SSL Private Key.") - -;;; -;;; We need to distinguish some special cases of PostgreSQL errors within -;;; Class 53 — Insufficient Resources: in case of "too many connections" we -;;; typically want to leave room for another worker to finish and free one -;;; connection, then try again. -;;; -;;; http://www.postgresql.org/docs/9.4/interactive/errcodes-appendix.html -;;; -;;; The "leave room to finish and try again" heuristic is currently quite -;;; simplistic, but at least it work in my test cases. -;;; -(cl-postgres-error::deferror "53300" - too-many-connections cl-postgres-error:insufficient-resources) -(cl-postgres-error::deferror "53400" - configuration-limit-exceeded cl-postgres-error:insufficient-resources) - -(defvar *retry-connect-times* 5 - "How many times to we try to connect again.") - -(defvar *retry-connect-delay* 0.5 - "How many seconds to wait before trying to connect again.") - -(defmethod open-connection ((pgconn pgsql-connection) &key username) - "Open a PostgreSQL connection." - (let* ((crt-file (expand-user-homedir-pathname *pgsql-client-certificate*)) - (key-file (expand-user-homedir-pathname *pgsql-client-key*)) - (pomo::*ssl-certificate-file* (when (and (ssl-enable-p pgconn) - (probe-file crt-file)) - (uiop:native-namestring crt-file))) - (pomo::*ssl-key-file* (when (and (ssl-enable-p pgconn) - (probe-file key-file)) - (uiop:native-namestring key-file)))) - (flet ((connect (pgconn username) - (handler-case - (pomo:connect (db-name pgconn) - (or username (db-user pgconn)) - (db-pass pgconn) - (let ((host (db-host pgconn))) - (if (and (consp host) (eq :unix (car host))) - :unix - host)) - :port (db-port pgconn) - :use-ssl (or (pgconn-use-ssl pgconn) :no)) - ((or too-many-connections configuration-limit-exceeded) (e) - (log-message :error - "Failed to connect to ~a: ~a; will try again in ~fs" - pgconn e *retry-connect-delay*) - (sleep *retry-connect-delay*))))) - (loop :while (null (conn-handle pgconn)) - :repeat *retry-connect-times* - :do (setf (conn-handle pgconn) (connect pgconn username))))) - (unless (conn-handle pgconn) - (error "Failed ~d times to connect to ~a" *retry-connect-times* pgconn)) - pgconn) - -(defmethod close-connection ((pgconn pgsql-connection)) - "Close a PostgreSQL connection." - (assert (not (null (conn-handle pgconn)))) - (pomo:disconnect (conn-handle pgconn)) - (setf (conn-handle pgconn) nil) - pgconn) - -(defmethod query ((pgconn pgsql-connection) sql &key) - (let ((pomo:*database* (conn-handle pgconn))) - (log-message :debug "~a" sql) - (pomo:query sql))) - -(defmacro handling-pgsql-notices (&body forms) - "The BODY is run within a PostgreSQL transaction where *pg-settings* have - been applied. PostgreSQL warnings and errors are logged at the - appropriate log level." - `(handler-bind - ((cl-postgres:database-error - #'(lambda (e) - (log-message :error "~a" e))) - (cl-postgres:postgresql-warning - #'(lambda (w) - (log-message :warning "~a" w) - (muffle-warning)))) - (progn ,@forms))) - -(defmacro with-pgsql-transaction ((&key pgconn database) &body forms) - "Run FORMS within a PostgreSQL transaction to DBNAME, reusing DATABASE if - given." - (if database - `(let ((pomo:*database* ,database)) - (handling-pgsql-notices - (pomo:with-transaction () - (log-message :debug "BEGIN") - ,@forms))) - ;; no database given, create a new database connection - `(with-pgsql-connection (,pgconn) - (pomo:with-transaction () - (log-message :debug "BEGIN") - ,@forms)))) - -(defmacro with-pgsql-connection ((pgconn) &body forms) - "Run FROMS within a PostgreSQL connection to DBNAME. To get the connection - spec from the DBNAME, use `get-connection-spec'." - (let ((conn (gensym "pgsql-conn"))) - `(let (#+unix (cl-postgres::*unix-socket-dir* (get-unix-socket-dir ,pgconn))) - (with-connection (,conn ,pgconn) - (let ((pomo:*database* (conn-handle ,conn))) - (log-message :debug "CONNECTED TO ~s" ,conn) - (set-session-gucs *pg-settings*) - (handling-pgsql-notices - ,@forms)))))) - -(defun get-unix-socket-dir (pgconn) - "When *pgconn* host is a (cons :unix path) value, return the right value - for cl-postgres::*unix-socket-dir*." - (let ((host (db-host pgconn))) - (if (and (consp host) (eq :unix (car host))) - ;; set to *pgconn* host value - (directory-namestring (fad:pathname-as-directory (cdr host))) - ;; keep as is. - cl-postgres::*unix-socket-dir*))) - -(defun set-session-gucs (alist &key transaction database) - "Set given GUCs to given values for the current session." - (let ((pomo:*database* (or database pomo:*database*))) - (loop - :for (name . value) :in alist - :for set := (cond - ((string-equal "search_path" name) - ;; for search_path, don't quote the value - (format nil "SET~:[~; LOCAL~] ~a TO ~a" - transaction name value)) - (t - ;; general case: quote the value - (format nil "SET~:[~; LOCAL~] ~a TO '~a'" - transaction name value))) - :do (progn ; indent helper - (log-message :debug set) - (pomo:execute set))))) - -(defun pgsql-connect-and-execute-with-timing (pgconn section label sql &key (count 1)) - "Run pgsql-execute-with-timing within a newly establised connection." - (with-pgsql-connection (pgconn) - (pomo:with-transaction () - (pgsql-execute-with-timing section label sql :count count)))) - -(defun pgsql-execute-with-timing (section label sql &key (count 1)) - "Execute given SQL and resgister its timing into STATE." - (multiple-value-bind (res secs) - (timing - (handler-case - (pgsql-execute sql) - (cl-postgres:database-error (e) - (log-message :error "~a" e) - (update-stats section label :errs 1 :rows (- count))))) - (declare (ignore res)) - (update-stats section label :read count :rows count :secs secs))) - -(defun pgsql-execute (sql &key client-min-messages) - "Execute given SQL in current transaction" - (when client-min-messages - (pomo:execute - (format nil "SET LOCAL client_min_messages TO ~a;" - (symbol-name client-min-messages)))) - - (log-message :notice "~a" sql) - (pomo:execute sql) - - (when client-min-messages - (pomo:execute (format nil "RESET client_min_messages;")))) - -;;; -;;; PostgreSQL Utility Queries -;;; - -;; (defun list-databases (&optional (username "postgres")) -;; "Connect to a local database and get the database list" -;; (with-pgsql-transaction (:dbname "postgres" :username username) -;; (loop for (dbname) in (pomo:query -;; "select datname -;; from pg_database -;; where datname !~ 'postgres|template'") -;; collect dbname))) - -;; (defun list-tables (&optional dbname) -;; "Return an alist of tables names and list of columns to pay attention to." -;; (with-pgsql-transaction (:dbname dbname) -;; (loop for (relname colarray) in (pomo:query " -;; select relname, array_agg(case when typname in ('date', 'timestamptz') -;; then attnum end -;; order by attnum) -;; from pg_class c -;; join pg_namespace n on n.oid = c.relnamespace -;; left join pg_attribute a on c.oid = a.attrelid -;; join pg_type t on t.oid = a.atttypid -;; where c.relkind = 'r' -;; and attnum > 0 -;; and n.nspname = 'public' -;; group by relname -;; ") -;; collect (cons relname (loop -;; for attnum across colarray -;; unless (eq attnum :NULL) -;; collect attnum))))) - (defun list-schemas () "Return the list of PostgreSQL schemas in the already established PostgreSQL connection." @@ -322,144 +76,6 @@ select i.relname, :conname (unless (eq :null conname) conname) :condef (unless (eq :null condef) condef)))) -(defun sanitize-user-gucs (gucs) - "Forbid certain actions such as setting a client_encoding different from utf8." - (let ((gucs - (append - (list (cons "client_encoding" "utf8")) - (loop :for (name . value) :in gucs - :when (and (string-equal name "client_encoding") - (not (member value '("utf-8" "utf8") :test #'string-equal))) - :do (log-message :warning - "pgloader always talk to PostgreSQL in utf-8, client_encoding has been forced to 'utf8'.") - :else - :collect (cons name value))))) - ;; - ;; Now see about the application_name, provide "pgloader" if it's not - ;; been overloaded already. - ;; - (cond ((not (assoc "application_name" gucs :test #'string-equal)) - (append gucs (list (cons "application_name" "pgloader")))) - - (t - gucs)))) - -(defun list-reserved-keywords (pgconn) - "Connect to PostgreSQL DBNAME and fetch reserved keywords." - (handler-case - (with-pgsql-connection (pgconn) - (pomo:query "select word - from pg_get_keywords() - where catcode IN ('R', 'T')" :column)) - ;; support for Amazon Redshift - (cl-postgres-error::syntax-error-or-access-violation (e) - ;; 42883 undefined_function - ;; Database error 42883: function pg_get_keywords() does not exist - ;; - ;; the following list comes from a manual query against a local - ;; PostgreSQL server (version 9.5devel), it's better to have this list - ;; than nothing at all. - (declare (ignore e)) - (list "all" - "analyse" - "analyze" - "and" - "any" - "array" - "as" - "asc" - "asymmetric" - "authorization" - "binary" - "both" - "case" - "cast" - "check" - "collate" - "collation" - "column" - "concurrently" - "constraint" - "create" - "cross" - "current_catalog" - "current_date" - "current_role" - "current_schema" - "current_time" - "current_timestamp" - "current_user" - "default" - "deferrable" - "desc" - "distinct" - "do" - "else" - "end" - "except" - "false" - "fetch" - "for" - "foreign" - "freeze" - "from" - "full" - "grant" - "group" - "having" - "ilike" - "in" - "initially" - "inner" - "intersect" - "into" - "is" - "isnull" - "join" - "lateral" - "leading" - "left" - "like" - "limit" - "localtime" - "localtimestamp" - "natural" - "not" - "notnull" - "null" - "offset" - "on" - "only" - "or" - "order" - "outer" - "overlaps" - "placing" - "primary" - "references" - "returning" - "right" - "select" - "session_user" - "similar" - "some" - "symmetric" - "table" - "then" - "to" - "trailing" - "true" - "union" - "unique" - "user" - "using" - "variadic" - "verbose" - "when" - "where" - "window" - "with")))) - (defun reset-all-sequences (pgconn &key tables) "Reset all sequences to the max value of the column they are attached to." (let ((newconn (clone-connection pgconn))) diff --git a/src/pgsql/retry-batch.lisp b/src/pgsql/retry-batch.lisp new file mode 100644 index 0000000..d783fe3 --- /dev/null +++ b/src/pgsql/retry-batch.lisp @@ -0,0 +1,125 @@ +;;; +;;; The PostgreSQL COPY TO implementation, with batches and retries. +;;; +(in-package #:pgloader.pgsql) + +;;; +;;; Compute how many rows we're going to try loading next, depending on +;;; where we are in the batch currently and where is the next-error to be +;;; seen, if that's between current position and the end of the batch. +;;; +(defun next-batch-rows (batch-rows current-batch-pos next-error) + "How many rows should we process in next iteration?" + (cond + ((< current-batch-pos next-error) + ;; We Can safely push a batch with all the rows until the first error, + ;; and here current-batch-pos should be 0 anyways. + ;; + ;; How many rows do we have from position 0 to position next-error, + ;; excluding next-error? Well, next-error. + (- next-error current-batch-pos)) + + ((= current-batch-pos next-error) + ;; Now we got to the line that we know is an error, we need to process + ;; only that one in the next batch + 1) + + (t + ;; We're past the known erroneous row. The batch might have new errors, + ;; or maybe that was the only one. We'll figure it out soon enough, + ;; let's try the whole remaining rows. + (- batch-rows current-batch-pos)))) + +;;; +;;; In case of COPY error, PostgreSQL gives us the line where the error was +;;; found as a CONTEXT message. Let's parse that information to optimize our +;;; batching splitting in case of errors. +;;; +;;; CONTEXT: COPY errors, line 1, column b: "2006-13-11" +;;; CONTEXT: COPY byte, line 1: "hello\0world" +;;; +;;; Those error messages are a translation target, tho, so we can only +;;; assume to recognize the command tag (COPY), the comma, and a numer after +;;; a world that might be Zeile (de), línea (es), ligne (fr), riga (it), +;;; linia (pl), linha (pt), строка (ru), 行 (zh), or something else +;;; entirely. +;;; +(defun parse-copy-error-context (context) + "Given a COPY command CONTEXT error message, return the batch position + where the error comes from." + (cl-ppcre:register-groups-bind ((#'parse-integer n)) + ("COPY [^,]+, [^ ]+ (\\d+)" context :sharedp t) + (1- n))) + +;;; +;;; The main retry batch function. +;;; +(defun retry-batch (table columns batch batch-rows condition + &optional (current-batch-pos 0) + &aux (nb-errors 0)) + "Batch is a list of rows containing at least one bad row, the first such + row is known to be located at FIRST-ERROR index in the BATCH array." + + (log-message :info "Entering error recovery.") + + (loop + :with next-error = (parse-copy-error-context + (cl-postgres::database-error-context condition)) + + :while (< current-batch-pos batch-rows) + + :do + (progn ; indenting helper + (when (= current-batch-pos next-error) + (log-message :info "error recovery at ~d/~d, processing bad row" + (+ 1 next-error) batch-rows) + (process-bad-row table condition (aref batch current-batch-pos)) + (incf current-batch-pos) + (incf nb-errors)) + + (let* ((current-batch-rows + (next-batch-rows batch-rows current-batch-pos next-error))) + (when (< 0 current-batch-rows) + (handler-case + (with-pgsql-transaction (:database pomo:*database*) + (let* ((table-name (format-table-name table)) + (stream + (cl-postgres:open-db-writer pomo:*database* + table-name columns))) + + (if (< current-batch-pos next-error) + (log-message :info + "error recovery at ~d/~d, next error at ~d, loading ~d row~:p" + current-batch-pos batch-rows (+ 1 next-error) current-batch-rows) + (log-message :info + "error recovery at ~d/~d, trying ~d row~:p" + current-batch-pos batch-rows current-batch-rows)) + + (unwind-protect + (loop :repeat current-batch-rows + :for pos :from current-batch-pos + :do (db-write-row stream (aref batch pos))) + + ;; close-db-writer is the one signaling cl-postgres-errors + (cl-postgres:close-db-writer stream) + (incf current-batch-pos current-batch-rows)))) + + ;; the batch didn't make it, prepare error handling for next turn + ((or + cl-postgres-error::data-exception + cl-postgres-error::integrity-violation + cl-postgres-error:internal-error + cl-postgres-error::insufficient-resources + cl-postgres-error::program-limit-exceeded) (next-error-in-batch) + + (setf condition next-error-in-batch + + next-error + (+ current-batch-pos + (parse-copy-error-context + (cl-postgres::database-error-context condition)))))))))) + + (log-message :info "Recovery found ~d errors in ~d row~:p" nb-errors batch-rows) + + ;; Return how many rows we did load, for statistics purposes + (- batch-rows nb-errors)) diff --git a/src/pgsql/schema.lisp b/src/pgsql/schema.lisp index 6866ac5..6597a04 100644 --- a/src/pgsql/schema.lisp +++ b/src/pgsql/schema.lisp @@ -1,92 +1,19 @@ ;;; ;;; Tools to handle PostgreSQL tables and indexes creations ;;; -(in-package pgloader.pgsql) +(in-package #:pgloader.pgsql) -;;; -;;; Some parts of the logic here needs to be specialized depending on the -;;; source type, such as SQLite or MySQL. To do so, sources must define -;;; their own column struct and may implement the methods -;;; `format-pgsql-column' and `format-extra-type' on those. -;;; -(defstruct pgsql-column name type-name type-mod nullable default) - -(defgeneric format-pgsql-column (col) - (:documentation - "Return the PostgreSQL column definition (type, default, not null, ...)")) - -(defgeneric format-extra-type (col &key include-drop) - (:documentation - "Return a list of PostgreSQL commands to create an extra type for given - column, or nil of none is required. If no special extra type is ever - needed, it's allowed not to specialize this generic into a method.")) - -(defgeneric format-extra-triggers (table col &key drop) - (:documentation - "Return a list of string representing the extra SQL commands needed to - implement PostgreSQL triggers.")) - -(defmethod format-extra-type ((col T) &key include-drop) - "The default `format-extra-type' implementation returns an empty list." - (declare (ignorable include-drop)) - nil) - -(defmethod format-extra-triggers ((table T) (col T) &key drop) - "The default `format-extra-triggers' implementation returns an empty list." - (declare (ignorable table col drop)) - nil) - - ;;; ;;; API for Foreign Keys ;;; -(defstruct pgsql-fkey - name table columns foreign-table foreign-columns update-rule delete-rule) - -(defgeneric format-pgsql-create-fkey (fkey) - (:documentation - "Return the PostgreSQL command to define a Foreign Key Constraint.")) - -(defgeneric format-pgsql-drop-fkey (fkey &key) - (:documentation - "Return the PostgreSQL command to DROP a Foreign Key Constraint.")) - -(defmethod format-pgsql-create-fkey ((fk pgsql-fkey)) - "Generate the PostgreSQL statement to rebuild a MySQL Foreign Key" - (format nil - "ALTER TABLE ~a ADD ~@[CONSTRAINT ~a ~]FOREIGN KEY(~{~a~^,~}) REFERENCES ~a(~{~a~^,~})~:[~*~; ON UPDATE ~a~]~:[~*~; ON DELETE ~a~]" - (format-table-name (pgsql-fkey-table fk)) - (pgsql-fkey-name fk) ; constraint name - (pgsql-fkey-columns fk) - (format-table-name (pgsql-fkey-foreign-table fk)) - (pgsql-fkey-foreign-columns fk) - (pgsql-fkey-update-rule fk) - (pgsql-fkey-update-rule fk) - (pgsql-fkey-delete-rule fk) - (pgsql-fkey-delete-rule fk))) - -(defmethod format-pgsql-drop-fkey ((fk pgsql-fkey) &key all-pgsql-fkeys) - "Generate the PostgreSQL statement to rebuild a MySQL Foreign Key" - (when (pgsql-fkey-name fk) - (let* ((constraint-name (apply-identifier-case (pgsql-fkey-name fk))) - (table-name (format-table-name (pgsql-fkey-table fk))) - (fkeys (cdr (assoc table-name all-pgsql-fkeys :test #'string=))) - (fkey-exists (member constraint-name fkeys :test #'string=))) - (when fkey-exists - ;; we could do that without all-pgsql-fkeys in 9.2 and following with: - ;; alter table if exists ... drop constraint if exists ... - (format nil "ALTER TABLE ~a DROP CONSTRAINT ~a" table-name constraint-name))))) - (defun drop-pgsql-fkeys (catalog) "Drop all Foreign Key Definitions given, to prepare for a clean run." - (let ((all-pgsql-fkeys (list-tables-and-fkeys))) - (loop :for table :in (table-list catalog) - :do - (loop :for fkey :in (table-fkey-list table) - :for sql := (format-pgsql-drop-fkey fkey - :all-pgsql-fkeys all-pgsql-fkeys) - :when sql - :do (pgsql-execute sql))))) + (loop :for table :in (table-list catalog) + :do + (loop :for fkey :in (table-fkey-list table) + :for sql := (format-drop-sql fkey) + :when sql + :do (pgsql-execute sql)))) (defun create-pgsql-fkeys (catalog &key @@ -97,69 +24,42 @@ (with-stats-collection (label :section section :use-result-as-rows t) (loop :for table :in (table-list catalog) :sum (loop :for fkey :in (table-fkey-list table) - :for sql := (format-pgsql-create-fkey fkey) + :for sql := (format-create-sql fkey) :do (pgsql-execute-with-timing section label sql) :count t)))) ;;; -;;; Table schema rewriting support +;;; Table schema support ;;; -(defun create-table-sql (table &key if-not-exists) - "Return a PostgreSQL CREATE TABLE statement from given COLS. +(defun create-sqltypes (catalog &key if-not-exists include-drop) + "Create the needed data types for given CATALOG." + (let ((sqltype-list)) + ;; build the sqltype list + (loop :for table :in (table-list catalog) + :do (loop :for column :in (table-column-list table) + :do (when (typep (column-type-name column) 'sqltype) + (pushnew (column-type-name column) sqltype-list + :test #'string-equal + :key #'sqltype-name)))) - Each element of the COLS list is expected to be of a type handled by the - `format-pgsql-column' generic function." - (with-output-to-string (s) - (format s "CREATE TABLE~:[~; IF NOT EXISTS~] ~a ~%(~%" - if-not-exists - (format-table-name table)) - (let ((max (reduce #'max - (mapcar #'length - (mapcar #'column-name (table-column-list table)))))) - (loop - :for (col . last?) :on (table-column-list table) - :for pg-coldef := (format-column col - :pretty-print t - :max-column-name-length max ) - :do (format s " ~a~:[~;,~]~%" pg-coldef last?))) - (format s ");~%"))) - -(defun drop-table-if-exists-sql (table) - "Return the PostgreSQL DROP TABLE IF EXISTS statement for TABLE-NAME." - (format nil "DROP TABLE IF EXISTS ~a CASCADE;" (format-table-name table))) + ;; now create the types + (loop :for sqltype :in sqltype-list + :when include-drop + :do (pgsql-execute (format-drop-sql sqltype :cascade t)) + :do (pgsql-execute + (format-create-sql sqltype :if-not-exists if-not-exists))))) (defun create-table-sql-list (table-list - &key - if-not-exists - include-drop) + &key + if-not-exists + include-drop) "Return the list of CREATE TABLE statements to run against PostgreSQL." - (loop - :for table :in table-list - :for cols := (table-column-list table) - :for fields := (table-field-list table) - :for extra-types := (loop :for field :in fields - :append (format-extra-type - field :include-drop include-drop)) - - :for pre-extra-triggers - := (when include-drop - (loop :for field :in fields - :append (format-extra-triggers table field :drop t))) - - :for post-extra-triggers - := (loop :for field :in fields - :append (format-extra-triggers table field)) - + (loop :for table :in table-list :when include-drop - :collect (drop-table-if-exists-sql table) + :collect (format-drop-sql table :cascade t) - :when extra-types :append extra-types - :when pre-extra-triggers :append pre-extra-triggers - - :collect (create-table-sql table :if-not-exists if-not-exists) - - :when post-extra-triggers :append post-extra-triggers)) + :collect (format-create-sql table :if-not-exists if-not-exists))) (defun create-table-list (table-list &key @@ -220,6 +120,23 @@ :include-drop include-drop :client-min-messages client-min-messages)) +(defun create-triggers (catalog &key (client-min-messages :notice)) + "Create the catalog objects that come after the data has been loaded." + (let ((sql-list + (loop :for table :in (table-list catalog) + :do (process-triggers table) + :when (table-trigger-list table) + :append (loop :for trigger :in (table-trigger-list table) + :collect (format-create-sql (trigger-procedure trigger)) + :collect (format-create-sql trigger))))) + (loop :for sql :in sql-list + :do (pgsql-execute sql :client-min-messages client-min-messages)))) + + +;;; +;;; DDL Utilities: TRUNCATE, ENABLE/DISABLE triggers +;;; + (defun truncate-tables (pgconn catalog-or-table) "Truncate given TABLE-NAME in database DBNAME" (with-pgsql-transaction (:pgconn pgconn) @@ -260,136 +177,6 @@ (progn ,@forms))) -;;; -;;; Index support -;;; -(defstruct pgsql-index - ;; the struct is used both for supporting new index creation from non - ;; PostgreSQL system and for drop/create indexes when using the 'drop - ;; indexes' option (in CSV mode and the like) - name schema table-oid primary unique columns sql conname condef filter) - -(defgeneric format-pgsql-create-index (table index) - (:documentation - "Return the PostgreSQL command to define an Index.")) - -(defgeneric format-pgsql-drop-index (table index) - (:documentation - "Return the PostgreSQL command to drop an Index.")) - -(defgeneric translate-index-filter (table index sql-dialect) - (:documentation - "Translate the filter clause of INDEX in PostgreSQL slang.")) - -(defmethod translate-index-filter ((table table) - (index pgsql-index) - (sql-dialect t)) - "Implement a default facility that does nothing." - nil) - -(defmethod format-pgsql-create-index ((table table) (index pgsql-index)) - "Generate the PostgreSQL statement list to rebuild a Foreign Key" - (let* ((index-name (if (and *preserve-index-names* - (not (string-equal "primary" (pgsql-index-name index))) - (pgsql-index-table-oid index)) - (pgsql-index-name index) - - ;; in the general case, we build our own index name. - (format nil "idx_~a_~a" - (pgsql-index-table-oid index) - (pgsql-index-name index)))) - (index-name (apply-identifier-case index-name))) - (cond - ((or (pgsql-index-primary index) - (and (pgsql-index-condef index) (pgsql-index-unique index))) - (values - ;; ensure good concurrency here, don't take the ACCESS EXCLUSIVE - ;; LOCK on the table before we have the index done already - (or (pgsql-index-sql index) - (format nil - "CREATE UNIQUE INDEX ~@[~a.~]~a ON ~a (~{~a~^, ~})~@[ WHERE ~a~];" - (pgsql-index-schema index) - index-name - (format-table-name table) - (pgsql-index-columns index) - (pgsql-index-filter index))) - (format nil - ;; don't use the index schema name here, PostgreSQL doesn't - ;; like it, might be implicit from the table's schema - ;; itself... - "ALTER TABLE ~a ADD ~a USING INDEX ~a;" - (format-table-name table) - (cond ((pgsql-index-primary index) "PRIMARY KEY") - ((pgsql-index-unique index) "UNIQUE")) - index-name))) - - ((pgsql-index-condef index) - (format nil "ALTER TABLE ~a ADD ~a;" - (format-table-name table) - (pgsql-index-condef index))) - - (t - (or (pgsql-index-sql index) - (format nil - "CREATE~:[~; UNIQUE~] INDEX ~@[~a.~]~a ON ~a (~{~a~^, ~})~@[ WHERE ~a~];" - (pgsql-index-unique index) - (pgsql-index-schema index) - index-name - (format-table-name table) - (pgsql-index-columns index) - (pgsql-index-filter index))))))) - -(defmethod format-pgsql-drop-index ((table table) (index pgsql-index)) - "Generate the PostgreSQL statement to DROP the index." - (let* ((schema-name (apply-identifier-case (pgsql-index-schema index))) - (index-name (apply-identifier-case (pgsql-index-name index)))) - (cond ((pgsql-index-conname index) - ;; here always quote the constraint name, currently the name - ;; comes from one source only, the PostgreSQL database catalogs, - ;; so don't question it, quote it. - (format nil "ALTER TABLE ~a DROP CONSTRAINT ~s;" - (format-table-name table) - (pgsql-index-conname index))) - - (t - (format nil "DROP INDEX ~@[~a.~]~a;" schema-name index-name))))) - - -;;; -;;; API to rewrite index WHERE clauses (filter) -;;; -(defgeneric process-index-definitions (object &key sql-dialect) - (:documentation "Rewrite all indexes filters in given catalog OBJECT.")) - -(defmethod process-index-definitions ((catalog catalog) &key sql-dialect) - "Rewrite all index filters in CATALOG." - (loop :for schema :in (catalog-schema-list catalog) - :do (process-index-definitions schema :sql-dialect sql-dialect))) - -(defmethod process-index-definitions ((schema schema) &key sql-dialect) - "Rewrite all index filters in CATALOG." - (loop :for table :in (schema-table-list schema) - :do (process-index-definitions table :sql-dialect sql-dialect))) - -(defmethod process-index-definitions ((table table) &key sql-dialect) - "Rewrite all index filter in TABLE." - (loop :for index :in (table-index-list table) - :when (pgsql-index-filter index) - :do (let ((pg-filter - (handler-case - (translate-index-filter table index sql-dialect) - (condition (c) - (log-message :error - "Failed to translate index ~s on table ~s because of filter clause ~s" - (pgsql-index-name index) - (format-table-name table) - (pgsql-index-filter index)) - (log-message :debug "filter translation error: ~a" c) - ;; try to create the index without the WHERE clause... - (setf (pgsql-index-filter index) nil))))) - (log-message :info "tranlate-index-filter: ~s" pg-filter) - (setf (pgsql-index-filter index) pg-filter)))) - ;;; ;;; Parallel index building. ;;; @@ -403,7 +190,7 @@ :for index :in (table-index-list table) :collect (multiple-value-bind (sql pkey) ;; we postpone the pkey upgrade of the index for later. - (format-pgsql-create-index table index) + (format-create-sql index) (lp:submit-task channel #'pgsql-connect-and-execute-with-timing @@ -430,9 +217,7 @@ :for table-name := (format-table-name table) :for table-oid := (cdr (assoc table-name table-oids :test #'string=)) :unless table-oid :do (error "OID not found for ~s." table-name) - :do (setf (table-oid table) table-oid) - (loop :for index :in (table-index-list table) - :do (setf (pgsql-index-table-oid index) table-oid))))) + :do (setf (table-oid table) table-oid)))) ;;; ;;; Drop indexes before loading @@ -441,7 +226,7 @@ "Drop indexes in PGSQL-INDEX-LIST. A PostgreSQL connection must already be active when calling that function." (loop :for index :in (table-index-list table) - :do (let ((sql (format-pgsql-drop-index table index))) + :do (let ((sql (format-drop-sql index))) (pgsql-execute-with-timing section "drop indexes" sql)))) ;;; @@ -451,14 +236,11 @@ "Drop the indexes for TABLE-NAME on TARGET PostgreSQL connection, and returns a list of indexes to create again." (with-pgsql-connection (target) - (let ((indexes (list-indexes table)) + (let ((indexes (table-index-list table)) ;; we get the list of indexes from PostgreSQL catalogs, so don't ;; question their spelling, just quote them. (*identifier-case* :quote)) - ;; set the indexes list in the table structure - (setf (table-index-list table) indexes) - (cond ((and indexes (not drop-indexes)) (log-message :warning "Target table ~s has ~d indexes defined against it." diff --git a/src/sources/common/casting-rules.lisp b/src/sources/common/casting-rules.lisp index 390e22c..d0640b0 100644 --- a/src/sources/common/casting-rules.lisp +++ b/src/sources/common/casting-rules.lisp @@ -111,7 +111,7 @@ pg-typemod) :nullable (and (not set-not-null) (or (not source-not-null) drop-not-null)) :default (when (and source-default (not drop-default)) - (format-default-value source-default using)) + source-default) :transform using))) ;; NO MATCH @@ -121,7 +121,7 @@ (make-column :name (apply-identifier-case source-column-name) :type-name source-ctype :nullable (not source-not-null) - :default (format-default-value source-default using) + :default source-default :transform using)))) (defun apply-casting-rules (table-name column-name @@ -155,7 +155,7 @@ table-name column-name ctype default (string= "NO" nullable) (string/= "" extra) extra - (format-column coldef) + (format-create-sql coldef) using) (return coldef)))))) diff --git a/src/sources/common/db-methods.lisp b/src/sources/common/db-methods.lisp index a865069..77b7603 100644 --- a/src/sources/common/db-methods.lisp +++ b/src/sources/common/db-methods.lisp @@ -27,13 +27,19 @@ (with-pgsql-transaction (:pgconn (target-db copy)) ;; we need to first drop the Foreign Key Constraints, so that we ;; can DROP TABLE when asked - (when (and foreign-keys include-drop) - (drop-pgsql-fkeys catalog)) + ;; (when (and foreign-keys include-drop) + ;; (drop-pgsql-fkeys catalog)) (when create-schemas (log-message :debug "Create schemas") (create-schemas catalog :include-drop include-drop)) + ;; create new SQL types (ENUMs, SETs) if needed and before we get to + ;; the table definitions that will use them + (log-message :debug "Create SQL types (enums, sets)") + (create-sqltypes catalog :include-drop include-drop) + + ;; now the tables (log-message :debug "Create tables") (create-tables catalog :include-drop include-drop) @@ -42,11 +48,11 @@ ;; index name, to differenciate. Set the table oids now. (when set-table-oids (log-message :debug "Set table OIDs") - (set-table-oids catalog)) + (pgloader.pgsql::set-table-oids catalog)) ;; We might have to MATERIALIZE VIEWS (when materialize-views - (log-message :debug "Create views for matview support") + (log-message :debug "Create tables for matview support") (create-views catalog :include-drop include-drop))))) (defmethod cleanup ((copy db-copy) (catalog catalog) &key materialize-views) @@ -92,6 +98,12 @@ (when (and foreign-keys (not data-only)) (create-pgsql-fkeys catalog)) + ;; + ;; Triggers and stored procedures -- includes special default values + ;; + (with-pgsql-transaction (:pgconn (target-db copy)) + (create-triggers catalog)) + ;; ;; And now, comments on tables and columns. ;; diff --git a/src/sources/common/md-methods.lisp b/src/sources/common/md-methods.lisp index 6d670c8..b999128 100644 --- a/src/sources/common/md-methods.lisp +++ b/src/sources/common/md-methods.lisp @@ -99,10 +99,20 @@ create-indexes reset-sequences materialize-views set-table-oids including excluding)) - ;; this sets (table-index-list (target copy)) - (maybe-drop-indexes (target-db copy) - (target copy) - :drop-indexes drop-indexes) + (let* ((pgsql-catalog + (fetch-pgsql-catalog (target-db copy) :table (target copy)))) + + (when (= 1 (count-tables pgsql-catalog)) + ;; we found the target table, grab its definition + (setf (target copy) + (first (schema-table-list + (first (catalog-schema-list pgsql-catalog)))))) + + ;; this sets (table-index-list (target copy)) + (maybe-drop-indexes (target-db copy) + (target copy) + :drop-indexes drop-indexes)) + ;; ensure we truncate only one (when truncate diff --git a/src/sources/mssql/mssql-cast-rules.lisp b/src/sources/mssql/mssql-cast-rules.lisp index 067e5b2..a6a64f1 100644 --- a/src/sources/mssql/mssql-cast-rules.lisp +++ b/src/sources/mssql/mssql-cast-rules.lisp @@ -120,5 +120,24 @@ (unless (column-transform pgcol) (setf (column-transform pgcol) (lambda (val) (if val (format nil "~a" val) :null)))) + + ;; normalize default values + ;; see *pgsql-default-values* + (setf (column-default pgcol) + (cond ((null default) :null) + ((and (stringp default) (string= "NULL" default)) :null) + + ((and (stringp default) + ;; address CURRENT_TIMESTAMP(6) and other spellings + (or (uiop:string-prefix-p "CURRENT_TIMESTAMP" default) + (string= "CURRENT TIMESTAMP" default))) + :current-timestamp) + + ((and (stringp default) + (or (string= "newid()" default) + (string= "newsequentialid()" default))) + :generate-uuid) + + (t default))) pgcol))) diff --git a/src/sources/mssql/mssql-index-filters.lisp b/src/sources/mssql/mssql-index-filters.lisp index 863a83c..75a9fcd 100644 --- a/src/sources/mssql/mssql-index-filters.lisp +++ b/src/sources/mssql/mssql-index-filters.lisp @@ -7,7 +7,7 @@ (in-package #:pgloader.mssql.index-filter) (defmethod translate-index-filter ((table table) - (index pgsql-index) + (index index) (sql-dialect (eql 'copy-mssql))) "Transform given MS SQL index filter to PostgreSQL slang." (labels ((process-expr (expression) @@ -27,8 +27,8 @@ (mapcar (column-transform col) (rest argument))) (t (funcall (column-transform col) argument)))))))) - (when (pgsql-index-filter index) - (let* ((raw-expr (parse-index-filter-clause (pgsql-index-filter index))) + (when (index-filter index) + (let* ((raw-expr (parse-index-filter-clause (index-filter index))) (pg-expr (loop :for node :in raw-expr :collect (typecase node diff --git a/src/sources/mssql/mssql-schema.lisp b/src/sources/mssql/mssql-schema.lisp index 02ec254..7490938 100644 --- a/src/sources/mssql/mssql-schema.lisp +++ b/src/sources/mssql/mssql-schema.lisp @@ -197,14 +197,16 @@ order by SchemaName, :do (let* ((schema (find-schema catalog schema-name)) (table (find-table schema table-name)) - (pg-index (make-pgsql-index :name index-name - :primary (= pkey 1) - :unique (= unique 1) - :columns nil - :filter filter)) + (pg-index (make-index :name index-name + :schema schema + :table table + :primary (= pkey 1) + :unique (= unique 1) + :columns nil + :filter filter)) (index (maybe-add-index table index-name pg-index :key #'pgloader.pgsql::pgsql-index-name))) - (push-to-end col (pgloader.pgsql::pgsql-index-columns index))) + (push-to-end col (index-columns index))) :finally (return catalog))) (defun list-all-fkeys (catalog &key including excluding) @@ -259,15 +261,15 @@ ORDER BY KCU1.CONSTRAINT_NAME, KCU1.ORDINAL_POSITION" (fschema (find-schema catalog fschema-name)) (ftable (find-table fschema ftable-name)) (pg-fkey - (make-pgsql-fkey :name fkey-name - :table table - :columns nil - :foreign-table ftable - :foreign-columns nil)) + (make-fkey :name fkey-name + :table table + :columns nil + :foreign-table ftable + :foreign-columns nil)) (fkey (maybe-add-fkey table fkey-name pg-fkey :key #'pgloader.pgsql::pgsql-fkey-name))) - (push-to-end col (pgloader.pgsql::pgsql-fkey-columns fkey)) - (push-to-end fcol (pgloader.pgsql::pgsql-fkey-foreign-columns fkey))) + (push-to-end col (fkey-columns fkey)) + (push-to-end fcol (fkey-foreign-columns fkey))) :finally (return catalog))) diff --git a/src/sources/mysql/mysql-cast-rules.lisp b/src/sources/mysql/mysql-cast-rules.lisp index 11efc3f..fc5262c 100644 --- a/src/sources/mysql/mysql-cast-rules.lisp +++ b/src/sources/mysql/mysql-cast-rules.lisp @@ -4,39 +4,9 @@ (in-package :pgloader.mysql) -;;; -;;; Some functions to deal with ENUM and SET types -;;; -(defun explode-mysql-enum (ctype) - "Convert MySQL ENUM expression into a list of labels." - (cl-ppcre:register-groups-bind (list) - ("(?i)(?:ENUM|SET)\\s*\\((.*)\\)" ctype) - (first (cl-csv:read-csv list :separator #\, :quote #\' :escape "\\")))) - -(defun get-enum-type-name (table-name column-name) - "Return the Type Name we're going to use in PostgreSQL." - (apply-identifier-case (format nil "~a_~a" table-name column-name))) - -(defun get-create-enum (table-name column-name ctype) - "Return a PostgreSQL CREATE ENUM TYPE statement from MySQL enum column." - (with-output-to-string (s) - (format s "CREATE TYPE ~a AS ENUM (~{'~a'~^, ~});" - (get-enum-type-name table-name column-name) - (explode-mysql-enum ctype)))) - -(defun cast-enum (table-name column-name type ctype typemod) - "Cast MySQL inline ENUM type to using a PostgreSQL named type. - - The type naming is hardcoded to be table-name_column-name" +(defun enum-or-set-name (table-name column-name type ctype typemod) (declare (ignore type ctype typemod)) - (get-enum-type-name table-name column-name)) - -(defun cast-set (table-name column-name type ctype typemod) - "Cast MySQL inline SET type to using a PostgreSQL ENUM Array. - - The ENUM data type name is hardcoded to be table-name_column-name" - (declare (ignore type ctype typemod)) - (format nil "\"~a_~a\"[]" table-name column-name)) + (string-downcase (format nil "~a_~a" table-name column-name))) ;;; ;;; The default MySQL Type Casting Rules @@ -157,10 +127,10 @@ ;; Inline MySQL "interesting" datatype (:source (:type "enum") - :target (:type ,#'cast-enum)) + :target (:type ,#'enum-or-set-name)) (:source (:type "set") - :target (:type ,#'cast-set) + :target (:type ,#'enum-or-set-name) :using pgloader.transforms::set-to-enum-array) ;; geometric data types, just POINT for now @@ -178,72 +148,62 @@ (table-name name comment dtype ctype default nullable extra))) table-name name dtype ctype default nullable extra comment) -(defmethod format-extra-type ((col mysql-column) &key include-drop) - "Return a string representing the extra needed PostgreSQL CREATE TYPE - statement, if such is needed" - (let ((dtype (mysql-column-dtype col))) - (when (or (string-equal "enum" dtype) - (string-equal "set" dtype)) - (list - (when include-drop - (let* ((type-name - (get-enum-type-name (mysql-column-table-name col) - (mysql-column-name col)))) - (format nil "DROP TYPE IF EXISTS ~a CASCADE;" type-name))) - - (get-create-enum (mysql-column-table-name col) - (mysql-column-name col) - (mysql-column-ctype col)))))) - -(defmethod format-extra-triggers ((table table) (col mysql-column) &key drop) - "Return a list of string representing the extra SQL commands needed to - implement some MySQL features as PostgreSQL triggers, such as on update - CURRENT_TIMESTAMP. - - When drop is t, only output the DROP sql statements." - (when (string= (mysql-column-extra col) "on update CURRENT_TIMESTAMP") - (let* ((field-pos (position (mysql-column-name col) - (table-field-list table) - :key #'mysql-column-name - :test #'string=)) - (col-name (funcall #'column-name - (nth field-pos (table-column-list table)))) - (fun-name (format nil "on_update_current_timestamp_~a" col-name)) - (update-fun-sql - (format nil " -CREATE OR REPLACE FUNCTION ~a() - RETURNS TRIGGER -LANGUAGE plpgsql -AS $$ -BEGIN - NEW.~a = now(); - RETURN NEW; -END; -$$;" - fun-name col-name)) - (trigger-sql - (format nil " -CREATE TRIGGER on_update_current_timestamp - BEFORE UPDATE ON ~a - FOR EACH ROW EXECUTE PROCEDURE ~a();" - (format-table-name table) fun-name))) - (if drop - (list - ;; don't DROP the function here, because it might be shared by - ;; several tables with "on update" rules on a field sharing the - ;; same name (such as "last_update"). - ;; the CREATE OR REPLACE FUNCTION will then be innoffensive. - (format nil "DROP TRIGGER IF EXISTS on_update_current_timestamp ON ~a;" - (mysql-column-table-name col))) - (list update-fun-sql trigger-sql))))) +(defun explode-mysql-enum (ctype) + "Convert MySQL ENUM expression into a list of labels." + (cl-ppcre:register-groups-bind (list) + ("(?i)(?:ENUM|SET)\\s*\\((.*)\\)" ctype) + (first (cl-csv:read-csv list :separator #\, :quote #\' :escape "\\")))) (defmethod cast ((col mysql-column)) "Return the PostgreSQL type definition from given MySQL column definition." (with-slots (table-name name dtype ctype default nullable extra comment) col - (let ((pgcol - (apply-casting-rules table-name name dtype ctype default nullable extra))) + (let* ((pgcol + (apply-casting-rules table-name name dtype ctype default nullable extra))) (setf (column-comment pgcol) comment) + + ;; normalize default values + (setf (column-default pgcol) + (cond ((and (stringp default) (string= "NULL" default)) :null) + ((and (stringp default) + ;; address CURRENT_TIMESTAMP(6) and other spellings + (or (uiop:string-prefix-p "CURRENT_TIMESTAMP" default) + (string= "CURRENT TIMESTAMP" default))) + :current-timestamp) + (t default))) + + ;; extra user-defined data types + (when (or (string-equal "set" dtype) + (string-equal "enum" dtype)) + ;; + ;; SET and ENUM both need more care, if the target is PostgreSQL we + ;; need to create per-schema user defined data types that match the + ;; column local definition here. + ;; + (let ((sqltype-name (enum-or-set-name table-name + (column-name pgcol) + dtype + ctype + nil))) + (setf (column-type-name pgcol) + (make-sqltype :name sqltype-name + :type (intern (string-upcase dtype) + (find-package "KEYWORD")) + :source-def ctype + :extra (explode-mysql-enum ctype))))) + + ;; extra triggers + ;; + ;; See the generic function `post-process-catalog' for the next step. + ;; + (when (string= extra "on update CURRENT_TIMESTAMP") + (let* ((pro-name (format nil + "on_update_current_timestamp_~a" + (column-name pgcol)))) + (setf (column-extra pgcol) + (make-trigger :name :on-update-current-timestamp + :action "BEFORE UPDATE" + :procedure-name pro-name)))) pgcol))) diff --git a/src/sources/mysql/mysql-schema.lisp b/src/sources/mysql/mysql-schema.lisp index 7a7ab3c..ab20959 100644 --- a/src/sources/mysql/mysql-schema.lisp +++ b/src/sources/mysql/mysql-schema.lisp @@ -221,24 +221,26 @@ order by table_name, ordinal_position" ~:[~*~;and (~{table_name ~a~^ or ~})~] ~:[~*~;and (~{table_name ~a~^ and ~})~] GROUP BY table_name, index_name;" - (db-name *connection*) - only-tables ; do we print the clause? - only-tables - including ; do we print the clause? - (filter-list-to-where-clause including) - excluding ; do we print the clause? - (filter-list-to-where-clause excluding t))) - :do (let ((table (find-table schema table-name)) - (index - (make-pgsql-index :name name ; further processing is needed - :primary (string= name "PRIMARY") - :unique (string= "0" non-unique) - :columns (mapcar - #'apply-identifier-case - (sq:split-sequence #\, cols))))) + (db-name *connection*) + only-tables ; do we print the clause? + only-tables + including ; do we print the clause? + (filter-list-to-where-clause including) + excluding ; do we print the clause? + (filter-list-to-where-clause excluding t))) + :do (let* ((table (find-table schema table-name)) + (index + (make-index :name name ; further processing is needed + :schema schema + :table table + :primary (string= name "PRIMARY") + :unique (string= "0" non-unique) + :columns (mapcar + #'apply-identifier-case + (sq:split-sequence #\, cols))))) (add-index table index)) :finally - (return schema))) + (return schema))) ;;; ;;; MySQL Foreign Keys @@ -296,16 +298,16 @@ FROM :do (let* ((table (find-table schema table-name)) (ftable (find-table schema ftable-name)) (fk - (make-pgsql-fkey :name (apply-identifier-case name) - :table table - :columns (mapcar #'apply-identifier-case - (sq:split-sequence #\, cols)) - :foreign-table ftable - :foreign-columns (mapcar - #'apply-identifier-case - (sq:split-sequence #\, fcols)) - :update-rule update-rule - :delete-rule delete-rule))) + (make-fkey :name (apply-identifier-case name) + :table table + :columns (mapcar #'apply-identifier-case + (sq:split-sequence #\, cols)) + :foreign-table ftable + :foreign-columns (mapcar + #'apply-identifier-case + (sq:split-sequence #\, fcols)) + :update-rule update-rule + :delete-rule delete-rule))) (if (and name table ftable) (add-fkey table fk) (log-message :error diff --git a/src/sources/sqlite/sqlite-cast-rules.lisp b/src/sources/sqlite/sqlite-cast-rules.lisp index f775eff..b32bb38 100644 --- a/src/sources/sqlite/sqlite-cast-rules.lisp +++ b/src/sources/sqlite/sqlite-cast-rules.lisp @@ -91,5 +91,14 @@ (setf (column-transform pgcol) (lambda (val) (if val (format nil "~a" val) :null)))) + (setf (column-default pgcol) + (cond ((and (stringp default) (string= "NULL" default)) :null) + ((and (stringp default) + ;; address CURRENT_TIMESTAMP(6) and other spellings + (or (uiop:string-prefix-p "CURRENT_TIMESTAMP" default) + (string= "CURRENT TIMESTAMP" default))) + :current-timestamp) + (t default))) + pgcol))) diff --git a/src/sources/sqlite/sqlite-schema.lisp b/src/sources/sqlite/sqlite-schema.lisp index 2cb58c4..5047290 100644 --- a/src/sources/sqlite/sqlite-schema.lisp +++ b/src/sources/sqlite/sqlite-schema.lisp @@ -120,10 +120,11 @@ (loop :for (seq index-name unique origin partial) :in (sqlite:execute-to-list db sql) :do (let* ((cols (list-index-cols index-name db)) - (index (make-pgsql-index :name index-name - :primary (is-index-pk table cols) - :unique (= unique 1) - :columns cols))) + (index (make-index :name index-name + :table table + :primary (is-index-pk table cols) + :unique (= unique 1) + :columns cols))) (add-index table index))))) (defun list-all-indexes (schema &key (db *sqlite-db*)) @@ -147,17 +148,17 @@ :do (let* ((ftable (find-table (table-schema table) ftable-name)) (fkey (or (gethash id fkey-table) (let ((pg-fkey - (make-pgsql-fkey :table table - :columns nil - :foreign-table ftable - :foreign-columns nil - :update-rule on-update - :delete-rule on-delete))) + (make-fkey :table table + :columns nil + :foreign-table ftable + :foreign-columns nil + :update-rule on-update + :delete-rule on-delete))) (setf (gethash id fkey-table) pg-fkey) (add-fkey table pg-fkey) pg-fkey)))) - (push-to-end from (pgsql-fkey-columns fkey)) - (push-to-end to (pgsql-fkey-foreign-columns fkey)))))) + (push-to-end from (fkey-columns fkey)) + (push-to-end to (fkey-foreign-columns fkey)))))) (defun list-all-fkeys (schema &key (db *sqlite-db*)) "Get the list of SQLite foreign keys definitions per table." diff --git a/src/utils/alter-table.lisp b/src/utils/alter-table.lisp index 5ebebd8..28ed039 100644 --- a/src/utils/alter-table.lisp +++ b/src/utils/alter-table.lisp @@ -2,7 +2,7 @@ ;;; ALTER TABLE allows pgloader to apply transformations on the catalog ;;; retrieved before applying it to PostgreSQL: SET SCHEMA, RENAME, etc. ;;; -(in-package :pgloader.schema) +(in-package :pgloader.catalog) #| See src/parsers/command-alter-table.lisp diff --git a/src/utils/schema-structs.lisp b/src/utils/catalog.lisp similarity index 88% rename from src/utils/schema-structs.lisp rename to src/utils/catalog.lisp index af4cff2..a6c1fe4 100644 --- a/src/utils/schema-structs.lisp +++ b/src/utils/catalog.lisp @@ -8,8 +8,11 @@ ;;; Utility function using those definitions are found in schema.lisp in the ;;; same directory. ;;; -(in-package :pgloader.schema) +(in-package :pgloader.catalog) +;;; +;;; A macro to ease writing the catalog handling API +;;; (defmacro push-to-end (item place) `(progn (setf ,place (nconc ,place (list ,item))) @@ -17,10 +20,20 @@ ,item)) ;;; -;;; TODO: stop using anonymous data structures for database catalogs, -;;; currently list of alists of lists... the madness has found its way in -;;; lots of places tho. +;;; One interesting thing to do to all those catalog objects is to be able +;;; to print the DDL commands out: CREATE and DROP SQL statements. ;;; +(defgeneric format-create-sql (object &key stream if-not-exists) + (:documentation "Generate proper SQL command to create OBJECT in + PostgreSQL. The output is written to STREAM.")) + +(defgeneric format-drop-sql (object &key stream cascade) + (:documentation "Generate proper SQL command to drop OBJECT in PostgreSQL. + The output is written to STREAM.")) + +(defgeneric format-default-value (column &key stream) + (:documentation "Generate proper value to be used as a default value for + given COLUMN in PostgreSQL. The output is written to STREAM.")) ;;; ;;; A database catalog is a list of schema each containing a list of tables, @@ -34,19 +47,38 @@ (defstruct table source-name name schema oid comment ;; field is for SOURCE ;; column is for TARGET - field-list column-list index-list fkey-list) + field-list column-list index-list fkey-list trigger-list) + +;;; +;;; When migrating from another database to PostgreSQL some data types might +;;; need to be tranformed dynamically into User Defined Types: ENUMs, SET, +;;; etc. +;;; +(defstruct sqltype name type source-def extra) ;;; ;;; The generic PostgreSQL column that the CAST generic function is asked to ;;; produce, so that we know how to CREATE TABLEs in PostgreSQL whatever the ;;; source is. ;;; -(defstruct column name type-name type-mod nullable default comment transform) +(defstruct column name type-name type-mod nullable default comment transform extra) -;;; those are currently defined in ./schema.lisp -;; (defstruct index name primary unique columns sql conname condef) -;; (defstruct fkey -;; name columns foreign-table foreign-columns update-rule delete-rule) +;;; +;;; Index and Foreign Keys +;;; +(defstruct fkey + name table columns foreign-table foreign-columns condef + update-rule delete-rule match-rule deferrable initially-deferred) + +(defstruct index + name schema table primary unique columns sql conname condef filter) + +;;; +;;; Triggers and trigger procedures, no args support (yet?) +;;; +(defstruct trigger name table action procedure-name procedure) + +(defstruct procedure name returns language body) ;;; ;;; Main data collection API @@ -164,12 +196,13 @@ (apply-identifier-case schema-name))))) (push-to-end schema (catalog-schema-list catalog)))) -(defmethod add-table ((schema schema) table-name &key comment) +(defmethod add-table ((schema schema) table-name &key comment oid) "Add TABLE-NAME to SCHEMA and return the new table instance." (let ((table (make-table :source-name table-name :name (apply-identifier-case table-name) :schema schema + :oid oid :comment (unless (or (null comment) (string= "" comment)) comment)))) (push-to-end table (schema-table-list schema)))) @@ -205,11 +238,11 @@ (let ((schema (find-schema catalog schema-name))) (or schema (add-schema catalog schema-name)))) -(defmethod maybe-add-table ((schema schema) table-name &key comment) +(defmethod maybe-add-table ((schema schema) table-name &key comment oid) "Add TABLE-NAME to the table-list for SCHEMA, or return the existing table of the same name if it already exists in the schema table-list." (let ((table (find-table schema table-name))) - (or table (add-table schema table-name :comment comment)))) + (or table (add-table schema table-name :oid oid :comment comment)))) (defmethod maybe-add-view ((schema schema) view-name &key comment) "Add TABLE-NAME to the table-list for SCHEMA, or return the existing table @@ -360,3 +393,4 @@ (pgloader.pgsql:pgsql-execute sql))) (table-name ,table-name)))) ,@body))) +