From 7eed610ecb66859e73baa83fcb780f9bb98b8023 Mon Sep 17 00:00:00 2001 From: dim Date: Tue, 21 Nov 2006 12:24:57 +0000 Subject: [PATCH] Adding pgloader python package and modules --- pgloader/__init__.py | 0 pgloader/db.py | 466 ++++++++++++++++++++++++++++ pgloader/lo.py | 109 +++++++ pgloader/options.py | 30 ++ pgloader/pgloader.py | 707 +++++++++++++++++++++++++++++++++++++++++++ pgloader/tools.py | 118 ++++++++ 6 files changed, 1430 insertions(+) create mode 100644 pgloader/__init__.py create mode 100644 pgloader/db.py create mode 100644 pgloader/lo.py create mode 100644 pgloader/options.py create mode 100644 pgloader/pgloader.py create mode 100644 pgloader/tools.py diff --git a/pgloader/__init__.py b/pgloader/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/pgloader/db.py b/pgloader/db.py new file mode 100644 index 0000000..c62dc46 --- /dev/null +++ b/pgloader/db.py @@ -0,0 +1,466 @@ +# -*- coding: ISO-8859-15 -*- +# Author: Dimitri Fontaine +# +# pgloader database connection handling +# COPY dichotomy on error + +import os, sys, os.path, time, codecs +from cStringIO import StringIO + +from options import DRY_RUN, VERBOSE, DEBUG, PEDANTIC +from options import TRUNCATE, VACUUM +from options import INPUT_ENCODING, PG_CLIENT_ENCODING +from options import COPY_SEP, FIELD_SEP, CLOB_SEP, NULL, EMPTY_STRING + +from tools import PGLoader_Error + +try: + import psycopg2.psycopg1 as psycopg +except ImportError: + if VERBOSE: + print 'No psycopg2 module found, trying psycopg1' + import psycopg + +class db: + """ a db connexion and utility class """ + def __init__(self, + host, port, base, user, passwd, + client_encoding = PG_CLIENT_ENCODING, + copy_every = 10000, commit_every = 1000, connect = True): + """ Connects to the specified database """ + self.dbconn = None + self.dsn = "host=%s port=%d user=%s dbname=%s password=%s" \ + % (host, port, user, base, passwd) + self.connect = "-h %s -p %s -U %s" % (host, port, user) + self.base = base + + # those parameters can be overwritten after db init + # here's their default values + self.copy_sep = COPY_SEP + self.copy_every = copy_every + self.commit_every = commit_every + self.client_encoding = client_encoding + self.null = NULL + self.empty_string = EMPTY_STRING + + if connect: + self.reset() + + def __del__(self): + """ db object destructor, we have to close the db connection """ + if self.dbconn is None: + return + + if self.running_commands > 0: + self.dbconn.commit() + self.commits += 1 + self.commited_rows += self.running_commands + + if self.dbconn is not None: + self.dbconn.close() + + def set_encoding(self): + """ set connection encoding to self.client_encoding """ + + if DEBUG: + # debug only cause reconnecting happens on every + # configured section + print 'Setting client encoding to %s' % self.client_encoding + + sql = 'set session client_encoding to %s' + cursor = self.dbconn.cursor() + cursor.execute(sql, [self.client_encoding]) + cursor.close() + + def reset(self): + """ reset internal counters and open a new database connection """ + self.buffer = None + self.copy = None # flag set to True when copy is called + self.errors = 0 + self.commits = 0 + self.commited_rows = 0 + self.running_commands = 0 + self.last_commit_time = time.time() + self.first_commit_time = self.last_commit_time + + if DEBUG: + if self.dbconn is not None: + print 'Debug: closing current connection' + self.dbconn.close() + + if DEBUG: + print 'Debug: connecting to dns %s' % self.dsn + + self.dbconn = psycopg.connect(self.dsn) + self.set_encoding() + + def print_stats(self): + """ output some stats about recent activity """ + d = time.time() - self.first_commit_time + u = self.commited_rows + c = self.commits + print "## %d updates in %d commits took %5.3f seconds" % (u, c, d) + + if self.errors > 0: + print "## %d database errors occured" % self.errors + if self.copy and not VACUUM: + print "## Please do VACUUM your database to recover space" + else: + if u > 0: + print "## No database error occured" + return + + def is_null(self, value): + """ return true if value is null, per configuration """ + return value == self.null + + def is_empty(self, value): + """ return true if value is empty, per configuration """ + return value == self.empty_string + + def truncate(self, table): + """ issue an SQL TRUNCATE TABLE on given table """ + if DRY_RUN: + if VERBOSE: + print "Notice: won't truncate tables on dry-run mode" + return + + sql = "TRUNCATE TABLE %s;" % table + + if VERBOSE: + print 'Notice: %s' % sql + + try: + cursor = self.dbconn.cursor() + cursor.execute(sql) + self.dbconn.commit() + except Exception, error: + if VERBOSE: + print error + raise PGLoader_Error, "Couldn't truncate table %s" % table + + def vacuum(self): + """ issue an vacuumdb -fvz database """ + if DRY_RUN: + if VERBOSE: + print + print 'Notice: no vacuum in dry-run mode' + return -1 + + command = "/usr/bin/vacuumdb %s -fvz %s 2>&1" \ + % (self.connect, self.base) + + if VERBOSE: + print command + + out = os.popen(command) + for line in out.readlines(): + if DEBUG: + # don't print \n + print line[:-1] + + return out.close() + + def insert_blob(self, table, index, rowids, + blob_cname, data, btype, + input_line, reject): + """ insert the given blob content into postgresql table + + return True on success, False on error + """ + ok = True + sql = "" + + if btype == 'ifx_clob': + data = data.replace("'", "\\'") + sql = "UPDATE %s SET %s = %%s WHERE " % (table, blob_cname) + + elif btype == 'ifx_blob': + data = data.tostring() + sql = "UPDATE %s SET %s = %%s::bytea WHERE " % (table, blob_cname) + + values = [data] + + ## + # Add a WHERE clause for each index + first = True + for name, col in index: + if not first: sql += " AND " + else: first = False + + sql += "%s = %%s" % name + values.append(rowids[name]) + sql += ";" + + if DEBUG: + print 'Debug: %s' % sql + + try: + cursor = self.dbconn.cursor() + cursor.execute(sql, values) + + # if execute raise an exception, don't count it as a + # running command (waiting a commit) + self.running_commands += 1 + + if VERBOSE: + str_rowids = "" + for i,v in rowids.items(): + if str_rowids != "": str_rowids += ", " + str_rowids += "%s:%s" % (i, v) + print '%s %s %s %6do' \ + % (table, str_rowids, blob_cname, len(data)) + + if self.running_commands == self.commit_every: + now = time.time() + self.dbconn.commit() + + self.commits += 1 + duration = now - self.last_commit_time + self.last_commit_time = now + + print "-- commit %d: %d updates in %5.3fs --" \ + % (self.commits, self.running_commands, duration) + + self.commited_rows += self.running_commands + self.running_commands = 1 + + except KeyboardInterrupt, error: + # C-c was pressed, please stop processing + self.dbconn.commit() + raise PGLoader_Error, "Aborting on user demand (Interrupt)" + + except Exception, e: + self.dbconn.commit() + # don't use self.commited_rows here, it's only updated + # after a commit + print "Error: update %d rejected: commiting (read log file %s)" \ + % (self.commits * self.commit_every + self.running_commands, + reject.reject_log) + + reject.log(str(e), input_line) + self.errors += 1 + ok = False + + return ok + + def save_copy_buffer(self, table): + """ save copy buffer to a temporary file for further inspection """ + import tempfile + (f, n) = tempfile.mkstemp(prefix='%s.' % table, + suffix='.pgimport', dir='/tmp') + os.write(f, self.buffer.getvalue()) + os.close(f) + + # systematicaly write about this + print "--- COPY data buffer saved in %s ---" % n + return n + + def copy_from(self, table, columns, input_line, reject, EOF = False): + """ Generate some COPY SQL for PostgreSQL """ + ok = True + if not self.copy: self.copy = True + + if EOF or self.running_commands == self.copy_every \ + and self.buffer is not None: + # time to copy data to PostgreSQL table + + if self.buffer is None: + if VERBOSE: + print "Error: no data to COPY" + return False + + if DEBUG: + self.save_copy_buffer(table) + + self.buffer.seek(0) + now = time.time() + + try: + cursor = self.dbconn.cursor() + r = cursor.copy_from(self.buffer, table, self.copy_sep) + self.dbconn.commit() + + self.commits += 1 + duration = now - self.last_commit_time + self.last_commit_time = now + + print "-- COPY %d: %d rows copied in %5.3fs --" \ + % (self.commits, self.running_commands, duration) + + # prepare next run + self.buffer.close() + self.buffer = None + self.commited_rows += self.running_commands + self.running_commands = 0 + + except psycopg.ProgrammingError, error: + # rollback current transaction + self.dbconn.rollback() + + if VERBOSE: + print 'Notice: COPY error, trying to find on which line' + if not DEBUG: + # in DEBUG mode, copy buffer has already been saved + # to file + self.save_copy_buffer(table) + + # copy recovery process + now = time.time() + c, ok, ko = self.copy_from_buff(table, self.buffer, + self.running_commands, reject) + + duration = now - self.last_commit_time + self.commits += c + self.last_commit_time = now + self.commited_rows += ok + self.errors += ko + + if VERBOSE: + print 'Notice: COPY error recovery done (%d/%d) in %5.3fs'\ + % (ko, ok, duration) + + # commit this transaction + self.dbconn.commit() + + # recovery process has closed the buffer + self.buffer = None + self.running_commands = 0 + + except psycopg.DatabaseError, error: + # non recoverable error + mesg = "\n".join(["Please check PostgreSQL logs", + "HINT: double check your client_encoding" + + " and copy_delimiter settings"]) + raise PGLoader_Error, mesg + + # prepare next run + if self.buffer is None: + self.buffer = StringIO() + + self.prepare_copy_data(columns) + self.running_commands += 1 + return ok + + def copy_from_buff(self, table, buff, count, reject): + """ If copy returned an error, try to detect wrong input line(s) """ + + if count == 1: + reject.log('COPY error on this line', buff.getvalue()) + buff.close() + if DEBUG: + print '--- Notice: found one more line in error' + + # returns commits, ok, ko + return 0, 0, 1 + + ## + # Dichotomy + # we cut the buffer into two buffers, try to copy from them + a = StringIO() + b = StringIO() + n = 0 + m = count / 2 + + # return values, copied lines and errors + commits = ok = ko = 0 + + buff.seek(0) + for line in buff.readlines(): + if n < m: + a.write(line) + else: + b.write(line) + n += 1 + + # we don't need no more orgininal buff + buff.close() + + if DEBUG: + print '--- Trying to find errors, dividing %d lines in %d and %d' \ + % (count, m, n-m) + + # now we have two buffers to copy to PostgreSQL database + cursor = self.dbconn.cursor() + for (x, xcount) in [(a, m), (b, n-m)]: + try: + x.seek(0) + cursor.copy_from(x, table, self.copy_sep) + self.dbconn.commit() + x.close() + + if DEBUG: + print "--- COPY ERROR processing progress: %d rows copied"\ + % (xcount) + + x.close() + commits += 1 + ok += xcount + + except Exception, error: + self.dbconn.commit() + + # if a is only one line long, reject this line + if xcount == 1: + ko += 1 + reject.log('COPY error: %s' % error, x.getvalue()) + if DEBUG: + print '--- Notice: found one more line in error' + print x.getvalue() + + else: + _c, _o, _k = self.copy_from_buff(table, x, xcount, reject) + commits += _c + ok += _o + ko += _k + + return commits, ok, ko + + + def prepare_copy_data(self, columns): + """ add a data line to copy buffer """ + if columns is not None: + first_col = True + + for c in columns: + # default text format COPY delimiter + if not first_col: self.buffer.write(self.copy_sep) + else: first_col = False + + if self.is_null(c): + # null column value: \N + self.buffer.write('\N') + + elif self.is_empty(c): + # empty string has been read + if DEBUG: + print "empty string read: '%s'" % c + self.buffer.write('') + + else: + # for a list of chars to replace, please have a look to + # http://www.postgresql.org/docs/8.1/static/sql-copy.html + if INPUT_ENCODING is not None: + try: + c = c.encode(INPUT_ENCODING) + except UnicodeDecodeError, e: + reject.log(['Codec error', str(e)], input_line) + + # in _split_line we remove delimiter escaping + # in order for backslash escaping not to de-escape it + # we then have to escape delimiters explicitely now + for orig, escaped in [('\\', '\\\\'), + (self.copy_sep, + '\\%s' % self.copy_sep), + ('\b', '\\b'), + ('\f', '\\f'), + ('\n', '\\n'), + ('\r', '\\r'), + ('\t', '\\t'), + ('\v', '\\v')]: + c = c.replace(orig, escaped) + + self.buffer.write(c) + + # end of row, \n + self.buffer.write('\n') diff --git a/pgloader/lo.py b/pgloader/lo.py new file mode 100644 index 0000000..5cfe801 --- /dev/null +++ b/pgloader/lo.py @@ -0,0 +1,109 @@ +# -*- coding: ISO-8859-15 -*- +# Author: Dimitri Fontaine +# +# pgloader Large Object support + +from cStringIO import StringIO +from tools import PGLoader_Error +from options import DRY_RUN, VERBOSE, DEBUG, PEDANTIC +from options import INPUT_ENCODING + +class ifx_lo: + """ an Informix Large Object data file as given by UNLOAD """ + + def __del__(self): + """ close self.file on object destruction """ + self.file.close() + + def extract(self, rowid, field_nb, begin, length): + """ extract given positionned data from Informix Clob out file, save + them in a file and returns the filename where text is stored """ + + # get stream position and data length + begin = long(begin, 16) + length = long(length, 16) + + # get blob data + self.file.seek(begin) + try: + content = self.file.read(length) + except UnicodeDecodeError, e: + # as of now, this is a fatal error + print + print 'Fatal error in clob file %s at position Ox%x' \ + % (self.filename, begin) + raise PGImport_Error, e + + return content + +class ifx_clob(ifx_lo): + """ Informix Text Large Object file """ + + def __init__(self, filename): + """ init a clob object """ + self.file = None + self.filename = filename + + if self.file is None: + if INPUT_ENCODING is not None: + self.file = codecs.open(self.filename, 'r', + encoding = INPUT_ENCODING) + else: + self.file = open(self.filename, 'r') + + if VERBOSE: + print "Notice: Opening informix clob file:", self.filename + +class ifx_blob(ifx_lo): + """ Informix Binary Large Object file """ + + def __init__(self, filename, field_sep): + """ init a clob object """ + self.file = None + self.filename = filename + self.field_sep = field_sep # used by bytea_escape + + # some helpers for bytea escaping + self.octals = range(0, 32) + self.octals += range(127, 256) + + if self.file is None: + self.file = open(self.filename, 'rb') + if VERBOSE: + print "Notice: Opening informix blob file:", self.filename + + def bytea_escape(self, bitstring): + """ escape chars from bitstring for PostgreSQL bytea input + see http://www.postgresql.org/docs/8.1/static/datatype-binary.html + """ + escaped = StringIO() + bsize = len(bitstring) + pos = 0 + + while pos < bsize: + c = bitstring[pos] + o = ord(c) + + if o in self.octals or c == self.field_sep: + # PostgreSQL wants octal numbers! + escaped.write('\\%03o' % o) + elif o == 39: + escaped.write('\\047') + elif o == 92: + escaped.write('\\134') + else: + escaped.write(c) + + pos += 1 + + r = escaped.getvalue() + escaped.close() + + return r + + def extract(self, rowid, field_nb, begin, length): + """ extract content, then bytea escape it """ + + content = ifx_lo.extract(self, rowid, field_nb, begin, length) + return self.bytea_escape(content) + diff --git a/pgloader/options.py b/pgloader/options.py new file mode 100644 index 0000000..21ea743 --- /dev/null +++ b/pgloader/options.py @@ -0,0 +1,30 @@ +# -*- coding: ISO-8859-15 -*- +# Author: Dimitri Fontaine +# +# Some common options, for each module to get them + +INPUT_ENCODING = None +PG_CLIENT_ENCODING = 'latin9' + +COPY_SEP = None +FIELD_SEP = '|' +CLOB_SEP = ',' +NULL = '' +EMPTY_STRING = '\ ' + +NEWLINE_ESCAPES = None + +DEBUG = False +VERBOSE = False +DRY_RUN = False +PEDANTIC = False + +TRUNCATE = False +VACUUM = False + +COUNT = None +FROM_COUNT = None +FROM_ID = None + + + diff --git a/pgloader/pgloader.py b/pgloader/pgloader.py new file mode 100644 index 0000000..d729574 --- /dev/null +++ b/pgloader/pgloader.py @@ -0,0 +1,707 @@ +# -*- coding: ISO-8859-15 -*- +# Author: Dimitri Fontaine +# +# pgloader main class +# +# handles configuration, parse data, then pass them to database module for +# COPY preparation + +import os, sys, os.path, time, codecs +from cStringIO import StringIO + +from tools import PGLoader_Error, Reject, parse_config_string +from db import db +from lo import ifx_clob, ifx_blob + +from options import DRY_RUN, VERBOSE, DEBUG, PEDANTIC +from options import TRUNCATE, VACUUM +from options import COUNT, FROM_COUNT, FROM_ID +from options import INPUT_ENCODING, PG_CLIENT_ENCODING +from options import COPY_SEP, FIELD_SEP, CLOB_SEP, NULL, EMPTY_STRING +from options import NEWLINE_ESCAPES + +class PGLoader: + """ + PGLoader reads some data file and depending on ts configuration, + import data with COPY or update blob data with UPDATE. + """ + + def __init__(self, name, config, db): + """ Init with a configuration section """ + # Some settings + self.name = name + self.db = db + + self.index = None + self.columns = None + self.blob_cols = None + + self.config_errors = 0 + self.errors = 0 + self.updates = 0 + self.init_time = time.time() + + # we may have to open several clob files while parsing the + # unload data file, hence we keep track of them all + self.blobs = {} + + if VERBOSE: + print + print "[%s] parse configuration" % self.name + + # some configuration elements don't have default value + for opt in ('table', 'filename'): + if config.has_option(name, opt): + self.__dict__[opt] = config.get(name, opt) + else: + print 'Error: please configure %s.%s' % (name, opt) + self.config_errors += 1 + + ## + # reject log and data files defaults to /tmp/
.rej[.log] + if config.has_option(name, 'reject_log'): + self.reject_log = config.get(name, 'reject_log') + else: + self.reject_log = os.path.join('/tmp', '%s.rej.log' % name) + if VERBOSE: + print 'Notice: reject log in %s' % self.reject_log + + if config.has_option(name, 'reject_data'): + self.reject_data = config.get(name, 'reject_data') + else: + self.reject_data = os.path.join('/tmp', '%s.rej' % name) + if VERBOSE: + print 'Notice: rejected data in %s' % self.reject_data + + # reject logging + self.reject = Reject(self.reject_log, self.reject_data) + + + # optionnal number of columns per line + self.field_count = None + if config.has_option(name, 'field_count'): + self.field_count = config.getint(name, 'field_count') + + # optionnal field separator + self.field_sep = FIELD_SEP + if config.has_option(name, 'field_sep'): + self.field_sep = config.get(name, 'field_sep') + + if not DRY_RUN: + if self.db.copy_sep is None: + self.db.copy_sep = self.field_sep + + # optionnal trailing separator option + self.trailing_sep = False + if config.has_option(name, 'trailing_sep'): + self.trailing_sep = config.get(name, 'trailing_sep') == 'True' + + # optionnal null and empty_string per table parameters + if config.has_option(name, 'null'): + self.db.null = parse_config_string(config.get(name, 'null')) + else: + self.db.null = NULL + + if config.has_option(name, 'empty_string'): + self.db.empty_string = parse_config_string( + config.get(name, 'empty_string')) + else: + self.db.empty_string = EMPTY_STRING + + # optionnal local option client_encoding + if config.has_option(name, 'client_encoding'): + self.db.client_encoding = parse_config_string( + config.get(name, 'client_encoding')) + + if DEBUG: + print "null: '%s'" % self.db.null + print "empty_string: '%s'" % self.db.empty_string + print "client_encoding: '%s'" % self.db.client_encoding + + ## + # we parse some columns definitions + if config.has_option(name, 'index'): + self._parse_fields('index', config.get(name, 'index')) + + if config.has_option(name, 'columns'): + self._parse_fields('columns', config.get(name, 'columns')) + + if config.has_option(name, 'blob_columns'): + self._parse_fields('blob_cols', + config.get(name, 'blob_columns'), + btype = True) + + if DEBUG: + print 'index', self.index + print 'columns', self.columns + print 'blob_columns', self.blob_cols + + # optionnal newline escaped option + self.newline_escapes = [] + if config.has_option(name, 'newline_escapes'): + if NEWLINE_ESCAPES is not None: + # this parameter is globally set, will ignore local + # definition + print "Warning: ignoring %s newline_escapes option" % name + print " option is set to '%s' globally" \ + % NEWLINE_ESCAPES + else: + self._parse_fields('newline_escapes', + config.get(name, 'newline_escapes'), + argtype = 'char') + + if NEWLINE_ESCAPES is not None: + # set NEWLINE_ESCAPES for each table column + self.newline_escapes = [(a, NEWLINE_ESCAPES) + for (a, x) in self.columns] + + ## + # How can we mix those columns definitions ? + # - we want to load table data with COPY + # - we want to load blob data into text or bytea field, with COPY + # - we want to do both + + if self.columns is None and self.blob_cols is None: + # nothing to do ? perfect, done + self.reject.log( + "Error: in section '%s': " % self.name +\ + "please configure some work to do (columns, blob_cols)") + self.config_errors += 1 + + if self.blob_cols is not None and self.index is None: + # if you want to load blobs, UPDATE need indexes + # is this error still necessary? + self.reject.log( + "Error: in section '%s': " % self.name +\ + "for blob importing (blob_cols), please configure index") + self.config_errors += 1 + + ## + # We have for example columns = col1:2, col2:1 + # this means the order of input columns is not the same as the + # awaited order of COPY, so we want a mapping index, here [2, 1] + if self.columns is not None: + self.col_mapping = [i for (c, i) in self.columns] + + ## + # if options.fromid is not None it has to be either a value, + # when index is single key or a dict in a string, when index + # is a multiple key + global FROM_ID + if FROM_ID is not None: + if len(self.index) > 1: + # we have to evaluate given string and see if it is a + # dictionnary + try: + d = {} + ids = [x.strip() for x in FROM_ID.split(',')] + for id in ids: + k, v = id.split(':') + d[k] = v + + FROM_ID = d + except Exception, e: + self.reject.log( + 'Error: unable to parse given key %s' % FROM_ID) + raise PGLoader_Error + + print 'Notice: composite key found, -I evaluated to %s' % FROM_ID + + if self.config_errors > 0: + mesg = ['Configuration errors for section %s' % self.name, + 'Please see reject log file %s' % self.reject_log] + raise PGLoader_Error, '\n'.join(mesg) + + # Now reset database connection + if not DRY_RUN: + self.db.reset() + + def _parse_fields(self, attr, str, btype = False, argtype = 'int'): + """ parse the user string str for fields definition to store + into self.attr """ + + def __getarg(arg, argtype): + """ return arg depending on its type """ + if argtype == 'int': + # arg is the target column index + try: + arg = int(arg) + except ValueError: + raise PGLoader_Error + + elif argtype == 'char': + # arg is an escape char + if len(arg) > 1: + raise PGLoader_Error + + return arg + + f = self.__dict__[attr] = [] + + try: + for field_def in str.split(','): + properties = [x.strip() for x in field_def.split(':')] + + if not btype: + # normal column definition, for COPY usage + colname, arg = properties + f.append((colname, __getarg(arg, argtype))) + + else: + # blob column definition, with blob type, for + # UPDATE usage + colname, arg, btype = properties + f.append((colname, __getarg(arg, argtype), btype)) + + except Exception, error: + # FIXME: make some errors and write some error messages + raise + + def _split_line(self, line): + """ split given line and returns a columns list """ + last_sep = 0 + columns = [] + pos = line.find(self.field_sep, last_sep) + + while pos != -1: + # don't consider backslash escaped separators melted into data + # warning, we may find some data|\\|data + # that is some escaped \ then a legal | separator + i=1 + while pos-i >= 0 and line[pos-i] == '\\': + i += 1 + + # now i-1 is the number of \ preceding the separator + # it's a legal separator only if i-1 is even + if (i-1) % 2 == 0: + # there's no need to keep escaped delimiters inside a column + # and we want to avoid double-escaping them + columns.append(line[last_sep:pos] + .replace("\\%s" % self.field_sep, + self.field_sep)) + last_sep = pos + 1 + + pos = line.find(self.field_sep, pos + 1) + + # append last column + columns.append(line[last_sep:]) + return columns + + def _rowids(self, columns): + """ get rowids for given input line """ + rowids = {} + try: + for id_name, id_col in self.index: + rowids[id_name] = columns[id_col - 1] + + except IndexError, e: + messages = [ + "Warning: couldn't get id %d on column #%d" % (id_name, + id_col), + str(e) + ] + + self.reject.log(messages, line) + + return rowids + + def summary(self): + """ return a (duration, updates, errors) tuple """ + self.duration = time.time() - self.init_time + + if self.reject is not None: + self.errors = self.reject.errors + + if self.db is not None: + self.updates = self.db.commited_rows + + return (self.duration, self.updates, self.errors) + + def print_stats(self): + """ print out some statistics """ + + if self.reject is not None: + self.errors = self.reject.errors + self.reject.print_stats() + + if self.db is not None: + self.updates = self.db.commited_rows + self.db.print_stats() + return + + def run(self): + """ depending on configuration, do given job """ + + # Announce the beginning of the work + print "[%s] data import" % self.name + + if TRUNCATE and not DRY_RUN: + self.db.truncate(self.table) + + if self.columns is not None: + print "Notice: COPY csv data" + self.csv_import() + + elif self.blob_cols is not None: + # elif: COPY process also blob data + print "Notice: UPDATE blob data" + + # then show up some stats + self.print_stats() + + def csv_import(self): + """ import CSV data, using COPY """ + + for line, columns in self.read_data(): + if self.blob_cols is not None: + columns, rowids = self.read_blob(line, columns) + + if DEBUG: + print self.col_mapping + print len(columns), len(self.col_mapping) + + if False and VERBOSE: + print line + for i in [37, 44, 52, 38]: + print len(columns[i-1]), columns[i-1] + print + + ## + # Now we have to reorder the columns to match schema + c_ordered = [columns[i-1] for i in self.col_mapping] + + if DRY_RUN or DEBUG: + print line + print c_ordered + print len(c_ordered) + print + + if not DRY_RUN: + self.db.copy_from(self.table, c_ordered, line, self.reject) + + if not DRY_RUN: + # we may need a last COPY for the rest of data + self.db.copy_from(self.table, None, None, self.reject, EOF = True) + + return + + def lo_import(self): + """ import large object data, using UPDATEs """ + + for line, columns in self.read_data(): + ## + # read blob data and replace its PG escaped form into columns + columns, rowids = self.read_blob(line, columns) + + # and insert it into database + if DRY_RUN or DEBUG: + print line + print columns + print + + if not DRY_RUN: + self.db.insert_blob(self.table, self.index, + rowids, cname, data, btype, + line, self.reject) + + def _chomp(self, input_line): + """ chomp end of line when necessary, and trailing_sep too """ + + if len(input_line) == 0: + if DEBUG: + print 'pgloader._chomp: skipping empty line' + return input_line + + # chomp a copy of the input_line, we will need the original one + line = input_line[:] + + if line[-2:] == "\r\n": + line = line[:-2] + + elif line[-1] == "\r": + line = line[:-1] + + elif line[-1] == "\n": + line = line[:-1] + + # trailing separator to whipe out ? + if self.trailing_sep \ + and line[-len(self.field_sep)] == self.field_sep: + + line = line[:-len(self.field_sep)] + + return line + + def _escape_newlines(self, columns): + """ trim out newline escapes to be found inside data columns """ + if DEBUG: + print 'Debug: escaping columns newlines' + print 'Debug:', self.newline_escapes + + for (ne_col, ne_esc) in self.newline_escapes: + # don't forget configured col references use counting from 1 + ne_colnum = dict(self.columns)[ne_col] - 1 + if DEBUG: + print 'Debug: column %s[%d] escaped with %s' \ + % (ne_col, ne_colnum+1, ne_esc) + + col_data = columns[ne_colnum] + + if self.db.is_null(col_data) or self.db.is_empty(col_data): + if DEBUG: + print 'Debug: skipping null or empty column' + continue + + escaped = [] + tmp = col_data + + for line in tmp.split('\n'): + if len(line) == 0: + if DEBUG: + print 'Debug: skipping empty line' + continue + + if DEBUG: + print 'Debug: chomping:', line + + tmpline = self._chomp(line) + if tmpline[-1] == ne_esc: + tmpline = tmpline[:-1] + + # chomp out only escaping char, not newline itself + escaped.append(line[:len(tmpline)] + \ + line[len(tmpline)+1:]) + + else: + # line does not end with escaping char, keep it + escaped.append(line) + + columns[ne_colnum] = '\n'.join(escaped) + return columns + + def read_data(self): + """ read data from configured file, and generate (yields) for + each data line: line, columns and rowid """ + + # temporary feature for controlling when to begin real inserts + # if first time launch, set to True. + input_buffer = StringIO() + nb_lines = 0 + begin_linenb = None + nb_plines = 0 + + ## + # if neither -I nor -F was used, we can state that begin = 0 + if FROM_ID is None and FROM_COUNT == 0: + if VERBOSE: + print 'Notice: beginning on first line' + begin_linenb = 1 + + if INPUT_ENCODING is not None: + try: + fd = codecs.open(self.filename, encoding = INPUT_ENCODING) + except LookupError, e: + # codec not found + raise PGLoader_Error, "Input codec: %s" % e + else: + try: + fd = open(self.filename) + except IOError, error: + raise PGLoader_Error, error + + for line in fd: + # we count real physical lines + nb_plines += 1 + + if INPUT_ENCODING is not None: + # this may not be necessary, after all + try: + line = line.encode(INPUT_ENCODING) + except UnicodeDecodeError, e: + reject.log(['Codec error', str(e)], input_line) + continue + + if self.field_count is not None: + input_buffer.write(line) + # act as if this were the last input_buffer for this line + tmp = self._chomp(input_buffer.getvalue()) + columns = self._split_line(tmp) + nb_cols = len(columns) + + # check we got them all if not and field_count was + # given, we have a multi-line input + if nb_cols < self.field_count: + continue + else: + # we have read all the logical line + line = tmp + input_buffer.close() + input_buffer = StringIO() + + if nb_cols != self.field_count: + if DEBUG: + print line + print columns + print + self.reject.log( + 'Error parsing columns on line ' +\ + '%d [row %d]: found %d columns' \ + % (nb_plines, nb_lines, nb_cols), line) + else: + # normal operation mode : one physical line is one + # logical line. we didn't split input line yet + line = self._chomp(line) + nb_cols = None + columns = None + + if len(line) == 0: + # skip empty lines + continue + + # we count logical lines + nb_lines += 1 + + ## + # if -F is used, count lines to skip, and skip them + if FROM_COUNT > 0: + if nb_lines < FROM_COUNT: + continue + + if nb_lines == FROM_COUNT: + begin_linenb = nb_lines + if VERBOSE: + print 'Notice: reached beginning on line %d' % nb_lines + + ## + # check for beginning if option -I was used + if FROM_ID is not None: + if columns is None: + columns = self._split_line(line) + + rowids = self._rowids(columns) + + if FROM_ID == rowids: + begin_linenb = nb_lines + if VERBOSE: + print 'Notice: reached beginning on line %d' % nb_lines + + elif begin_linenb is None: + # begin is set to 1 when we don't use neither -I nor -F + continue + + if COUNT is not None and begin_linenb is not None \ + and (nb_lines - begin_linenb + 1) > COUNT: + + if VERBOSE: + print 'Notice: reached line %d, stopping' % nb_lines + break + + if columns is None: + columns = self._split_line(line) + + if DEBUG: + print 'Debug: read data' + + # now, we may have to apply newline_escapes on configured columns + if NEWLINE_ESCAPES or self.newline_escapes != []: + columns = self._escape_newlines(columns) + + nb_cols = len(columns) + if nb_cols != len(self.columns): + if DEBUG: + print line + print columns + print + + msg = 'Error parsing columns on line ' +\ + '%d [row %d]: found %d columns' \ + % (nb_plines, nb_lines, nb_cols) + + self.reject.log(msg, line) + continue + + yield line, columns + + + def read_blob(self, line, columns): + """ read blob data and replace columns values with PG escape + data to give as input to COPY or UPDATE """ + + # cache rowids, a single line ids will be the same whatever + # the number of clob entries are configured + rowids = None + + for (cname, c, btype) in self.blob_cols: + try: + blob_col = c-1 + blob = columns[blob_col].strip() + except Exception, e: + m = [ + "Warning: couldn't get '%s' column (#%d)" % (cname, c), + str(e) + ] + try: + self.reject.log(m, line) + except PGLoader_Error: + # pedantic setting, but we want to continue + continue + + if blob != "": + try: + (begin, length, blobname) = blob.split(CLOB_SEP) + + except Exception, e: + m = ["Warning: column '%s'" % cname + \ + " is not a valid blob reference: %s" % blob, + str(e) + ] + self.reject.log(m, line) + continue + + # Informix sometimes output 0,0,0 as blob reference + if begin == length == blobname == "0": + columns[blob_col] = '' + continue + + abs_blobname = os.path.join(os.path.dirname(self.filename), + blobname) + + # We want to manage existing blob file + if not os.access(abs_blobname, os.R_OK): + self.reject.log( + "Warning: Can't read blob file %s" % abs_blobname, + line) + + if abs_blobname not in self.blobs: + + if btype not in ['ifx_blob', 'ifx_clob']: + msg = "Error: Blob type '%s' not supported" % mode + raise PGLoader_Error, msg + + elif btype == 'ifx_blob': + self.blobs[abs_blobname] = ifx_blob(abs_blobname, + self.field_sep) + + elif btype == 'ifx_clob': + self.blobs[abs_blobname] = ifx_clob(abs_blobname) + + blob = self.blobs[abs_blobname] + + # we now need to get row id(s) from ordered columns + if rowids is None: + rowids = self._rowids(columns) + + # get data from blob object + data = blob.extract(rowids, c, begin, length) + columns[blob_col] = data + + if DEBUG: + print 'Debug: read blob data' + print rowids + print data + print + + # no we have read all defined blob data, returns new columns + return columns, rowids + + diff --git a/pgloader/tools.py b/pgloader/tools.py new file mode 100644 index 0000000..7e801d2 --- /dev/null +++ b/pgloader/tools.py @@ -0,0 +1,118 @@ +# -*- coding: ISO-8859-15 -*- +# Author: Dimitri Fontaine +# +# pgloader librairies + +import os, sys, os.path, time, codecs +from cStringIO import StringIO + +from options import DRY_RUN, VERBOSE, DEBUG, PEDANTIC + +class PGLoader_Error(Exception): + """ Internal pgloader processing error """ + pass + +class Reject: + """ We log rejects into two files, reject_log and reject_data + + reject_log contains some error messages and reasons + reject_data contains input lines which this tool couldn't manage + """ + + def __init__(self, reject_log, reject_data): + """ Constructor, with file names """ + self.reject_log = reject_log + self.reject_data = reject_data + + # we will open files on first error + self.errors = 0 + + def print_stats(self): + """ give a summary """ + if DRY_RUN: + return + + if self.errors == 0: + print "## No data were rejected" + else: + print "## %d errors found into data" % self.errors + print " please read %s for errors log" % self.reject_log + print " and %s for data still to process" % self.reject_data + + def log(self, messages, data = None): + """ log the messages into reject_log, and the data into reject_data + + We open the file on each request, cause we supose errors to be + rare while the import process will take a long time. + """ + + if self.errors == 0: + try: + fd_log = open(self.reject_log, 'wb+') + fd_data = open(self.reject_data, 'wb+') + except IOError, error: + raise PGLoader_Error, error + else: + fd_log = open(self.reject_log, 'ab+') + fd_data = open(self.reject_data, 'ab+') + + # message has to be either a string or a list of strings + if type(messages) == type("string"): + error = messages + "\n" + fd_log.write(error) + + if PEDANTIC and not VERBOSE: + # (write the message just once) + sys.stderr.write(error) + + else: + error = None + for m in messages: + if error is None: error = m + m += "\n" + fd_log.write(m) + + if PEDANTIC: + sys.stderr.write(m) + + # add a separation line between log entries + fd_log.write("\n") + + # data has to be a string, a single input line or None + # the input line is not chomped + if data is not None: + fd_data.write(data) + + # now we close the two fds + for f in [fd_log, fd_data]: + f.flush() + f.close() + + self.errors += 1 + + if PEDANTIC: + raise PGLoader_Error, error + + +def parse_config_string(str): + """ parse a config string + + used for null and empty_string elements + null = "" + empty_string = "\ " + + this would result in null =="" and empty_string == '"\ "', + which is not what we want. + """ + + if len(str) > 2: + if (str[0] == str[-1] == '"') \ + or (str[0] == str[-1] == "'"): + # we have a param = "foo" configuration, we want to return only + # the foo + return str[1:-1] + + return str + + +