mirror of
https://github.com/dimitri/pgloader.git
synced 2026-01-30 19:41:01 +01:00
Implement Citus support from a MySQL database.
This commit is contained in:
parent
290ad68d61
commit
bda06f8ac0
@ -232,6 +232,7 @@
|
||||
(defun process-catalog (copy catalog &key alter-table alter-schema distribute)
|
||||
"Do all the PostgreSQL catalog tweaking here: casts, index WHERE clause
|
||||
rewriting, pgloader level alter schema and alter table commands."
|
||||
(log-message :info "Processing source catalogs")
|
||||
|
||||
;; cast the catalog into something PostgreSQL can work on
|
||||
(cast catalog)
|
||||
@ -250,6 +251,7 @@
|
||||
|
||||
;; we also support schema changes necessary for Citus distribution
|
||||
(when distribute
|
||||
(log-message :info "Applying distribution rules")
|
||||
(setf (catalog-distribution-rules catalog)
|
||||
(citus-distribute-schema catalog distribute))))
|
||||
|
||||
@ -366,10 +368,12 @@
|
||||
:alter-schema alter-schema
|
||||
:distribute distribute)
|
||||
|
||||
(citus-rule-is-missing-from-list (e)
|
||||
#+pgloader-image
|
||||
((or citus-rule-table-not-found citus-rule-is-missing-from-list) (e)
|
||||
(log-message :fatal "~a" e)
|
||||
(return-from copy-database))
|
||||
|
||||
#+pgloader-image
|
||||
(condition (e)
|
||||
(log-message :fatal "Failed to process catalogs: ~a" e)
|
||||
(return-from copy-database)))
|
||||
|
||||
@ -190,6 +190,7 @@
|
||||
#:count-indexes
|
||||
#:count-fkeys
|
||||
#:max-indexes-per-table
|
||||
#:field-name
|
||||
|
||||
#:push-to-end
|
||||
#:with-schema
|
||||
@ -299,7 +300,18 @@
|
||||
(:export #:citus-distribute-schema
|
||||
#:citus-format-sql-select
|
||||
#:citus-backfill-table-p
|
||||
#:citus-rule-is-missing-from-list))
|
||||
#:citus-rule-table-not-found
|
||||
#:citus-rule-is-missing-from-list
|
||||
|
||||
#:citus-reference-rule
|
||||
#:citus-reference-rule-p
|
||||
#:citus-reference-rule-table
|
||||
|
||||
#:citus-distributed-rule
|
||||
#:citus-distributed-rule-p
|
||||
#:citus-distributed-rule-table
|
||||
#:citus-distributed-rule-using
|
||||
#:citus-distributed-rule-from))
|
||||
|
||||
(defpackage #:pgloader.utils
|
||||
(:use #:cl
|
||||
|
||||
@ -89,7 +89,8 @@
|
||||
excluding-matching
|
||||
decoding-tables-as
|
||||
before-load
|
||||
after-load))
|
||||
after-load
|
||||
distribute-commands))
|
||||
(:lambda (clauses-list)
|
||||
(alexandria:alist-plist clauses-list)))
|
||||
|
||||
@ -164,7 +165,7 @@
|
||||
&key
|
||||
gucs mysql-gucs
|
||||
casts views before after options
|
||||
alter-table alter-schema
|
||||
alter-table alter-schema distribute
|
||||
((:including incl))
|
||||
((:excluding excl))
|
||||
((:decoding decoding-as))
|
||||
@ -191,6 +192,7 @@
|
||||
:materialize-views ',views
|
||||
:alter-table ',alter-table
|
||||
:alter-schema ',alter-schema
|
||||
:distribute ',distribute
|
||||
:set-table-oids t
|
||||
:on-error-stop on-error-stop
|
||||
,@(remove-batch-control-option options))
|
||||
@ -203,7 +205,7 @@
|
||||
pg-db-uri
|
||||
&key
|
||||
gucs mysql-gucs casts views before after options
|
||||
alter-table alter-schema
|
||||
alter-table alter-schema distribute
|
||||
including excluding decoding)
|
||||
source
|
||||
(cond (*dry-run*
|
||||
@ -219,6 +221,7 @@
|
||||
:options options
|
||||
:alter-table alter-table
|
||||
:alter-schema alter-schema
|
||||
:distribute distribute
|
||||
:including including
|
||||
:excluding excluding
|
||||
:decoding decoding))))))
|
||||
|
||||
@ -13,6 +13,8 @@
|
||||
(defmethod format-create-sql ((rule citus-distributed-rule)
|
||||
&key (stream nil) if-not-exists)
|
||||
(declare (ignore if-not-exists))
|
||||
(format stream "SELECT create_distributed_table('~a', '~a');"
|
||||
(format-table-name (citus-distributed-rule-table rule))
|
||||
(column-name (citus-distributed-rule-using rule))))
|
||||
(let* ((rule-table (citus-distributed-rule-table rule))
|
||||
(rule-col-name (column-name (citus-distributed-rule-using rule))))
|
||||
(format stream "SELECT create_distributed_table('~a', '~a');"
|
||||
(format-table-name rule-table)
|
||||
(apply-identifier-case rule-col-name))))
|
||||
|
||||
@ -186,6 +186,9 @@
|
||||
(table-name name comment dtype ctype default nullable extra)))
|
||||
table-name name dtype ctype default nullable extra comment)
|
||||
|
||||
(defmethod field-name ((field mysql-column) &key)
|
||||
(mysql-column-name field))
|
||||
|
||||
(defun explode-mysql-enum (ctype)
|
||||
"Convert MySQL ENUM expression into a list of labels."
|
||||
(cl-ppcre:register-groups-bind (list)
|
||||
|
||||
@ -186,6 +186,9 @@
|
||||
"Cast a FIELD definition from a source database into a PostgreSQL COLUMN
|
||||
definition."))
|
||||
|
||||
(defgeneric field-name (object &key)
|
||||
(:documentation "Get the source database column name, or field-name."))
|
||||
|
||||
|
||||
;;;
|
||||
;;; Implementation of the methods
|
||||
@ -373,6 +376,9 @@
|
||||
(loop :for schema :in (catalog-schema-list catalog)
|
||||
:do (cast schema)))
|
||||
|
||||
(defmethod field-name ((column column) &key)
|
||||
(column-name column))
|
||||
|
||||
;;;
|
||||
;;; There's no simple equivalent to array_agg() in MS SQL, so the index and
|
||||
;;; fkey queries return a row per index|fkey column rather than per
|
||||
|
||||
@ -40,20 +40,42 @@
|
||||
;;
|
||||
;; ERROR Database error 42P16: table ;; "campaigns" is already distributed
|
||||
;;
|
||||
;; In the PostgreSQL source case, we have the table OIDs already at this
|
||||
;; point, but in the general case we don't. Use the names to match what
|
||||
;; we did up to now.
|
||||
;;
|
||||
(loop :for rule :in (append distribution-rules derived-rules)
|
||||
:unless (member (table-oid (citus-rule-table rule))
|
||||
:unless (member (table-source-name (citus-rule-table rule))
|
||||
processed-rules
|
||||
:key (lambda (rule)
|
||||
(table-oid (citus-rule-table rule))))
|
||||
(table-source-name (citus-rule-table rule)))
|
||||
:test #'equal)
|
||||
:collect (progn
|
||||
(push rule processed-rules)
|
||||
(apply-citus-rule rule)
|
||||
rule))))
|
||||
|
||||
(define-condition citus-rule-table-not-found (error)
|
||||
((schema-name :initarg :schema-name
|
||||
:accessor citus-rule-table-not-found-schema-name)
|
||||
(table-name :initarg :table-name
|
||||
:accessor citus-rule-table-not-found-table-name))
|
||||
(:report
|
||||
(lambda (err stream)
|
||||
(let ((*print-circle* nil))
|
||||
(with-slots (schema-name table-name)
|
||||
err
|
||||
(format stream
|
||||
"Could not find table ~s in schema ~s for distribution rules."
|
||||
table-name schema-name))))))
|
||||
|
||||
(defun citus-find-table (catalog table)
|
||||
(let* ((table-name (table-name table))
|
||||
(let* ((table-name (cdr (table-source-name table)))
|
||||
(schema-name (schema-name (table-schema table))))
|
||||
(find-table (find-schema catalog schema-name) table-name)))
|
||||
(or (find-table (find-schema catalog schema-name) table-name)
|
||||
(error (make-condition 'citus-rule-table-not-found
|
||||
:table-name table-name
|
||||
:schema-name schema-name)))))
|
||||
|
||||
(defgeneric citus-rule-table (rule)
|
||||
(:documentation "Returns the RULE's table.")
|
||||
@ -197,11 +219,11 @@
|
||||
;; it to our model
|
||||
(setf (table-citus-rule (citus-distributed-rule-table rule)) rule)
|
||||
|
||||
(let* ((table (citus-distributed-rule-table rule))
|
||||
(let* ((table (citus-distributed-rule-table rule))
|
||||
(column (find (column-name (citus-distributed-rule-using rule))
|
||||
(table-field-list table)
|
||||
:test #'string=
|
||||
:key #'column-name)))
|
||||
:key #'field-name)))
|
||||
(if column
|
||||
|
||||
;; add it to the PKEY definition, in first position
|
||||
|
||||
19
test/mysql/f1db-citus.load
Normal file
19
test/mysql/f1db-citus.load
Normal file
@ -0,0 +1,19 @@
|
||||
load database
|
||||
from mysql://root@localhost/f1db?useSSL=false
|
||||
into pgsql://localhost:9700/dim
|
||||
|
||||
with reset no sequences
|
||||
|
||||
distribute f1db.circuits as reference table
|
||||
distribute f1db.constructorResults using raceId
|
||||
distribute f1db.constructors as reference table
|
||||
distribute f1db.constructorStandings using raceId
|
||||
distribute f1db.drivers as reference table
|
||||
distribute f1db.driverStandings using raceId
|
||||
distribute f1db.lapTimes using raceId
|
||||
distribute f1db.pitStops using raceId
|
||||
distribute f1db.qualifying using raceId
|
||||
distribute f1db.races as reference table
|
||||
distribute f1db.results using raceId
|
||||
distribute f1db.seasons as reference table
|
||||
distribute f1db.status as reference table;
|
||||
Loading…
x
Reference in New Issue
Block a user