Add schema migration support for Redshift as a target.

Redshift looks like a very old PostgreSQL (8.0.2) with some extra features
and a very limited selection of data types. In this patch we parse the
PostgreSQL version() function output and automatically determine if we're
connected to Redshift.

When connected to Redshift, we then dumb-down our target catalogs to the
subset of data types that Redshift actually does support.

Also, some catalog queries can't be done in Redshift, and 8.0 didn't have
fully compliant VALUES statement, so we use a temporary table in places
where we used to use SELECT ... FROM (VALUES(...)) in pgloader.

COPYing data to Redshift isn't possible with just this set of changes,
because Redshift also don't support the COPY FROM STDIN form. COPY sources
are limited, and another patch will have to be cooked to prepare the data
from pgloader into a format and location that Redshift knows how to handle.

At least, it's possible to migrate a database schema to Redshift already.
This commit is contained in:
Dimitri Fontaine 2018-05-19 19:16:58 +02:00
parent 8fce6c84fc
commit d4dc4499a8
8 changed files with 523 additions and 135 deletions

View File

@ -98,6 +98,7 @@
(:file "merge-catalogs" :depends-on ("pgsql-schema"))
(:file "pgsql-trigger")
(:file "pgsql-index-filter")
(:file "pgsql-finalize-catalogs")
(:file "pgsql-create-schema"
:depends-on ("pgsql-trigger"))))

View File

@ -28,8 +28,8 @@
(log-message :notice "Prepare PostgreSQL database.")
(with-pgsql-transaction (:pgconn (target-db copy))
(setf (catalog-types-without-btree catalog)
(list-typenames-without-btree-support))
(finalize-catalogs catalog (pgconn-variant (target-db copy)))
(if create-tables
(progn
@ -99,7 +99,7 @@
(with-stats-collection ("Set Table OIDs" :section :pre
:use-result-as-read t
:use-result-as-rows t)
(set-table-oids catalog)))
(set-table-oids catalog :variant (pgconn-variant (target-db copy)))))
;; We might have to MATERIALIZE VIEWS
(when (and create-tables materialize-views)

View File

@ -366,13 +366,16 @@
(defpackage #:pgloader.pgsql
(:use #:cl
#:pgloader.params #:pgloader.utils #:pgloader.connection
#:pgloader.catalog)
#:pgloader.params #:pgloader.utils #:pgloader.transforms
#:pgloader.connection #:pgloader.catalog)
(:import-from #:cl-postgres
#:database-error-context)
(:export #:pgsql-connection
#:pgconn-use-ssl
#:pgconn-table-name
#:pgconn-version-string
#:pgconn-major-version
#:pgconn-variant
#:with-pgsql-transaction
#:with-pgsql-connection
#:pgsql-execute
@ -395,9 +398,6 @@
#:create-pgsql-fkeys
#:create-triggers
#:translate-index-filter
#:process-index-definitions
#:fetch-pgsql-catalog
#:merge-catalogs
@ -408,6 +408,10 @@
#:reset-sequences
#:comment-on-tables-and-columns
;; finalizing catalogs support (redshift and other variants)
#:finalize-catalogs
#:adjust-data-types
;; index filter rewriting support
#:translate-index-filter
#:process-index-definitions

View File

@ -3,12 +3,21 @@
;;;
(in-package :pgloader.pgsql)
(defparameter *pgconn-variant* :pgdg
"The PostgreSQL version/variant we are talking to. At the moment, this
could be either :pgdg for PostgreSQL Global Development Group,
or :redshift. The value is parsed from SELECT version(); and set in the
pgsql-connection instance at open-connection time.")
;;;
;;; PostgreSQL Tools connecting to a database
;;;
(defclass pgsql-connection (db-connection)
((use-ssl :initarg :use-ssl :accessor pgconn-use-ssl)
(table-name :initarg :table-name :accessor pgconn-table-name))
(table-name :initarg :table-name :accessor pgconn-table-name)
(version-string :initarg :pg-version :accessor pgconn-version-string)
(major-version :initarg :major-version :accessor pgconn-major-version)
(variant :initarg :variant :accessor pgconn-variant))
(:documentation "PostgreSQL connection for pgloader"))
(defmethod initialize-instance :after ((pgconn pgsql-connection) &key)
@ -18,8 +27,11 @@
(defmethod clone-connection ((c pgsql-connection))
(let ((clone
(change-class (call-next-method c) 'pgsql-connection)))
(setf (pgconn-use-ssl clone) (pgconn-use-ssl c)
(pgconn-table-name clone) (pgconn-table-name c))
(setf (pgconn-use-ssl clone) (pgconn-use-ssl c)
(pgconn-table-name clone) (pgconn-table-name c)
(pgconn-version-string clone) (pgconn-version-string c)
(pgconn-major-version clone) (pgconn-major-version c)
(pgconn-variant clone) (pgconn-variant c))
clone))
(defmethod ssl-enable-p ((pgconn pgsql-connection))
@ -139,6 +151,7 @@
(log-message :debug "CONNECTED TO ~s" pgconn)
(set-session-gucs *pg-settings* :database (conn-handle pgconn))
(set-postgresql-version pgconn)
pgconn))
@ -195,7 +208,8 @@
spec from the DBNAME, use `get-connection-spec'."
(let ((conn (gensym "pgsql-conn")))
`(with-connection (,conn ,pgconn)
(let ((pomo:*database* (conn-handle ,conn)))
(let ((pomo:*database* (conn-handle ,conn))
(*pgconn-variant* (pgconn-variant ,conn)))
(handling-pgsql-notices
,@forms)))))
@ -309,9 +323,10 @@
(nb-ok 0)
(nb-errors 0))
(when client-min-messages
(pomo:execute
(format nil "SET LOCAL client_min_messages TO ~a;"
(symbol-name client-min-messages))))
(unless (eq :redshift *pgconn-variant*)
(pomo:execute
(format nil "SET LOCAL client_min_messages TO ~a;"
(symbol-name client-min-messages)))))
(if on-error-stop
(loop :for sql :in sql-list
@ -337,7 +352,8 @@
(pomo:execute "rollback to savepoint pgloader;"))))))
(when client-min-messages
(pomo:execute (format nil "RESET client_min_messages;")))
(unless (eq :redshift *pgconn-variant*)
(pomo:execute (format nil "RESET client_min_messages;"))))
(values nb-ok nb-errors)))
@ -345,6 +361,49 @@
;;;
;;; PostgreSQL version specific support, that we get once connected
;;;
;;;
;;; Given that RedShift is a PostgreSQL fork(), we can use PostgreSQL
;;; protocol to talk to it. That said, it's a strange beast:
;;;
;;; pgloader=> show server_version;
;;; ERROR: must be superuser to examine "server_version"
;;;
;;; So we're back to parsing the output of select version();
;;;
(defun set-postgresql-version (pgconn)
"Get the PostgreSQL version string and parse it."
(let* ((database (conn-handle pgconn))
(pomo:*database* (or database pomo:*database*))
(version-string (pomo:query "select version()" :single)))
(multiple-value-bind (version major-version variant)
(parse-postgresql-version-string version-string)
(declare (ignore version))
(setf (pgconn-version-string pgconn) version-string)
(setf (pgconn-major-version pgconn) major-version)
(setf (pgconn-variant pgconn) variant))))
;;; Parse the version string and return the major and complete version
;;; "numbers" as strings, and the variant as a third value, where variant
;;; might be "pgsql" or "Redshift".
;;;
;;; PostgreSQL 8.0.2 on i686-pc-linux-gnu, compiled by GCC gcc (GCC) 3.4.2 20041017 (Red Hat 3.4.2-6.fc3), Redshift 1.0.2058
;;; PostgreSQL 10.1 on x86_64-apple-darwin14.5.0, compiled by Apple LLVM version 7.0.0 (clang-700.1.76), 64-bit
(defun parse-postgresql-version-string (version-string)
"Parse PostgreSQL select version() output."
(cl-ppcre:register-groups-bind (full-version maybe-variant)
("PostgreSQL ([0-9.]+) on .*, [^,]+, (.*)" version-string)
(let* ((version-dots (split-sequence:split-sequence #\. full-version))
(major-version (if (= 3 (length version-dots))
(format nil "~a.~a"
(first version-dots)
(second version-dots))
(first version-dots)))
(variant (if (cl-ppcre:scan "Redshift" maybe-variant)
:redshift
:pgdg)))
(values full-version major-version variant))))
(defun list-typenames-without-btree-support ()
"Fetch PostgresQL data types without btree support, so that it's possible
to later CREATE INDEX ... ON ... USING gist(...), or even something else
@ -353,118 +412,281 @@
(pomo:query (sql "/pgsql/list-typenames-without-btree-support.sql"))
:collect (cons typename access-methods)))
(defvar *redshift-reserved-keywords*
'("aes128"
"aes256"
"all"
"allowoverwrite"
"analyse"
"analyze"
"and"
"any"
"array"
"as"
"asc"
"authorization"
"backup"
"between"
"binary"
"blanksasnull"
"both"
"bytedict"
"bzip2"
"case"
"cast"
"check"
"collate"
"column"
"constraint"
"create"
"credentials"
"cross"
"current_date"
"current_time"
"current_timestamp"
"current_user"
"current_user_id"
"default"
"deferrable"
"deflate"
"defrag"
"delta"
"delta32k"
"desc"
"disable"
"distinct"
"do"
"else"
"emptyasnull"
"enable"
"encode"
"encrypt"
"encryption"
"end"
"except"
"explicit"
"false"
"for"
"foreign"
"freeze"
"from"
"full"
"globaldict256"
"globaldict64k"
"grant"
"group"
"gzip"
"having"
"identity"
"ignore"
"ilike"
"in"
"initially"
"inner"
"intersect"
"into"
"is"
"isnull"
"join"
"leading"
"left"
"like"
"limit"
"localtime"
"localtimestamp"
"lun"
"luns"
"lzo"
"lzop"
"minus"
"mostly13"
"mostly32"
"mostly8"
"natural"
"new"
"not"
"notnull"
"null"
"nulls"
"off"
"offline"
"offset"
"oid"
"old"
"on"
"only"
"open"
"or"
"order"
"outer"
"overlaps"
"parallel"
"partition"
"percent"
"permissions"
"placing"
"primary"
"raw"
"readratio"
"recover"
"references"
"respect"
"rejectlog"
"resort"
"restore"
"right"
"select"
"session_user"
"similar"
"snapshot"
"some"
"sysdate"
"system"
"table"
"tag"
"tdes"
"text255"
"text32k"
"then"
"timestamp"
"to"
"top"
"trailing"
"true"
"truncatecolumns"
"union"
"unique"
"user"
"using"
"verbose"
"wallet"
"when"
"where"
"with"
"without")
"See https://docs.aws.amazon.com/redshift/latest/dg/r_pg_keywords.html")
(defun list-reserved-keywords (pgconn)
"Connect to PostgreSQL DBNAME and fetch reserved keywords."
(handler-case
(with-pgsql-connection (pgconn)
(with-pgsql-connection (pgconn)
(handler-case
(pomo:query "select word
from pg_get_keywords()
where catcode IN ('R', 'T')" :column))
;; support for Amazon Redshift
(cl-postgres-error::syntax-error-or-access-violation (e)
;; 42883 undefined_function
;; Database error 42883: function pg_get_keywords() does not exist
;;
;; the following list comes from a manual query against a local
;; PostgreSQL server (version 9.5devel), it's better to have this list
;; than nothing at all.
(declare (ignore e))
(list "all"
"analyse"
"analyze"
"and"
"any"
"array"
"as"
"asc"
"asymmetric"
"authorization"
"binary"
"both"
"case"
"cast"
"check"
"collate"
"collation"
"column"
"concurrently"
"constraint"
"create"
"cross"
"current_catalog"
"current_date"
"current_role"
"current_schema"
"current_time"
"current_timestamp"
"current_user"
"default"
"deferrable"
"desc"
"distinct"
"do"
"else"
"end"
"except"
"false"
"fetch"
"for"
"foreign"
"freeze"
"from"
"full"
"grant"
"group"
"having"
"ilike"
"in"
"initially"
"inner"
"intersect"
"into"
"is"
"isnull"
"join"
"lateral"
"leading"
"left"
"like"
"limit"
"localtime"
"localtimestamp"
"natural"
"not"
"notnull"
"null"
"offset"
"on"
"only"
"or"
"order"
"outer"
"overlaps"
"placing"
"primary"
"references"
"returning"
"right"
"select"
"session_user"
"similar"
"some"
"symmetric"
"table"
"then"
"to"
"trailing"
"true"
"union"
"unique"
"user"
"using"
"variadic"
"verbose"
"when"
"where"
"window"
"with"))))
where catcode IN ('R', 'T')" :column)
;; support for Amazon Redshift
(cl-postgres-error::syntax-error-or-access-violation (e)
(declare (ignore e))
;; 42883 undefined_function
;; Database error 42883: function pg_get_keywords() does not exist
(let ((version-string (pomo:query "select version()" :single)))
(if (cl-ppcre:scan "Redshift" version-string)
*redshift-reserved-keywords*
;;
;; In case the pg_get_keywords() function isn't supported but
;; we're not talking to a Redshift database, use the following
;; list of keywords, that comes from a manual query against a
;; local PostgreSQL server (version 9.5devel). It's better to
;; have this list than nothing at all.
;;
(list "all"
"analyse"
"analyze"
"and"
"any"
"array"
"as"
"asc"
"asymmetric"
"authorization"
"binary"
"both"
"case"
"cast"
"check"
"collate"
"collation"
"column"
"concurrently"
"constraint"
"create"
"cross"
"current_catalog"
"current_date"
"current_role"
"current_schema"
"current_time"
"current_timestamp"
"current_user"
"default"
"deferrable"
"desc"
"distinct"
"do"
"else"
"end"
"except"
"false"
"fetch"
"for"
"foreign"
"freeze"
"from"
"full"
"grant"
"group"
"having"
"ilike"
"in"
"initially"
"inner"
"intersect"
"into"
"is"
"isnull"
"join"
"lateral"
"leading"
"left"
"like"
"limit"
"localtime"
"localtimestamp"
"natural"
"not"
"notnull"
"null"
"offset"
"on"
"only"
"or"
"order"
"outer"
"overlaps"
"placing"
"primary"
"references"
"returning"
"right"
"select"
"session_user"
"similar"
"some"
"symmetric"
"table"
"then"
"to"
"trailing"
"true"
"union"
"unique"
"user"
"using"
"variadic"
"verbose"
"when"
"where"
"window"
"with")))))))

View File

@ -268,15 +268,14 @@
;;;
;;; Protect from non-unique index names
;;;
(defun set-table-oids (catalog)
(defun set-table-oids (catalog &key (variant :pgdg))
"MySQL allows using the same index name against separate tables, which
PostgreSQL forbids. To get unicity in index names without running out of
characters (we are allowed only 63), we use the table OID instead.
This function grabs the table OIDs in the PostgreSQL database and update
the definitions with them."
(let* ((table-names (mapcar #'format-table-name (table-list catalog)))
(oid-map (list-table-oids table-names)))
(let ((oid-map (list-table-oids catalog :variant variant)))
(loop :for table :in (table-list catalog)
:for table-name := (format-table-name table)
:for table-oid := (gethash table-name oid-map)

View File

@ -0,0 +1,136 @@
;;;
;;; pgloader supports several PostgreSQL variants:
;;;
;;; :pgdg Core PostgreSQL from the PostgreSQL Development Group
;;; :redshift The Amazon Redshift variant
;;;
;;; Some variants have less features than others, for instance Redshift has
;;; a very limited set of data types to choose from. We have to dumb-down
;;; the PostgreSQL catalogs to target such a dumb database technology.
;;;
;;; Still, Redshift supports the same protocol as PostgreSQL, including
;;; COPY, so it's nice to consider this target as a PostgreSQL thing
;;; nonetheless.
(in-package #:pgloader.pgsql)
(defun finalize-catalogs (catalog variant)
;;
;; For Core PostgreSQL, we also want to find data types names that have
;; no Btree support and fetch alternatives. This allows for supporting
;; automated migration of geometric data types.
;;
(when (eq :pgdg variant)
(setf (catalog-types-without-btree catalog)
(list-typenames-without-btree-support)))
;;
;; For other variants, we might have to be smart about the selection of
;; data types…
;;
(adjust-data-types catalog variant))
(defgeneric adjust-data-types (catalog variant))
;;;
;;; Nothing needs to be done for PostgreSQL variant :pgdg, of course.
;;;
(defmethod adjust-data-types ((catalog catalog) (variant (eql :pgdg)))
catalog)
;;;
;;; The RedShift case is a little more involved, as shown in their
;;; documentation:
;;;
;;; https://docs.aws.amazon.com/redshift/latest/dg/c_Supported_data_types.html
;;;
;;; Redshift only support those data types:
;;;
(defvar *redshift-supported-data-types*
'("SMALLINT"
"INTEGER"
"BIGINT"
"DECIMAL"
"REAL"
"DOUBLE PRECISION"
"BOOLEAN"
"CHAR"
"VARCHAR"
"DATE"
"TIMESTAMP"
"TIMESTAMPTZ"))
(defvar *redshift-decimal-max-precision* 38)
(defmethod adjust-data-types ((catalog catalog) (variant (eql :redshift)))
(dumb-down-data-types-for-redshift catalog))
;;;
;;; Catalog data types walker
;;;
(defgeneric dumb-down-data-types-for-redshift (object))
(defmethod dumb-down-data-types-for-redshift ((catalog catalog))
(loop :for schema :in (catalog-schema-list catalog)
:do (dumb-down-data-types-for-redshift schema)))
(defmethod dumb-down-data-types-for-redshift ((schema schema))
(loop :for table :in (append (schema-table-list schema)
(schema-view-list schema))
:do (dumb-down-data-types-for-redshift table)))
(defmethod dumb-down-data-types-for-redshift ((table table))
(loop :for column :in (table-column-list table)
:do (dumb-down-data-types-for-redshift column)))
(defmethod dumb-down-data-types-for-redshift ((column column))
(cond ((and (stringp (column-type-name column))
(member (column-type-name column)
*redshift-supported-data-types*
:test #'string-equal))
;; the target data type is supported... but we might have other
;; situations to deal with, such as
;;
;; DECIMAL precision 65 must be between 1 and 38
(when (and (member (column-type-name column)
'("decimal" "numeric")
:test #'string-equal)
(column-type-mod column))
(destructuring-bind (precision &optional (scale 0))
(parse-column-typemod (column-type-name column)
(column-type-mod column))
(when (< *redshift-decimal-max-precision* precision)
(log-message :info
"Redshift doesn't support DECIMAL(~a), capping to ~a"
precision *redshift-decimal-max-precision*)
;;
;; Internal pgloader API want the value as a string.
;;
(setf (column-type-mod column)
(format nil "(~a,~a)"
*redshift-decimal-max-precision* scale))))))
(t
(let ((target-data-type "varchar")
(target-type-mod "(512)"))
;;
;; TODO: we might want to be smarter about the data type conversion
;; here, or at least a tad less dumb. For instance:
;;
;; - when the source column is an ENUM, we know from its source
;; definition the maximum length of the target column
;;
;; - when the source field is e.g. a MySQL varchar(12), it might be
;; - nice to keep the typemod here, or maybe 4 times the typemod to
;; - take into account Redshift encoding "properties".
;;
(setf (column-type-name column) target-data-type)
(setf (column-type-mod column) target-type-mod)
(log-message :info
"Redshift support: ~a change type from ~a to ~a~a"
(column-name column)
(column-type-name column)
target-data-type
target-type-mod)))))

View File

@ -324,13 +324,34 @@
name may be different, so just ask PostgreSQL here."
(pomo:query "select current_database();" :single))
(defun list-table-oids (table-names)
(defun list-table-oids (catalog &key (variant :pgdg))
"Return an hash table mapping TABLE-NAME to its OID for all table in the
TABLE-NAMES list. A PostgreSQL connection must be established already."
(let ((oidmap (make-hash-table :size (length table-names) :test #'equal))
(sql (format nil (sql "/pgsql/list-table-oids.sql") table-names)))
(when table-names
(let* ((table-list (table-list catalog))
(oidmap (make-hash-table :size (length table-list) :test #'equal)))
(when table-list
(loop :for (name oid)
:in (query nil sql)
:in (ecase variant
(:pgdg
;; use the SELECT ... FROM (VALUES ...) variant
(query nil (format nil
(sql "/pgsql/list-table-oids.sql")
(mapcar #'format-table-name table-list))))
(:redshift
;; use the TEMP TABLE variant in Redshift, which doesn't
;; have proper support for VALUES (landed in PostgreSQL 8.2)
(query nil
"create temp table pgloader_toids(tnsp text, tnam text)")
(query nil
(format
nil
"insert into pgloader_toids values ~{(~a)~^,~};"
(mapcar (lambda (table)
(format nil "'~a', '~a'"
(schema-name (table-schema table))
(table-name table)))
table-list)))
(query nil
(sql "/pgsql/list-table-oids-from-temp-table.sql"))))
:do (setf (gethash name oidmap) oid)))
oidmap))

View File

@ -0,0 +1,5 @@
select toids.tnsp || '.' || toids.tnam , c.oid
from pg_catalog.pg_class c
join pg_catalog.pg_namespace n on n.oid = c.relnamespace
join pgloader_toids toids on n.nspname::text = toids.tnsp
and c.relname::text = toids.tnam;