mirror of
https://github.com/dimitri/pgloader.git
synced 2025-08-13 09:46:59 +02:00
In a previous commit we re-used the package name pgloader.copy for the now separated implementation of the COPY protocol, but this package was already in use for the implementation of the COPY file format as a pgloader source. Oops. And CCL was happily doing its magic anyway, so that I've been blind to the problem. To fix, rename the new package pgloader.pgcopy, and to avoid having to deal with other problems of the same kind in the future, rename every source package pgloader.source.<format>, so that we now have pgloader.source.copy and pgloader.pgcopy, two visibily different packages to deal with. This light refactoring came with a challenge tho. The split in between the pgloader.sources API and the rest of the code involved some circular depencendies in the namespaces. CL is pretty flexible here because it can reload code definitions at runtime, but it was still a mess. To untangle it, implement a new namespace, the pgloader.load package, where we can use the pgloader.sources API and the pgloader.connection and pgloader.pgsql APIs too. A little problem gave birth to quite a massive patch. As it happens when refactoring and cleaning-up the dirt in any large enough project, right? See #748.
258 lines
12 KiB
Common Lisp
258 lines
12 KiB
Common Lisp
;;;
|
||
;;; Tools to handle MySQL data fetching
|
||
;;;
|
||
|
||
(in-package :pgloader.source.mysql)
|
||
|
||
(defclass copy-mysql (db-copy)
|
||
((encoding :accessor encoding ; allows forcing encoding
|
||
:initarg :encoding
|
||
:initform nil)
|
||
(range-list :accessor range-list
|
||
:initarg :range-list
|
||
:initform nil))
|
||
(:documentation "pgloader MySQL Data Source"))
|
||
|
||
(defmethod initialize-instance :after ((source copy-mysql) &key)
|
||
"Add a default value for transforms in case it's not been provided."
|
||
(let ((transforms (and (slot-boundp source 'transforms)
|
||
(slot-value source 'transforms))))
|
||
(when (and (slot-boundp source 'fields) (slot-value source 'fields))
|
||
;; cast typically happens in copy-database in the schema structure,
|
||
;; and the result is then copied into the copy-mysql instance.
|
||
(unless (and (slot-boundp source 'columns) (slot-value source 'columns))
|
||
(setf (slot-value source 'columns)
|
||
(mapcar #'cast (slot-value source 'fields))))
|
||
|
||
(unless transforms
|
||
(setf (slot-value source 'transforms)
|
||
(mapcar #'column-transform (slot-value source 'columns)))))))
|
||
|
||
|
||
;;;
|
||
;;; Implement the specific methods
|
||
;;;
|
||
(defmethod concurrency-support ((mysql copy-mysql) concurrency)
|
||
"Splits the read work thanks WHERE clauses when possible and relevant,
|
||
return nil if we decide to read all in a single thread, and a list of as
|
||
many copy-mysql instances as CONCURRENCY otherwise. Each copy-mysql
|
||
instance in the returned list embeds specifications about how to read
|
||
only its partition of the source data."
|
||
(unless (= 1 concurrency)
|
||
(let* ((indexes (table-index-list (target mysql)))
|
||
(pkey (first (remove-if-not #'index-primary indexes)))
|
||
(pcol (when pkey (first (index-columns pkey))))
|
||
(coldef (when pcol
|
||
(find pcol
|
||
(table-column-list (target mysql))
|
||
:key #'column-name
|
||
:test #'string=)))
|
||
(ptype (when (and coldef (stringp (column-type-name coldef)))
|
||
(column-type-name coldef))))
|
||
(when (member ptype (list "integer" "bigint" "serial" "bigserial")
|
||
:test #'string=)
|
||
;; the table has a primary key over a integer data type we are able
|
||
;; to generate WHERE clause and range index scans.
|
||
(with-connection (*connection* (source-db mysql))
|
||
(let* ((col (mysql-column-name
|
||
(nth (position coldef (table-column-list (target mysql)))
|
||
(fields mysql))))
|
||
(sql (format nil "select min(`~a`), max(`~a`) from `~a`"
|
||
col col (table-source-name (source mysql)))))
|
||
(destructuring-bind (min max)
|
||
(let ((result (first (mysql-query sql))))
|
||
;; result is (min max), or (nil nil) if table is empty
|
||
(if (or (null (first result))
|
||
(null (second result)))
|
||
result
|
||
(mapcar #'parse-integer result)))
|
||
;; generate a list of ranges from min to max
|
||
(when (and min max)
|
||
(let ((range-list (split-range min max *rows-per-range*)))
|
||
(unless (< (length range-list) concurrency)
|
||
;; affect those ranges to each reader, we have CONCURRENCY
|
||
;; of them
|
||
(let ((partitions (distribute range-list concurrency)))
|
||
(loop :for part :in partitions :collect
|
||
(make-instance 'copy-mysql
|
||
:source-db (clone-connection
|
||
(source-db mysql))
|
||
:target-db (target-db mysql)
|
||
:source (source mysql)
|
||
:target (target mysql)
|
||
:fields (fields mysql)
|
||
:columns (columns mysql)
|
||
:transforms (transforms mysql)
|
||
:encoding (encoding mysql)
|
||
:range-list (cons col part))))))))))))))
|
||
|
||
(defun call-with-encoding-handler (copy-mysql table-name func)
|
||
(handler-bind
|
||
;; Newer versions of qmynd handle the babel error and signal this one
|
||
;; with more details and an improved reporting:
|
||
((qmynd-impl::decoding-error
|
||
#'(lambda (c)
|
||
(update-stats :data (target copy-mysql) :errs 1)
|
||
(log-message :error "~a" c)
|
||
(invoke-restart 'qmynd-impl::use-nil)))
|
||
|
||
;; Older versions of qmynd reported babel errors directly
|
||
(babel-encodings:end-of-input-in-character
|
||
#'(lambda (c)
|
||
(update-stats :data (target copy-mysql) :errs 1)
|
||
(log-message :error "~a" c)
|
||
(invoke-restart 'qmynd-impl::use-nil)))
|
||
|
||
(babel-encodings:character-decoding-error
|
||
#'(lambda (c)
|
||
(update-stats :data (target copy-mysql) :errs 1)
|
||
(let* ((encoding (babel-encodings:character-coding-error-encoding c))
|
||
(position (babel-encodings:character-coding-error-position c))
|
||
(buffer (babel-encodings:character-coding-error-buffer c))
|
||
(character
|
||
(when (and position (< position (length buffer)))
|
||
(aref buffer position))))
|
||
(log-message :error
|
||
"While decoding text data from MySQL table ~s: ~%~
|
||
Illegal ~a character starting at position ~a~@[: ~a~].~%"
|
||
table-name encoding position character))
|
||
(invoke-restart 'qmynd-impl::use-nil))))
|
||
(funcall func)))
|
||
|
||
(defmacro with-encoding-handler ((copy-mysql table-name) &body forms)
|
||
`(call-with-encoding-handler ,copy-mysql ,table-name (lambda () ,@forms)))
|
||
|
||
(defmethod map-rows ((mysql copy-mysql) &key process-row-fn)
|
||
"Extract MySQL data and call PROCESS-ROW-FN function with a single
|
||
argument (a list of column values) for each row."
|
||
(let ((table-name (table-source-name (source mysql)))
|
||
(qmynd:*mysql-encoding*
|
||
(when (encoding mysql)
|
||
#+sbcl (encoding mysql)
|
||
#+ccl (ccl:external-format-character-encoding (encoding mysql)))))
|
||
|
||
(with-connection (*connection* (source-db mysql))
|
||
(when qmynd:*mysql-encoding*
|
||
(log-message :notice "Force encoding to ~a for ~a"
|
||
qmynd:*mysql-encoding* table-name))
|
||
(let* ((cols (get-column-list mysql))
|
||
(sql (format nil "SELECT ~{~a~^, ~} FROM `~a`" cols table-name)))
|
||
|
||
(if (range-list mysql)
|
||
;; read a range at a time, in a loop
|
||
(destructuring-bind (colname . ranges) (range-list mysql)
|
||
(loop :for (min max) :in ranges :do
|
||
(let ((sql (format nil "~a WHERE `~a` >= ~a AND `~a` < ~a"
|
||
sql colname min colname max)))
|
||
(with-encoding-handler (mysql table-name)
|
||
(mysql-query sql
|
||
:row-fn process-row-fn
|
||
:result-type 'vector)))))
|
||
|
||
;; read it all, no WHERE clause
|
||
(with-encoding-handler (mysql table-name)
|
||
(mysql-query sql
|
||
:row-fn process-row-fn
|
||
:result-type 'vector)))))))
|
||
|
||
|
||
|
||
(defmethod copy-column-list ((mysql copy-mysql))
|
||
"We are sending the data in the MySQL columns ordering here."
|
||
(mapcar #'apply-identifier-case (mapcar #'mysql-column-name (fields mysql))))
|
||
|
||
|
||
(defmethod fetch-metadata ((mysql copy-mysql)
|
||
(catalog catalog)
|
||
&key
|
||
materialize-views
|
||
only-tables
|
||
(create-indexes t)
|
||
(foreign-keys t)
|
||
including
|
||
excluding)
|
||
"MySQL introspection to prepare the migration."
|
||
(let ((schema (add-schema catalog (catalog-name catalog)
|
||
:in-search-path t))
|
||
(view-names (unless (eq :all materialize-views)
|
||
(mapcar #'car materialize-views))))
|
||
(with-stats-collection ("fetch meta data"
|
||
:use-result-as-rows t
|
||
:use-result-as-read t
|
||
:section :pre)
|
||
(with-connection (*connection* (source-db mysql))
|
||
;; If asked to MATERIALIZE VIEWS, now is the time to create them in
|
||
;; MySQL, when given definitions rather than existing view names.
|
||
(when (and materialize-views (not (eq :all materialize-views)))
|
||
(create-my-views materialize-views))
|
||
|
||
;; fetch table and columns metadata, covering table and column comments
|
||
(list-all-columns schema
|
||
:only-tables only-tables
|
||
:including including
|
||
:excluding excluding)
|
||
|
||
;; fetch view (and their columns) metadata, covering comments too
|
||
(cond (view-names (list-all-columns schema
|
||
:only-tables view-names
|
||
:table-type :view))
|
||
|
||
((eq :all materialize-views)
|
||
(list-all-columns schema :table-type :view)))
|
||
|
||
(when foreign-keys
|
||
(list-all-fkeys schema
|
||
:only-tables only-tables
|
||
:including including
|
||
:excluding excluding))
|
||
|
||
(when create-indexes
|
||
(list-all-indexes schema
|
||
:only-tables only-tables
|
||
:including including
|
||
:excluding excluding))
|
||
|
||
;; return how many objects we're going to deal with in total
|
||
;; for stats collection
|
||
(+ (count-tables catalog)
|
||
(count-views catalog)
|
||
(count-indexes catalog)
|
||
(count-fkeys catalog))))
|
||
|
||
catalog))
|
||
|
||
(defmethod cleanup ((mysql copy-mysql) (catalog catalog) &key materialize-views)
|
||
"When there is a PostgreSQL error at prepare-pgsql-database step, we might
|
||
need to clean-up any view created in the MySQL connection for the
|
||
migration purpose."
|
||
(when materialize-views
|
||
(with-connection (*connection* (source-db mysql))
|
||
(drop-my-views materialize-views))))
|
||
|
||
(defvar *decoding-as* nil
|
||
"Special per-table encoding/decoding overloading rules for MySQL.")
|
||
|
||
(defun apply-decoding-as-filters (table-name filters)
|
||
"Return a generialized boolean which is non-nil only if TABLE-NAME matches
|
||
one of the FILTERS."
|
||
(flet ((apply-filter (filter)
|
||
;; we close over table-name here.
|
||
(typecase filter
|
||
(string (string-equal filter table-name))
|
||
(list (destructuring-bind (type val) filter
|
||
(ecase type
|
||
(:regex (cl-ppcre:scan val table-name))))))))
|
||
(some #'apply-filter filters)))
|
||
|
||
(defmethod instanciate-table-copy-object ((copy copy-mysql) (table table))
|
||
"Create an new instance for copying TABLE data."
|
||
(let ((new-instance (change-class (call-next-method copy table) 'copy-mysql)))
|
||
(setf (encoding new-instance)
|
||
;; force the data encoding when asked to
|
||
(when *decoding-as*
|
||
(loop :for (encoding . filters) :in *decoding-as*
|
||
:when (apply-decoding-as-filters (table-name table) filters)
|
||
:return encoding)))
|
||
new-instance))
|
||
|