From 87e2e16582c34a1b89891623716bea9cb96e0bb4 Mon Sep 17 00:00:00 2001 From: Dimitri Fontaine Date: Mon, 10 Nov 2014 03:15:22 +0100 Subject: [PATCH] Implement data transfer from SQL Server. --- src/monkey/mssql.lisp | 23 +++++++ src/sources/mssql.lisp | 92 +++++++++++++++++++++---- src/sources/mssql/mssql-cast-rules.lisp | 19 ++++- src/sources/mssql/mssql-schema.lisp | 19 +++++ src/utils/threads.lisp | 4 ++ src/utils/transforms.lisp | 5 +- 6 files changed, 145 insertions(+), 17 deletions(-) diff --git a/src/monkey/mssql.lisp b/src/monkey/mssql.lisp index a1787cd..d36057a 100644 --- a/src/monkey/mssql.lisp +++ b/src/monkey/mssql.lisp @@ -98,3 +98,26 @@ vector)) (otherwise (error "not supported type ~A" (foreign-enum-keyword '%syb-value-type type)))))) + + +(defun map-query-results (query &key row-fn (connection *database*)) + "Map the query results through the map-fn function." + (let ((%dbproc (slot-value connection 'dbproc)) + (cffi:*default-foreign-encoding* (slot-value connection 'external-format))) + (with-foreign-string (%query query) + (%dbcmd %dbproc %query)) + (%dbsqlexec %dbproc) + (unwind-protect + (unless (= +no-more-results+ (%dbresults %dbproc)) + (loop :for rtc := (%dbnextrow %dbproc) + :until (= rtc +no-more-rows+) + :do (let ((row (make-array (%dbnumcols %dbproc)))) + (loop :for i :from 1 :to (%dbnumcols %dbproc) + :for value := (sysdb-data-to-lisp %dbproc + (%dbdata %dbproc i) + (%dbcoltype %dbproc i) + (%dbdatlen %dbproc i)) + :do (setf (aref row (- i 1)) value)) + + (funcall row-fn row)))) + (%dbcancel %dbproc)))) diff --git a/src/sources/mssql.lisp b/src/sources/mssql.lisp index 7f492b0..e408b0c 100644 --- a/src/sources/mssql.lisp +++ b/src/sources/mssql.lisp @@ -10,14 +10,6 @@ :initform nil)) (:documentation "pgloader MS SQL Data Source")) -(defun cast-mssql-column-definition-to-pgsql (mssql-column) - "Return the PostgreSQL column definition from the MySQL one." - (with-slots (schema table-name name type default nullable) - mssql-column - (declare (ignore schema)) ; FIXME - (let ((ctype (mssql-column-ctype mssql-column))) - (cast table-name name type ctype default nullable nil)))) - (defmethod initialize-instance :after ((source copy-mssql) &key) "Add a default value for transforms in case it's not been provided." (let* ((source-db (slot-value source 'source-db)) @@ -50,7 +42,7 @@ (loop :for field :in fields :for (column fn) := (multiple-value-bind (column fn) - (cast-mysql-column-definition-to-pgsql field) + (cast-mssql-column-definition-to-pgsql field) (list column fn)) :collect column :into columns :collect fn :into fns @@ -58,6 +50,62 @@ (unless transforms (setf (slot-value source 'transforms) fns))))))) +(defmethod map-rows ((mssql copy-mssql) &key process-row-fn) + "Extract Mssql data and call PROCESS-ROW-FN function with a single + argument (a list of column values) for each row." + (with-mssql-connection ((source-db mssql)) + (let* ((sql (destructuring-bind (schema . table-name) + (source mssql) + (format nil "SELECT ~{~a~^, ~} FROM ~a.~a;" + (get-column-list (fields mssql)) + schema + table-name))) + (row-fn + (lambda (row) + (pgstate-incf *state* (target mssql) :read 1) + (funcall process-row-fn row)))) + (log-message :warning "~a" sql) + (handler-case + (mssql::map-query-results sql :row-fn row-fn :connection *mssql-db*) + (condition (e) + (progn + (log-message :error "~a" e) + (pgstate-incf *state* (target mssql) :errs 1))))))) + +(defmethod copy-to-queue ((mssql copy-mssql) queue) + "Copy data from MSSQL table DBNAME.TABLE-NAME into queue DATAQ" + (map-push-queue mssql queue)) + +(defmethod copy-from ((mssql copy-mssql) &key (kernel nil k-s-p) truncate) + "Connect in parallel to MSSQL and PostgreSQL and stream the data." + (let* ((summary (null *state*)) + (*state* (or *state* (pgloader.utils:make-pgstate))) + (lp:*kernel* (or kernel (make-kernel 2))) + (channel (lp:make-channel)) + (queue (lq:make-queue :fixed-capacity *concurrent-batches*)) + (table-name (target mssql))) + + ;; we account stats against the target table-name, because that's all we + ;; know on the PostgreSQL thread + (with-stats-collection (table-name :state *state* :summary summary) + (log-message :notice "COPY ~a" table-name) + ;; read data from Mssql + (lp:submit-task channel #'copy-to-queue mssql queue) + + ;; and start another task to push that data from the queue to PostgreSQL + (lp:submit-task channel #'pgloader.pgsql:copy-from-queue + (target-db mssql) (target mssql) queue + :truncate truncate) + + ;; now wait until both the tasks are over + (loop for tasks below 2 do (lp:receive-result channel) + finally + (log-message :info "COPY ~a done." table-name) + (unless k-s-p (lp:end-kernel)))) + + ;; return the copy-mssql object we just did the COPY for + mssql)) + (defmethod copy-database ((mssql copy-mssql) &key state-before @@ -81,7 +129,7 @@ (idx-state (make-pgstate)) (seq-state (make-pgstate)) (cffi:*default-foreign-encoding* encoding) - ;; (copy-kernel (make-kernel 2)) + (copy-kernel (make-kernel 2)) (all-columns (filter-column-list (list-all-columns) :only-tables only-tables :including including @@ -97,7 +145,6 @@ ;; (idx-channel (when idx-kernel ;; (let ((lp:*kernel* idx-kernel)) ;; (lp:make-channel)))) - ;; (pg-dbname (target-db mssql)) ) ;; if asked, first drop/create the tables on the PostgreSQL side @@ -128,11 +175,32 @@ (let ((qualified-table-name-list (qualified-table-name-list all-columns :identifier-case identifier-case))) - (truncate-tables *pg-dbname* + (truncate-tables (target-db mssql) ;; here we really do want only the name (mapcar #'car qualified-table-name-list) :identifier-case identifier-case)))) + ;; Transfert the data + (loop :for (schema . tables) :in all-columns + :do (loop :for (table-name . columns) :in tables + :do + (let ((table-source + (make-instance 'copy-mssql + :source-db (source-db mssql) + :target-db (target-db mssql) + :source (cons schema table-name) + :target (qualify-name schema table-name + :identifier-case + identifier-case) + :fields columns))) + (log-message :debug "TARGET: ~a" (target table-source)) + ;; COPY the data to PostgreSQL, using copy-kernel + (unless schema-only + (copy-from table-source :kernel copy-kernel))))) + + ;; now end the kernels + (let ((lp:*kernel* copy-kernel)) (lp:end-kernel)) + ;; and report the total time spent on the operation (report-full-summary "Total streaming time" *state* :before state-before diff --git a/src/sources/mssql/mssql-cast-rules.lisp b/src/sources/mssql/mssql-cast-rules.lisp index b7f1248..3e1ee0f 100644 --- a/src/sources/mssql/mssql-cast-rules.lisp +++ b/src/sources/mssql/mssql-cast-rules.lisp @@ -9,7 +9,7 @@ (:source (:type "nchar") :target (:type "text" :drop-typemod t)) (:source (:type "varchar") :target (:type "text" :drop-typemod t)) (:source (:type "nvarchar") :target (:type "text" :drop-typemod t)) - (:source (:type "xml") :target (:type "xml" :drop-typemod t)) + (:source (:type "xml") :target (:type "text" :drop-typemod t)) (:source (:type "bit") :target (:type "boolean")) @@ -45,8 +45,7 @@ (:source (:type "varbinary") :target (:type "bytea") :using pgloader.transforms::byte-vector-to-bytea) - (:source (:type "datetime") :target (:type "timestamptz") - :using pgloader.transforms::sqlite-timestamp-to-timestamp)) + (:source (:type "datetime") :target (:type "timestamptz"))) "Data Type Casting to migrate from MSSQL to PostgreSQL") ;;; @@ -98,3 +97,17 @@ (let ((ctype (mssql-column-ctype col))) (cast table-name name type ctype default nullable nil))))) (format nil "~a ~22t ~a" column-name type-definition))) + +(defun cast-mssql-column-definition-to-pgsql (mssql-column) + "Return the PostgreSQL column definition from the MS SQL one." + (multiple-value-bind (column fn) + (with-slots (schema table-name name type default nullable) + mssql-column + (declare (ignore schema)) ; FIXME + (let ((ctype (mssql-column-ctype mssql-column))) + (cast table-name name type ctype default nullable nil))) + + ;; the MS SQL driver smartly maps data to the proper CL type, but the + ;; pgloader API only wants to see text representations to send down the + ;; COPY protocol. + (values column (or fn (lambda (val) (if val (format nil "~a" val) :null)))))) diff --git a/src/sources/mssql/mssql-schema.lisp b/src/sources/mssql/mssql-schema.lisp index 97c17aa..76c5c0a 100644 --- a/src/sources/mssql/mssql-schema.lisp +++ b/src/sources/mssql/mssql-schema.lisp @@ -105,6 +105,7 @@ where c.table_catalog = '~a' and t.table_type = '~a' + and c.table_schema != 'dbo' order by table_schema, table_name, ordinal_position" dbname @@ -174,3 +175,21 @@ ORDER BY T.[name], I.[index_id], IC.[key_ordinal];") (cons schema (reverse (loop :for (table-name . cols) :in tables :collect (cons table-name (reverse cols)))))))))) + + +;;; +;;; Tools to handle row queries. +;;; +(defun get-column-sql-expression (name type) + "Return per-TYPE SQL expression to use given a column NAME. + + Mostly we just use the name, and make try to avoid parsing dates." + (case (intern (string-upcase type) "KEYWORD") + (:datetime (format nil "convert(varchar, ~a, 126)" name)) + (t (format nil "~a" name)))) + +(defun get-column-list (columns) + "Tweak how we fetch the column values to avoid parsing when possible." + (loop :for col :in columns + :collect (with-slots (name type) col + (get-column-sql-expression name type)))) diff --git a/src/utils/threads.lisp b/src/utils/threads.lisp index 85f8b40..73254c5 100644 --- a/src/utils/threads.lisp +++ b/src/utils/threads.lisp @@ -20,6 +20,10 @@ (*myconn-port* . ,*myconn-port*) (*myconn-user* . ,*myconn-user*) (*myconn-pass* . ,*myconn-pass*) + (*msconn-host* . ',*msconn-host*) + (*msconn-port* . ,*msconn-port*) + (*msconn-user* . ,*msconn-user*) + (*msconn-pass* . ,*msconn-pass*) (*state* . ,*state*) (*client-min-messages* . ,*client-min-messages*) (*log-min-messages* . ,*log-min-messages*) diff --git a/src/utils/transforms.lisp b/src/utils/transforms.lisp index 4ecee16..3f78d7c 100644 --- a/src/utils/transforms.lisp +++ b/src/utils/transforms.lisp @@ -230,5 +230,6 @@ date-string-or-integer))))))) (defun sql-server-uniqueidentifier-to-uuid (id) - (declare (type (array (unsigned-byte 8) (16)) id)) - (format nil "~a" (uuid:byte-array-to-uuid id))) + (declare (type (or null (array (unsigned-byte 8) (16))) id)) + (when id + (format nil "~a" (uuid:byte-array-to-uuid id))))