Massive Refactoring, towards pgloader.

This commit is contained in:
Dimitri Fontaine 2013-02-07 00:04:12 +01:00
parent b9c7dbb77b
commit 623e2d4ff7
15 changed files with 988 additions and 715 deletions

248
README.galaxya.md Normal file
View File

@ -0,0 +1,248 @@
# Galaxya Loader
Chargement de données CSV dans une base PostgreSQL. Les données sont
proprement encodée en UTF8 et seuls les champs dates doivent recevoir un
traitement particulier.
Les dates `0000-00-00` deviennent `NULL`. Les dates `0000-00-00 00:00:00`
aussi.
## Installation et dependances
Programme Common Lisp utilisant [SBCL](http://sbcl.org/) et
[Quicklisp](http://www.quicklisp.org/beta/).
apt-get install sbcl
wget http://beta.quicklisp.org/quicklisp.lisp
sbcl --load quicklisp.lisp
* (quicklisp-quickstart:install)
* (ql:add-to-init-file)
Ensuite il faut récupérer les sources du projet dans
`~/quicklisp/local-projects/`, afin de pouvoir faire :
sbcl
* (ql:quickload :galaxa-loader)
* (in-package :galaxya-loader)
* (stream-database-tables "weetix")
Ou bien directement :
* (load-all-databases)
## Example d'usage
Un chargement complet d'une base de données, en streaming directement depuis
MySQL vers PostgreSQL :
~~~
table name rows time
------------------------------ --------- ---------
ab_tests skip, unknown table in MySQL database
affiliations 1018 0m0s.440
blacklist 352 0m0s.348
blogtop 41451 0m1s.564
ca_sms_boost 447 0m0s.520
codes 3344498 4m8s.535
codes_buffer 15 0m8s.638
codes_distribues 366392 0m24s.908
codes_distribues_demande 795 0m1s.3
codes_temp 3 0m17s.195
codes_vipplus 859 0m8s.65
com_log 39703 0m2s.122
com_partenaires 19 0m0s.360
commandes_boutique 78717 0m2s.363
commentaires 7009 0m0s.770
communaute_amis 62948 0m1s.452
communaute_blacklist 67 0m0s.619
communaute_messagerie_id 4448 0m0s.606
communaute_messagerie_messages 9855 0m0s.907
communaute_no_show 9018 0m0s.437
concours_weetix 773 0m0s.609
coupons 71 0m0s.833
css 163463 0m3s.724
documents 21956 0m2s.148
documents_lots 23118 0m0s.909
documents_pays 31201 0m0s.627
echange_votes 251 0m0s.436
etransactions_id 1 0m0s.54
gagnants 21457 0m1s.132
gagnants_docs 321 0m0s.543
gains_affiliations 17280 0m1s.494
gains_concours 24728 0m1s.547
gains_webmasters 581093 0m12s.612
happy_hour 1 0m0s.501
interco 120 0m0s.834
ip 1603 0m1s.290
litiges 172 0m1s.487
livemessage 10719 0m1s.428
log_actions 1116761 0m31s.882
log_actions_vip 2912106 1m31s.668
log_facebook_credits 475 0m0s.908
log_flood_action 453 0m0s.438
log_modifications 4450 0m0s.458
log_mpme 34615 0m1s.890
log_points 432229 0m4s.337
log_sms 31766 0m1s.285
log_vipplus 18603 0m0s.714
log_votes 627462 0m12s.462
log_votes_details 2873684 1m24s.824
log_votes_manuels 4853 0m0s.530
membres 26752 0m2s.990
membres_referer 9628 0m0s.564
membres_verification 13976 0m0s.573
membres_vip 118192 0m8s.420
membres_vip_interets 101571 0m0s.669
muzictop 16686 0m1s.18
newsletters_clients 387 0m2s.914
numeros 133 0m0s.772
oppositions 518 0m0s.356
paiements 8231 0m0s.612
peopletop 56295 0m1s.781
plans 3 0m0s.401
plans_reversements 114 0m0s.544
prestataires 17 0m0s.315
promo_boost 74946 0m1s.769
reversements 21940 0m0s.744
sav_questions 22 0m0s.573
sav_themes 5 0m0s.407
sav_vip_questions 40 0m0s.381
sav_vip_themes 7 0m0s.319
services 29 0m0s.284
short_url 1820123 0m39s.355
short_url_temp 97495 0m1s.174
sms_boost 5552 0m1s.871
sms_boost_log 1008752 0m44s.848
startop 399326 0m7s.343
temp_codes_annules 918 0m0s.342
videotop 1352 0m0s.805
vip_alertes_sms 124 0m0s.502
vip_bonus 6816 0m0s.449
vip_classements 44536 0m2s.592
vip_classements_categories 17 0m0s.348
vip_infos_classements 31 0m0s.500
vip_profil 40384 0m1s.9
vip_regularisation_codes 15535 0m0s.735
vip_sollicitation 106368 0m1s.224
vip_verification_tel 8116 0m0s.637
webtop 21983 0m1s.195
zarchives 16296 0m0s.797
zarchives_blogtop 156067 0m6s.455
zarchives_muzictop 210174 0m4s.553
zarchives_peopletop 59026 0m2s.494
zarchives_startop 360187 0m7s.223
zarchives_videotop 6917 0m0s.787
zarchives_webtop 146387 0m3s.196
------------------------------ --------- ---------
Total streaming time 17905373 12m21s.296
17905373
741.296
~~~
Et en passant par des exports sur fichier temporaire au format COPY TEXT de
PostgreSQL:
~~~
GALAXYA-LOADER> (load-database-tables-from-file "weetix")
table name rows time
------------------------------ --------- ---------
ab_tests skip, unknown table in MySQL database
affiliations 1018 0m0s.751
blacklist 352 0m0s.416
blogtop 41451 0m2s.745
ca_sms_boost 447 0m0s.563
codes 3344498 5m45s.511
codes_buffer 15 0m0s.425
codes_distribues 366392 0m32s.598
codes_distribues_demande 795 0m0s.562
codes_temp 3 0m0s.453
codes_vipplus 859 0m0s.994
com_log 39703 0m2s.388
com_partenaires 19 0m0s.420
commandes_boutique 78717 0m4s.919
commentaires 7009 0m2s.881
communaute_amis 62948 0m8s.724
communaute_blacklist 67 0m1s.236
communaute_messagerie_id 4448 0m1s.522
communaute_messagerie_messages 9855 0m1s.614
communaute_no_show 9018 0m0s.437
concours_weetix 773 0m1s.305
coupons 71 0m0s.525
css 163463 0m5s.604
documents 21956 0m3s.788
documents_lots 23118 0m1s.333
documents_pays 31201 0m0s.930
echange_votes 251 0m0s.473
etransactions_id 1 0m0s.99
gagnants 21457 0m2s.85
gagnants_docs 321 0m0s.489
gains_affiliations 17280 0m1s.736
gains_concours 24728 0m1s.889
gains_webmasters 581093 0m20s.38
happy_hour 1 0m0s.147
interco 120 0m0s.457
ip 1603 0m0s.787
litiges 172 0m0s.750
livemessage 10719 0m1s.632
log_actions 1116761 1m23s.781
log_actions_vip 2912106 3m48s.206
log_facebook_credits 475 0m1s.178
log_flood_action 453 0m0s.434
log_modifications 4450 0m0s.952
log_mpme 34615 0m2s.859
log_points 432229 0m10s.778
log_sms 31766 0m2s.666
log_vipplus 18603 0m1s.145
log_votes 627462 0m25s.411
log_votes_details 2873684 2m54s.984
log_votes_manuels 4853 0m0s.600
membres 26752 0m6s.852
membres_referer 9628 0m0s.832
membres_verification 13976 0m0s.815
membres_vip 118192 0m18s.340
membres_vip_interets 101571 0m3s.513
muzictop 16686 0m1s.367
newsletters_clients 387 0m0s.941
numeros 133 0m0s.832
oppositions 518 0m0s.364
paiements 8231 0m0s.984
peopletop 56295 0m3s.730
plans 3 0m0s.375
plans_reversements 114 0m0s.590
prestataires 17 0m0s.334
promo_boost 74946 0m3s.559
reversements 21940 0m1s.25
sav_questions 22 0m0s.541
sav_themes 5 0m0s.346
sav_vip_questions 40 0m0s.382
sav_vip_themes 7 0m0s.360
services 29 0m0s.317
short_url 1820123 0m55s.47
short_url_temp 97495 0m4s.735
sms_boost 5552 0m1s.699
sms_boost_log 1008752 1m30s.472
startop 399326 0m16s.981
temp_codes_annules 918 0m0s.532
videotop 1352 0m0s.822
vip_alertes_sms 124 0m0s.577
vip_bonus 6816 0m0s.525
vip_classements 44536 0m6s.669
vip_classements_categories 17 0m1s.2
vip_infos_classements 31 0m0s.658
vip_profil 40384 0m2s.545
vip_regularisation_codes 15535 0m0s.809
vip_sollicitation 106368 0m2s.858
vip_verification_tel 8116 0m0s.791
webtop 21983 0m1s.766
zarchives 16296 0m1s.26
zarchives_blogtop 156067 0m7s.121
zarchives_muzictop 210174 0m8s.436
zarchives_peopletop 59026 0m3s.53
zarchives_startop 360187 0m13s.473
zarchives_videotop 6917 0m1s.187
zarchives_webtop 146387 0m7s.484
------------------------------ --------- ---------
Total export+import time 17905373 21m2s.887
17905373
1262.887
~~~

295
README.md
View File

@ -1,16 +1,25 @@
# Galaxya Loader
# PGLoader
Chargement de données CSV dans une base PostgreSQL. Les données sont
proprement encodée en UTF8 et seuls les champs dates doivent recevoir un
traitement particulier.
pgloader is a data loading tool for PostgreSQL, using the `COPY` command.
Les dates `0000-00-00` deviennent `NULL`. Les dates `0000-00-00 00:00:00`
aussi.
Its main avantage over just using `COPY` or `\copy` and over using a
*Foreign Data Wrapper* is the transaction behaviour, where *pgloader* will
keep a separate file of rejected data and continue trying to `copy` good
data in your database.
## Installation et dependances
The default PostgreSQL behaviour is transactional, which means that any
erroneous line in the input data (file or remote database) will stop the
bulk load for the whole table.
Programme Common Lisp utilisant [SBCL](http://sbcl.org/) et
[Quicklisp](http://www.quicklisp.org/beta/).
pgloader also implements data reformating, the main example of that being a
transformation from MySQL dates `0000-00-00` and `0000-00-00 00:00:00` to
PostgreSQL `NULL` value (because our calendar never had a *year zero*).
## INSTALL
pgloader is now a Common Lisp program, tested using the
[SBCL](http://sbcl.org/) and [CCL](http://ccl.clozure.com/) implementation
with [Quicklisp](http://www.quicklisp.org/beta/).
apt-get install sbcl
wget http://beta.quicklisp.org/quicklisp.lisp
@ -18,231 +27,63 @@ Programme Common Lisp utilisant [SBCL](http://sbcl.org/) et
* (quicklisp-quickstart:install)
* (ql:add-to-init-file)
Ensuite il faut récupérer les sources du projet dans
`~/quicklisp/local-projects/`, afin de pouvoir faire :
Now fetch pgloader sources into `~/quicklisp/local-projects/` so that you
can do:
sbcl
* (ql:quickload :galaxa-loader)
* (in-package :galaxya-loader)
* (ql:quickload :pgloader)
* (in-package :pgloader)
* (stream-database-tables "weetix")
Ou bien directement :
## Usage
* (load-all-databases)
## TODO
## Example d'usage
Some notes about what I intend to be working on next.
Un chargement complet d'une base de données, en streaming directement depuis
MySQL vers PostgreSQL :
### internals & refactoring
~~~
table name rows time
------------------------------ --------- ---------
ab_tests skip, unknown table in MySQL database
affiliations 1018 0m0s.440
blacklist 352 0m0s.348
blogtop 41451 0m1s.564
ca_sms_boost 447 0m0s.520
codes 3344498 4m8s.535
codes_buffer 15 0m8s.638
codes_distribues 366392 0m24s.908
codes_distribues_demande 795 0m1s.3
codes_temp 3 0m17s.195
codes_vipplus 859 0m8s.65
com_log 39703 0m2s.122
com_partenaires 19 0m0s.360
commandes_boutique 78717 0m2s.363
commentaires 7009 0m0s.770
communaute_amis 62948 0m1s.452
communaute_blacklist 67 0m0s.619
communaute_messagerie_id 4448 0m0s.606
communaute_messagerie_messages 9855 0m0s.907
communaute_no_show 9018 0m0s.437
concours_weetix 773 0m0s.609
coupons 71 0m0s.833
css 163463 0m3s.724
documents 21956 0m2s.148
documents_lots 23118 0m0s.909
documents_pays 31201 0m0s.627
echange_votes 251 0m0s.436
etransactions_id 1 0m0s.54
gagnants 21457 0m1s.132
gagnants_docs 321 0m0s.543
gains_affiliations 17280 0m1s.494
gains_concours 24728 0m1s.547
gains_webmasters 581093 0m12s.612
happy_hour 1 0m0s.501
interco 120 0m0s.834
ip 1603 0m1s.290
litiges 172 0m1s.487
livemessage 10719 0m1s.428
log_actions 1116761 0m31s.882
log_actions_vip 2912106 1m31s.668
log_facebook_credits 475 0m0s.908
log_flood_action 453 0m0s.438
log_modifications 4450 0m0s.458
log_mpme 34615 0m1s.890
log_points 432229 0m4s.337
log_sms 31766 0m1s.285
log_vipplus 18603 0m0s.714
log_votes 627462 0m12s.462
log_votes_details 2873684 1m24s.824
log_votes_manuels 4853 0m0s.530
membres 26752 0m2s.990
membres_referer 9628 0m0s.564
membres_verification 13976 0m0s.573
membres_vip 118192 0m8s.420
membres_vip_interets 101571 0m0s.669
muzictop 16686 0m1s.18
newsletters_clients 387 0m2s.914
numeros 133 0m0s.772
oppositions 518 0m0s.356
paiements 8231 0m0s.612
peopletop 56295 0m1s.781
plans 3 0m0s.401
plans_reversements 114 0m0s.544
prestataires 17 0m0s.315
promo_boost 74946 0m1s.769
reversements 21940 0m0s.744
sav_questions 22 0m0s.573
sav_themes 5 0m0s.407
sav_vip_questions 40 0m0s.381
sav_vip_themes 7 0m0s.319
services 29 0m0s.284
short_url 1820123 0m39s.355
short_url_temp 97495 0m1s.174
sms_boost 5552 0m1s.871
sms_boost_log 1008752 0m44s.848
startop 399326 0m7s.343
temp_codes_annules 918 0m0s.342
videotop 1352 0m0s.805
vip_alertes_sms 124 0m0s.502
vip_bonus 6816 0m0s.449
vip_classements 44536 0m2s.592
vip_classements_categories 17 0m0s.348
vip_infos_classements 31 0m0s.500
vip_profil 40384 0m1s.9
vip_regularisation_codes 15535 0m0s.735
vip_sollicitation 106368 0m1s.224
vip_verification_tel 8116 0m0s.637
webtop 21983 0m1s.195
zarchives 16296 0m0s.797
zarchives_blogtop 156067 0m6s.455
zarchives_muzictop 210174 0m4s.553
zarchives_peopletop 59026 0m2s.494
zarchives_startop 360187 0m7s.223
zarchives_videotop 6917 0m0s.787
zarchives_webtop 146387 0m3s.196
------------------------------ --------- ---------
Total streaming time 17905373 12m21s.296
17905373
741.296
~~~
- review pgloader.pgsql:reformat-row date-columns arguments
- review connection string handling for both PostgreSQL and MySQL
- provide a better toplevel API
- implement tests
Et en passant par des exports sur fichier temporaire au format COPY TEXT de
PostgreSQL:
### user features
~~~
GALAXYA-LOADER> (load-database-tables-from-file "weetix")
table name rows time
------------------------------ --------- ---------
ab_tests skip, unknown table in MySQL database
affiliations 1018 0m0s.751
blacklist 352 0m0s.416
blogtop 41451 0m2s.745
ca_sms_boost 447 0m0s.563
codes 3344498 5m45s.511
codes_buffer 15 0m0s.425
codes_distribues 366392 0m32s.598
codes_distribues_demande 795 0m0s.562
codes_temp 3 0m0s.453
codes_vipplus 859 0m0s.994
com_log 39703 0m2s.388
com_partenaires 19 0m0s.420
commandes_boutique 78717 0m4s.919
commentaires 7009 0m2s.881
communaute_amis 62948 0m8s.724
communaute_blacklist 67 0m1s.236
communaute_messagerie_id 4448 0m1s.522
communaute_messagerie_messages 9855 0m1s.614
communaute_no_show 9018 0m0s.437
concours_weetix 773 0m1s.305
coupons 71 0m0s.525
css 163463 0m5s.604
documents 21956 0m3s.788
documents_lots 23118 0m1s.333
documents_pays 31201 0m0s.930
echange_votes 251 0m0s.473
etransactions_id 1 0m0s.99
gagnants 21457 0m2s.85
gagnants_docs 321 0m0s.489
gains_affiliations 17280 0m1s.736
gains_concours 24728 0m1s.889
gains_webmasters 581093 0m20s.38
happy_hour 1 0m0s.147
interco 120 0m0s.457
ip 1603 0m0s.787
litiges 172 0m0s.750
livemessage 10719 0m1s.632
log_actions 1116761 1m23s.781
log_actions_vip 2912106 3m48s.206
log_facebook_credits 475 0m1s.178
log_flood_action 453 0m0s.434
log_modifications 4450 0m0s.952
log_mpme 34615 0m2s.859
log_points 432229 0m10s.778
log_sms 31766 0m2s.666
log_vipplus 18603 0m1s.145
log_votes 627462 0m25s.411
log_votes_details 2873684 2m54s.984
log_votes_manuels 4853 0m0s.600
membres 26752 0m6s.852
membres_referer 9628 0m0s.832
membres_verification 13976 0m0s.815
membres_vip 118192 0m18s.340
membres_vip_interets 101571 0m3s.513
muzictop 16686 0m1s.367
newsletters_clients 387 0m0s.941
numeros 133 0m0s.832
oppositions 518 0m0s.364
paiements 8231 0m0s.984
peopletop 56295 0m3s.730
plans 3 0m0s.375
plans_reversements 114 0m0s.590
prestataires 17 0m0s.334
promo_boost 74946 0m3s.559
reversements 21940 0m1s.25
sav_questions 22 0m0s.541
sav_themes 5 0m0s.346
sav_vip_questions 40 0m0s.382
sav_vip_themes 7 0m0s.360
services 29 0m0s.317
short_url 1820123 0m55s.47
short_url_temp 97495 0m4s.735
sms_boost 5552 0m1s.699
sms_boost_log 1008752 1m30s.472
startop 399326 0m16s.981
temp_codes_annules 918 0m0s.532
videotop 1352 0m0s.822
vip_alertes_sms 124 0m0s.577
vip_bonus 6816 0m0s.525
vip_classements 44536 0m6s.669
vip_classements_categories 17 0m1s.2
vip_infos_classements 31 0m0s.658
vip_profil 40384 0m2s.545
vip_regularisation_codes 15535 0m0s.809
vip_sollicitation 106368 0m2s.858
vip_verification_tel 8116 0m0s.791
webtop 21983 0m1s.766
zarchives 16296 0m1s.26
zarchives_blogtop 156067 0m7s.121
zarchives_muzictop 210174 0m8s.436
zarchives_peopletop 59026 0m3s.53
zarchives_startop 360187 0m13s.473
zarchives_videotop 6917 0m1s.187
zarchives_webtop 146387 0m7s.484
------------------------------ --------- ---------
Total export+import time 17905373 21m2s.887
17905373
1262.887
~~~
- commands: `LOAD` and `INI` formats
- compat with `SQL*Loader` format
#### data loading
- dichotomy, that needs a local buffer
- general CSV and Text source formats
- compressed input (gzip, other algos)
- fetch data from S3
#### convenience
- automatic creation of schema (from MySQL schema, or from CSV header)
- pre-fetch some rows to guesstimate data types?
#### performances
- some more parallelizing options
- support for partitionning in pgloader itself
#### reformating
Data reformating is now going to have to happen in Common Lisp mostly, maybe
offer some other languages (cl-awk etc).
- raw reformating, before rows are split
- per column reformating
- user-defined columns (constants, functions of other rows)
#### UI
- add a web controler with pretty monitoring
- launch new jobs from the web controler
#### crazy ideas
- MySQL replication, reading from the binlog directly

21
csv.lisp Normal file
View File

@ -0,0 +1,21 @@
;;;
;;; Tools to handle MySQL data fetching
;;;
(defpackage #:pgloader.csv
(:use #:cl)
(:export #:*csv-path-root*
#:get-pathname))
(in-package :pgloader.csv)
(defparameter *csv-path-root*
(merge-pathnames "csv/" (user-homedir-pathname)))
(defun get-pathname (dbname table-name)
"Return a pathname where to read or write the file data"
(make-pathname
:directory (pathname-directory
(merge-pathnames (format nil "~a/" dbname) *csv-path-root*))
:name table-name
:type "csv"))

View File

@ -1,180 +0,0 @@
(in-package :galaxya-loader)
;;;
;;; Reformating Tools, because MySQL has not the same idea about its data
;;; than PostgreSQL has. Ever heard of year 0000? MySQL did...
;;;
(defun mysql-fix-date (datestr)
(cond
((null datestr) nil)
((string= datestr "") nil)
((string= datestr "0000-00-00") nil)
((string= datestr "0000-00-00 00:00:00") nil)
(t datestr)))
(defun pgsql-reformat-null-value (value)
"cl-mysql returns nil for NULL and cl-postgres wants :NULL"
(if (null value) :NULL value))
(defun pgsql-reformat-row (row &key date-columns)
"Reformat row as given by MySQL in a format compatible with cl-postgres"
(loop
for i from 1
for col in row
for no-zero-date-col = (if (member i date-columns)
(mysql-fix-date col)
col)
collect (pgsql-reformat-null-value no-zero-date-col)))
;;;
;;; Implement PostgreSQL COPY format, the TEXT variant.
;;;
(defun pgsql-text-copy-format (stream row &key date-columns)
"Add a csv row in the stream"
(let* (*print-circle* *print-pretty*)
(loop
for i from 1
for (col . more?) on row
for preprocessed-col = (if (member i date-columns)
(mysql-fix-date col)
col)
do (if (null preprocessed-col)
(format stream "~a~:[~;~c~]" "\\N" more? #\Tab)
(progn
;; In particular, the following characters must be preceded
;; by a backslash if they appear as part of a column value:
;; backslash itself, newline, carriage return, and the
;; current delimiter character.
(loop
for char across preprocessed-col
do (case char
(#\\ (format stream "\\\\")) ; 2 chars here
(#\Space (princ #\Space stream))
(#\Newline (format stream "\\n")) ; 2 chars here
(#\Return (format stream "\\r")) ; 2 chars here
(#\Tab (format stream "\\t")) ; 2 chars here
(#\Backspace (format stream "\\b")) ; 2 chars here
(#\Page (format stream "\\f")) ; 2 chars here
(t (format stream "~c" char))))
(format stream "~:[~;~c~]" more? #\Tab))))
(format stream "~%")))
;;;
;;; Map a function to each row extracted from MySQL
;;;
(defun mysql-map-rows (dbname table-name process-row-fn
&key
(host *myconn-host*)
(user *myconn-user*)
(pass *myconn-pass*))
"Extract MySQL data and call PROCESS-ROW-FN function with a single
argument (a list of column values) for each row."
(cl-mysql:connect :host host :user user :password pass)
(unwind-protect
(progn
;; Ensure we're talking utf-8 and connect to DBNAME in MySQL
(cl-mysql:query "SET NAMES 'utf8'")
(cl-mysql:query "SET character_set_results = utf8;")
(cl-mysql:use dbname)
(let* ((sql (format nil "SELECT * FROM ~a;" table-name))
(q (cl-mysql:query sql :store nil))
(rs (cl-mysql:next-result-set q)))
(declare (ignore rs))
;; Now fetch MySQL rows directly in the stream
(loop
for row = (cl-mysql:next-row q :type-map (make-hash-table))
while row
counting row into count
do (funcall process-row-fn row)
finally (return count))))
;; free resources
(cl-mysql:disconnect)))
;;;
;;; Use mysql-map-rows and pgsql-text-copy-format to fill in a CSV file on
;;; disk with MySQL data in there.
;;;
(defun mysql-copy-text-format (dbname table-name filename
&key
date-columns
(host *myconn-host*)
(user *myconn-user*)
(pass *myconn-pass*))
"Extrat data from MySQL in PostgreSQL COPY TEXT format"
(with-open-file (text-file filename
:direction :output
:if-exists :supersede
:external-format :utf8)
(mysql-map-rows dbname table-name
(lambda (row)
(pgsql-text-copy-format
text-file
row
:date-columns date-columns))
:host host
:user user
:pass pass)))
;;;
;;; Some MySQL other tools
;;;
(defun list-tables-in-mysql-db (dbname
&key
(host *myconn-host*)
(user *myconn-user*)
(pass *myconn-pass*))
"As the name says"
(cl-mysql:connect :host host :user user :password pass)
(unwind-protect
(progn
(cl-mysql:use dbname)
;; that returns a pretty weird format, process it
(mapcan #'identity (caar (cl-mysql:list-tables))))
;; free resources
(cl-mysql:disconnect)))
(defun mysql-export-all-tables (dbname
&key
only-tables
(host *myconn-host*)
(user *myconn-user*)
(pass *myconn-pass*))
"Export MySQL tables into as many TEXT files, in the PostgreSQL COPY format"
(let ((pgsql-date-columns-alist (get-table-list dbname))
(total-count 0)
(total-seconds 0))
(format t "~&~30@a ~9@a ~9@a" "table name" "rows" "time")
(format t "~&------------------------------ --------- ---------")
(loop
for table-name in (list-tables-in-mysql-db dbname
:host host
:user user
:pass pass)
for filename = (get-csv-pathname dbname table-name)
when (or (null only-tables)
(member table-name only-tables :test #'equal))
do
(format t "~&~30@a " table-name)
(multiple-value-bind (result seconds)
(timing
(mysql-copy-text-format
dbname
table-name
(get-csv-pathname dbname table-name)
:date-columns (cdr (assoc table-name pgsql-date-columns-alist))))
(when result
(incf total-count result)
(incf total-seconds seconds)
(format t "~9@a ~9@a"
result (format-interval seconds nil))))
finally
(format t "~&------------------------------ --------- ---------")
(format t "~&~30@a ~9@a ~9@a" "Total export time"
total-count (format-interval total-seconds nil))
(return (values total-count (float total-seconds))))))

View File

@ -1,20 +0,0 @@
;;;; galaxya-loader.asd
(asdf:defsystem #:galaxya-loader
:serial t
:description "Export data from MySQL and load it into PostgreSQL"
:author "Dimitri Fontaine <dimitri@2ndQuadrant.fr>"
:license "The PostgreSQL Licence"
:depends-on (#:postmodern
#:cl-postgres
#:simple-date
#:cl-mysql
#:split-sequence
#:cl-csv
#:lparallel)
:components ((:file "package")
(:file "timing" :depends-on ("package"))
(:file "galaxya-export" :depends-on ("package"))
(:file "galaxya-loader" :depends-on ("package"
"galaxya-export"))))

View File

@ -1,268 +0,0 @@
;;;; galaxya-loader.lisp
(in-package #:galaxya-loader)
;;;
;;; Parameters you might want to change
;;;
(defparameter *loader-kernel* (lp:make-kernel 2)
"lparallel kernel to use for loading data in parallel")
(defparameter *myconn-host* "gala3.galaxya.fr")
(defparameter *myconn-user* "pg1")
(defparameter *myconn-pass* "AFmhKERxD9PVjgQD")
(setq *myconn-host* "localhost"
*myconn-user* "debian-sys-maint"
*myconn-pass* "vtmMI04yBZlFprYm")
(defparameter *pgconn*
'("gdb" "none" "localhost" :port 5432)
"Connection string to the local database")
(defparameter *csv-path-root*
(merge-pathnames "csv/" (user-homedir-pathname)))
(defun get-csv-pathname (dbname table-name)
"return where to find the file"
(make-pathname
:directory (pathname-directory
(merge-pathnames (format nil "~a/" dbname) *csv-path-root*))
:name table-name
:type "csv"))
;;;
;;; PostgreSQL Utilities
;;;
(defun get-connection-string (dbname)
(cons dbname *pgconn*))
(defun get-database-list ()
"connect to a local database and get the database list"
(pomo:with-connection
(get-connection-string "postgres")
(loop for (dbname) in (pomo:query
"select datname
from pg_database
where datname !~ 'postgres|template'")
collect dbname)))
(defun get-table-list (dbname)
"Return an alist of tables names and list of columns to pay attention to."
(pomo:with-connection
(get-connection-string dbname)
(loop for (relname colarray) in (pomo:query "
select relname, array_agg(case when typname in ('date', 'timestamptz')
then attnum end
order by attnum)
from pg_class c
join pg_namespace n on n.oid = c.relnamespace
left join pg_attribute a on c.oid = a.attrelid
join pg_type t on t.oid = a.atttypid
where c.relkind = 'r'
and attnum > 0
and n.nspname = 'public'
group by relname
")
collect (cons relname (loop
for attnum across colarray
unless (eq attnum :NULL)
collect attnum)))))
(defun pgsql-truncate-table (dbname table-name)
"Truncate given TABLE-NAME in database DBNAME"
(pomo:with-connection (get-connection-string dbname)
(pomo:execute (format nil "truncate ~a;" table-name))))
(defun load-data-in-pgsql-text-format (dbname table-name filename
&key
(truncate t))
"Load data from clean CSV file to PostgreSQL"
(with-open-file
;; we just ignore files that don't exist
(input filename
:direction :input
:if-does-not-exist nil)
(when input
(when truncate
(pgsql-truncate-table dbname table-name))
;; read csv in the file and push it directly through the db writer
;; in COPY streaming mode
(let* ((conspec (remove :port (get-connection-string dbname)))
(stream
(cl-postgres:open-db-writer conspec table-name nil)))
(unwind-protect
(loop
for line = (read-line input nil)
for row = (mapcar (lambda (x)
(if (string= "\\N" x) :null x))
(sq:split-sequence #\Tab line))
while line
counting line into count
do (cl-postgres:db-write-row stream row)
finally (return count))
(cl-postgres:close-db-writer stream))))))
;;;
;;; Export data from MySQL as a COPY TEXT file then import that file into
;;; the destination PostgreSQL table.
;;;
(defun load-single-table-using-file (dbname table-name
&key
(truncate t)
date-columns)
"Load a single table: export data from MySQL to CSV file then load that in PG"
(let ((filename (get-csv-pathname dbname table-name)))
(mysql-copy-text-format dbname table-name
filename
:date-columns date-columns)
(load-data-in-pgsql-text-format dbname table-name filename
:truncate truncate)))
;;;
;;; Let's go parallel, with a queue to communicate data
;;;
(defun load-data-from-queue-to-pgsql (dbname table-name dataq
&key
(truncate t)
date-columns)
"Fetch data from the QUEUE until we see :end-of-data"
(when truncate (pgsql-truncate-table dbname table-name))
(let* ((conspec (remove :port (get-connection-string dbname)))
(stream
(cl-postgres:open-db-writer conspec table-name nil)))
(unwind-protect
(loop
for row = (lq:pop-queue dataq)
until (eq row :end-of-data)
counting row into count
do (let ((pgrow
(pgsql-reformat-row row :date-columns date-columns)))
(cl-postgres:db-write-row stream pgrow))
finally (return (list :pgsql count)))
(cl-postgres:close-db-writer stream))))
;;;
;;; Direct "stream" in between mysql fetching of results and PostgreSQL COPY
;;; protocol
;;;
(defun stream-mysql-table-in-pgsql (dbname table-name
&key
truncate
date-columns)
"Connect in parallel to MySQL and PostgreSQL and stream the data."
(let* ((lp:*kernel* *loader-kernel*)
(channel (lp:make-channel))
(dataq (lq:make-queue 4096)))
;; have a task fill MySQL data in the queue
(lp:submit-task channel
(lambda ()
(prog1
(list :mysql
(mysql-map-rows
dbname table-name
(lambda (row)
(lq:push-queue row dataq)))))
(lq:push-queue :end-of-data dataq)))
;; and start another task to push that data from the queue to PostgreSQL
(lp:submit-task channel
(lambda ()
(load-data-from-queue-to-pgsql dbname table-name dataq
:truncate truncate
:date-columns date-columns)))
;; now wait until both the tasks are over
(loop
for tasks below 2
collect (lp:receive-result channel) into counts
finally (return (cadr (assoc :pgsql counts))))))
;;;
;;; Work on all tables for given database
;;;
(defun stream-database-tables (dbname &key (truncate t) only-tables)
"Export MySQL data and Import it into PostgreSQL"
;; get the list of tables and have at it
(let ((mysql-tables (list-tables-in-mysql-db dbname))
(total-count 0)
(total-seconds 0))
(format t "~&~30@a ~9@a ~9@a" "table name" "rows" "time")
(format t "~&------------------------------ --------- ---------")
(loop
for (table-name . date-columns) in (get-table-list dbname)
when (or (null only-tables)
(member table-name only-tables :test #'equal))
do
(format t "~&~30@a " table-name)
(if (member table-name mysql-tables :test #'equal)
(multiple-value-bind (result seconds)
(timing
(stream-mysql-table-in-pgsql dbname table-name
:truncate truncate
:date-columns date-columns))
(when result
(incf total-count result)
(incf total-seconds seconds)
(format t "~9@a ~9@a"
result (format-interval seconds nil))))
;; not a known mysql table
(format t "skip, unknown table in MySQL database~%"))
finally
(format t "~&------------------------------ --------- ---------")
(format t "~&~30@a ~9@a ~9@a" "Total streaming time"
total-count (format-interval total-seconds nil))
(return (values total-count (float total-seconds))))))
(defun load-database-tables-from-file (dbname &key (truncate t) only-tables)
"Export MySQL data and Import it into PostgreSQL"
;; get the list of tables and have at it
(let ((mysql-tables (list-tables-in-mysql-db dbname))
(total-count 0)
(total-seconds 0))
(format t "~&~30@a ~9@a ~9@a" "table name" "rows" "time")
(format t "~&------------------------------ --------- ---------")
(loop
for (table-name . date-columns) in (get-table-list dbname)
when (or (null only-tables)
(member table-name only-tables :test #'equal))
do
(format t "~&~30@a " table-name)
(if (member table-name mysql-tables :test #'equal)
(multiple-value-bind (result seconds)
(timing
(load-single-table-using-file dbname table-name
:truncate truncate
:date-columns date-columns))
(when result
(incf total-count result)
(incf total-seconds seconds)
(format t "~9@a ~9@a"
result (format-interval seconds nil))))
;; not a known mysql table
(format t " skip, unknown table in MySQL database~%"))
finally
(format t "~&------------------------------ --------- ---------")
(format t "~&~30@a ~9@a ~9@a" "Total export+import time"
total-count (format-interval total-seconds nil))
(return (values total-count (float total-seconds))))))
(defun load-all-databases ()
(pomo:with-connection
(get-connection-string "postgres")
;; get the list of databases and have at it
(loop
for dbname in (get-database-list)
do
(format t "~&DATABASE: ~a ..." dbname)
(multiple-value-bind (result seconds)
(timing
(load-database-tables dbname))
(format t " ~d rows in ~f secs~%" result seconds)))))

261
mysql.lisp Normal file
View File

@ -0,0 +1,261 @@
;;;
;;; Tools to handle MySQL data fetching
;;;
(defpackage #:pgloader.mysql
(:use #:cl)
(:import-from #:pgloader
#:*loader-kernel*
#:*myconn-host*
#:*myconn-user*
#:*myconn-pass*)
(:export #:map-rows
#:copy-from
#:list-databases
#:list-tables
#:export-all-tables
#:export-import-database
#:stream-mysql-table-in-pgsql
#:stream-database-tables))
(in-package :pgloader.mysql)
;;;
;;; MySQL tools connecting to a database
;;;
(defun list-databases (&key
(host *myconn-host*)
(user *myconn-user*)
(pass *myconn-pass*))
"Connect to a local database and get the database list"
(cl-mysql:connect :host host :user user :password pass)
(unwind-protect
(mapcan #'identity (caar (cl-mysql:query "show databases")))
(cl-mysql:disconnect)))
(defun list-tables (dbname
&key
(host *myconn-host*)
(user *myconn-user*)
(pass *myconn-pass*))
"Return a flat list of all the tables names known in given DATABASE"
(cl-mysql:connect :host host :user user :password pass)
(unwind-protect
(progn
(cl-mysql:use dbname)
;; that returns a pretty weird format, process it
(mapcan #'identity (caar (cl-mysql:list-tables))))
;; free resources
(cl-mysql:disconnect)))
;;;
;;; Map a function to each row extracted from MySQL
;;;
(defun map-rows (dbname table-name process-row-fn
&key
(host *myconn-host*)
(user *myconn-user*)
(pass *myconn-pass*))
"Extract MySQL data and call PROCESS-ROW-FN function with a single
argument (a list of column values) for each row."
(cl-mysql:connect :host host :user user :password pass)
(unwind-protect
(progn
;; Ensure we're talking utf-8 and connect to DBNAME in MySQL
(cl-mysql:query "SET NAMES 'utf8'")
(cl-mysql:query "SET character_set_results = utf8;")
(cl-mysql:use dbname)
(let* ((sql (format nil "SELECT * FROM ~a;" table-name))
(q (cl-mysql:query sql :store nil))
(rs (cl-mysql:next-result-set q)))
(declare (ignore rs))
;; Now fetch MySQL rows directly in the stream
(loop
for row = (cl-mysql:next-row q :type-map (make-hash-table))
while row
counting row into count
do (funcall process-row-fn row)
finally (return count))))
;; free resources
(cl-mysql:disconnect)))
;;;
;;; Use mysql-map-rows and pgsql-text-copy-format to fill in a CSV file on
;;; disk with MySQL data in there.
;;;
(defun copy-from (dbname table-name filename
&key
date-columns
(host *myconn-host*)
(user *myconn-user*)
(pass *myconn-pass*))
"Extrat data from MySQL in PostgreSQL COPY TEXT format"
(with-open-file (text-file filename
:direction :output
:if-exists :supersede
:external-format :utf8)
(map-rows dbname table-name
(lambda (row)
(pgloader.pgsql:format-row text-file
row
:date-columns date-columns))
:host host
:user user
:pass pass)))
;;;
;;; MySQL bulk export to file, in PostgreSQL COPY TEXT format
;;;
(defun export-all-tables (dbname
&key
only-tables
(host *myconn-host*)
(user *myconn-user*)
(pass *myconn-pass*))
"Export MySQL tables into as many TEXT files, in the PostgreSQL COPY format"
(let ((pgtables (pgloader.pgsql:list-tables dbname))
(total-count 0)
(total-seconds 0))
(pgloader.utils:report-header)
(loop
for table-name in (list-tables dbname
:host host
:user user
:pass pass)
for filename = (pgloader.csv:get-pathname dbname table-name)
when (or (null only-tables)
(member table-name only-tables :test #'equal))
do
(pgloader.utils:report-table-name table-name)
(multiple-value-bind (result seconds)
(pgloader.utils:timing
(copy-from dbname table-name filename
:date-columns (pgloader.pgsql:get-date-columns
table-name pgtables)))
(when result
(incf total-count result)
(incf total-seconds seconds)
(pgloader.utils:report-results result seconds)))
finally
(pgloader.utils:report-footer "Total export time"
total-count total-seconds)
(return (values total-count (float total-seconds))))))
;;;
;;; Copy data for a target database from files in the PostgreSQL COPY TEXT
;;; format
;;;
(defun export-import-database (dbname
&key
(truncate t)
only-tables)
"Export MySQL data and Import it into PostgreSQL"
;; get the list of tables and have at it
(let ((mysql-tables (list-tables dbname))
(total-count 0)
(total-seconds 0))
(pgloader.utils:report-header)
(loop
for (table-name . date-columns) in (pgloader.pgsql:list-tables dbname)
when (or (null only-tables)
(member table-name only-tables :test #'equal))
do
(pgloader.utils:report-table-name table-name)
(if (member table-name mysql-tables :test #'equal)
(multiple-value-bind (result seconds)
(pgloader.utils:timing
(let ((filename
(pgloader.csv:get-pathname dbname table-name)))
;; export from MySQL to file
(copy-from dbname table-name filename
:date-columns date-columns)
;; import the file to PostgreSQL
(pgloader.pgsql:copy-from-file dbname table-name filename
:truncate truncate)))
(when result
(incf total-count result)
(incf total-seconds seconds)
(pgloader.utils:report-results result seconds)))
;; not a known mysql table
(format t " skip, unknown table in MySQL database~%"))
finally
(pgloader.utils:report-footer "Total export+import time"
total-count total-seconds)
(return (values total-count (float total-seconds))))))
;;;
;;; Direct "stream" in between mysql fetching of results and PostgreSQL COPY
;;; protocol
;;;
(defun stream-mysql-table-in-pgsql (dbname table-name
&key
truncate
date-columns)
"Connect in parallel to MySQL and PostgreSQL and stream the data."
(let* ((lp:*kernel* *loader-kernel*)
(channel (lp:make-channel))
(dataq (lq:make-queue 4096)))
;; have a task fill MySQL data in the queue
(lp:submit-task
channel (lambda ()
(list :mysql
(pgloader.queue:map-push-queue
dataq
#'map-rows dbname table-name))))
;; and start another task to push that data from the queue to PostgreSQL
(lp:submit-task
channel (lambda ()
(list :pgsql
(pgloader.pgsql:copy-from-queue dbname table-name dataq
:truncate truncate
:date-columns date-columns))))
;; now wait until both the tasks are over
(loop
for tasks below 2
collect (lp:receive-result channel) into counts
finally (return (cadr (assoc :pgsql counts))))))
;;;
;;; Work on all tables for given database
;;;
(defun stream-database-tables (dbname &key (truncate t) only-tables)
"Export MySQL data and Import it into PostgreSQL"
;; get the list of tables and have at it
(let ((mysql-tables (list-tables dbname))
(total-count 0)
(total-seconds 0))
(pgloader.utils:report-header)
(loop
for (table-name . date-columns) in (pgloader.pgsql:list-tables dbname)
when (or (null only-tables)
(member table-name only-tables :test #'equal))
do
(pgloader.utils:report-table-name table-name)
(if (member table-name mysql-tables :test #'equal)
(multiple-value-bind (result seconds)
(pgloader.utils:timing
(stream-mysql-table-in-pgsql dbname table-name
:truncate truncate
:date-columns date-columns))
(when result
(incf total-count result)
(incf total-seconds seconds)
(pgloader.utils:report-results result seconds)))
;; not a known mysql table
(format t "skip, unknown table in MySQL database~%"))
finally
(pgloader.utils:report-footer "Total streaming time"
total-count total-seconds)
(return (values total-count (float total-seconds))))))

View File

@ -1,18 +1,38 @@
;;;; package.lisp
(defpackage #:galaxya-loader
(:use #:cl))
(defpackage #:pgloader.pgsql
(:use #:cl)
(:export #:truncate-table
#:copy-from-file
#:copy-from-queue
#:list-databases
#:list-tables
#:get-date-columns
#:format-row))
(defpackage #:pgloader
(:use #:cl)
(:import-from #:pgloader.pgsql
#:copy-from-file
#:list-databases
#:list-tables)
(:export #:copy-from-file
#:list-databases
#:list-tables))
(in-package #:pgloader)
;;;
;;; Some package names are a little too long to my taste and don't ship with
;;; nicknames, so use `rename-package' here to give them some new nicknames.
;;;
(loop for (package . nicknames)
in '((lparallel lp)
(lparallel.queue lq)
(simple-date date)
(split-sequence sq))
do (rename-package package package nicknames))
(loop
for (package . nicknames)
in '((lparallel lp)
(lparallel.queue lq)
(simple-date date)
(split-sequence sq))
do (rename-package package package nicknames))
;;;
;;; and recompile. Now you can pre-allocate the queue by passing a size to

32
pgloader.asd Normal file
View File

@ -0,0 +1,32 @@
;;;; pgloader.asd
(asdf:defsystem #:pgloader
:serial t
:description "Load data into PostgreSQL"
:author "Dimitri Fontaine <dimitri@2ndQuadrant.fr>"
:license "The PostgreSQL Licence"
:depends-on (#:postmodern
#:cl-postgres
#:simple-date
#:cl-mysql
#:split-sequence
#:cl-csv
#:lparallel)
:components ((:file "package")
(:file "utils" :depends-on ("package"))
(:file "pgloader" :depends-on ("package"))
;; those are one-package-per-file
(:file "queue") ; package pgloader.queue
(:file "csv") ; package pgloader.csv
;; package pgloader.pgsql
(:file "pgsql" :depends-on ("queue" "utils"))
;; mysql.lisp depends on pgsql.lisp to be able to export data
;; from MySQL in the PostgreSQL format.
;;
;; package pgloader.mysql
(:file "mysql" :depends-on ("pgsql" "queue" "utils"))))

20
pgloader.lisp Normal file
View File

@ -0,0 +1,20 @@
;;;; pgloader
;;;
;;; PostgreSQL data loading tool.
;;;
(in-package #:pgloader)
;;;
;;; Parameters you might want to change
;;;
(defparameter *loader-kernel* (lp:make-kernel 2)
"lparallel kernel to use for loading data in parallel")
(defparameter *myconn-host* "myhost")
(defparameter *myconn-user* "myuser")
(defparameter *myconn-pass* "mypass")
;;;
;;; TODO: define a top level API
;;;

200
pgsql.lisp Normal file
View File

@ -0,0 +1,200 @@
;;;
;;; Tools to handle PostgreSQL data format
;;;
(in-package :pgloader.pgsql)
;;;
;;; Quick utilities to get rid of later.
;;;
(defparameter *pgconn*
'("gdb" "none" "localhost" :port 5432)
"Connection string to the local database")
(defun get-connection-string (dbname)
(cons dbname *pgconn*))
;;;
;;; PostgreSQL Tools connecting to a database
;;;
(defun truncate-table (dbname table-name)
"Truncate given TABLE-NAME in database DBNAME"
(pomo:with-connection (get-connection-string dbname)
(pomo:execute (format nil "truncate ~a;" table-name))))
(defun list-databases (&optional (username "postgres"))
"Connect to a local database and get the database list"
(pomo:with-connection
(list "postgres" username "none" "localhost" :port 5432)
(loop for (dbname) in (pomo:query
"select datname
from pg_database
where datname !~ 'postgres|template'")
collect dbname)))
(defun list-tables (dbname)
"Return an alist of tables names and list of columns to pay attention to."
(pomo:with-connection
(get-connection-string dbname)
(loop for (relname colarray) in (pomo:query "
select relname, array_agg(case when typname in ('date', 'timestamptz')
then attnum end
order by attnum)
from pg_class c
join pg_namespace n on n.oid = c.relnamespace
left join pg_attribute a on c.oid = a.attrelid
join pg_type t on t.oid = a.atttypid
where c.relkind = 'r'
and attnum > 0
and n.nspname = 'public'
group by relname
")
collect (cons relname (loop
for attnum across colarray
unless (eq attnum :NULL)
collect attnum)))))
(defun get-date-columns (table-name pgsql-table-list)
"Given a PGSQL-TABLE-LIST as per function list-tables, return a list of
row numbers containing dates (those have to be reformated)"
(cdr (assoc table-name pgsql-table-list)))
;;;
;;; PostgreSQL formating tools
;;;
(defun fix-zero-date (datestr)
(cond
((null datestr) nil)
((string= datestr "") nil)
((string= datestr "0000-00-00") nil)
((string= datestr "0000-00-00 00:00:00") nil)
(t datestr)))
(defun reformat-null-value (value)
"cl-mysql returns nil for NULL and cl-postgres wants :NULL"
(if (null value) :NULL value))
(defun reformat-row (row &key date-columns)
"Reformat row as given by MySQL in a format compatible with cl-postgres"
(loop
for i from 1
for col in row
for no-zero-date-col = (if (member i date-columns)
(fix-zero-date col)
col)
collect (reformat-null-value no-zero-date-col)))
;;;
;;; Format row to PostgreSQL COPY format, the TEXT variant.
;;;
(defun format-row (stream row &key date-columns)
"Add a ROW in the STREAM, formating ROW in PostgreSQL COPY TEXT format.
See http://www.postgresql.org/docs/9.2/static/sql-copy.html#AEN66609 for
details about the format, and format specs."
(let* (*print-circle* *print-pretty*)
(loop
for i from 1
for (col . more?) on row
for preprocessed-col = (if (member i date-columns)
(fix-zero-date col)
col)
do (if (null preprocessed-col)
(format stream "~a~:[~;~c~]" "\\N" more? #\Tab)
(progn
;; From PostgreSQL docs:
;;
;; In particular, the following characters must be preceded
;; by a backslash if they appear as part of a column value:
;; backslash itself, newline, carriage return, and the
;; current delimiter character.
(loop
for char across preprocessed-col
do (case char
(#\\ (format stream "\\\\")) ; 2 chars here
(#\Space (princ #\Space stream))
(#\Newline (format stream "\\n")) ; 2 chars here
(#\Return (format stream "\\r")) ; 2 chars here
(#\Tab (format stream "\\t")) ; 2 chars here
(#\Backspace (format stream "\\b")) ; 2 chars here
(#\Page (format stream "\\f")) ; 2 chars here
(t (format stream "~c" char))))
(format stream "~:[~;~c~]" more? #\Tab))))
(format stream "~%")))
;;;
;;; Read a file format in PostgreSQL COPY TEXT format, and call given
;;; function on each line.
;;;
(defun map-rows (dbname table-name filename process-row-fn)
"Load data from a text file in PostgreSQL COPY TEXT format.
Each row is pre-processed then PROCESS-ROW-FN is called with the row as a
list as its only parameter.
Finally returns how many rows where read and processed."
(with-open-file
;; we just ignore files that don't exist
(input filename
:direction :input
:if-does-not-exist nil)
(when input
;; read in the text file, split it into columns, process NULL columns
;; the way postmodern expects them, and call PROCESS-ROW-FN on them
(loop
for line = (read-line input nil)
for row = (mapcar (lambda (x)
;; we want Postmodern compliant NULLs
(if (string= "\\N" x) :null x))
;; splitting is easy, it's always on #\Tab
;; see format-row-for-copy for details
(sq:split-sequence #\Tab line))
while line
counting line into count
do (funcall process-row-fn row)
finally (return count)))))
;;;
;;; Read a file in PostgreSQL COPY TEXT format and load it into a PostgreSQL
;;; table using the COPY protocol.
;;;
(defun copy-from-file (dbname table-name filename
&key
(truncate t))
"Load data from clean COPY TEXT file to PostgreSQL"
(when truncate (truncate-table dbname table-name))
(let* ((conspec (remove :port (get-connection-string dbname)))
(stream
(cl-postgres:open-db-writer conspec table-name nil)))
(unwind-protect
;; read csv in the file and push it directly through the db writer
;; in COPY streaming mode
(map-rows dbname table-name filename
(lambda (row)
(cl-postgres:db-write-row stream row)))
(cl-postgres:close-db-writer stream))))
;;;
;;; Pop data from a lparallel.queue queue instance, reformat it assuming
;;; data in there are from cl-mysql, and copy it to a PostgreSQL table.
;;;
(defun copy-from-queue (dbname table-name dataq
&key
(truncate t)
date-columns)
"Fetch data from the QUEUE until we see :end-of-data"
(when truncate (truncate-table dbname table-name))
(let* ((conspec (remove :port (get-connection-string dbname)))
(stream
(cl-postgres:open-db-writer conspec table-name nil)))
(unwind-protect
(pgloader.queue:map-pop-queue
dataq (lambda (row)
(cl-postgres:db-write-row
stream
(reformat-row row :date-columns date-columns))))
(cl-postgres:close-db-writer stream))))

36
queue.lisp Normal file
View File

@ -0,0 +1,36 @@
;;;
;;; Tools to handle internal queueing, using lparallel.queue
;;;
(defpackage #:pgloader.queue
(:use #:cl)
(:export #:map-pop-queue
#:map-push-queue))
;; no nickname for that package, queue is far too generic
(in-package :pgloader.queue)
(defun map-pop-queue (queue process-row-fn)
"Consume the whole of QUEUE, calling PROCESS-ROW-FN on each row. The QUEUE
must signal end of data by the element :end-of-data.
Returns how many rows where processed from the queue."
(loop
for row = (lq:pop-queue queue)
until (eq row :end-of-data)
counting row into count
do (funcall process-row-fn row)
finally (return count)))
(defun map-push-queue (queue map-row-fn &rest initial-args)
"Apply MAP-ROW-FN on INITIAL-ARGS and a function of ROW that will push the
row into the queue. When MAP-ROW-FN returns, push :end-of-data in the queue.
Returns whatever MAP-ROW-FN did return."
(prog1
(apply map-row-fn (append initial-args
(list
(lambda (row)
(lq:push-queue row queue)))))
(lq:push-queue :end-of-data queue)))

View File

@ -1,14 +1,8 @@
;;; facility to easily run the program
(ql:quickload :galaxya-loader)
(ql:quickload :pgloader)
(in-package :pgloader)
(in-package :galaxya-loader)
(let ((*pgcon* '("galaxya" "none" "localhost" :port 5432))
((csv-path-root* "/home/cyb/csv")))
;; when we're ready we do that
;; (load-all-databases))
;; meanwhile
(load-database-tables "weetix"))
(setq *myconn-host* "localhost"
*myconn-user* "debian-sys-maint"
*myconn-pass* "vtmMI04yBZlFprYm")

View File

@ -2,7 +2,7 @@
;;; Some little timing tools
;;;
(in-package :galaxya-loader)
(in-package :pgloader)
;;;
;;; Timing Macros

68
utils.lisp Normal file
View File

@ -0,0 +1,68 @@
;;;
;;; Random utilities
;;;
(defpackage #:pgloader.utils
(:use #:cl)
(:export #:report-header
#:report-table-name
#:report-results
#:report-footer
#:format-interval
#:timing))
(in-package :pgloader.utils)
;;;
;;; Timing Macro
;;;
(defun elapsed-time-since (start)
"Return how many seconds ticked between START and now"
(/ (- (get-internal-real-time) start)
internal-time-units-per-second))
(defmacro timing (&body forms)
"return both how much real time was spend in body and its result"
(let ((start (gensym))
(end (gensym))
(result (gensym)))
`(let* ((,start (get-internal-real-time))
(,result (progn ,@forms))
(,end (get-internal-real-time)))
(values ,result (/ (- ,end ,start) internal-time-units-per-second)))))
;;;
;;; Timing Formating
;;;
(defun format-interval (seconds &optional (stream t))
"Output the number of seconds in a human friendly way"
(multiple-value-bind (years months days hours mins secs millisecs)
(date:decode-interval (date:encode-interval :second seconds))
(format
stream
"~:[~*~;~d years ~]~:[~*~;~d months ~]~:[~*~;~d days ~]~:[~*~;~dh~]~:[~*~;~dm~]~d.~ds"
(< 0 years) years
(< 0 months) months
(< 0 days) days
(< 0 hours) hours
(< 0 mins) mins
secs (truncate millisecs))))
;;;
;;; Pretty print a report while doing bulk operations
;;;
(defun report-header ()
(format t "~&~30@a ~9@a ~9@a" "table name" "rows" "time")
(format t "~&------------------------------ --------- ---------"))
(defun report-table-name (table-name)
(format t "~&~30@a " table-name))
(defun report-results (rows seconds)
(format t "~9@a ~9@a" rows (format-interval seconds nil)))
(defun report-footer (legend rows seconds)
(format t "~&------------------------------ --------- ---------")
(format t "~&~30@a ~9@a ~9@a" legend
rows (format-interval seconds nil)))