;;; ;;; Tools to handle PostgreSQL data format ;;; (in-package :pgloader.pgsql) ;;; ;;; Quick utilities to get rid of later. ;;; (defparameter *copy-batch-size* 10000 "How many rows to per COPY transaction") (defparameter *copy-batch-split* 10 "Number of batches in which to split a batch with bad data") (defparameter *pgconn* '("gdb" "none" "localhost" :port 5432) "Connection string to the local database") (defun get-connection-string (dbname) (cons dbname *pgconn*)) ;;; ;;; PostgreSQL Tools connecting to a database ;;; (defun truncate-table (dbname table-name) "Truncate given TABLE-NAME in database DBNAME" (pomo:with-connection (get-connection-string dbname) (pomo:execute (format nil "truncate ~a;" table-name)))) (defun list-databases (&optional (username "postgres")) "Connect to a local database and get the database list" (pomo:with-connection (list "postgres" username "none" "localhost" :port 5432) (loop for (dbname) in (pomo:query "select datname from pg_database where datname !~ 'postgres|template'") collect dbname))) (defun list-tables (dbname) "Return an alist of tables names and list of columns to pay attention to." (pomo:with-connection (get-connection-string dbname) (loop for (relname colarray) in (pomo:query " select relname, array_agg(case when typname in ('date', 'timestamptz') then attnum end order by attnum) from pg_class c join pg_namespace n on n.oid = c.relnamespace left join pg_attribute a on c.oid = a.attrelid join pg_type t on t.oid = a.atttypid where c.relkind = 'r' and attnum > 0 and n.nspname = 'public' group by relname ") collect (cons relname (loop for attnum across colarray unless (eq attnum :NULL) collect attnum))))) (defun get-date-columns (table-name pgsql-table-list) "Given a PGSQL-TABLE-LIST as per function list-tables, return a list of row numbers containing dates (those have to be reformated)" (cdr (assoc table-name pgsql-table-list))) ;;; ;;; PostgreSQL formating tools ;;; (defun fix-zero-date (datestr) (cond ((null datestr) nil) ((string= datestr "") nil) ((string= datestr "0000-00-00") nil) ((string= datestr "0000-00-00 00:00:00") nil) (t datestr))) (defun reformat-null-value (value) "cl-mysql returns nil for NULL and cl-postgres wants :NULL" (if (null value) :NULL value)) (defun reformat-row (row &key date-columns) "Reformat row as given by MySQL in a format compatible with cl-postgres" (loop for i from 1 for col in row for no-zero-date-col = (if (member i date-columns) (fix-zero-date col) col) collect (reformat-null-value no-zero-date-col))) ;;; ;;; Format row to PostgreSQL COPY format, the TEXT variant. ;;; (defun format-row (stream row &key date-columns) "Add a ROW in the STREAM, formating ROW in PostgreSQL COPY TEXT format. See http://www.postgresql.org/docs/9.2/static/sql-copy.html#AEN66609 for details about the format, and format specs." (let* (*print-circle* *print-pretty*) (loop for i from 1 for (col . more?) on row for preprocessed-col = (if (member i date-columns) (fix-zero-date col) col) do (if (null preprocessed-col) (format stream "~a~:[~;~c~]" "\\N" more? #\Tab) (progn ;; From PostgreSQL docs: ;; ;; In particular, the following characters must be preceded ;; by a backslash if they appear as part of a column value: ;; backslash itself, newline, carriage return, and the ;; current delimiter character. (loop for char across preprocessed-col do (case char (#\\ (format stream "\\\\")) ; 2 chars here (#\Space (princ #\Space stream)) (#\Newline (format stream "\\n")) ; 2 chars here (#\Return (format stream "\\r")) ; 2 chars here (#\Tab (format stream "\\t")) ; 2 chars here (#\Backspace (format stream "\\b")) ; 2 chars here (#\Page (format stream "\\f")) ; 2 chars here (t (format stream "~c" char)))) (format stream "~:[~;~c~]" more? #\Tab)))) (format stream "~%"))) ;;; ;;; Read a file format in PostgreSQL COPY TEXT format, and call given ;;; function on each line. ;;; (defun map-rows (dbname table-name filename process-row-fn) "Load data from a text file in PostgreSQL COPY TEXT format. Each row is pre-processed then PROCESS-ROW-FN is called with the row as a list as its only parameter. Finally returns how many rows where read and processed." (with-open-file ;; we just ignore files that don't exist (input filename :direction :input :if-does-not-exist nil) (when input ;; read in the text file, split it into columns, process NULL columns ;; the way postmodern expects them, and call PROCESS-ROW-FN on them (loop for line = (read-line input nil) for row = (mapcar (lambda (x) ;; we want Postmodern compliant NULLs (if (string= "\\N" x) :null x)) ;; splitting is easy, it's always on #\Tab ;; see format-row-for-copy for details (sq:split-sequence #\Tab line)) while line counting line into count do (funcall process-row-fn row) finally (return count))))) ;;; ;;; Read a file in PostgreSQL COPY TEXT format and load it into a PostgreSQL ;;; table using the COPY protocol. ;;; (defun copy-from-file (dbname table-name filename &key (truncate t)) "Load data from clean COPY TEXT file to PostgreSQL" (when truncate (truncate-table dbname table-name)) (let* ((conspec (remove :port (get-connection-string dbname))) (stream (cl-postgres:open-db-writer conspec table-name nil))) (unwind-protect ;; read csv in the file and push it directly through the db writer ;; in COPY streaming mode (map-rows dbname table-name filename (lambda (row) (cl-postgres:db-write-row stream row))) (cl-postgres:close-db-writer stream)))) ;;; ;;; Pop data from a lparallel.queue queue instance, reformat it assuming ;;; data in there are from cl-mysql, and copy it to a PostgreSQL table. ;;; (defun copy-from-queue (dbname table-name dataq &key (truncate t) date-columns) "Fetch data from the QUEUE until we see :end-of-data" (when truncate (truncate-table dbname table-name)) (let* ((conspec (remove :port (get-connection-string dbname)))) (loop for retval = (let* ((stream (cl-postgres:open-db-writer conspec table-name nil)) (batch nil) (batch-size 0) (process-row-fn ;; build our batch aware row processing function ;; it closes over batch and stream (lambda (row) (let ((reformated-row (reformat-row row :date-columns date-columns))) (push reformated-row batch) (incf batch-size 1) (cl-postgres:db-write-row stream reformated-row) ;; return control in between batches (when (= batch-size *copy-batch-size*) (throw 'next-batch (cons :continue batch-size))))))) (unwind-protect (catch 'next-batch (pgloader.queue:map-pop-queue dataq process-row-fn)) ;; in case of data-exception, split the batch and try again (handler-case (cl-postgres:close-db-writer stream) ((or CL-POSTGRES-ERROR:UNIQUE-VIOLATION CL-POSTGRES-ERROR:DATA-EXCEPTION) (condition) (retry-batch dbname table-name (nreverse batch) batch-size))))) ;; the final return value is the number of row processed summing (if (consp retval) (cdr retval) retval) into total-rows while (and (consp retval) (eq (car retval) :continue)) finally (return total-rows)))) ;;; ;;; When a batch has been refused by PostgreSQL with a data-exception, that ;;; means it contains non-conforming data. It could be only one row in the ;;; middle of the *copy-batch-size* rows. ;;; ;;; The general principle to filter out the bad row(s) is to split the batch ;;; in smaller ones, and try to COPY all of the smaller ones again, ;;; recursively. When the batch is containing only one row, we know that one ;;; is non conforming to PostgreSQL expectations (usually, data type input ;;; does not match, e.g. text is not proper utf-8). ;;; ;;; As we often need to split out a single bad row out of a full batch, we ;;; don't do the classical dichotomy but rather split the batch directly in ;;; lots of smaller ones. ;;; ;;; split 1000 rows in 10 batches of 100 rows ;;; split 352 rows in 3 batches of 100 rows + 1 batch of 52 rows ;;; ;;; ;;; Retry a single batch, without doing data copying: we already have the ;;; rows inside a batch, just process a subset of it of size batch-size. ;;; (defun process-bad-row (dbname table-name row) "Process bad row" (let* ((str (format nil "~a" row)) (str (if (< 72 (length str)) (subseq str 0 72) str))) (format t "BAD ROW: ~a...~%" str))) (defun smaller-batch-size (batch-size processed-rows) "How many rows should we process in next iteration?" (let ((remaining-rows (- batch-size processed-rows))) (if (< remaining-rows *copy-batch-split*) 1 (min remaining-rows (floor (/ batch-size *copy-batch-split*)))))) (defun retry-batch (dbname table-name batch batch-size) "Batch is a list of rows containing at least one error. Return number of bad rows." (let* ((conspec (remove :port (get-connection-string dbname))) (current-batch-pos batch) (processed-rows 0) (total-bad-rows 0)) (loop while (<= processed-rows batch-size) do (let* ((current-batch current-batch-pos) (current-batch-size (smaller-batch-size batch-size processed-rows)) (stream (cl-postgres:open-db-writer conspec table-name nil))) (unwind-protect (progn (dotimes (i current-batch-size) ;; rows in that batch have already been processed (cl-postgres:db-write-row stream (car current-batch-pos)) (setf current-batch-pos (cdr current-batch-pos)) (incf processed-rows)) ;; function's return value: number of bad rows extracted total-bad-rows) (handler-case (cl-postgres:close-db-writer stream) ;; the batch didn't make it, recurse ((or CL-POSTGRES-ERROR:UNIQUE-VIOLATION CL-POSTGRES-ERROR:DATA-EXCEPTION) (condition) (format t "~&botched batch of ~d rows: ~a.~%" current-batch-size condition) ;; process bad data (if (= 1 current-batch-size) (progn (process-bad-row dbname table-name (car current-batch)) (incf total-bad-rows)) ;; more than one line of bad data: recurse (retry-batch dbname table-name current-batch current-batch-size)))))))))