From 623e2d4ff7b46b9c98224bed3ea62d4ba04cf389 Mon Sep 17 00:00:00 2001 From: Dimitri Fontaine Date: Thu, 7 Feb 2013 00:04:12 +0100 Subject: [PATCH] Massive Refactoring, towards pgloader. --- README.galaxya.md | 248 +++++++++++++++++++++++++++++++++++++ README.md | 295 ++++++++++---------------------------------- csv.lisp | 21 ++++ galaxya-export.lisp | 180 --------------------------- galaxya-loader.asd | 20 --- galaxya-loader.lisp | 268 ---------------------------------------- mysql.lisp | 261 +++++++++++++++++++++++++++++++++++++++ package.lisp | 36 ++++-- pgloader.asd | 32 +++++ pgloader.lisp | 20 +++ pgsql.lisp | 200 ++++++++++++++++++++++++++++++ queue.lisp | 36 ++++++ run.lisp | 16 +-- timing.lisp | 2 +- utils.lisp | 68 ++++++++++ 15 files changed, 988 insertions(+), 715 deletions(-) create mode 100644 README.galaxya.md create mode 100644 csv.lisp delete mode 100644 galaxya-export.lisp delete mode 100644 galaxya-loader.asd delete mode 100644 galaxya-loader.lisp create mode 100644 mysql.lisp create mode 100644 pgloader.asd create mode 100644 pgloader.lisp create mode 100644 pgsql.lisp create mode 100644 queue.lisp create mode 100644 utils.lisp diff --git a/README.galaxya.md b/README.galaxya.md new file mode 100644 index 0000000..6efdb68 --- /dev/null +++ b/README.galaxya.md @@ -0,0 +1,248 @@ +# Galaxya Loader + +Chargement de données CSV dans une base PostgreSQL. Les données sont +proprement encodée en UTF8 et seuls les champs dates doivent recevoir un +traitement particulier. + +Les dates `0000-00-00` deviennent `NULL`. Les dates `0000-00-00 00:00:00` +aussi. + +## Installation et dependances + +Programme Common Lisp utilisant [SBCL](http://sbcl.org/) et +[Quicklisp](http://www.quicklisp.org/beta/). + + apt-get install sbcl + wget http://beta.quicklisp.org/quicklisp.lisp + sbcl --load quicklisp.lisp + * (quicklisp-quickstart:install) + * (ql:add-to-init-file) + +Ensuite il faut récupérer les sources du projet dans +`~/quicklisp/local-projects/`, afin de pouvoir faire : + + sbcl + * (ql:quickload :galaxa-loader) + * (in-package :galaxya-loader) + * (stream-database-tables "weetix") + +Ou bien directement : + + * (load-all-databases) + +## Example d'usage + +Un chargement complet d'une base de données, en streaming directement depuis +MySQL vers PostgreSQL : + +~~~ + table name rows time +------------------------------ --------- --------- + ab_tests skip, unknown table in MySQL database + affiliations 1018 0m0s.440 + blacklist 352 0m0s.348 + blogtop 41451 0m1s.564 + ca_sms_boost 447 0m0s.520 + codes 3344498 4m8s.535 + codes_buffer 15 0m8s.638 + codes_distribues 366392 0m24s.908 + codes_distribues_demande 795 0m1s.3 + codes_temp 3 0m17s.195 + codes_vipplus 859 0m8s.65 + com_log 39703 0m2s.122 + com_partenaires 19 0m0s.360 + commandes_boutique 78717 0m2s.363 + commentaires 7009 0m0s.770 + communaute_amis 62948 0m1s.452 + communaute_blacklist 67 0m0s.619 + communaute_messagerie_id 4448 0m0s.606 +communaute_messagerie_messages 9855 0m0s.907 + communaute_no_show 9018 0m0s.437 + concours_weetix 773 0m0s.609 + coupons 71 0m0s.833 + css 163463 0m3s.724 + documents 21956 0m2s.148 + documents_lots 23118 0m0s.909 + documents_pays 31201 0m0s.627 + echange_votes 251 0m0s.436 + etransactions_id 1 0m0s.54 + gagnants 21457 0m1s.132 + gagnants_docs 321 0m0s.543 + gains_affiliations 17280 0m1s.494 + gains_concours 24728 0m1s.547 + gains_webmasters 581093 0m12s.612 + happy_hour 1 0m0s.501 + interco 120 0m0s.834 + ip 1603 0m1s.290 + litiges 172 0m1s.487 + livemessage 10719 0m1s.428 + log_actions 1116761 0m31s.882 + log_actions_vip 2912106 1m31s.668 + log_facebook_credits 475 0m0s.908 + log_flood_action 453 0m0s.438 + log_modifications 4450 0m0s.458 + log_mpme 34615 0m1s.890 + log_points 432229 0m4s.337 + log_sms 31766 0m1s.285 + log_vipplus 18603 0m0s.714 + log_votes 627462 0m12s.462 + log_votes_details 2873684 1m24s.824 + log_votes_manuels 4853 0m0s.530 + membres 26752 0m2s.990 + membres_referer 9628 0m0s.564 + membres_verification 13976 0m0s.573 + membres_vip 118192 0m8s.420 + membres_vip_interets 101571 0m0s.669 + muzictop 16686 0m1s.18 + newsletters_clients 387 0m2s.914 + numeros 133 0m0s.772 + oppositions 518 0m0s.356 + paiements 8231 0m0s.612 + peopletop 56295 0m1s.781 + plans 3 0m0s.401 + plans_reversements 114 0m0s.544 + prestataires 17 0m0s.315 + promo_boost 74946 0m1s.769 + reversements 21940 0m0s.744 + sav_questions 22 0m0s.573 + sav_themes 5 0m0s.407 + sav_vip_questions 40 0m0s.381 + sav_vip_themes 7 0m0s.319 + services 29 0m0s.284 + short_url 1820123 0m39s.355 + short_url_temp 97495 0m1s.174 + sms_boost 5552 0m1s.871 + sms_boost_log 1008752 0m44s.848 + startop 399326 0m7s.343 + temp_codes_annules 918 0m0s.342 + videotop 1352 0m0s.805 + vip_alertes_sms 124 0m0s.502 + vip_bonus 6816 0m0s.449 + vip_classements 44536 0m2s.592 + vip_classements_categories 17 0m0s.348 + vip_infos_classements 31 0m0s.500 + vip_profil 40384 0m1s.9 + vip_regularisation_codes 15535 0m0s.735 + vip_sollicitation 106368 0m1s.224 + vip_verification_tel 8116 0m0s.637 + webtop 21983 0m1s.195 + zarchives 16296 0m0s.797 + zarchives_blogtop 156067 0m6s.455 + zarchives_muzictop 210174 0m4s.553 + zarchives_peopletop 59026 0m2s.494 + zarchives_startop 360187 0m7s.223 + zarchives_videotop 6917 0m0s.787 + zarchives_webtop 146387 0m3s.196 +------------------------------ --------- --------- + Total streaming time 17905373 12m21s.296 +17905373 +741.296 +~~~ + +Et en passant par des exports sur fichier temporaire au format COPY TEXT de +PostgreSQL: + +~~~ +GALAXYA-LOADER> (load-database-tables-from-file "weetix") + table name rows time +------------------------------ --------- --------- + ab_tests skip, unknown table in MySQL database + affiliations 1018 0m0s.751 + blacklist 352 0m0s.416 + blogtop 41451 0m2s.745 + ca_sms_boost 447 0m0s.563 + codes 3344498 5m45s.511 + codes_buffer 15 0m0s.425 + codes_distribues 366392 0m32s.598 + codes_distribues_demande 795 0m0s.562 + codes_temp 3 0m0s.453 + codes_vipplus 859 0m0s.994 + com_log 39703 0m2s.388 + com_partenaires 19 0m0s.420 + commandes_boutique 78717 0m4s.919 + commentaires 7009 0m2s.881 + communaute_amis 62948 0m8s.724 + communaute_blacklist 67 0m1s.236 + communaute_messagerie_id 4448 0m1s.522 +communaute_messagerie_messages 9855 0m1s.614 + communaute_no_show 9018 0m0s.437 + concours_weetix 773 0m1s.305 + coupons 71 0m0s.525 + css 163463 0m5s.604 + documents 21956 0m3s.788 + documents_lots 23118 0m1s.333 + documents_pays 31201 0m0s.930 + echange_votes 251 0m0s.473 + etransactions_id 1 0m0s.99 + gagnants 21457 0m2s.85 + gagnants_docs 321 0m0s.489 + gains_affiliations 17280 0m1s.736 + gains_concours 24728 0m1s.889 + gains_webmasters 581093 0m20s.38 + happy_hour 1 0m0s.147 + interco 120 0m0s.457 + ip 1603 0m0s.787 + litiges 172 0m0s.750 + livemessage 10719 0m1s.632 + log_actions 1116761 1m23s.781 + log_actions_vip 2912106 3m48s.206 + log_facebook_credits 475 0m1s.178 + log_flood_action 453 0m0s.434 + log_modifications 4450 0m0s.952 + log_mpme 34615 0m2s.859 + log_points 432229 0m10s.778 + log_sms 31766 0m2s.666 + log_vipplus 18603 0m1s.145 + log_votes 627462 0m25s.411 + log_votes_details 2873684 2m54s.984 + log_votes_manuels 4853 0m0s.600 + membres 26752 0m6s.852 + membres_referer 9628 0m0s.832 + membres_verification 13976 0m0s.815 + membres_vip 118192 0m18s.340 + membres_vip_interets 101571 0m3s.513 + muzictop 16686 0m1s.367 + newsletters_clients 387 0m0s.941 + numeros 133 0m0s.832 + oppositions 518 0m0s.364 + paiements 8231 0m0s.984 + peopletop 56295 0m3s.730 + plans 3 0m0s.375 + plans_reversements 114 0m0s.590 + prestataires 17 0m0s.334 + promo_boost 74946 0m3s.559 + reversements 21940 0m1s.25 + sav_questions 22 0m0s.541 + sav_themes 5 0m0s.346 + sav_vip_questions 40 0m0s.382 + sav_vip_themes 7 0m0s.360 + services 29 0m0s.317 + short_url 1820123 0m55s.47 + short_url_temp 97495 0m4s.735 + sms_boost 5552 0m1s.699 + sms_boost_log 1008752 1m30s.472 + startop 399326 0m16s.981 + temp_codes_annules 918 0m0s.532 + videotop 1352 0m0s.822 + vip_alertes_sms 124 0m0s.577 + vip_bonus 6816 0m0s.525 + vip_classements 44536 0m6s.669 + vip_classements_categories 17 0m1s.2 + vip_infos_classements 31 0m0s.658 + vip_profil 40384 0m2s.545 + vip_regularisation_codes 15535 0m0s.809 + vip_sollicitation 106368 0m2s.858 + vip_verification_tel 8116 0m0s.791 + webtop 21983 0m1s.766 + zarchives 16296 0m1s.26 + zarchives_blogtop 156067 0m7s.121 + zarchives_muzictop 210174 0m8s.436 + zarchives_peopletop 59026 0m3s.53 + zarchives_startop 360187 0m13s.473 + zarchives_videotop 6917 0m1s.187 + zarchives_webtop 146387 0m7s.484 +------------------------------ --------- --------- + Total export+import time 17905373 21m2s.887 +17905373 +1262.887 +~~~ diff --git a/README.md b/README.md index 6efdb68..90e2407 100644 --- a/README.md +++ b/README.md @@ -1,16 +1,25 @@ -# Galaxya Loader +# PGLoader -Chargement de données CSV dans une base PostgreSQL. Les données sont -proprement encodée en UTF8 et seuls les champs dates doivent recevoir un -traitement particulier. +pgloader is a data loading tool for PostgreSQL, using the `COPY` command. -Les dates `0000-00-00` deviennent `NULL`. Les dates `0000-00-00 00:00:00` -aussi. +Its main avantage over just using `COPY` or `\copy` and over using a +*Foreign Data Wrapper* is the transaction behaviour, where *pgloader* will +keep a separate file of rejected data and continue trying to `copy` good +data in your database. -## Installation et dependances +The default PostgreSQL behaviour is transactional, which means that any +erroneous line in the input data (file or remote database) will stop the +bulk load for the whole table. -Programme Common Lisp utilisant [SBCL](http://sbcl.org/) et -[Quicklisp](http://www.quicklisp.org/beta/). +pgloader also implements data reformating, the main example of that being a +transformation from MySQL dates `0000-00-00` and `0000-00-00 00:00:00` to +PostgreSQL `NULL` value (because our calendar never had a *year zero*). + +## INSTALL + +pgloader is now a Common Lisp program, tested using the +[SBCL](http://sbcl.org/) and [CCL](http://ccl.clozure.com/) implementation +with [Quicklisp](http://www.quicklisp.org/beta/). apt-get install sbcl wget http://beta.quicklisp.org/quicklisp.lisp @@ -18,231 +27,63 @@ Programme Common Lisp utilisant [SBCL](http://sbcl.org/) et * (quicklisp-quickstart:install) * (ql:add-to-init-file) -Ensuite il faut récupérer les sources du projet dans -`~/quicklisp/local-projects/`, afin de pouvoir faire : +Now fetch pgloader sources into `~/quicklisp/local-projects/` so that you +can do: sbcl - * (ql:quickload :galaxa-loader) - * (in-package :galaxya-loader) + * (ql:quickload :pgloader) + * (in-package :pgloader) * (stream-database-tables "weetix") -Ou bien directement : +## Usage - * (load-all-databases) +## TODO -## Example d'usage +Some notes about what I intend to be working on next. -Un chargement complet d'une base de données, en streaming directement depuis -MySQL vers PostgreSQL : +### internals & refactoring -~~~ - table name rows time ------------------------------- --------- --------- - ab_tests skip, unknown table in MySQL database - affiliations 1018 0m0s.440 - blacklist 352 0m0s.348 - blogtop 41451 0m1s.564 - ca_sms_boost 447 0m0s.520 - codes 3344498 4m8s.535 - codes_buffer 15 0m8s.638 - codes_distribues 366392 0m24s.908 - codes_distribues_demande 795 0m1s.3 - codes_temp 3 0m17s.195 - codes_vipplus 859 0m8s.65 - com_log 39703 0m2s.122 - com_partenaires 19 0m0s.360 - commandes_boutique 78717 0m2s.363 - commentaires 7009 0m0s.770 - communaute_amis 62948 0m1s.452 - communaute_blacklist 67 0m0s.619 - communaute_messagerie_id 4448 0m0s.606 -communaute_messagerie_messages 9855 0m0s.907 - communaute_no_show 9018 0m0s.437 - concours_weetix 773 0m0s.609 - coupons 71 0m0s.833 - css 163463 0m3s.724 - documents 21956 0m2s.148 - documents_lots 23118 0m0s.909 - documents_pays 31201 0m0s.627 - echange_votes 251 0m0s.436 - etransactions_id 1 0m0s.54 - gagnants 21457 0m1s.132 - gagnants_docs 321 0m0s.543 - gains_affiliations 17280 0m1s.494 - gains_concours 24728 0m1s.547 - gains_webmasters 581093 0m12s.612 - happy_hour 1 0m0s.501 - interco 120 0m0s.834 - ip 1603 0m1s.290 - litiges 172 0m1s.487 - livemessage 10719 0m1s.428 - log_actions 1116761 0m31s.882 - log_actions_vip 2912106 1m31s.668 - log_facebook_credits 475 0m0s.908 - log_flood_action 453 0m0s.438 - log_modifications 4450 0m0s.458 - log_mpme 34615 0m1s.890 - log_points 432229 0m4s.337 - log_sms 31766 0m1s.285 - log_vipplus 18603 0m0s.714 - log_votes 627462 0m12s.462 - log_votes_details 2873684 1m24s.824 - log_votes_manuels 4853 0m0s.530 - membres 26752 0m2s.990 - membres_referer 9628 0m0s.564 - membres_verification 13976 0m0s.573 - membres_vip 118192 0m8s.420 - membres_vip_interets 101571 0m0s.669 - muzictop 16686 0m1s.18 - newsletters_clients 387 0m2s.914 - numeros 133 0m0s.772 - oppositions 518 0m0s.356 - paiements 8231 0m0s.612 - peopletop 56295 0m1s.781 - plans 3 0m0s.401 - plans_reversements 114 0m0s.544 - prestataires 17 0m0s.315 - promo_boost 74946 0m1s.769 - reversements 21940 0m0s.744 - sav_questions 22 0m0s.573 - sav_themes 5 0m0s.407 - sav_vip_questions 40 0m0s.381 - sav_vip_themes 7 0m0s.319 - services 29 0m0s.284 - short_url 1820123 0m39s.355 - short_url_temp 97495 0m1s.174 - sms_boost 5552 0m1s.871 - sms_boost_log 1008752 0m44s.848 - startop 399326 0m7s.343 - temp_codes_annules 918 0m0s.342 - videotop 1352 0m0s.805 - vip_alertes_sms 124 0m0s.502 - vip_bonus 6816 0m0s.449 - vip_classements 44536 0m2s.592 - vip_classements_categories 17 0m0s.348 - vip_infos_classements 31 0m0s.500 - vip_profil 40384 0m1s.9 - vip_regularisation_codes 15535 0m0s.735 - vip_sollicitation 106368 0m1s.224 - vip_verification_tel 8116 0m0s.637 - webtop 21983 0m1s.195 - zarchives 16296 0m0s.797 - zarchives_blogtop 156067 0m6s.455 - zarchives_muzictop 210174 0m4s.553 - zarchives_peopletop 59026 0m2s.494 - zarchives_startop 360187 0m7s.223 - zarchives_videotop 6917 0m0s.787 - zarchives_webtop 146387 0m3s.196 ------------------------------- --------- --------- - Total streaming time 17905373 12m21s.296 -17905373 -741.296 -~~~ + - review pgloader.pgsql:reformat-row date-columns arguments + - review connection string handling for both PostgreSQL and MySQL + - provide a better toplevel API + - implement tests -Et en passant par des exports sur fichier temporaire au format COPY TEXT de -PostgreSQL: +### user features -~~~ -GALAXYA-LOADER> (load-database-tables-from-file "weetix") - table name rows time ------------------------------- --------- --------- - ab_tests skip, unknown table in MySQL database - affiliations 1018 0m0s.751 - blacklist 352 0m0s.416 - blogtop 41451 0m2s.745 - ca_sms_boost 447 0m0s.563 - codes 3344498 5m45s.511 - codes_buffer 15 0m0s.425 - codes_distribues 366392 0m32s.598 - codes_distribues_demande 795 0m0s.562 - codes_temp 3 0m0s.453 - codes_vipplus 859 0m0s.994 - com_log 39703 0m2s.388 - com_partenaires 19 0m0s.420 - commandes_boutique 78717 0m4s.919 - commentaires 7009 0m2s.881 - communaute_amis 62948 0m8s.724 - communaute_blacklist 67 0m1s.236 - communaute_messagerie_id 4448 0m1s.522 -communaute_messagerie_messages 9855 0m1s.614 - communaute_no_show 9018 0m0s.437 - concours_weetix 773 0m1s.305 - coupons 71 0m0s.525 - css 163463 0m5s.604 - documents 21956 0m3s.788 - documents_lots 23118 0m1s.333 - documents_pays 31201 0m0s.930 - echange_votes 251 0m0s.473 - etransactions_id 1 0m0s.99 - gagnants 21457 0m2s.85 - gagnants_docs 321 0m0s.489 - gains_affiliations 17280 0m1s.736 - gains_concours 24728 0m1s.889 - gains_webmasters 581093 0m20s.38 - happy_hour 1 0m0s.147 - interco 120 0m0s.457 - ip 1603 0m0s.787 - litiges 172 0m0s.750 - livemessage 10719 0m1s.632 - log_actions 1116761 1m23s.781 - log_actions_vip 2912106 3m48s.206 - log_facebook_credits 475 0m1s.178 - log_flood_action 453 0m0s.434 - log_modifications 4450 0m0s.952 - log_mpme 34615 0m2s.859 - log_points 432229 0m10s.778 - log_sms 31766 0m2s.666 - log_vipplus 18603 0m1s.145 - log_votes 627462 0m25s.411 - log_votes_details 2873684 2m54s.984 - log_votes_manuels 4853 0m0s.600 - membres 26752 0m6s.852 - membres_referer 9628 0m0s.832 - membres_verification 13976 0m0s.815 - membres_vip 118192 0m18s.340 - membres_vip_interets 101571 0m3s.513 - muzictop 16686 0m1s.367 - newsletters_clients 387 0m0s.941 - numeros 133 0m0s.832 - oppositions 518 0m0s.364 - paiements 8231 0m0s.984 - peopletop 56295 0m3s.730 - plans 3 0m0s.375 - plans_reversements 114 0m0s.590 - prestataires 17 0m0s.334 - promo_boost 74946 0m3s.559 - reversements 21940 0m1s.25 - sav_questions 22 0m0s.541 - sav_themes 5 0m0s.346 - sav_vip_questions 40 0m0s.382 - sav_vip_themes 7 0m0s.360 - services 29 0m0s.317 - short_url 1820123 0m55s.47 - short_url_temp 97495 0m4s.735 - sms_boost 5552 0m1s.699 - sms_boost_log 1008752 1m30s.472 - startop 399326 0m16s.981 - temp_codes_annules 918 0m0s.532 - videotop 1352 0m0s.822 - vip_alertes_sms 124 0m0s.577 - vip_bonus 6816 0m0s.525 - vip_classements 44536 0m6s.669 - vip_classements_categories 17 0m1s.2 - vip_infos_classements 31 0m0s.658 - vip_profil 40384 0m2s.545 - vip_regularisation_codes 15535 0m0s.809 - vip_sollicitation 106368 0m2s.858 - vip_verification_tel 8116 0m0s.791 - webtop 21983 0m1s.766 - zarchives 16296 0m1s.26 - zarchives_blogtop 156067 0m7s.121 - zarchives_muzictop 210174 0m8s.436 - zarchives_peopletop 59026 0m3s.53 - zarchives_startop 360187 0m13s.473 - zarchives_videotop 6917 0m1s.187 - zarchives_webtop 146387 0m7s.484 ------------------------------- --------- --------- - Total export+import time 17905373 21m2s.887 -17905373 -1262.887 -~~~ + - commands: `LOAD` and `INI` formats + - compat with `SQL*Loader` format + +#### data loading + + - dichotomy, that needs a local buffer + - general CSV and Text source formats + - compressed input (gzip, other algos) + - fetch data from S3 + +#### convenience + + - automatic creation of schema (from MySQL schema, or from CSV header) + - pre-fetch some rows to guesstimate data types? + +#### performances + + - some more parallelizing options + - support for partitionning in pgloader itself + +#### reformating + +Data reformating is now going to have to happen in Common Lisp mostly, maybe +offer some other languages (cl-awk etc). + + - raw reformating, before rows are split + - per column reformating + - user-defined columns (constants, functions of other rows) + +#### UI + + - add a web controler with pretty monitoring + - launch new jobs from the web controler + +#### crazy ideas + + - MySQL replication, reading from the binlog directly diff --git a/csv.lisp b/csv.lisp new file mode 100644 index 0000000..14d9042 --- /dev/null +++ b/csv.lisp @@ -0,0 +1,21 @@ +;;; +;;; Tools to handle MySQL data fetching +;;; + +(defpackage #:pgloader.csv + (:use #:cl) + (:export #:*csv-path-root* + #:get-pathname)) + +(in-package :pgloader.csv) + +(defparameter *csv-path-root* + (merge-pathnames "csv/" (user-homedir-pathname))) + +(defun get-pathname (dbname table-name) + "Return a pathname where to read or write the file data" + (make-pathname + :directory (pathname-directory + (merge-pathnames (format nil "~a/" dbname) *csv-path-root*)) + :name table-name + :type "csv")) diff --git a/galaxya-export.lisp b/galaxya-export.lisp deleted file mode 100644 index 6a372e3..0000000 --- a/galaxya-export.lisp +++ /dev/null @@ -1,180 +0,0 @@ -(in-package :galaxya-loader) - -;;; -;;; Reformating Tools, because MySQL has not the same idea about its data -;;; than PostgreSQL has. Ever heard of year 0000? MySQL did... -;;; -(defun mysql-fix-date (datestr) - (cond - ((null datestr) nil) - ((string= datestr "") nil) - ((string= datestr "0000-00-00") nil) - ((string= datestr "0000-00-00 00:00:00") nil) - (t datestr))) - -(defun pgsql-reformat-null-value (value) - "cl-mysql returns nil for NULL and cl-postgres wants :NULL" - (if (null value) :NULL value)) - -(defun pgsql-reformat-row (row &key date-columns) - "Reformat row as given by MySQL in a format compatible with cl-postgres" - (loop - for i from 1 - for col in row - for no-zero-date-col = (if (member i date-columns) - (mysql-fix-date col) - col) - collect (pgsql-reformat-null-value no-zero-date-col))) - -;;; -;;; Implement PostgreSQL COPY format, the TEXT variant. -;;; -(defun pgsql-text-copy-format (stream row &key date-columns) - "Add a csv row in the stream" - (let* (*print-circle* *print-pretty*) - (loop - for i from 1 - for (col . more?) on row - for preprocessed-col = (if (member i date-columns) - (mysql-fix-date col) - col) - do (if (null preprocessed-col) - (format stream "~a~:[~;~c~]" "\\N" more? #\Tab) - (progn - ;; In particular, the following characters must be preceded - ;; by a backslash if they appear as part of a column value: - ;; backslash itself, newline, carriage return, and the - ;; current delimiter character. - (loop - for char across preprocessed-col - do (case char - (#\\ (format stream "\\\\")) ; 2 chars here - (#\Space (princ #\Space stream)) - (#\Newline (format stream "\\n")) ; 2 chars here - (#\Return (format stream "\\r")) ; 2 chars here - (#\Tab (format stream "\\t")) ; 2 chars here - (#\Backspace (format stream "\\b")) ; 2 chars here - (#\Page (format stream "\\f")) ; 2 chars here - (t (format stream "~c" char)))) - (format stream "~:[~;~c~]" more? #\Tab)))) - (format stream "~%"))) - -;;; -;;; Map a function to each row extracted from MySQL -;;; -(defun mysql-map-rows (dbname table-name process-row-fn - &key - (host *myconn-host*) - (user *myconn-user*) - (pass *myconn-pass*)) - "Extract MySQL data and call PROCESS-ROW-FN function with a single - argument (a list of column values) for each row." - (cl-mysql:connect :host host :user user :password pass) - - (unwind-protect - (progn - ;; Ensure we're talking utf-8 and connect to DBNAME in MySQL - (cl-mysql:query "SET NAMES 'utf8'") - (cl-mysql:query "SET character_set_results = utf8;") - (cl-mysql:use dbname) - - (let* ((sql (format nil "SELECT * FROM ~a;" table-name)) - (q (cl-mysql:query sql :store nil)) - (rs (cl-mysql:next-result-set q))) - (declare (ignore rs)) - - ;; Now fetch MySQL rows directly in the stream - (loop - for row = (cl-mysql:next-row q :type-map (make-hash-table)) - while row - counting row into count - do (funcall process-row-fn row) - finally (return count)))) - - ;; free resources - (cl-mysql:disconnect))) - -;;; -;;; Use mysql-map-rows and pgsql-text-copy-format to fill in a CSV file on -;;; disk with MySQL data in there. -;;; -(defun mysql-copy-text-format (dbname table-name filename - &key - date-columns - (host *myconn-host*) - (user *myconn-user*) - (pass *myconn-pass*)) - "Extrat data from MySQL in PostgreSQL COPY TEXT format" - (with-open-file (text-file filename - :direction :output - :if-exists :supersede - :external-format :utf8) - (mysql-map-rows dbname table-name - (lambda (row) - (pgsql-text-copy-format - text-file - row - :date-columns date-columns)) - :host host - :user user - :pass pass))) - -;;; -;;; Some MySQL other tools -;;; -(defun list-tables-in-mysql-db (dbname - &key - (host *myconn-host*) - (user *myconn-user*) - (pass *myconn-pass*)) - "As the name says" - (cl-mysql:connect :host host :user user :password pass) - - (unwind-protect - (progn - (cl-mysql:use dbname) - ;; that returns a pretty weird format, process it - (mapcan #'identity (caar (cl-mysql:list-tables)))) - ;; free resources - (cl-mysql:disconnect))) - - -(defun mysql-export-all-tables (dbname - &key - only-tables - (host *myconn-host*) - (user *myconn-user*) - (pass *myconn-pass*)) - "Export MySQL tables into as many TEXT files, in the PostgreSQL COPY format" - (let ((pgsql-date-columns-alist (get-table-list dbname)) - (total-count 0) - (total-seconds 0)) - (format t "~&~30@a ~9@a ~9@a" "table name" "rows" "time") - (format t "~&------------------------------ --------- ---------") - (loop - for table-name in (list-tables-in-mysql-db dbname - :host host - :user user - :pass pass) - for filename = (get-csv-pathname dbname table-name) - when (or (null only-tables) - (member table-name only-tables :test #'equal)) - do - (format t "~&~30@a " table-name) - (multiple-value-bind (result seconds) - (timing - (mysql-copy-text-format - dbname - table-name - (get-csv-pathname dbname table-name) - :date-columns (cdr (assoc table-name pgsql-date-columns-alist)))) - (when result - (incf total-count result) - (incf total-seconds seconds) - (format t "~9@a ~9@a" - result (format-interval seconds nil)))) - finally - (format t "~&------------------------------ --------- ---------") - (format t "~&~30@a ~9@a ~9@a" "Total export time" - total-count (format-interval total-seconds nil)) - (return (values total-count (float total-seconds)))))) diff --git a/galaxya-loader.asd b/galaxya-loader.asd deleted file mode 100644 index 0688ad8..0000000 --- a/galaxya-loader.asd +++ /dev/null @@ -1,20 +0,0 @@ -;;;; galaxya-loader.asd - -(asdf:defsystem #:galaxya-loader - :serial t - :description "Export data from MySQL and load it into PostgreSQL" - :author "Dimitri Fontaine " - :license "The PostgreSQL Licence" - :depends-on (#:postmodern - #:cl-postgres - #:simple-date - #:cl-mysql - #:split-sequence - #:cl-csv - #:lparallel) - :components ((:file "package") - (:file "timing" :depends-on ("package")) - (:file "galaxya-export" :depends-on ("package")) - (:file "galaxya-loader" :depends-on ("package" - "galaxya-export")))) - diff --git a/galaxya-loader.lisp b/galaxya-loader.lisp deleted file mode 100644 index 5504531..0000000 --- a/galaxya-loader.lisp +++ /dev/null @@ -1,268 +0,0 @@ -;;;; galaxya-loader.lisp - -(in-package #:galaxya-loader) - -;;; -;;; Parameters you might want to change -;;; -(defparameter *loader-kernel* (lp:make-kernel 2) - "lparallel kernel to use for loading data in parallel") - -(defparameter *myconn-host* "gala3.galaxya.fr") -(defparameter *myconn-user* "pg1") -(defparameter *myconn-pass* "AFmhKERxD9PVjgQD") - -(setq *myconn-host* "localhost" - *myconn-user* "debian-sys-maint" - *myconn-pass* "vtmMI04yBZlFprYm") - -(defparameter *pgconn* - '("gdb" "none" "localhost" :port 5432) - "Connection string to the local database") - -(defparameter *csv-path-root* - (merge-pathnames "csv/" (user-homedir-pathname))) - -(defun get-csv-pathname (dbname table-name) - "return where to find the file" - (make-pathname - :directory (pathname-directory - (merge-pathnames (format nil "~a/" dbname) *csv-path-root*)) - :name table-name - :type "csv")) - -;;; -;;; PostgreSQL Utilities -;;; -(defun get-connection-string (dbname) - (cons dbname *pgconn*)) - -(defun get-database-list () - "connect to a local database and get the database list" - (pomo:with-connection - (get-connection-string "postgres") - (loop for (dbname) in (pomo:query - "select datname - from pg_database - where datname !~ 'postgres|template'") - collect dbname))) - -(defun get-table-list (dbname) - "Return an alist of tables names and list of columns to pay attention to." - (pomo:with-connection - (get-connection-string dbname) - - (loop for (relname colarray) in (pomo:query " -select relname, array_agg(case when typname in ('date', 'timestamptz') - then attnum end - order by attnum) - from pg_class c - join pg_namespace n on n.oid = c.relnamespace - left join pg_attribute a on c.oid = a.attrelid - join pg_type t on t.oid = a.atttypid - where c.relkind = 'r' - and attnum > 0 - and n.nspname = 'public' - group by relname -") - collect (cons relname (loop - for attnum across colarray - unless (eq attnum :NULL) - collect attnum))))) - -(defun pgsql-truncate-table (dbname table-name) - "Truncate given TABLE-NAME in database DBNAME" - (pomo:with-connection (get-connection-string dbname) - (pomo:execute (format nil "truncate ~a;" table-name)))) - -(defun load-data-in-pgsql-text-format (dbname table-name filename - &key - (truncate t)) - "Load data from clean CSV file to PostgreSQL" - (with-open-file - ;; we just ignore files that don't exist - (input filename - :direction :input - :if-does-not-exist nil) - (when input - (when truncate - (pgsql-truncate-table dbname table-name)) - - ;; read csv in the file and push it directly through the db writer - ;; in COPY streaming mode - (let* ((conspec (remove :port (get-connection-string dbname))) - (stream - (cl-postgres:open-db-writer conspec table-name nil))) - - (unwind-protect - (loop - for line = (read-line input nil) - for row = (mapcar (lambda (x) - (if (string= "\\N" x) :null x)) - (sq:split-sequence #\Tab line)) - while line - counting line into count - do (cl-postgres:db-write-row stream row) - finally (return count)) - (cl-postgres:close-db-writer stream)))))) - -;;; -;;; Export data from MySQL as a COPY TEXT file then import that file into -;;; the destination PostgreSQL table. -;;; -(defun load-single-table-using-file (dbname table-name - &key - (truncate t) - date-columns) - "Load a single table: export data from MySQL to CSV file then load that in PG" - (let ((filename (get-csv-pathname dbname table-name))) - (mysql-copy-text-format dbname table-name - filename - :date-columns date-columns) - (load-data-in-pgsql-text-format dbname table-name filename - :truncate truncate))) - -;;; -;;; Let's go parallel, with a queue to communicate data -;;; -(defun load-data-from-queue-to-pgsql (dbname table-name dataq - &key - (truncate t) - date-columns) - "Fetch data from the QUEUE until we see :end-of-data" - (when truncate (pgsql-truncate-table dbname table-name)) - - (let* ((conspec (remove :port (get-connection-string dbname))) - (stream - (cl-postgres:open-db-writer conspec table-name nil))) - (unwind-protect - (loop - for row = (lq:pop-queue dataq) - until (eq row :end-of-data) - counting row into count - do (let ((pgrow - (pgsql-reformat-row row :date-columns date-columns))) - (cl-postgres:db-write-row stream pgrow)) - finally (return (list :pgsql count))) - (cl-postgres:close-db-writer stream)))) - -;;; -;;; Direct "stream" in between mysql fetching of results and PostgreSQL COPY -;;; protocol -;;; -(defun stream-mysql-table-in-pgsql (dbname table-name - &key - truncate - date-columns) - "Connect in parallel to MySQL and PostgreSQL and stream the data." - (let* ((lp:*kernel* *loader-kernel*) - (channel (lp:make-channel)) - (dataq (lq:make-queue 4096))) - ;; have a task fill MySQL data in the queue - (lp:submit-task channel - (lambda () - (prog1 - (list :mysql - (mysql-map-rows - dbname table-name - (lambda (row) - (lq:push-queue row dataq))))) - (lq:push-queue :end-of-data dataq))) - - ;; and start another task to push that data from the queue to PostgreSQL - (lp:submit-task channel - (lambda () - (load-data-from-queue-to-pgsql dbname table-name dataq - :truncate truncate - :date-columns date-columns))) - - ;; now wait until both the tasks are over - (loop - for tasks below 2 - collect (lp:receive-result channel) into counts - finally (return (cadr (assoc :pgsql counts)))))) - -;;; -;;; Work on all tables for given database -;;; -(defun stream-database-tables (dbname &key (truncate t) only-tables) - "Export MySQL data and Import it into PostgreSQL" - ;; get the list of tables and have at it - (let ((mysql-tables (list-tables-in-mysql-db dbname)) - (total-count 0) - (total-seconds 0)) - (format t "~&~30@a ~9@a ~9@a" "table name" "rows" "time") - (format t "~&------------------------------ --------- ---------") - (loop - for (table-name . date-columns) in (get-table-list dbname) - when (or (null only-tables) - (member table-name only-tables :test #'equal)) - do - (format t "~&~30@a " table-name) - - (if (member table-name mysql-tables :test #'equal) - (multiple-value-bind (result seconds) - (timing - (stream-mysql-table-in-pgsql dbname table-name - :truncate truncate - :date-columns date-columns)) - (when result - (incf total-count result) - (incf total-seconds seconds) - (format t "~9@a ~9@a" - result (format-interval seconds nil)))) - ;; not a known mysql table - (format t "skip, unknown table in MySQL database~%")) - finally - (format t "~&------------------------------ --------- ---------") - (format t "~&~30@a ~9@a ~9@a" "Total streaming time" - total-count (format-interval total-seconds nil)) - (return (values total-count (float total-seconds)))))) - -(defun load-database-tables-from-file (dbname &key (truncate t) only-tables) - "Export MySQL data and Import it into PostgreSQL" - ;; get the list of tables and have at it - (let ((mysql-tables (list-tables-in-mysql-db dbname)) - (total-count 0) - (total-seconds 0)) - (format t "~&~30@a ~9@a ~9@a" "table name" "rows" "time") - (format t "~&------------------------------ --------- ---------") - (loop - for (table-name . date-columns) in (get-table-list dbname) - when (or (null only-tables) - (member table-name only-tables :test #'equal)) - do - (format t "~&~30@a " table-name) - - (if (member table-name mysql-tables :test #'equal) - (multiple-value-bind (result seconds) - (timing - (load-single-table-using-file dbname table-name - :truncate truncate - :date-columns date-columns)) - (when result - (incf total-count result) - (incf total-seconds seconds) - (format t "~9@a ~9@a" - result (format-interval seconds nil)))) - ;; not a known mysql table - (format t " skip, unknown table in MySQL database~%")) - finally - (format t "~&------------------------------ --------- ---------") - (format t "~&~30@a ~9@a ~9@a" "Total export+import time" - total-count (format-interval total-seconds nil)) - (return (values total-count (float total-seconds)))))) - -(defun load-all-databases () - (pomo:with-connection - (get-connection-string "postgres") - - ;; get the list of databases and have at it - (loop - for dbname in (get-database-list) - do - (format t "~&DATABASE: ~a ..." dbname) - (multiple-value-bind (result seconds) - (timing - (load-database-tables dbname)) - (format t " ~d rows in ~f secs~%" result seconds))))) diff --git a/mysql.lisp b/mysql.lisp new file mode 100644 index 0000000..1559660 --- /dev/null +++ b/mysql.lisp @@ -0,0 +1,261 @@ +;;; +;;; Tools to handle MySQL data fetching +;;; + +(defpackage #:pgloader.mysql + (:use #:cl) + (:import-from #:pgloader + #:*loader-kernel* + #:*myconn-host* + #:*myconn-user* + #:*myconn-pass*) + (:export #:map-rows + #:copy-from + #:list-databases + #:list-tables + #:export-all-tables + #:export-import-database + #:stream-mysql-table-in-pgsql + #:stream-database-tables)) + +(in-package :pgloader.mysql) + +;;; +;;; MySQL tools connecting to a database +;;; +(defun list-databases (&key + (host *myconn-host*) + (user *myconn-user*) + (pass *myconn-pass*)) + "Connect to a local database and get the database list" + (cl-mysql:connect :host host :user user :password pass) + (unwind-protect + (mapcan #'identity (caar (cl-mysql:query "show databases"))) + (cl-mysql:disconnect))) + +(defun list-tables (dbname + &key + (host *myconn-host*) + (user *myconn-user*) + (pass *myconn-pass*)) + "Return a flat list of all the tables names known in given DATABASE" + (cl-mysql:connect :host host :user user :password pass) + + (unwind-protect + (progn + (cl-mysql:use dbname) + ;; that returns a pretty weird format, process it + (mapcan #'identity (caar (cl-mysql:list-tables)))) + ;; free resources + (cl-mysql:disconnect))) + +;;; +;;; Map a function to each row extracted from MySQL +;;; +(defun map-rows (dbname table-name process-row-fn + &key + (host *myconn-host*) + (user *myconn-user*) + (pass *myconn-pass*)) + "Extract MySQL data and call PROCESS-ROW-FN function with a single + argument (a list of column values) for each row." + (cl-mysql:connect :host host :user user :password pass) + + (unwind-protect + (progn + ;; Ensure we're talking utf-8 and connect to DBNAME in MySQL + (cl-mysql:query "SET NAMES 'utf8'") + (cl-mysql:query "SET character_set_results = utf8;") + (cl-mysql:use dbname) + + (let* ((sql (format nil "SELECT * FROM ~a;" table-name)) + (q (cl-mysql:query sql :store nil)) + (rs (cl-mysql:next-result-set q))) + (declare (ignore rs)) + + ;; Now fetch MySQL rows directly in the stream + (loop + for row = (cl-mysql:next-row q :type-map (make-hash-table)) + while row + counting row into count + do (funcall process-row-fn row) + finally (return count)))) + + ;; free resources + (cl-mysql:disconnect))) + +;;; +;;; Use mysql-map-rows and pgsql-text-copy-format to fill in a CSV file on +;;; disk with MySQL data in there. +;;; +(defun copy-from (dbname table-name filename + &key + date-columns + (host *myconn-host*) + (user *myconn-user*) + (pass *myconn-pass*)) + "Extrat data from MySQL in PostgreSQL COPY TEXT format" + (with-open-file (text-file filename + :direction :output + :if-exists :supersede + :external-format :utf8) + (map-rows dbname table-name + (lambda (row) + (pgloader.pgsql:format-row text-file + row + :date-columns date-columns)) + :host host + :user user + :pass pass))) + +;;; +;;; MySQL bulk export to file, in PostgreSQL COPY TEXT format +;;; +(defun export-all-tables (dbname + &key + only-tables + (host *myconn-host*) + (user *myconn-user*) + (pass *myconn-pass*)) + "Export MySQL tables into as many TEXT files, in the PostgreSQL COPY format" + (let ((pgtables (pgloader.pgsql:list-tables dbname)) + (total-count 0) + (total-seconds 0)) + (pgloader.utils:report-header) + (loop + for table-name in (list-tables dbname + :host host + :user user + :pass pass) + for filename = (pgloader.csv:get-pathname dbname table-name) + when (or (null only-tables) + (member table-name only-tables :test #'equal)) + do + (pgloader.utils:report-table-name table-name) + (multiple-value-bind (result seconds) + (pgloader.utils:timing + (copy-from dbname table-name filename + :date-columns (pgloader.pgsql:get-date-columns + table-name pgtables))) + (when result + (incf total-count result) + (incf total-seconds seconds) + (pgloader.utils:report-results result seconds))) + finally + (pgloader.utils:report-footer "Total export time" + total-count total-seconds) + (return (values total-count (float total-seconds)))))) + + +;;; +;;; Copy data for a target database from files in the PostgreSQL COPY TEXT +;;; format +;;; +(defun export-import-database (dbname + &key + (truncate t) + only-tables) + "Export MySQL data and Import it into PostgreSQL" + ;; get the list of tables and have at it + (let ((mysql-tables (list-tables dbname)) + (total-count 0) + (total-seconds 0)) + (pgloader.utils:report-header) + (loop + for (table-name . date-columns) in (pgloader.pgsql:list-tables dbname) + when (or (null only-tables) + (member table-name only-tables :test #'equal)) + do + (pgloader.utils:report-table-name table-name) + + (if (member table-name mysql-tables :test #'equal) + (multiple-value-bind (result seconds) + (pgloader.utils:timing + (let ((filename + (pgloader.csv:get-pathname dbname table-name))) + ;; export from MySQL to file + (copy-from dbname table-name filename + :date-columns date-columns) + ;; import the file to PostgreSQL + (pgloader.pgsql:copy-from-file dbname table-name filename + :truncate truncate))) + (when result + (incf total-count result) + (incf total-seconds seconds) + (pgloader.utils:report-results result seconds))) + ;; not a known mysql table + (format t " skip, unknown table in MySQL database~%")) + finally + (pgloader.utils:report-footer "Total export+import time" + total-count total-seconds) + (return (values total-count (float total-seconds)))))) + + +;;; +;;; Direct "stream" in between mysql fetching of results and PostgreSQL COPY +;;; protocol +;;; +(defun stream-mysql-table-in-pgsql (dbname table-name + &key + truncate + date-columns) + "Connect in parallel to MySQL and PostgreSQL and stream the data." + (let* ((lp:*kernel* *loader-kernel*) + (channel (lp:make-channel)) + (dataq (lq:make-queue 4096))) + ;; have a task fill MySQL data in the queue + (lp:submit-task + channel (lambda () + (list :mysql + (pgloader.queue:map-push-queue + dataq + #'map-rows dbname table-name)))) + + ;; and start another task to push that data from the queue to PostgreSQL + (lp:submit-task + channel (lambda () + (list :pgsql + (pgloader.pgsql:copy-from-queue dbname table-name dataq + :truncate truncate + :date-columns date-columns)))) + + ;; now wait until both the tasks are over + (loop + for tasks below 2 + collect (lp:receive-result channel) into counts + finally (return (cadr (assoc :pgsql counts)))))) + +;;; +;;; Work on all tables for given database +;;; +(defun stream-database-tables (dbname &key (truncate t) only-tables) + "Export MySQL data and Import it into PostgreSQL" + ;; get the list of tables and have at it + (let ((mysql-tables (list-tables dbname)) + (total-count 0) + (total-seconds 0)) + (pgloader.utils:report-header) + (loop + for (table-name . date-columns) in (pgloader.pgsql:list-tables dbname) + when (or (null only-tables) + (member table-name only-tables :test #'equal)) + do + (pgloader.utils:report-table-name table-name) + + (if (member table-name mysql-tables :test #'equal) + (multiple-value-bind (result seconds) + (pgloader.utils:timing + (stream-mysql-table-in-pgsql dbname table-name + :truncate truncate + :date-columns date-columns)) + (when result + (incf total-count result) + (incf total-seconds seconds) + (pgloader.utils:report-results result seconds))) + ;; not a known mysql table + (format t "skip, unknown table in MySQL database~%")) + finally + (pgloader.utils:report-footer "Total streaming time" + total-count total-seconds) + (return (values total-count (float total-seconds)))))) + diff --git a/package.lisp b/package.lisp index 0304727..dbcbfbc 100644 --- a/package.lisp +++ b/package.lisp @@ -1,18 +1,38 @@ ;;;; package.lisp -(defpackage #:galaxya-loader - (:use #:cl)) +(defpackage #:pgloader.pgsql + (:use #:cl) + (:export #:truncate-table + #:copy-from-file + #:copy-from-queue + #:list-databases + #:list-tables + #:get-date-columns + #:format-row)) + +(defpackage #:pgloader + (:use #:cl) + (:import-from #:pgloader.pgsql + #:copy-from-file + #:list-databases + #:list-tables) + (:export #:copy-from-file + #:list-databases + #:list-tables)) + +(in-package #:pgloader) ;;; ;;; Some package names are a little too long to my taste and don't ship with ;;; nicknames, so use `rename-package' here to give them some new nicknames. ;;; -(loop for (package . nicknames) - in '((lparallel lp) - (lparallel.queue lq) - (simple-date date) - (split-sequence sq)) - do (rename-package package package nicknames)) +(loop + for (package . nicknames) + in '((lparallel lp) + (lparallel.queue lq) + (simple-date date) + (split-sequence sq)) + do (rename-package package package nicknames)) ;;; ;;; and recompile. Now you can pre-allocate the queue by passing a size to diff --git a/pgloader.asd b/pgloader.asd new file mode 100644 index 0000000..70538b8 --- /dev/null +++ b/pgloader.asd @@ -0,0 +1,32 @@ +;;;; pgloader.asd + +(asdf:defsystem #:pgloader + :serial t + :description "Load data into PostgreSQL" + :author "Dimitri Fontaine " + :license "The PostgreSQL Licence" + :depends-on (#:postmodern + #:cl-postgres + #:simple-date + #:cl-mysql + #:split-sequence + #:cl-csv + #:lparallel) + :components ((:file "package") + (:file "utils" :depends-on ("package")) + (:file "pgloader" :depends-on ("package")) + + ;; those are one-package-per-file + (:file "queue") ; package pgloader.queue + (:file "csv") ; package pgloader.csv + + ;; package pgloader.pgsql + (:file "pgsql" :depends-on ("queue" "utils")) + + ;; mysql.lisp depends on pgsql.lisp to be able to export data + ;; from MySQL in the PostgreSQL format. + ;; + ;; package pgloader.mysql + (:file "mysql" :depends-on ("pgsql" "queue" "utils")))) + + diff --git a/pgloader.lisp b/pgloader.lisp new file mode 100644 index 0000000..f25f411 --- /dev/null +++ b/pgloader.lisp @@ -0,0 +1,20 @@ +;;;; pgloader +;;; +;;; PostgreSQL data loading tool. +;;; + +(in-package #:pgloader) + +;;; +;;; Parameters you might want to change +;;; +(defparameter *loader-kernel* (lp:make-kernel 2) + "lparallel kernel to use for loading data in parallel") + +(defparameter *myconn-host* "myhost") +(defparameter *myconn-user* "myuser") +(defparameter *myconn-pass* "mypass") + +;;; +;;; TODO: define a top level API +;;; diff --git a/pgsql.lisp b/pgsql.lisp new file mode 100644 index 0000000..d902d12 --- /dev/null +++ b/pgsql.lisp @@ -0,0 +1,200 @@ +;;; +;;; Tools to handle PostgreSQL data format +;;; +(in-package :pgloader.pgsql) + +;;; +;;; Quick utilities to get rid of later. +;;; +(defparameter *pgconn* + '("gdb" "none" "localhost" :port 5432) + "Connection string to the local database") + +(defun get-connection-string (dbname) + (cons dbname *pgconn*)) + +;;; +;;; PostgreSQL Tools connecting to a database +;;; +(defun truncate-table (dbname table-name) + "Truncate given TABLE-NAME in database DBNAME" + (pomo:with-connection (get-connection-string dbname) + (pomo:execute (format nil "truncate ~a;" table-name)))) + +(defun list-databases (&optional (username "postgres")) + "Connect to a local database and get the database list" + (pomo:with-connection + (list "postgres" username "none" "localhost" :port 5432) + (loop for (dbname) in (pomo:query + "select datname + from pg_database + where datname !~ 'postgres|template'") + collect dbname))) + +(defun list-tables (dbname) + "Return an alist of tables names and list of columns to pay attention to." + (pomo:with-connection + (get-connection-string dbname) + + (loop for (relname colarray) in (pomo:query " +select relname, array_agg(case when typname in ('date', 'timestamptz') + then attnum end + order by attnum) + from pg_class c + join pg_namespace n on n.oid = c.relnamespace + left join pg_attribute a on c.oid = a.attrelid + join pg_type t on t.oid = a.atttypid + where c.relkind = 'r' + and attnum > 0 + and n.nspname = 'public' + group by relname +") + collect (cons relname (loop + for attnum across colarray + unless (eq attnum :NULL) + collect attnum))))) + +(defun get-date-columns (table-name pgsql-table-list) + "Given a PGSQL-TABLE-LIST as per function list-tables, return a list of + row numbers containing dates (those have to be reformated)" + (cdr (assoc table-name pgsql-table-list))) + +;;; +;;; PostgreSQL formating tools +;;; +(defun fix-zero-date (datestr) + (cond + ((null datestr) nil) + ((string= datestr "") nil) + ((string= datestr "0000-00-00") nil) + ((string= datestr "0000-00-00 00:00:00") nil) + (t datestr))) + +(defun reformat-null-value (value) + "cl-mysql returns nil for NULL and cl-postgres wants :NULL" + (if (null value) :NULL value)) + +(defun reformat-row (row &key date-columns) + "Reformat row as given by MySQL in a format compatible with cl-postgres" + (loop + for i from 1 + for col in row + for no-zero-date-col = (if (member i date-columns) + (fix-zero-date col) + col) + collect (reformat-null-value no-zero-date-col))) + +;;; +;;; Format row to PostgreSQL COPY format, the TEXT variant. +;;; +(defun format-row (stream row &key date-columns) + "Add a ROW in the STREAM, formating ROW in PostgreSQL COPY TEXT format. + +See http://www.postgresql.org/docs/9.2/static/sql-copy.html#AEN66609 for +details about the format, and format specs." + (let* (*print-circle* *print-pretty*) + (loop + for i from 1 + for (col . more?) on row + for preprocessed-col = (if (member i date-columns) + (fix-zero-date col) + col) + do (if (null preprocessed-col) + (format stream "~a~:[~;~c~]" "\\N" more? #\Tab) + (progn + ;; From PostgreSQL docs: + ;; + ;; In particular, the following characters must be preceded + ;; by a backslash if they appear as part of a column value: + ;; backslash itself, newline, carriage return, and the + ;; current delimiter character. + (loop + for char across preprocessed-col + do (case char + (#\\ (format stream "\\\\")) ; 2 chars here + (#\Space (princ #\Space stream)) + (#\Newline (format stream "\\n")) ; 2 chars here + (#\Return (format stream "\\r")) ; 2 chars here + (#\Tab (format stream "\\t")) ; 2 chars here + (#\Backspace (format stream "\\b")) ; 2 chars here + (#\Page (format stream "\\f")) ; 2 chars here + (t (format stream "~c" char)))) + (format stream "~:[~;~c~]" more? #\Tab)))) + (format stream "~%"))) + +;;; +;;; Read a file format in PostgreSQL COPY TEXT format, and call given +;;; function on each line. +;;; +(defun map-rows (dbname table-name filename process-row-fn) + "Load data from a text file in PostgreSQL COPY TEXT format. + +Each row is pre-processed then PROCESS-ROW-FN is called with the row as a +list as its only parameter. + +Finally returns how many rows where read and processed." + (with-open-file + ;; we just ignore files that don't exist + (input filename + :direction :input + :if-does-not-exist nil) + (when input + ;; read in the text file, split it into columns, process NULL columns + ;; the way postmodern expects them, and call PROCESS-ROW-FN on them + (loop + for line = (read-line input nil) + for row = (mapcar (lambda (x) + ;; we want Postmodern compliant NULLs + (if (string= "\\N" x) :null x)) + ;; splitting is easy, it's always on #\Tab + ;; see format-row-for-copy for details + (sq:split-sequence #\Tab line)) + while line + counting line into count + do (funcall process-row-fn row) + finally (return count))))) + +;;; +;;; Read a file in PostgreSQL COPY TEXT format and load it into a PostgreSQL +;;; table using the COPY protocol. +;;; +(defun copy-from-file (dbname table-name filename + &key + (truncate t)) + "Load data from clean COPY TEXT file to PostgreSQL" + (when truncate (truncate-table dbname table-name)) + + (let* ((conspec (remove :port (get-connection-string dbname))) + (stream + (cl-postgres:open-db-writer conspec table-name nil))) + + (unwind-protect + ;; read csv in the file and push it directly through the db writer + ;; in COPY streaming mode + (map-rows dbname table-name filename + (lambda (row) + (cl-postgres:db-write-row stream row))) + (cl-postgres:close-db-writer stream)))) + +;;; +;;; Pop data from a lparallel.queue queue instance, reformat it assuming +;;; data in there are from cl-mysql, and copy it to a PostgreSQL table. +;;; +(defun copy-from-queue (dbname table-name dataq + &key + (truncate t) + date-columns) + "Fetch data from the QUEUE until we see :end-of-data" + (when truncate (truncate-table dbname table-name)) + + (let* ((conspec (remove :port (get-connection-string dbname))) + (stream + (cl-postgres:open-db-writer conspec table-name nil))) + (unwind-protect + (pgloader.queue:map-pop-queue + dataq (lambda (row) + (cl-postgres:db-write-row + stream + (reformat-row row :date-columns date-columns)))) + (cl-postgres:close-db-writer stream)))) + diff --git a/queue.lisp b/queue.lisp new file mode 100644 index 0000000..b5d5de0 --- /dev/null +++ b/queue.lisp @@ -0,0 +1,36 @@ +;;; +;;; Tools to handle internal queueing, using lparallel.queue +;;; + +(defpackage #:pgloader.queue + (:use #:cl) + (:export #:map-pop-queue + #:map-push-queue)) + +;; no nickname for that package, queue is far too generic +(in-package :pgloader.queue) + +(defun map-pop-queue (queue process-row-fn) + "Consume the whole of QUEUE, calling PROCESS-ROW-FN on each row. The QUEUE + must signal end of data by the element :end-of-data. + +Returns how many rows where processed from the queue." + (loop + for row = (lq:pop-queue queue) + until (eq row :end-of-data) + counting row into count + do (funcall process-row-fn row) + finally (return count))) + +(defun map-push-queue (queue map-row-fn &rest initial-args) + "Apply MAP-ROW-FN on INITIAL-ARGS and a function of ROW that will push the +row into the queue. When MAP-ROW-FN returns, push :end-of-data in the queue. + +Returns whatever MAP-ROW-FN did return." + (prog1 + (apply map-row-fn (append initial-args + (list + (lambda (row) + (lq:push-queue row queue))))) + (lq:push-queue :end-of-data queue))) + diff --git a/run.lisp b/run.lisp index 97f7d17..145fac2 100644 --- a/run.lisp +++ b/run.lisp @@ -1,14 +1,8 @@ ;;; facility to easily run the program -(ql:quickload :galaxya-loader) +(ql:quickload :pgloader) +(in-package :pgloader) -(in-package :galaxya-loader) - -(let ((*pgcon* '("galaxya" "none" "localhost" :port 5432)) - ((csv-path-root* "/home/cyb/csv"))) - - ;; when we're ready we do that - ;; (load-all-databases)) - - ;; meanwhile - (load-database-tables "weetix")) +(setq *myconn-host* "localhost" + *myconn-user* "debian-sys-maint" + *myconn-pass* "vtmMI04yBZlFprYm") diff --git a/timing.lisp b/timing.lisp index ae9307e..d4a8812 100644 --- a/timing.lisp +++ b/timing.lisp @@ -2,7 +2,7 @@ ;;; Some little timing tools ;;; -(in-package :galaxya-loader) +(in-package :pgloader) ;;; ;;; Timing Macros diff --git a/utils.lisp b/utils.lisp new file mode 100644 index 0000000..0edbe7b --- /dev/null +++ b/utils.lisp @@ -0,0 +1,68 @@ +;;; +;;; Random utilities +;;; + +(defpackage #:pgloader.utils + (:use #:cl) + (:export #:report-header + #:report-table-name + #:report-results + #:report-footer + #:format-interval + #:timing)) + +(in-package :pgloader.utils) + +;;; +;;; Timing Macro +;;; +(defun elapsed-time-since (start) + "Return how many seconds ticked between START and now" + (/ (- (get-internal-real-time) start) + internal-time-units-per-second)) + +(defmacro timing (&body forms) + "return both how much real time was spend in body and its result" + (let ((start (gensym)) + (end (gensym)) + (result (gensym))) + `(let* ((,start (get-internal-real-time)) + (,result (progn ,@forms)) + (,end (get-internal-real-time))) + (values ,result (/ (- ,end ,start) internal-time-units-per-second))))) + +;;; +;;; Timing Formating +;;; +(defun format-interval (seconds &optional (stream t)) + "Output the number of seconds in a human friendly way" + (multiple-value-bind (years months days hours mins secs millisecs) + (date:decode-interval (date:encode-interval :second seconds)) + (format + stream + "~:[~*~;~d years ~]~:[~*~;~d months ~]~:[~*~;~d days ~]~:[~*~;~dh~]~:[~*~;~dm~]~d.~ds" + (< 0 years) years + (< 0 months) months + (< 0 days) days + (< 0 hours) hours + (< 0 mins) mins + secs (truncate millisecs)))) + +;;; +;;; Pretty print a report while doing bulk operations +;;; +(defun report-header () + (format t "~&~30@a ~9@a ~9@a" "table name" "rows" "time") + (format t "~&------------------------------ --------- ---------")) + +(defun report-table-name (table-name) + (format t "~&~30@a " table-name)) + +(defun report-results (rows seconds) + (format t "~9@a ~9@a" rows (format-interval seconds nil))) + +(defun report-footer (legend rows seconds) + (format t "~&------------------------------ --------- ---------") + (format t "~&~30@a ~9@a ~9@a" legend + rows (format-interval seconds nil))) +