From 1dfa170a49cbb3b42580e545467903dd643cc934 Mon Sep 17 00:00:00 2001 From: Dimitri Fontaine Date: Fri, 25 Jan 2013 12:46:38 +0100 Subject: [PATCH] First Import of the source tree for the Galaxya Loader. --- .gitignore | 1 + README.md | 142 +++++++++++++++++++++++ galaxya-export.lisp | 180 +++++++++++++++++++++++++++++ galaxya-loader.asd | 20 ++++ galaxya-loader.lisp | 270 ++++++++++++++++++++++++++++++++++++++++++++ mytest.lisp | 138 ++++++++++++++++++++++ package.lisp | 21 ++++ run.lisp | 14 +++ timing.lisp | 32 ++++++ 9 files changed, 818 insertions(+) create mode 100644 .gitignore create mode 100644 README.md create mode 100644 galaxya-export.lisp create mode 100644 galaxya-loader.asd create mode 100644 galaxya-loader.lisp create mode 100644 mytest.lisp create mode 100644 package.lisp create mode 100644 run.lisp create mode 100644 timing.lisp diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..c3f4517 --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +csv \ No newline at end of file diff --git a/README.md b/README.md new file mode 100644 index 0000000..72fc49f --- /dev/null +++ b/README.md @@ -0,0 +1,142 @@ +# Galaxya Loader + +Chargement de données CSV dans une base PostgreSQL. Les données sont +proprement encodée en UTF8 et seuls les champs dates doivent recevoir un +traitement particulier. + +Les dates `0000-00-00` deviennent `NULL`. Les dates `0000-00-00 00:00:00` +aussi. + +## Installation et dependances + +Programme Common Lisp utilisant [SBCL](http://sbcl.org/) et +[Quicklisp](http://www.quicklisp.org/beta/). + + apt-get install sbcl + wget http://beta.quicklisp.org/quicklisp.lisp + sbcl --load quicklisp.lisp + * (quicklisp-quickstart:install) + * (ql:add-to-init-file) + +Ensuite il faut récupérer les sources du projet dans +`~/quicklisp/local-projects/`, afin de pouvoir faire : + + sbcl + * (ql:quickload :galaxa-loader) + * (in-package :galaxya-loader) + * (stream-database-tables "weetix") + +Ou bien directement : + + * (load-all-databases) + +## Example d'usage + +Un chargement complet d'une base de données, en streaming directement depuis +MySQL vers PostgreSQL : + +~~~ +GALAXYA-LOADER> (stream-database-tables "weetix") + table name rows time +------------------------------ --------- --------- + ab_tests skip, unknown table in MySQL database + affiliations 1018 0m0s.429 + blacklist 352 0m0s.365 + blogtop 41451 0m3s.697 + ca_sms_boost 447 0m0s.515 + codes 3344498 4m42s.328 + codes_buffer 15 0m0s.312 + codes_distribues 366392 0m39s.493 + codes_distribues_demande 795 0m0s.787 + codes_temp 3 0m0s.394 + codes_vipplus 859 0m0s.956 + com_log 39703 0m2s.869 + com_partenaires 19 0m0s.436 + commandes_boutique 78717 0m5s.966 + commentaires 7009 0m0s.631 + communaute_amis 62948 0m5s.216 + communaute_blacklist 67 0m0s.519 + communaute_messagerie_id 4448 0m0s.606 +communaute_messagerie_messages 9855 0m1s.91 + communaute_no_show 9018 0m0s.481 + concours_weetix 773 0m0s.645 + coupons 71 0m0s.470 + css 163463 0m15s.919 + documents 21956 0m10s.608 + documents_lots 23118 0m2s.130 + documents_pays 31201 0m2s.278 + echange_votes 251 0m0s.447 + etransactions_id 1 0m0s.51 + gagnants 21457 0m2s.462 + gagnants_docs 321 0m0s.435 + gains_affiliations 17280 0m1s.902 + gains_concours 24728 0m3s.219 + gains_webmasters 581093 0m43s.599 + happy_hour 1 0m0s.155 + interco 120 0m0s.424 + ip 1603 0m0s.684 + litiges 172 0m0s.690 + livemessage 10719 0m0s.778 + log_actions 1116761 1m24s.826 + log_actions_vip 2912106 2m15s.702 + log_facebook_credits 475 0m2s.370 + log_flood_action 453 0m0s.350 + log_modifications 4450 0m0s.484 + log_mpme 34615 0m3s.92 + log_points 432229 0m28s.561 + log_sms 31766 0m2s.631 + log_vipplus 18603 0m1s.548 + log_votes 627462 0m26s.465 + log_votes_details 2873684 1m57s.461 + log_votes_manuels 4853 0m0s.446 + membres 26752 0m3s.261 + membres_referer 9628 0m0s.787 + membres_verification 13976 0m1s.113 + membres_vip 118192 0m8s.790 + membres_vip_interets 101571 0m2s.825 + muzictop 16686 0m1s.476 + newsletters_clients 387 0m0s.804 + numeros 133 0m0s.752 + oppositions 518 0m0s.340 + paiements 8231 0m0s.771 + peopletop 56295 0m5s.311 + plans 3 0m0s.347 + plans_reversements 114 0m0s.527 + prestataires 17 0m0s.343 + promo_boost 74946 0m4s.967 + reversements 21940 0m1s.600 + sav_questions 22 0m0s.353 + sav_themes 5 0m0s.303 + sav_vip_questions 40 0m0s.300 + sav_vip_themes 7 0m0s.308 + services 29 0m0s.275 + short_url 1820123 1m6s.423 + short_url_temp 97495 0m7s.341 + sms_boost 5552 0m0s.979 + sms_boost_log 1008752 1m5s.178 + startop 399326 0m19s.53 + temp_codes_annules 918 0m0s.391 + videotop 1352 0m0s.752 + vip_alertes_sms 124 0m0s.491 + vip_bonus 6816 0m0s.465 + vip_classements 44536 0m3s.834 + vip_classements_categories 17 0m0s.839 + vip_infos_classements 31 0m0s.509 + vip_profil 40384 0m3s.411 + vip_regularisation_codes 15535 0m1s.95 + vip_sollicitation 106368 0m6s.101 + vip_verification_tel 8116 0m0s.660 + webtop 21983 0m2s.74 + zarchives 16296 0m1s.158 + zarchives_blogtop 156067 0m10s.502 + zarchives_muzictop 210174 0m8s.201 + zarchives_peopletop 59026 0m5s.560 + zarchives_startop 360187 0m19s.334 + zarchives_videotop 6917 0m0s.742 + zarchives_webtop 146387 0m10s.117 +------------------------------ --------- --------- + Total streaming time 17905373 18m28s.686 +17905373 +1108.686 +~~~ + diff --git a/galaxya-export.lisp b/galaxya-export.lisp new file mode 100644 index 0000000..6a372e3 --- /dev/null +++ b/galaxya-export.lisp @@ -0,0 +1,180 @@ +(in-package :galaxya-loader) + +;;; +;;; Reformating Tools, because MySQL has not the same idea about its data +;;; than PostgreSQL has. Ever heard of year 0000? MySQL did... +;;; +(defun mysql-fix-date (datestr) + (cond + ((null datestr) nil) + ((string= datestr "") nil) + ((string= datestr "0000-00-00") nil) + ((string= datestr "0000-00-00 00:00:00") nil) + (t datestr))) + +(defun pgsql-reformat-null-value (value) + "cl-mysql returns nil for NULL and cl-postgres wants :NULL" + (if (null value) :NULL value)) + +(defun pgsql-reformat-row (row &key date-columns) + "Reformat row as given by MySQL in a format compatible with cl-postgres" + (loop + for i from 1 + for col in row + for no-zero-date-col = (if (member i date-columns) + (mysql-fix-date col) + col) + collect (pgsql-reformat-null-value no-zero-date-col))) + +;;; +;;; Implement PostgreSQL COPY format, the TEXT variant. +;;; +(defun pgsql-text-copy-format (stream row &key date-columns) + "Add a csv row in the stream" + (let* (*print-circle* *print-pretty*) + (loop + for i from 1 + for (col . more?) on row + for preprocessed-col = (if (member i date-columns) + (mysql-fix-date col) + col) + do (if (null preprocessed-col) + (format stream "~a~:[~;~c~]" "\\N" more? #\Tab) + (progn + ;; In particular, the following characters must be preceded + ;; by a backslash if they appear as part of a column value: + ;; backslash itself, newline, carriage return, and the + ;; current delimiter character. + (loop + for char across preprocessed-col + do (case char + (#\\ (format stream "\\\\")) ; 2 chars here + (#\Space (princ #\Space stream)) + (#\Newline (format stream "\\n")) ; 2 chars here + (#\Return (format stream "\\r")) ; 2 chars here + (#\Tab (format stream "\\t")) ; 2 chars here + (#\Backspace (format stream "\\b")) ; 2 chars here + (#\Page (format stream "\\f")) ; 2 chars here + (t (format stream "~c" char)))) + (format stream "~:[~;~c~]" more? #\Tab)))) + (format stream "~%"))) + +;;; +;;; Map a function to each row extracted from MySQL +;;; +(defun mysql-map-rows (dbname table-name process-row-fn + &key + (host *myconn-host*) + (user *myconn-user*) + (pass *myconn-pass*)) + "Extract MySQL data and call PROCESS-ROW-FN function with a single + argument (a list of column values) for each row." + (cl-mysql:connect :host host :user user :password pass) + + (unwind-protect + (progn + ;; Ensure we're talking utf-8 and connect to DBNAME in MySQL + (cl-mysql:query "SET NAMES 'utf8'") + (cl-mysql:query "SET character_set_results = utf8;") + (cl-mysql:use dbname) + + (let* ((sql (format nil "SELECT * FROM ~a;" table-name)) + (q (cl-mysql:query sql :store nil)) + (rs (cl-mysql:next-result-set q))) + (declare (ignore rs)) + + ;; Now fetch MySQL rows directly in the stream + (loop + for row = (cl-mysql:next-row q :type-map (make-hash-table)) + while row + counting row into count + do (funcall process-row-fn row) + finally (return count)))) + + ;; free resources + (cl-mysql:disconnect))) + +;;; +;;; Use mysql-map-rows and pgsql-text-copy-format to fill in a CSV file on +;;; disk with MySQL data in there. +;;; +(defun mysql-copy-text-format (dbname table-name filename + &key + date-columns + (host *myconn-host*) + (user *myconn-user*) + (pass *myconn-pass*)) + "Extrat data from MySQL in PostgreSQL COPY TEXT format" + (with-open-file (text-file filename + :direction :output + :if-exists :supersede + :external-format :utf8) + (mysql-map-rows dbname table-name + (lambda (row) + (pgsql-text-copy-format + text-file + row + :date-columns date-columns)) + :host host + :user user + :pass pass))) + +;;; +;;; Some MySQL other tools +;;; +(defun list-tables-in-mysql-db (dbname + &key + (host *myconn-host*) + (user *myconn-user*) + (pass *myconn-pass*)) + "As the name says" + (cl-mysql:connect :host host :user user :password pass) + + (unwind-protect + (progn + (cl-mysql:use dbname) + ;; that returns a pretty weird format, process it + (mapcan #'identity (caar (cl-mysql:list-tables)))) + ;; free resources + (cl-mysql:disconnect))) + + +(defun mysql-export-all-tables (dbname + &key + only-tables + (host *myconn-host*) + (user *myconn-user*) + (pass *myconn-pass*)) + "Export MySQL tables into as many TEXT files, in the PostgreSQL COPY format" + (let ((pgsql-date-columns-alist (get-table-list dbname)) + (total-count 0) + (total-seconds 0)) + (format t "~&~30@a ~9@a ~9@a" "table name" "rows" "time") + (format t "~&------------------------------ --------- ---------") + (loop + for table-name in (list-tables-in-mysql-db dbname + :host host + :user user + :pass pass) + for filename = (get-csv-pathname dbname table-name) + when (or (null only-tables) + (member table-name only-tables :test #'equal)) + do + (format t "~&~30@a " table-name) + (multiple-value-bind (result seconds) + (timing + (mysql-copy-text-format + dbname + table-name + (get-csv-pathname dbname table-name) + :date-columns (cdr (assoc table-name pgsql-date-columns-alist)))) + (when result + (incf total-count result) + (incf total-seconds seconds) + (format t "~9@a ~9@a" + result (format-interval seconds nil)))) + finally + (format t "~&------------------------------ --------- ---------") + (format t "~&~30@a ~9@a ~9@a" "Total export time" + total-count (format-interval total-seconds nil)) + (return (values total-count (float total-seconds)))))) diff --git a/galaxya-loader.asd b/galaxya-loader.asd new file mode 100644 index 0000000..fff546a --- /dev/null +++ b/galaxya-loader.asd @@ -0,0 +1,20 @@ +;;;; galaxya-loader.asd + +(asdf:defsystem #:galaxya-loader + :serial t + :description "Load data from MySQL directly to PostgreSQL" + :author "Dimitri Fontaine " + :license "The PostgreSQL Licence" + :depends-on (#:postmodern + #:cl-postgres + #:simple-date + #:cl-mysql + #:split-sequence + #:cl-csv + #:lparallel) + :components ((:file "package") + (:file "timing" :depends-on ("package")) + (:file "galaxya-export" :depends-on ("package")) + (:file "galaxya-loader" :depends-on ("package" + "galaxya-export")))) + diff --git a/galaxya-loader.lisp b/galaxya-loader.lisp new file mode 100644 index 0000000..aff2239 --- /dev/null +++ b/galaxya-loader.lisp @@ -0,0 +1,270 @@ +;;;; galaxya-loader.lisp + +(in-package #:galaxya-loader) + +;;; +;;; Parameters you might want to change +;;; +(defparameter *loader-kernel* (lp:make-kernel 2) + "lparallel kernel to use for loading data in parallel") + +(defparameter *myconn-host* "localhost") +(defparameter *myconn-user* "debian-sys-maint") +(defparameter *myconn-pass* "vtmMI04yBZlFprYm") + +(defparameter *pgconn* + '("gdb" "none" "localhost" :port 5432) + "Connection string to the local database") + +(defparameter *csv-path-root* + (merge-pathnames "csv/" (user-homedir-pathname))) + +(defun get-csv-pathname (dbname table-name) + "return where to find the file" + (make-pathname + :directory (pathname-directory + (merge-pathnames (format nil "~a/" dbname) *csv-path-root*)) + :name table-name + :type "csv")) + +;;; +;;; PostgreSQL Utilities +;;; +(defun get-connection-string (dbname) + (cons dbname *pgconn*)) + +(defun get-database-list () + "connect to a local database and get the database list" + (pomo:with-connection + (get-connection-string "postgres") + (loop for (dbname) in (pomo:query + "select datname + from pg_database + where datname !~ 'postgres|template'") + collect dbname))) + +(defun get-table-list (dbname) + "Return an alist of tables names and list of columns to pay attention to." + (pomo:with-connection + (get-connection-string dbname) + + (loop for (relname colarray) in (pomo:query " +select relname, array_agg(case when typname in ('date', 'timestamptz') + then attnum end + order by attnum) + from pg_class c + join pg_namespace n on n.oid = c.relnamespace + left join pg_attribute a on c.oid = a.attrelid + join pg_type t on t.oid = a.atttypid + where c.relkind = 'r' + and attnum > 0 + and n.nspname = 'public' + group by relname +") + collect (cons relname (loop + for attnum across colarray + unless (eq attnum :NULL) + collect attnum))))) + +(defun pgsql-truncate-table (dbname table-name) + "Truncate given TABLE-NAME in database DBNAME" + (pomo:with-connection (get-connection-string dbname) + (pomo:execute (format nil "truncate ~a;" table-name)))) + +(defun load-data-in-pgsql-text-format (dbname table-name filename + &key + (truncate t)) + "Load data from clean CSV file to PostgreSQL" + (with-open-file + ;; we just ignore files that don't exist + (input filename + :direction :input + :if-does-not-exist nil) + (when input + (when truncate + (pgsql-truncate-table dbname table-name)) + + ;; read csv in the file and push it directly through the db writer + ;; in COPY streaming mode + (let* ((conspec (remove :port (get-connection-string dbname))) + (stream + (cl-postgres:open-db-writer conspec table-name nil))) + + (unwind-protect + (loop + for line = (read-line input nil) + for row = (mapcar (lambda (x) + (if (string= "\\N" x) :null x)) + (sq:split-sequence #\Tab line)) + while line + counting line into count + do (cl-postgres:db-write-row stream row) + finally (return count)) + (cl-postgres:close-db-writer stream)))))) + +;;; +;;; Export data from MySQL as a COPY TEXT file then import that file into +;;; the destination PostgreSQL table. +;;; +(defun load-single-table-using-file (dbname table-name + &key + (truncate t) + date-columns) + "Load a single table: export data from MySQL to CSV file then load that in PG" + (let ((filename (get-csv-pathname dbname table-name))) + (mysql-copy-text-format dbname table-name + filename + :date-columns date-columns) + (load-data-in-pgsql-text-format dbname table-name filename + :truncate truncate))) + +;;; +;;; Let's go parallel, with a queue to communicate data +;;; +(defun load-data-from-queue-to-pgsql (dbname table-name dataq + &key + (truncate t) + date-columns) + "Fetch data from the QUEUE until we see :end-of-data" + (when truncate (pgsql-truncate-table dbname table-name)) + + (let* ((conspec (remove :port (get-connection-string dbname))) + (stream + (cl-postgres:open-db-writer conspec table-name nil))) + (unwind-protect + (loop + for row = (lq:pop-queue dataq) + until (eq row :end-of-data) + counting row into count + do (let ((pgrow + (pgsql-reformat-row row :date-columns date-columns))) + (cl-postgres:db-write-row stream pgrow)) + finally (return (list :pgsql count))) + (cl-postgres:close-db-writer stream)))) + +;;; +;;; Direct "stream" in between mysql fetching of results and PostgreSQL COPY +;;; protocol +;;; +(defun stream-mysql-table-in-pgsql (dbname table-name + &key + truncate + date-columns) + "Connect in parallel to MySQL and PostgreSQL and stream the data." + (let* ((lp:*kernel* *loader-kernel*) + (channel (lp:make-channel)) + (dataq (lq:make-queue))) + ;; have a task fill MySQL data in the queue + (lp:submit-task channel + (lambda () + (prog1 + (list :mysql + (mysql-map-rows + dbname table-name + (lambda (row) + ;; buzy loop leave headroom to the consumer + (loop + for cycles in '(5000 10000 25000 15000) + while (< 15000 (lq:queue-count dataq)) + do + (loop repeat cycles)) + (lq:push-queue row dataq))))) + (lq:push-queue :end-of-data dataq))) + + ;; and start another task to push that data from the queue to PostgreSQL + (lp:submit-task channel + (lambda () + (load-data-from-queue-to-pgsql dbname table-name dataq + :truncate truncate + :date-columns date-columns))) + + ;; now wait until both the tasks are over + (loop + for tasks below 2 + collect (lp:receive-result channel) into counts + finally (return (cadr (assoc :pgsql counts)))))) + +;;; +;;; Work on all tables for given database +;;; +(defun stream-database-tables (dbname &key (truncate t) only-tables) + "Export MySQL data and Import it into PostgreSQL" + ;; get the list of tables and have at it + (let ((mysql-tables (list-tables-in-mysql-db dbname)) + (total-count 0) + (total-seconds 0)) + (format t "~&~30@a ~9@a ~9@a" "table name" "rows" "time") + (format t "~&------------------------------ --------- ---------") + (loop + for (table-name . date-columns) in (get-table-list dbname) + when (or (null only-tables) + (member table-name only-tables :test #'equal)) + do + (format t "~&~30@a " table-name) + + (if (member table-name mysql-tables :test #'equal) + (multiple-value-bind (result seconds) + (timing + (stream-mysql-table-in-pgsql dbname table-name + :truncate truncate + :date-columns date-columns)) + (when result + (incf total-count result) + (incf total-seconds seconds) + (format t "~9@a ~9@a" + result (format-interval seconds nil)))) + ;; not a known mysql table + (format t "skip, unknown table in MySQL database~%")) + finally + (format t "~&------------------------------ --------- ---------") + (format t "~&~30@a ~9@a ~9@a" "Total streaming time" + total-count (format-interval total-seconds nil)) + (return (values total-count (float total-seconds)))))) + +(defun load-database-tables-from-file (dbname &key (truncate t) only-tables) + "Export MySQL data and Import it into PostgreSQL" + ;; get the list of tables and have at it + (let ((mysql-tables (list-tables-in-mysql-db dbname)) + (total-count 0) + (total-seconds 0)) + (format t "~&~30@a ~9@a ~9@a" "table name" "rows" "time") + (format t "~&------------------------------ --------- ---------") + (loop + for (table-name . date-columns) in (get-table-list dbname) + when (or (null only-tables) + (member table-name only-tables :test #'equal)) + do + (format t "~&~30@a " table-name) + + (if (member table-name mysql-tables :test #'equal) + (multiple-value-bind (result seconds) + (timing + (load-single-table-using-file dbname table-name + :truncate truncate + :date-columns date-columns)) + (when result + (incf total-count result) + (incf total-seconds seconds) + (format t "~9@a ~9@a" + result (format-interval seconds nil)))) + ;; not a known mysql table + (format t " skip, unknown table in MySQL database~%")) + finally + (format t "~&------------------------------ --------- ---------") + (format t "~&~30@a ~9@a ~9@a" "Total export+import time" + total-count (format-interval total-seconds nil)) + (return (values total-count (float total-seconds)))))) + +(defun load-all-databases () + (pomo:with-connection + (get-connection-string "postgres") + + ;; get the list of databases and have at it + (loop + for dbname in (get-database-list) + do + (format t "~&DATABASE: ~a ..." dbname) + (multiple-value-bind (result seconds) + (timing + (load-database-tables dbname)) + (format t " ~d rows in ~f secs~%" result seconds))))) diff --git a/mytest.lisp b/mytest.lisp new file mode 100644 index 0000000..6daf8e6 --- /dev/null +++ b/mytest.lisp @@ -0,0 +1,138 @@ +(in-package :galaxya-loader) + +(defparameter *myconn-host* "localhost") +(defparameter *myconn-user* "debian-sys-maint") +(defparameter *myconn-pass* "vtmMI04yBZlFprYm") + +(defun toto () + (cl-mysql:connect :host *myconn-host* + :user *myconn-user* + :password *myconn-pass*) + + (cl-mysql:query "SET NAMES 'utf8'") + (cl-mysql:use "galaxya1") + (prog1 + (cl-mysql:query "SELECT * FROM toto;" :type-map nil) + (cl-mysql:disconnect))) + +(defun mytest () + ;; connect + (cl-mysql:connect :host *myconn-host* + :user *myconn-user* + :password *myconn-pass*) + + (unwind-protect + (progn + ;; Ensure we're talking utf-8 and connect to DBNAME in MySQL + (cl-mysql:query "SET NAMES 'utf8'") + (cl-mysql:use "galaxya1") + + (let* ((sql "SELECT * FROM toto;") + (q (cl-mysql:query sql :store nil)) + (rs (cl-mysql:next-result-set q))) + (declare (ignore rs)) + + ;; "SELECT id, email, date_debut, date_redige, ip + ;; FROM commentaires + ;; WHERE date_redige = '0000-00-00' LIMIT 10;" + + ;; Now fetch MySQL rows + (loop + for row = (cl-mysql:next-row q :type-map (make-hash-table)) + while row collect row))) + + ;; free resources + (cl-mysql:disconnect))) + +(defun mytest-to-csv (dbname table-name filename) + "extract MySQL data to CSV file" + (cl-mysql:connect :host *myconn-host* + :user *myconn-user* + :password *myconn-pass*) + + (unwind-protect + (progn + ;; Ensure we're talking utf-8 and connect to DBNAME in MySQL + (cl-mysql:query "SET NAMES 'utf8'") + (cl-mysql:query "SET character_set_results = utf8;") + (cl-mysql:use dbname) + + (let* ((sql (format nil "SELECT * FROM ~a;" table-name)) + (q (cl-mysql:query sql :store nil)) + (rs (cl-mysql:next-result-set q))) + (declare (ignore rs)) + + ;; Now fetch MySQL rows directly in the stream + (with-open-file (csv filename + :direction :output + :if-exists :supersede + :external-format :utf8) + (loop + for count from 1 + for row = (cl-mysql:next-row q :type-map (make-hash-table)) + while row + do (cl-csv:write-csv-row (reformat-row row '(2 3)) + :stream csv + :separator #\; + :quote #\" + :newline '(#\Newline)) + finally (return count))))) + + ;; free resources + (cl-mysql:disconnect))) + +(defun reformat-row (row date-columns) + "cl-mysql returns universal date, we want PostgreSQL date strings" + (loop + for i from 1 + for col in row + when (member i date-columns) + collect (reformat-date col) + else collect col)) + +(defun pgtest (mydbname table-name &key (truncate t)) + (cl-mysql:connect :host *myconn-host* + :user *myconn-user* + :password *myconn-pass*) + + ;; Ensure we're talking utf-8 and connect to DBNAME in MySQL + (cl-mysql:query "SET NAMES 'utf8'") + (cl-mysql:query "SET character_set_results = utf8;") + (cl-mysql:use mydbname) + + ;; TRUNCATE the table in PostgreSQL + (when truncate + (format t "TRUNCATE ~a;~%" table-name) + (pomo:with-connection '("dim" "dim" "none" "localhost") + (pomo:execute (format nil "truncate ~a;" table-name)))) + + ;; Now fetch MySQL rows and feed them to our COPY stream + (let* ((my-sql (format nil + "SELECT * FROM ~a ORDER BY id LIMIT 1;" + table-name)) + (my-q (cl-mysql:query my-sql :store nil)) + (my-rs (cl-mysql:next-result-set my-q)) + + (pgconn '("dim" "dim" "none" "localhost")) + (pgstream + (cl-postgres:open-db-writer pgconn table-name nil))) + + (declare (ignore my-rs)) + + (unwind-protect + (loop + for count from 0 + + ;; read MySQL data + for row = (cl-mysql:next-row my-q :type-map (make-hash-table)) + while row + + ;; write it to PostgreSQL + do (cl-postgres:db-write-row pgstream + (mapcar (lambda (x) + (if (null x) :null x)) + row)) + + finally (return count)) + ;; in case of error, close copier and database + (cl-postgres:close-db-writer pgstream)))) diff --git a/package.lisp b/package.lisp new file mode 100644 index 0000000..0304727 --- /dev/null +++ b/package.lisp @@ -0,0 +1,21 @@ +;;;; package.lisp + +(defpackage #:galaxya-loader + (:use #:cl)) + +;;; +;;; Some package names are a little too long to my taste and don't ship with +;;; nicknames, so use `rename-package' here to give them some new nicknames. +;;; +(loop for (package . nicknames) + in '((lparallel lp) + (lparallel.queue lq) + (simple-date date) + (split-sequence sq)) + do (rename-package package package nicknames)) + +;;; +;;; and recompile. Now you can pre-allocate the queue by passing a size to +;;; MAKE-QUEUE. (You could pass a number before too, but it was ignored.) +;;; +(pushnew :lparallel.with-vector-queue *features*) diff --git a/run.lisp b/run.lisp new file mode 100644 index 0000000..97f7d17 --- /dev/null +++ b/run.lisp @@ -0,0 +1,14 @@ +;;; facility to easily run the program + +(ql:quickload :galaxya-loader) + +(in-package :galaxya-loader) + +(let ((*pgcon* '("galaxya" "none" "localhost" :port 5432)) + ((csv-path-root* "/home/cyb/csv"))) + + ;; when we're ready we do that + ;; (load-all-databases)) + + ;; meanwhile + (load-database-tables "weetix")) diff --git a/timing.lisp b/timing.lisp new file mode 100644 index 0000000..c143c92 --- /dev/null +++ b/timing.lisp @@ -0,0 +1,32 @@ +;;; +;;; Some little timing tools +;;; + +(in-package :galaxya-loader) + +;;; +;;; Timing Macros +;;; +(defun elapsed-time-since (start) + "Return how many seconds ticked between START and now" + (/ (- (get-internal-real-time) start) + internal-time-units-per-second)) + +(defmacro timing (&body forms) + "return both how much real time was spend in body and its result" + (let ((start (gensym)) + (end (gensym)) + (result (gensym))) + `(let* ((,start (get-internal-real-time)) + (,result (progn ,@forms)) + (,end (get-internal-real-time))) + (values ,result (/ (- ,end ,start) internal-time-units-per-second))))) + +(defun format-interval (seconds &optional (stream t)) + "Output the number of seconds in a human friendly way" + (multiple-value-bind (year month day hour minute second millisecond) + (date:decode-interval (date:encode-interval :second seconds)) + (declare (ignore year month)) + (when (< 0 day) (format stream "~d days " day)) + (when (< 0 hour) (format stream "~d hour " hour)) + (format stream "~dm~ds.~d" minute second millisecond))) \ No newline at end of file