mirror of
https://github.com/dimitri/pgloader.git
synced 2026-05-05 19:06:11 +02:00
First Import of the source tree for the Galaxya Loader.
This commit is contained in:
commit
1dfa170a49
1
.gitignore
vendored
Normal file
1
.gitignore
vendored
Normal file
@ -0,0 +1 @@
|
||||
csv
|
||||
142
README.md
Normal file
142
README.md
Normal file
@ -0,0 +1,142 @@
|
||||
# Galaxya Loader
|
||||
|
||||
Chargement de données CSV dans une base PostgreSQL. Les données sont
|
||||
proprement encodée en UTF8 et seuls les champs dates doivent recevoir un
|
||||
traitement particulier.
|
||||
|
||||
Les dates `0000-00-00` deviennent `NULL`. Les dates `0000-00-00 00:00:00`
|
||||
aussi.
|
||||
|
||||
## Installation et dependances
|
||||
|
||||
Programme Common Lisp utilisant [SBCL](http://sbcl.org/) et
|
||||
[Quicklisp](http://www.quicklisp.org/beta/).
|
||||
|
||||
apt-get install sbcl
|
||||
wget http://beta.quicklisp.org/quicklisp.lisp
|
||||
sbcl --load quicklisp.lisp
|
||||
* (quicklisp-quickstart:install)
|
||||
* (ql:add-to-init-file)
|
||||
|
||||
Ensuite il faut récupérer les sources du projet dans
|
||||
`~/quicklisp/local-projects/`, afin de pouvoir faire :
|
||||
|
||||
sbcl
|
||||
* (ql:quickload :galaxa-loader)
|
||||
* (in-package :galaxya-loader)
|
||||
* (stream-database-tables "weetix")
|
||||
|
||||
Ou bien directement :
|
||||
|
||||
* (load-all-databases)
|
||||
|
||||
## Example d'usage
|
||||
|
||||
Un chargement complet d'une base de données, en streaming directement depuis
|
||||
MySQL vers PostgreSQL :
|
||||
|
||||
~~~
|
||||
GALAXYA-LOADER> (stream-database-tables "weetix")
|
||||
table name rows time
|
||||
------------------------------ --------- ---------
|
||||
ab_tests skip, unknown table in MySQL database
|
||||
affiliations 1018 0m0s.429
|
||||
blacklist 352 0m0s.365
|
||||
blogtop 41451 0m3s.697
|
||||
ca_sms_boost 447 0m0s.515
|
||||
codes 3344498 4m42s.328
|
||||
codes_buffer 15 0m0s.312
|
||||
codes_distribues 366392 0m39s.493
|
||||
codes_distribues_demande 795 0m0s.787
|
||||
codes_temp 3 0m0s.394
|
||||
codes_vipplus 859 0m0s.956
|
||||
com_log 39703 0m2s.869
|
||||
com_partenaires 19 0m0s.436
|
||||
commandes_boutique 78717 0m5s.966
|
||||
commentaires 7009 0m0s.631
|
||||
communaute_amis 62948 0m5s.216
|
||||
communaute_blacklist 67 0m0s.519
|
||||
communaute_messagerie_id 4448 0m0s.606
|
||||
communaute_messagerie_messages 9855 0m1s.91
|
||||
communaute_no_show 9018 0m0s.481
|
||||
concours_weetix 773 0m0s.645
|
||||
coupons 71 0m0s.470
|
||||
css 163463 0m15s.919
|
||||
documents 21956 0m10s.608
|
||||
documents_lots 23118 0m2s.130
|
||||
documents_pays 31201 0m2s.278
|
||||
echange_votes 251 0m0s.447
|
||||
etransactions_id 1 0m0s.51
|
||||
gagnants 21457 0m2s.462
|
||||
gagnants_docs 321 0m0s.435
|
||||
gains_affiliations 17280 0m1s.902
|
||||
gains_concours 24728 0m3s.219
|
||||
gains_webmasters 581093 0m43s.599
|
||||
happy_hour 1 0m0s.155
|
||||
interco 120 0m0s.424
|
||||
ip 1603 0m0s.684
|
||||
litiges 172 0m0s.690
|
||||
livemessage 10719 0m0s.778
|
||||
log_actions 1116761 1m24s.826
|
||||
log_actions_vip 2912106 2m15s.702
|
||||
log_facebook_credits 475 0m2s.370
|
||||
log_flood_action 453 0m0s.350
|
||||
log_modifications 4450 0m0s.484
|
||||
log_mpme 34615 0m3s.92
|
||||
log_points 432229 0m28s.561
|
||||
log_sms 31766 0m2s.631
|
||||
log_vipplus 18603 0m1s.548
|
||||
log_votes 627462 0m26s.465
|
||||
log_votes_details 2873684 1m57s.461
|
||||
log_votes_manuels 4853 0m0s.446
|
||||
membres 26752 0m3s.261
|
||||
membres_referer 9628 0m0s.787
|
||||
membres_verification 13976 0m1s.113
|
||||
membres_vip 118192 0m8s.790
|
||||
membres_vip_interets 101571 0m2s.825
|
||||
muzictop 16686 0m1s.476
|
||||
newsletters_clients 387 0m0s.804
|
||||
numeros 133 0m0s.752
|
||||
oppositions 518 0m0s.340
|
||||
paiements 8231 0m0s.771
|
||||
peopletop 56295 0m5s.311
|
||||
plans 3 0m0s.347
|
||||
plans_reversements 114 0m0s.527
|
||||
prestataires 17 0m0s.343
|
||||
promo_boost 74946 0m4s.967
|
||||
reversements 21940 0m1s.600
|
||||
sav_questions 22 0m0s.353
|
||||
sav_themes 5 0m0s.303
|
||||
sav_vip_questions 40 0m0s.300
|
||||
sav_vip_themes 7 0m0s.308
|
||||
services 29 0m0s.275
|
||||
short_url 1820123 1m6s.423
|
||||
short_url_temp 97495 0m7s.341
|
||||
sms_boost 5552 0m0s.979
|
||||
sms_boost_log 1008752 1m5s.178
|
||||
startop 399326 0m19s.53
|
||||
temp_codes_annules 918 0m0s.391
|
||||
videotop 1352 0m0s.752
|
||||
vip_alertes_sms 124 0m0s.491
|
||||
vip_bonus 6816 0m0s.465
|
||||
vip_classements 44536 0m3s.834
|
||||
vip_classements_categories 17 0m0s.839
|
||||
vip_infos_classements 31 0m0s.509
|
||||
vip_profil 40384 0m3s.411
|
||||
vip_regularisation_codes 15535 0m1s.95
|
||||
vip_sollicitation 106368 0m6s.101
|
||||
vip_verification_tel 8116 0m0s.660
|
||||
webtop 21983 0m2s.74
|
||||
zarchives 16296 0m1s.158
|
||||
zarchives_blogtop 156067 0m10s.502
|
||||
zarchives_muzictop 210174 0m8s.201
|
||||
zarchives_peopletop 59026 0m5s.560
|
||||
zarchives_startop 360187 0m19s.334
|
||||
zarchives_videotop 6917 0m0s.742
|
||||
zarchives_webtop 146387 0m10s.117
|
||||
------------------------------ --------- ---------
|
||||
Total streaming time 17905373 18m28s.686
|
||||
17905373
|
||||
1108.686
|
||||
~~~
|
||||
|
||||
180
galaxya-export.lisp
Normal file
180
galaxya-export.lisp
Normal file
@ -0,0 +1,180 @@
|
||||
(in-package :galaxya-loader)
|
||||
|
||||
;;;
|
||||
;;; Reformating Tools, because MySQL has not the same idea about its data
|
||||
;;; than PostgreSQL has. Ever heard of year 0000? MySQL did...
|
||||
;;;
|
||||
(defun mysql-fix-date (datestr)
|
||||
(cond
|
||||
((null datestr) nil)
|
||||
((string= datestr "") nil)
|
||||
((string= datestr "0000-00-00") nil)
|
||||
((string= datestr "0000-00-00 00:00:00") nil)
|
||||
(t datestr)))
|
||||
|
||||
(defun pgsql-reformat-null-value (value)
|
||||
"cl-mysql returns nil for NULL and cl-postgres wants :NULL"
|
||||
(if (null value) :NULL value))
|
||||
|
||||
(defun pgsql-reformat-row (row &key date-columns)
|
||||
"Reformat row as given by MySQL in a format compatible with cl-postgres"
|
||||
(loop
|
||||
for i from 1
|
||||
for col in row
|
||||
for no-zero-date-col = (if (member i date-columns)
|
||||
(mysql-fix-date col)
|
||||
col)
|
||||
collect (pgsql-reformat-null-value no-zero-date-col)))
|
||||
|
||||
;;;
|
||||
;;; Implement PostgreSQL COPY format, the TEXT variant.
|
||||
;;;
|
||||
(defun pgsql-text-copy-format (stream row &key date-columns)
|
||||
"Add a csv row in the stream"
|
||||
(let* (*print-circle* *print-pretty*)
|
||||
(loop
|
||||
for i from 1
|
||||
for (col . more?) on row
|
||||
for preprocessed-col = (if (member i date-columns)
|
||||
(mysql-fix-date col)
|
||||
col)
|
||||
do (if (null preprocessed-col)
|
||||
(format stream "~a~:[~;~c~]" "\\N" more? #\Tab)
|
||||
(progn
|
||||
;; In particular, the following characters must be preceded
|
||||
;; by a backslash if they appear as part of a column value:
|
||||
;; backslash itself, newline, carriage return, and the
|
||||
;; current delimiter character.
|
||||
(loop
|
||||
for char across preprocessed-col
|
||||
do (case char
|
||||
(#\\ (format stream "\\\\")) ; 2 chars here
|
||||
(#\Space (princ #\Space stream))
|
||||
(#\Newline (format stream "\\n")) ; 2 chars here
|
||||
(#\Return (format stream "\\r")) ; 2 chars here
|
||||
(#\Tab (format stream "\\t")) ; 2 chars here
|
||||
(#\Backspace (format stream "\\b")) ; 2 chars here
|
||||
(#\Page (format stream "\\f")) ; 2 chars here
|
||||
(t (format stream "~c" char))))
|
||||
(format stream "~:[~;~c~]" more? #\Tab))))
|
||||
(format stream "~%")))
|
||||
|
||||
;;;
|
||||
;;; Map a function to each row extracted from MySQL
|
||||
;;;
|
||||
(defun mysql-map-rows (dbname table-name process-row-fn
|
||||
&key
|
||||
(host *myconn-host*)
|
||||
(user *myconn-user*)
|
||||
(pass *myconn-pass*))
|
||||
"Extract MySQL data and call PROCESS-ROW-FN function with a single
|
||||
argument (a list of column values) for each row."
|
||||
(cl-mysql:connect :host host :user user :password pass)
|
||||
|
||||
(unwind-protect
|
||||
(progn
|
||||
;; Ensure we're talking utf-8 and connect to DBNAME in MySQL
|
||||
(cl-mysql:query "SET NAMES 'utf8'")
|
||||
(cl-mysql:query "SET character_set_results = utf8;")
|
||||
(cl-mysql:use dbname)
|
||||
|
||||
(let* ((sql (format nil "SELECT * FROM ~a;" table-name))
|
||||
(q (cl-mysql:query sql :store nil))
|
||||
(rs (cl-mysql:next-result-set q)))
|
||||
(declare (ignore rs))
|
||||
|
||||
;; Now fetch MySQL rows directly in the stream
|
||||
(loop
|
||||
for row = (cl-mysql:next-row q :type-map (make-hash-table))
|
||||
while row
|
||||
counting row into count
|
||||
do (funcall process-row-fn row)
|
||||
finally (return count))))
|
||||
|
||||
;; free resources
|
||||
(cl-mysql:disconnect)))
|
||||
|
||||
;;;
|
||||
;;; Use mysql-map-rows and pgsql-text-copy-format to fill in a CSV file on
|
||||
;;; disk with MySQL data in there.
|
||||
;;;
|
||||
(defun mysql-copy-text-format (dbname table-name filename
|
||||
&key
|
||||
date-columns
|
||||
(host *myconn-host*)
|
||||
(user *myconn-user*)
|
||||
(pass *myconn-pass*))
|
||||
"Extrat data from MySQL in PostgreSQL COPY TEXT format"
|
||||
(with-open-file (text-file filename
|
||||
:direction :output
|
||||
:if-exists :supersede
|
||||
:external-format :utf8)
|
||||
(mysql-map-rows dbname table-name
|
||||
(lambda (row)
|
||||
(pgsql-text-copy-format
|
||||
text-file
|
||||
row
|
||||
:date-columns date-columns))
|
||||
:host host
|
||||
:user user
|
||||
:pass pass)))
|
||||
|
||||
;;;
|
||||
;;; Some MySQL other tools
|
||||
;;;
|
||||
(defun list-tables-in-mysql-db (dbname
|
||||
&key
|
||||
(host *myconn-host*)
|
||||
(user *myconn-user*)
|
||||
(pass *myconn-pass*))
|
||||
"As the name says"
|
||||
(cl-mysql:connect :host host :user user :password pass)
|
||||
|
||||
(unwind-protect
|
||||
(progn
|
||||
(cl-mysql:use dbname)
|
||||
;; that returns a pretty weird format, process it
|
||||
(mapcan #'identity (caar (cl-mysql:list-tables))))
|
||||
;; free resources
|
||||
(cl-mysql:disconnect)))
|
||||
|
||||
|
||||
(defun mysql-export-all-tables (dbname
|
||||
&key
|
||||
only-tables
|
||||
(host *myconn-host*)
|
||||
(user *myconn-user*)
|
||||
(pass *myconn-pass*))
|
||||
"Export MySQL tables into as many TEXT files, in the PostgreSQL COPY format"
|
||||
(let ((pgsql-date-columns-alist (get-table-list dbname))
|
||||
(total-count 0)
|
||||
(total-seconds 0))
|
||||
(format t "~&~30@a ~9@a ~9@a" "table name" "rows" "time")
|
||||
(format t "~&------------------------------ --------- ---------")
|
||||
(loop
|
||||
for table-name in (list-tables-in-mysql-db dbname
|
||||
:host host
|
||||
:user user
|
||||
:pass pass)
|
||||
for filename = (get-csv-pathname dbname table-name)
|
||||
when (or (null only-tables)
|
||||
(member table-name only-tables :test #'equal))
|
||||
do
|
||||
(format t "~&~30@a " table-name)
|
||||
(multiple-value-bind (result seconds)
|
||||
(timing
|
||||
(mysql-copy-text-format
|
||||
dbname
|
||||
table-name
|
||||
(get-csv-pathname dbname table-name)
|
||||
:date-columns (cdr (assoc table-name pgsql-date-columns-alist))))
|
||||
(when result
|
||||
(incf total-count result)
|
||||
(incf total-seconds seconds)
|
||||
(format t "~9@a ~9@a"
|
||||
result (format-interval seconds nil))))
|
||||
finally
|
||||
(format t "~&------------------------------ --------- ---------")
|
||||
(format t "~&~30@a ~9@a ~9@a" "Total export time"
|
||||
total-count (format-interval total-seconds nil))
|
||||
(return (values total-count (float total-seconds))))))
|
||||
20
galaxya-loader.asd
Normal file
20
galaxya-loader.asd
Normal file
@ -0,0 +1,20 @@
|
||||
;;;; galaxya-loader.asd
|
||||
|
||||
(asdf:defsystem #:galaxya-loader
|
||||
:serial t
|
||||
:description "Load data from MySQL directly to PostgreSQL"
|
||||
:author "Dimitri Fontaine <dimitri@2ndQuadrant.fr>"
|
||||
:license "The PostgreSQL Licence"
|
||||
:depends-on (#:postmodern
|
||||
#:cl-postgres
|
||||
#:simple-date
|
||||
#:cl-mysql
|
||||
#:split-sequence
|
||||
#:cl-csv
|
||||
#:lparallel)
|
||||
:components ((:file "package")
|
||||
(:file "timing" :depends-on ("package"))
|
||||
(:file "galaxya-export" :depends-on ("package"))
|
||||
(:file "galaxya-loader" :depends-on ("package"
|
||||
"galaxya-export"))))
|
||||
|
||||
270
galaxya-loader.lisp
Normal file
270
galaxya-loader.lisp
Normal file
@ -0,0 +1,270 @@
|
||||
;;;; galaxya-loader.lisp
|
||||
|
||||
(in-package #:galaxya-loader)
|
||||
|
||||
;;;
|
||||
;;; Parameters you might want to change
|
||||
;;;
|
||||
(defparameter *loader-kernel* (lp:make-kernel 2)
|
||||
"lparallel kernel to use for loading data in parallel")
|
||||
|
||||
(defparameter *myconn-host* "localhost")
|
||||
(defparameter *myconn-user* "debian-sys-maint")
|
||||
(defparameter *myconn-pass* "vtmMI04yBZlFprYm")
|
||||
|
||||
(defparameter *pgconn*
|
||||
'("gdb" "none" "localhost" :port 5432)
|
||||
"Connection string to the local database")
|
||||
|
||||
(defparameter *csv-path-root*
|
||||
(merge-pathnames "csv/" (user-homedir-pathname)))
|
||||
|
||||
(defun get-csv-pathname (dbname table-name)
|
||||
"return where to find the file"
|
||||
(make-pathname
|
||||
:directory (pathname-directory
|
||||
(merge-pathnames (format nil "~a/" dbname) *csv-path-root*))
|
||||
:name table-name
|
||||
:type "csv"))
|
||||
|
||||
;;;
|
||||
;;; PostgreSQL Utilities
|
||||
;;;
|
||||
(defun get-connection-string (dbname)
|
||||
(cons dbname *pgconn*))
|
||||
|
||||
(defun get-database-list ()
|
||||
"connect to a local database and get the database list"
|
||||
(pomo:with-connection
|
||||
(get-connection-string "postgres")
|
||||
(loop for (dbname) in (pomo:query
|
||||
"select datname
|
||||
from pg_database
|
||||
where datname !~ 'postgres|template'")
|
||||
collect dbname)))
|
||||
|
||||
(defun get-table-list (dbname)
|
||||
"Return an alist of tables names and list of columns to pay attention to."
|
||||
(pomo:with-connection
|
||||
(get-connection-string dbname)
|
||||
|
||||
(loop for (relname colarray) in (pomo:query "
|
||||
select relname, array_agg(case when typname in ('date', 'timestamptz')
|
||||
then attnum end
|
||||
order by attnum)
|
||||
from pg_class c
|
||||
join pg_namespace n on n.oid = c.relnamespace
|
||||
left join pg_attribute a on c.oid = a.attrelid
|
||||
join pg_type t on t.oid = a.atttypid
|
||||
where c.relkind = 'r'
|
||||
and attnum > 0
|
||||
and n.nspname = 'public'
|
||||
group by relname
|
||||
")
|
||||
collect (cons relname (loop
|
||||
for attnum across colarray
|
||||
unless (eq attnum :NULL)
|
||||
collect attnum)))))
|
||||
|
||||
(defun pgsql-truncate-table (dbname table-name)
|
||||
"Truncate given TABLE-NAME in database DBNAME"
|
||||
(pomo:with-connection (get-connection-string dbname)
|
||||
(pomo:execute (format nil "truncate ~a;" table-name))))
|
||||
|
||||
(defun load-data-in-pgsql-text-format (dbname table-name filename
|
||||
&key
|
||||
(truncate t))
|
||||
"Load data from clean CSV file to PostgreSQL"
|
||||
(with-open-file
|
||||
;; we just ignore files that don't exist
|
||||
(input filename
|
||||
:direction :input
|
||||
:if-does-not-exist nil)
|
||||
(when input
|
||||
(when truncate
|
||||
(pgsql-truncate-table dbname table-name))
|
||||
|
||||
;; read csv in the file and push it directly through the db writer
|
||||
;; in COPY streaming mode
|
||||
(let* ((conspec (remove :port (get-connection-string dbname)))
|
||||
(stream
|
||||
(cl-postgres:open-db-writer conspec table-name nil)))
|
||||
|
||||
(unwind-protect
|
||||
(loop
|
||||
for line = (read-line input nil)
|
||||
for row = (mapcar (lambda (x)
|
||||
(if (string= "\\N" x) :null x))
|
||||
(sq:split-sequence #\Tab line))
|
||||
while line
|
||||
counting line into count
|
||||
do (cl-postgres:db-write-row stream row)
|
||||
finally (return count))
|
||||
(cl-postgres:close-db-writer stream))))))
|
||||
|
||||
;;;
|
||||
;;; Export data from MySQL as a COPY TEXT file then import that file into
|
||||
;;; the destination PostgreSQL table.
|
||||
;;;
|
||||
(defun load-single-table-using-file (dbname table-name
|
||||
&key
|
||||
(truncate t)
|
||||
date-columns)
|
||||
"Load a single table: export data from MySQL to CSV file then load that in PG"
|
||||
(let ((filename (get-csv-pathname dbname table-name)))
|
||||
(mysql-copy-text-format dbname table-name
|
||||
filename
|
||||
:date-columns date-columns)
|
||||
(load-data-in-pgsql-text-format dbname table-name filename
|
||||
:truncate truncate)))
|
||||
|
||||
;;;
|
||||
;;; Let's go parallel, with a queue to communicate data
|
||||
;;;
|
||||
(defun load-data-from-queue-to-pgsql (dbname table-name dataq
|
||||
&key
|
||||
(truncate t)
|
||||
date-columns)
|
||||
"Fetch data from the QUEUE until we see :end-of-data"
|
||||
(when truncate (pgsql-truncate-table dbname table-name))
|
||||
|
||||
(let* ((conspec (remove :port (get-connection-string dbname)))
|
||||
(stream
|
||||
(cl-postgres:open-db-writer conspec table-name nil)))
|
||||
(unwind-protect
|
||||
(loop
|
||||
for row = (lq:pop-queue dataq)
|
||||
until (eq row :end-of-data)
|
||||
counting row into count
|
||||
do (let ((pgrow
|
||||
(pgsql-reformat-row row :date-columns date-columns)))
|
||||
(cl-postgres:db-write-row stream pgrow))
|
||||
finally (return (list :pgsql count)))
|
||||
(cl-postgres:close-db-writer stream))))
|
||||
|
||||
;;;
|
||||
;;; Direct "stream" in between mysql fetching of results and PostgreSQL COPY
|
||||
;;; protocol
|
||||
;;;
|
||||
(defun stream-mysql-table-in-pgsql (dbname table-name
|
||||
&key
|
||||
truncate
|
||||
date-columns)
|
||||
"Connect in parallel to MySQL and PostgreSQL and stream the data."
|
||||
(let* ((lp:*kernel* *loader-kernel*)
|
||||
(channel (lp:make-channel))
|
||||
(dataq (lq:make-queue)))
|
||||
;; have a task fill MySQL data in the queue
|
||||
(lp:submit-task channel
|
||||
(lambda ()
|
||||
(prog1
|
||||
(list :mysql
|
||||
(mysql-map-rows
|
||||
dbname table-name
|
||||
(lambda (row)
|
||||
;; buzy loop leave headroom to the consumer
|
||||
(loop
|
||||
for cycles in '(5000 10000 25000 15000)
|
||||
while (< 15000 (lq:queue-count dataq))
|
||||
do
|
||||
(loop repeat cycles))
|
||||
(lq:push-queue row dataq)))))
|
||||
(lq:push-queue :end-of-data dataq)))
|
||||
|
||||
;; and start another task to push that data from the queue to PostgreSQL
|
||||
(lp:submit-task channel
|
||||
(lambda ()
|
||||
(load-data-from-queue-to-pgsql dbname table-name dataq
|
||||
:truncate truncate
|
||||
:date-columns date-columns)))
|
||||
|
||||
;; now wait until both the tasks are over
|
||||
(loop
|
||||
for tasks below 2
|
||||
collect (lp:receive-result channel) into counts
|
||||
finally (return (cadr (assoc :pgsql counts))))))
|
||||
|
||||
;;;
|
||||
;;; Work on all tables for given database
|
||||
;;;
|
||||
(defun stream-database-tables (dbname &key (truncate t) only-tables)
|
||||
"Export MySQL data and Import it into PostgreSQL"
|
||||
;; get the list of tables and have at it
|
||||
(let ((mysql-tables (list-tables-in-mysql-db dbname))
|
||||
(total-count 0)
|
||||
(total-seconds 0))
|
||||
(format t "~&~30@a ~9@a ~9@a" "table name" "rows" "time")
|
||||
(format t "~&------------------------------ --------- ---------")
|
||||
(loop
|
||||
for (table-name . date-columns) in (get-table-list dbname)
|
||||
when (or (null only-tables)
|
||||
(member table-name only-tables :test #'equal))
|
||||
do
|
||||
(format t "~&~30@a " table-name)
|
||||
|
||||
(if (member table-name mysql-tables :test #'equal)
|
||||
(multiple-value-bind (result seconds)
|
||||
(timing
|
||||
(stream-mysql-table-in-pgsql dbname table-name
|
||||
:truncate truncate
|
||||
:date-columns date-columns))
|
||||
(when result
|
||||
(incf total-count result)
|
||||
(incf total-seconds seconds)
|
||||
(format t "~9@a ~9@a"
|
||||
result (format-interval seconds nil))))
|
||||
;; not a known mysql table
|
||||
(format t "skip, unknown table in MySQL database~%"))
|
||||
finally
|
||||
(format t "~&------------------------------ --------- ---------")
|
||||
(format t "~&~30@a ~9@a ~9@a" "Total streaming time"
|
||||
total-count (format-interval total-seconds nil))
|
||||
(return (values total-count (float total-seconds))))))
|
||||
|
||||
(defun load-database-tables-from-file (dbname &key (truncate t) only-tables)
|
||||
"Export MySQL data and Import it into PostgreSQL"
|
||||
;; get the list of tables and have at it
|
||||
(let ((mysql-tables (list-tables-in-mysql-db dbname))
|
||||
(total-count 0)
|
||||
(total-seconds 0))
|
||||
(format t "~&~30@a ~9@a ~9@a" "table name" "rows" "time")
|
||||
(format t "~&------------------------------ --------- ---------")
|
||||
(loop
|
||||
for (table-name . date-columns) in (get-table-list dbname)
|
||||
when (or (null only-tables)
|
||||
(member table-name only-tables :test #'equal))
|
||||
do
|
||||
(format t "~&~30@a " table-name)
|
||||
|
||||
(if (member table-name mysql-tables :test #'equal)
|
||||
(multiple-value-bind (result seconds)
|
||||
(timing
|
||||
(load-single-table-using-file dbname table-name
|
||||
:truncate truncate
|
||||
:date-columns date-columns))
|
||||
(when result
|
||||
(incf total-count result)
|
||||
(incf total-seconds seconds)
|
||||
(format t "~9@a ~9@a"
|
||||
result (format-interval seconds nil))))
|
||||
;; not a known mysql table
|
||||
(format t " skip, unknown table in MySQL database~%"))
|
||||
finally
|
||||
(format t "~&------------------------------ --------- ---------")
|
||||
(format t "~&~30@a ~9@a ~9@a" "Total export+import time"
|
||||
total-count (format-interval total-seconds nil))
|
||||
(return (values total-count (float total-seconds))))))
|
||||
|
||||
(defun load-all-databases ()
|
||||
(pomo:with-connection
|
||||
(get-connection-string "postgres")
|
||||
|
||||
;; get the list of databases and have at it
|
||||
(loop
|
||||
for dbname in (get-database-list)
|
||||
do
|
||||
(format t "~&DATABASE: ~a ..." dbname)
|
||||
(multiple-value-bind (result seconds)
|
||||
(timing
|
||||
(load-database-tables dbname))
|
||||
(format t " ~d rows in ~f secs~%" result seconds)))))
|
||||
138
mytest.lisp
Normal file
138
mytest.lisp
Normal file
@ -0,0 +1,138 @@
|
||||
(in-package :galaxya-loader)
|
||||
|
||||
(defparameter *myconn-host* "localhost")
|
||||
(defparameter *myconn-user* "debian-sys-maint")
|
||||
(defparameter *myconn-pass* "vtmMI04yBZlFprYm")
|
||||
|
||||
(defun toto ()
|
||||
(cl-mysql:connect :host *myconn-host*
|
||||
:user *myconn-user*
|
||||
:password *myconn-pass*)
|
||||
|
||||
(cl-mysql:query "SET NAMES 'utf8'")
|
||||
(cl-mysql:use "galaxya1")
|
||||
(prog1
|
||||
(cl-mysql:query "SELECT * FROM toto;" :type-map nil)
|
||||
(cl-mysql:disconnect)))
|
||||
|
||||
(defun mytest ()
|
||||
;; connect
|
||||
(cl-mysql:connect :host *myconn-host*
|
||||
:user *myconn-user*
|
||||
:password *myconn-pass*)
|
||||
|
||||
(unwind-protect
|
||||
(progn
|
||||
;; Ensure we're talking utf-8 and connect to DBNAME in MySQL
|
||||
(cl-mysql:query "SET NAMES 'utf8'")
|
||||
(cl-mysql:use "galaxya1")
|
||||
|
||||
(let* ((sql "SELECT * FROM toto;")
|
||||
(q (cl-mysql:query sql :store nil))
|
||||
(rs (cl-mysql:next-result-set q)))
|
||||
(declare (ignore rs))
|
||||
|
||||
;; "SELECT id, email, date_debut, date_redige, ip
|
||||
;; FROM commentaires
|
||||
;; WHERE date_redige = '0000-00-00' LIMIT 10;"
|
||||
|
||||
;; Now fetch MySQL rows
|
||||
(loop
|
||||
for row = (cl-mysql:next-row q :type-map (make-hash-table))
|
||||
while row collect row)))
|
||||
|
||||
;; free resources
|
||||
(cl-mysql:disconnect)))
|
||||
|
||||
(defun mytest-to-csv (dbname table-name filename)
|
||||
"extract MySQL data to CSV file"
|
||||
(cl-mysql:connect :host *myconn-host*
|
||||
:user *myconn-user*
|
||||
:password *myconn-pass*)
|
||||
|
||||
(unwind-protect
|
||||
(progn
|
||||
;; Ensure we're talking utf-8 and connect to DBNAME in MySQL
|
||||
(cl-mysql:query "SET NAMES 'utf8'")
|
||||
(cl-mysql:query "SET character_set_results = utf8;")
|
||||
(cl-mysql:use dbname)
|
||||
|
||||
(let* ((sql (format nil "SELECT * FROM ~a;" table-name))
|
||||
(q (cl-mysql:query sql :store nil))
|
||||
(rs (cl-mysql:next-result-set q)))
|
||||
(declare (ignore rs))
|
||||
|
||||
;; Now fetch MySQL rows directly in the stream
|
||||
(with-open-file (csv filename
|
||||
:direction :output
|
||||
:if-exists :supersede
|
||||
:external-format :utf8)
|
||||
(loop
|
||||
for count from 1
|
||||
for row = (cl-mysql:next-row q :type-map (make-hash-table))
|
||||
while row
|
||||
do (cl-csv:write-csv-row (reformat-row row '(2 3))
|
||||
:stream csv
|
||||
:separator #\;
|
||||
:quote #\"
|
||||
:newline '(#\Newline))
|
||||
finally (return count)))))
|
||||
|
||||
;; free resources
|
||||
(cl-mysql:disconnect)))
|
||||
|
||||
(defun reformat-row (row date-columns)
|
||||
"cl-mysql returns universal date, we want PostgreSQL date strings"
|
||||
(loop
|
||||
for i from 1
|
||||
for col in row
|
||||
when (member i date-columns)
|
||||
collect (reformat-date col)
|
||||
else collect col))
|
||||
|
||||
(defun pgtest (mydbname table-name &key (truncate t))
|
||||
(cl-mysql:connect :host *myconn-host*
|
||||
:user *myconn-user*
|
||||
:password *myconn-pass*)
|
||||
|
||||
;; Ensure we're talking utf-8 and connect to DBNAME in MySQL
|
||||
(cl-mysql:query "SET NAMES 'utf8'")
|
||||
(cl-mysql:query "SET character_set_results = utf8;")
|
||||
(cl-mysql:use mydbname)
|
||||
|
||||
;; TRUNCATE the table in PostgreSQL
|
||||
(when truncate
|
||||
(format t "TRUNCATE ~a;~%" table-name)
|
||||
(pomo:with-connection '("dim" "dim" "none" "localhost")
|
||||
(pomo:execute (format nil "truncate ~a;" table-name))))
|
||||
|
||||
;; Now fetch MySQL rows and feed them to our COPY stream
|
||||
(let* ((my-sql (format nil
|
||||
"SELECT * FROM ~a ORDER BY id LIMIT 1;"
|
||||
table-name))
|
||||
(my-q (cl-mysql:query my-sql :store nil))
|
||||
(my-rs (cl-mysql:next-result-set my-q))
|
||||
|
||||
(pgconn '("dim" "dim" "none" "localhost"))
|
||||
(pgstream
|
||||
(cl-postgres:open-db-writer pgconn table-name nil)))
|
||||
|
||||
(declare (ignore my-rs))
|
||||
|
||||
(unwind-protect
|
||||
(loop
|
||||
for count from 0
|
||||
|
||||
;; read MySQL data
|
||||
for row = (cl-mysql:next-row my-q :type-map (make-hash-table))
|
||||
while row
|
||||
|
||||
;; write it to PostgreSQL
|
||||
do (cl-postgres:db-write-row pgstream
|
||||
(mapcar (lambda (x)
|
||||
(if (null x) :null x))
|
||||
row))
|
||||
|
||||
finally (return count))
|
||||
;; in case of error, close copier and database
|
||||
(cl-postgres:close-db-writer pgstream))))
|
||||
21
package.lisp
Normal file
21
package.lisp
Normal file
@ -0,0 +1,21 @@
|
||||
;;;; package.lisp
|
||||
|
||||
(defpackage #:galaxya-loader
|
||||
(:use #:cl))
|
||||
|
||||
;;;
|
||||
;;; Some package names are a little too long to my taste and don't ship with
|
||||
;;; nicknames, so use `rename-package' here to give them some new nicknames.
|
||||
;;;
|
||||
(loop for (package . nicknames)
|
||||
in '((lparallel lp)
|
||||
(lparallel.queue lq)
|
||||
(simple-date date)
|
||||
(split-sequence sq))
|
||||
do (rename-package package package nicknames))
|
||||
|
||||
;;;
|
||||
;;; and recompile. Now you can pre-allocate the queue by passing a size to
|
||||
;;; MAKE-QUEUE. (You could pass a number before too, but it was ignored.)
|
||||
;;;
|
||||
(pushnew :lparallel.with-vector-queue *features*)
|
||||
14
run.lisp
Normal file
14
run.lisp
Normal file
@ -0,0 +1,14 @@
|
||||
;;; facility to easily run the program
|
||||
|
||||
(ql:quickload :galaxya-loader)
|
||||
|
||||
(in-package :galaxya-loader)
|
||||
|
||||
(let ((*pgcon* '("galaxya" "none" "localhost" :port 5432))
|
||||
((csv-path-root* "/home/cyb/csv")))
|
||||
|
||||
;; when we're ready we do that
|
||||
;; (load-all-databases))
|
||||
|
||||
;; meanwhile
|
||||
(load-database-tables "weetix"))
|
||||
32
timing.lisp
Normal file
32
timing.lisp
Normal file
@ -0,0 +1,32 @@
|
||||
;;;
|
||||
;;; Some little timing tools
|
||||
;;;
|
||||
|
||||
(in-package :galaxya-loader)
|
||||
|
||||
;;;
|
||||
;;; Timing Macros
|
||||
;;;
|
||||
(defun elapsed-time-since (start)
|
||||
"Return how many seconds ticked between START and now"
|
||||
(/ (- (get-internal-real-time) start)
|
||||
internal-time-units-per-second))
|
||||
|
||||
(defmacro timing (&body forms)
|
||||
"return both how much real time was spend in body and its result"
|
||||
(let ((start (gensym))
|
||||
(end (gensym))
|
||||
(result (gensym)))
|
||||
`(let* ((,start (get-internal-real-time))
|
||||
(,result (progn ,@forms))
|
||||
(,end (get-internal-real-time)))
|
||||
(values ,result (/ (- ,end ,start) internal-time-units-per-second)))))
|
||||
|
||||
(defun format-interval (seconds &optional (stream t))
|
||||
"Output the number of seconds in a human friendly way"
|
||||
(multiple-value-bind (year month day hour minute second millisecond)
|
||||
(date:decode-interval (date:encode-interval :second seconds))
|
||||
(declare (ignore year month))
|
||||
(when (< 0 day) (format stream "~d days " day))
|
||||
(when (< 0 hour) (format stream "~d hour " hour))
|
||||
(format stream "~dm~ds.~d" minute second millisecond)))
|
||||
Loading…
x
Reference in New Issue
Block a user