Implement data transfer from SQL Server.

This commit is contained in:
Dimitri Fontaine 2014-11-10 03:15:22 +01:00
parent e2f6dba107
commit 87e2e16582
6 changed files with 145 additions and 17 deletions

View File

@ -98,3 +98,26 @@
vector))
(otherwise (error "not supported type ~A"
(foreign-enum-keyword '%syb-value-type type))))))
(defun map-query-results (query &key row-fn (connection *database*))
"Map the query results through the map-fn function."
(let ((%dbproc (slot-value connection 'dbproc))
(cffi:*default-foreign-encoding* (slot-value connection 'external-format)))
(with-foreign-string (%query query)
(%dbcmd %dbproc %query))
(%dbsqlexec %dbproc)
(unwind-protect
(unless (= +no-more-results+ (%dbresults %dbproc))
(loop :for rtc := (%dbnextrow %dbproc)
:until (= rtc +no-more-rows+)
:do (let ((row (make-array (%dbnumcols %dbproc))))
(loop :for i :from 1 :to (%dbnumcols %dbproc)
:for value := (sysdb-data-to-lisp %dbproc
(%dbdata %dbproc i)
(%dbcoltype %dbproc i)
(%dbdatlen %dbproc i))
:do (setf (aref row (- i 1)) value))
(funcall row-fn row))))
(%dbcancel %dbproc))))

View File

@ -10,14 +10,6 @@
:initform nil))
(:documentation "pgloader MS SQL Data Source"))
(defun cast-mssql-column-definition-to-pgsql (mssql-column)
"Return the PostgreSQL column definition from the MySQL one."
(with-slots (schema table-name name type default nullable)
mssql-column
(declare (ignore schema)) ; FIXME
(let ((ctype (mssql-column-ctype mssql-column)))
(cast table-name name type ctype default nullable nil))))
(defmethod initialize-instance :after ((source copy-mssql) &key)
"Add a default value for transforms in case it's not been provided."
(let* ((source-db (slot-value source 'source-db))
@ -50,7 +42,7 @@
(loop :for field :in fields
:for (column fn) := (multiple-value-bind (column fn)
(cast-mysql-column-definition-to-pgsql field)
(cast-mssql-column-definition-to-pgsql field)
(list column fn))
:collect column :into columns
:collect fn :into fns
@ -58,6 +50,62 @@
(unless transforms
(setf (slot-value source 'transforms) fns)))))))
(defmethod map-rows ((mssql copy-mssql) &key process-row-fn)
"Extract Mssql data and call PROCESS-ROW-FN function with a single
argument (a list of column values) for each row."
(with-mssql-connection ((source-db mssql))
(let* ((sql (destructuring-bind (schema . table-name)
(source mssql)
(format nil "SELECT ~{~a~^, ~} FROM ~a.~a;"
(get-column-list (fields mssql))
schema
table-name)))
(row-fn
(lambda (row)
(pgstate-incf *state* (target mssql) :read 1)
(funcall process-row-fn row))))
(log-message :warning "~a" sql)
(handler-case
(mssql::map-query-results sql :row-fn row-fn :connection *mssql-db*)
(condition (e)
(progn
(log-message :error "~a" e)
(pgstate-incf *state* (target mssql) :errs 1)))))))
(defmethod copy-to-queue ((mssql copy-mssql) queue)
"Copy data from MSSQL table DBNAME.TABLE-NAME into queue DATAQ"
(map-push-queue mssql queue))
(defmethod copy-from ((mssql copy-mssql) &key (kernel nil k-s-p) truncate)
"Connect in parallel to MSSQL and PostgreSQL and stream the data."
(let* ((summary (null *state*))
(*state* (or *state* (pgloader.utils:make-pgstate)))
(lp:*kernel* (or kernel (make-kernel 2)))
(channel (lp:make-channel))
(queue (lq:make-queue :fixed-capacity *concurrent-batches*))
(table-name (target mssql)))
;; we account stats against the target table-name, because that's all we
;; know on the PostgreSQL thread
(with-stats-collection (table-name :state *state* :summary summary)
(log-message :notice "COPY ~a" table-name)
;; read data from Mssql
(lp:submit-task channel #'copy-to-queue mssql queue)
;; and start another task to push that data from the queue to PostgreSQL
(lp:submit-task channel #'pgloader.pgsql:copy-from-queue
(target-db mssql) (target mssql) queue
:truncate truncate)
;; now wait until both the tasks are over
(loop for tasks below 2 do (lp:receive-result channel)
finally
(log-message :info "COPY ~a done." table-name)
(unless k-s-p (lp:end-kernel))))
;; return the copy-mssql object we just did the COPY for
mssql))
(defmethod copy-database ((mssql copy-mssql)
&key
state-before
@ -81,7 +129,7 @@
(idx-state (make-pgstate))
(seq-state (make-pgstate))
(cffi:*default-foreign-encoding* encoding)
;; (copy-kernel (make-kernel 2))
(copy-kernel (make-kernel 2))
(all-columns (filter-column-list (list-all-columns)
:only-tables only-tables
:including including
@ -97,7 +145,6 @@
;; (idx-channel (when idx-kernel
;; (let ((lp:*kernel* idx-kernel))
;; (lp:make-channel))))
;; (pg-dbname (target-db mssql))
)
;; if asked, first drop/create the tables on the PostgreSQL side
@ -128,11 +175,32 @@
(let ((qualified-table-name-list
(qualified-table-name-list all-columns
:identifier-case identifier-case)))
(truncate-tables *pg-dbname*
(truncate-tables (target-db mssql)
;; here we really do want only the name
(mapcar #'car qualified-table-name-list)
:identifier-case identifier-case))))
;; Transfert the data
(loop :for (schema . tables) :in all-columns
:do (loop :for (table-name . columns) :in tables
:do
(let ((table-source
(make-instance 'copy-mssql
:source-db (source-db mssql)
:target-db (target-db mssql)
:source (cons schema table-name)
:target (qualify-name schema table-name
:identifier-case
identifier-case)
:fields columns)))
(log-message :debug "TARGET: ~a" (target table-source))
;; COPY the data to PostgreSQL, using copy-kernel
(unless schema-only
(copy-from table-source :kernel copy-kernel)))))
;; now end the kernels
(let ((lp:*kernel* copy-kernel)) (lp:end-kernel))
;; and report the total time spent on the operation
(report-full-summary "Total streaming time" *state*
:before state-before

View File

@ -9,7 +9,7 @@
(:source (:type "nchar") :target (:type "text" :drop-typemod t))
(:source (:type "varchar") :target (:type "text" :drop-typemod t))
(:source (:type "nvarchar") :target (:type "text" :drop-typemod t))
(:source (:type "xml") :target (:type "xml" :drop-typemod t))
(:source (:type "xml") :target (:type "text" :drop-typemod t))
(:source (:type "bit") :target (:type "boolean"))
@ -45,8 +45,7 @@
(:source (:type "varbinary") :target (:type "bytea")
:using pgloader.transforms::byte-vector-to-bytea)
(:source (:type "datetime") :target (:type "timestamptz")
:using pgloader.transforms::sqlite-timestamp-to-timestamp))
(:source (:type "datetime") :target (:type "timestamptz")))
"Data Type Casting to migrate from MSSQL to PostgreSQL")
;;;
@ -98,3 +97,17 @@
(let ((ctype (mssql-column-ctype col)))
(cast table-name name type ctype default nullable nil)))))
(format nil "~a ~22t ~a" column-name type-definition)))
(defun cast-mssql-column-definition-to-pgsql (mssql-column)
"Return the PostgreSQL column definition from the MS SQL one."
(multiple-value-bind (column fn)
(with-slots (schema table-name name type default nullable)
mssql-column
(declare (ignore schema)) ; FIXME
(let ((ctype (mssql-column-ctype mssql-column)))
(cast table-name name type ctype default nullable nil)))
;; the MS SQL driver smartly maps data to the proper CL type, but the
;; pgloader API only wants to see text representations to send down the
;; COPY protocol.
(values column (or fn (lambda (val) (if val (format nil "~a" val) :null))))))

View File

@ -105,6 +105,7 @@
where c.table_catalog = '~a'
and t.table_type = '~a'
and c.table_schema != 'dbo'
order by table_schema, table_name, ordinal_position"
dbname
@ -174,3 +175,21 @@ ORDER BY T.[name], I.[index_id], IC.[key_ordinal];")
(cons schema
(reverse (loop :for (table-name . cols) :in tables
:collect (cons table-name (reverse cols))))))))))
;;;
;;; Tools to handle row queries.
;;;
(defun get-column-sql-expression (name type)
"Return per-TYPE SQL expression to use given a column NAME.
Mostly we just use the name, and make try to avoid parsing dates."
(case (intern (string-upcase type) "KEYWORD")
(:datetime (format nil "convert(varchar, ~a, 126)" name))
(t (format nil "~a" name))))
(defun get-column-list (columns)
"Tweak how we fetch the column values to avoid parsing when possible."
(loop :for col :in columns
:collect (with-slots (name type) col
(get-column-sql-expression name type))))

View File

@ -20,6 +20,10 @@
(*myconn-port* . ,*myconn-port*)
(*myconn-user* . ,*myconn-user*)
(*myconn-pass* . ,*myconn-pass*)
(*msconn-host* . ',*msconn-host*)
(*msconn-port* . ,*msconn-port*)
(*msconn-user* . ,*msconn-user*)
(*msconn-pass* . ,*msconn-pass*)
(*state* . ,*state*)
(*client-min-messages* . ,*client-min-messages*)
(*log-min-messages* . ,*log-min-messages*)

View File

@ -230,5 +230,6 @@
date-string-or-integer)))))))
(defun sql-server-uniqueidentifier-to-uuid (id)
(declare (type (array (unsigned-byte 8) (16)) id))
(format nil "~a" (uuid:byte-array-to-uuid id)))
(declare (type (or null (array (unsigned-byte 8) (16))) id))
(when id
(format nil "~a" (uuid:byte-array-to-uuid id))))