;;; ;;; Tools to handle PostgreSQL data format ;;; (in-package :pgloader.pgsql) ;;; ;;; Quick utilities to get rid of later. ;;; (defparameter *copy-batch-size* 25000 "How many rows to per COPY transaction") (defparameter *copy-batch-split* 5 "Number of batches in which to split a batch with bad data") (defparameter *pgconn* '("gdb" "none" "localhost" :port 5432) "Connection string to the local database") (defun get-connection-string (dbname) (cons dbname *pgconn*)) ;;; ;;; PostgreSQL Tools connecting to a database ;;; (defun truncate-table (dbname table-name) "Truncate given TABLE-NAME in database DBNAME" (pomo:with-connection (get-connection-string dbname) (pomo:execute (format nil "truncate ~a;" table-name)))) (defun list-databases (&optional (username "postgres")) "Connect to a local database and get the database list" (pomo:with-connection (list "postgres" username "none" "localhost" :port 5432) (loop for (dbname) in (pomo:query "select datname from pg_database where datname !~ 'postgres|template'") collect dbname))) (defun list-tables (dbname) "Return an alist of tables names and list of columns to pay attention to." (pomo:with-connection (get-connection-string dbname) (loop for (relname colarray) in (pomo:query " select relname, array_agg(case when typname in ('date', 'timestamptz') then attnum end order by attnum) from pg_class c join pg_namespace n on n.oid = c.relnamespace left join pg_attribute a on c.oid = a.attrelid join pg_type t on t.oid = a.atttypid where c.relkind = 'r' and attnum > 0 and n.nspname = 'public' group by relname ") collect (cons relname (loop for attnum across colarray unless (eq attnum :NULL) collect attnum))))) (defun get-date-columns (table-name pgsql-table-list) "Given a PGSQL-TABLE-LIST as per function list-tables, return a list of row numbers containing dates (those have to be reformated)" (cdr (assoc table-name pgsql-table-list))) ;;; ;;; PostgreSQL formating tools ;;; (defun fix-zero-date (datestr) (cond ((null datestr) nil) ((string= datestr "") nil) ((string= datestr "0000-00-00") nil) ((string= datestr "0000-00-00 00:00:00") nil) (t datestr))) (defun reformat-null-value (value) "cl-mysql returns nil for NULL and cl-postgres wants :NULL" (if (null value) :NULL value)) (defun reformat-row (row &key date-columns) "Reformat row as given by MySQL in a format compatible with cl-postgres" (loop for i from 1 for col in row for no-zero-date-col = (if (member i date-columns) (fix-zero-date col) col) collect (reformat-null-value no-zero-date-col))) ;;; ;;; Format row to PostgreSQL COPY format, the TEXT variant. ;;; (defun format-row (stream row &key date-columns) "Add a ROW in the STREAM, formating ROW in PostgreSQL COPY TEXT format. See http://www.postgresql.org/docs/9.2/static/sql-copy.html#AEN66609 for details about the format, and format specs." (let* (*print-circle* *print-pretty*) (loop for i from 1 for (col . more?) on row for preprocessed-col = (if (member i date-columns) (fix-zero-date col) col) do (if (null preprocessed-col) (format stream "~a~:[~;~c~]" "\\N" more? #\Tab) (progn ;; From PostgreSQL docs: ;; ;; In particular, the following characters must be preceded ;; by a backslash if they appear as part of a column value: ;; backslash itself, newline, carriage return, and the ;; current delimiter character. (loop for char across preprocessed-col do (case char (#\\ (format stream "\\\\")) ; 2 chars here (#\Space (princ #\Space stream)) (#\Newline (format stream "\\n")) ; 2 chars here (#\Return (format stream "\\r")) ; 2 chars here (#\Tab (format stream "\\t")) ; 2 chars here (#\Backspace (format stream "\\b")) ; 2 chars here (#\Page (format stream "\\f")) ; 2 chars here (t (format stream "~c" char)))) (format stream "~:[~;~c~]" more? #\Tab)))) (format stream "~%"))) ;;; ;;; Read a file format in PostgreSQL COPY TEXT format, and call given ;;; function on each line. ;;; (defun map-rows (dbname table-name filename process-row-fn) "Load data from a text file in PostgreSQL COPY TEXT format. Each row is pre-processed then PROCESS-ROW-FN is called with the row as a list as its only parameter. Finally returns how many rows where read and processed." (with-open-file ;; we just ignore files that don't exist (input filename :direction :input :if-does-not-exist nil) (when input ;; read in the text file, split it into columns, process NULL columns ;; the way postmodern expects them, and call PROCESS-ROW-FN on them (loop for line = (read-line input nil) for row = (mapcar (lambda (x) ;; we want Postmodern compliant NULLs (if (string= "\\N" x) :null x)) ;; splitting is easy, it's always on #\Tab ;; see format-row-for-copy for details (sq:split-sequence #\Tab line)) while line counting line into count do (funcall process-row-fn row) finally (return count))))) ;;; ;;; Read a file in PostgreSQL COPY TEXT format and load it into a PostgreSQL ;;; table using the COPY protocol. ;;; (defun copy-from-file (dbname table-name filename &key (truncate t)) "Load data from clean COPY TEXT file to PostgreSQL" (when truncate (truncate-table dbname table-name)) (let* ((conspec (remove :port (get-connection-string dbname))) (stream (cl-postgres:open-db-writer conspec table-name nil))) (unwind-protect ;; read csv in the file and push it directly through the db writer ;; in COPY streaming mode (map-rows dbname table-name filename (lambda (row) (cl-postgres:db-write-row stream row))) (cl-postgres:close-db-writer stream)))) ;;; ;;; Pop data from a lparallel.queue queue instance, reformat it assuming ;;; data in there are from cl-mysql, and copy it to a PostgreSQL table. ;;; (defun copy-from-queue (dbname table-name dataq &key (truncate t) date-columns) "Fetch data from the QUEUE until we see :end-of-data. Update *state*" (when truncate (truncate-table dbname table-name)) (let* ((conspec (remove :port (get-connection-string dbname)))) (loop for retval = ;; The idea is to stream the queue content directly into the ;; PostgreSQL COPY protocol stream, but COMMIT every ;; *copy-batch-size* rows. ;; ;; That allows to have to recover from a buffer of data only rather ;; than restart from scratch each time we have to find which row ;; contains erroneous data. BATCH is that buffer. (let* ((stream (cl-postgres:open-db-writer conspec table-name nil)) (batch nil) (batch-size 0) (process-row-fn ;; build our batch aware row processing function ;; it closes over stream, batch and batch-size (lambda (row) (let ((reformated-row (reformat-row row :date-columns date-columns))) (push reformated-row batch) (incf batch-size 1) (cl-postgres:db-write-row stream reformated-row) ;; return control in between batches (when (= batch-size *copy-batch-size*) (throw 'next-batch (cons :continue batch-size))))))) (unwind-protect (catch 'next-batch (pgloader.queue:map-pop-queue dataq process-row-fn)) ;; in case of data-exception, split the batch and try again (handler-case (cl-postgres:close-db-writer stream) ((or CL-POSTGRES-ERROR:UNIQUE-VIOLATION CL-POSTGRES-ERROR:DATA-EXCEPTION) (condition) (retry-batch dbname table-name (nreverse batch) batch-size))))) ;; fetch how many rows we just pushed through, update stats for rows = (if (consp retval) (cdr retval) retval) for cont = (and (consp retval) (eq (car retval) :continue)) do (pgstate-incf *state* table-name :rows rows) while cont))) ;;; ;;; When a batch has been refused by PostgreSQL with a data-exception, that ;;; means it contains non-conforming data. It could be only one row in the ;;; middle of the *copy-batch-size* rows. ;;; ;;; The general principle to filter out the bad row(s) is to split the batch ;;; in smaller ones, and try to COPY all of the smaller ones again, ;;; recursively. When the batch is containing only one row, we know that one ;;; is non conforming to PostgreSQL expectations (usually, data type input ;;; does not match, e.g. text is not proper utf-8). ;;; ;;; As we often need to split out a single bad row out of a full batch, we ;;; don't do the classical dichotomy but rather split the batch directly in ;;; lots of smaller ones. ;;; ;;; split 1000 rows in 10 batches of 100 rows ;;; split 352 rows in 3 batches of 100 rows + 1 batch of 52 rows ;;; (defun process-bad-row (dbname table-name condition row) "Add the row to the reject file, in PostgreSQL COPY TEXT format" ;; first, update the stats. (pgstate-incf *state* table-name :errs 1) (pgstate-decf *state* table-name :rows 1) ;; now, the bad row processing (let* ((table (pgstate-get-table *state* table-name)) (data (pgtable-reject-data table)) (logs (pgtable-reject-logs table))) ;; first log the rejected data (with-open-file (reject-data-file data :direction :output :if-exists :append :if-does-not-exist :create :external-format :utf8) ;; the row has already been processed when we get here (pgloader.pgsql:format-row reject-data-file row)) ;; now log the condition signaled to reject the data (with-open-file (reject-logs-file logs :direction :output :if-exists :append :if-does-not-exist :create :external-format :utf8) ;; the row has already been processed when we get here (format reject-logs-file "~a~%" condition)))) ;;; ;;; Compute the next batch size, must be smaller than the previous one or ;;; just one row to ensure the retry-batch recursion is not infinite. ;;; (defun smaller-batch-size (batch-size processed-rows) "How many rows should we process in next iteration?" (let ((remaining-rows (- batch-size processed-rows))) (if (< remaining-rows *copy-batch-split*) 1 (min remaining-rows (floor (/ batch-size *copy-batch-split*)))))) ;;; ;;; The recursive retry batch function. ;;; (defun retry-batch (dbname table-name batch batch-size) "Batch is a list of rows containing at least one bad row. Find it." (let* ((conspec (remove :port (get-connection-string dbname))) (current-batch-pos batch) (processed-rows 0) (total-bad-rows 0)) (loop while (< processed-rows batch-size) do (let* ((current-batch current-batch-pos) (current-batch-size (smaller-batch-size batch-size processed-rows)) (stream (cl-postgres:open-db-writer conspec table-name nil))) (unwind-protect (dotimes (i current-batch-size) ;; rows in that batch have already been processed (cl-postgres:db-write-row stream (car current-batch-pos)) (setf current-batch-pos (cdr current-batch-pos)) (incf processed-rows)) (handler-case (cl-postgres:close-db-writer stream) ;; the batch didn't make it, recurse ((or CL-POSTGRES-ERROR:UNIQUE-VIOLATION CL-POSTGRES-ERROR:DATA-EXCEPTION) (condition) ;; process bad data (if (= 1 current-batch-size) (process-bad-row dbname table-name condition (car current-batch)) ;; more than one line of bad data: recurse (retry-batch dbname table-name current-batch current-batch-size)))))))))