Adding pgloader python package and modules

This commit is contained in:
dim 2006-11-21 12:24:57 +00:00
parent 8ed1e0ff2c
commit 7eed610ecb
6 changed files with 1430 additions and 0 deletions

0
pgloader/__init__.py Normal file
View File

466
pgloader/db.py Normal file
View File

@ -0,0 +1,466 @@
# -*- coding: ISO-8859-15 -*-
# Author: Dimitri Fontaine <dimitri@dalibo.com>
#
# pgloader database connection handling
# COPY dichotomy on error
import os, sys, os.path, time, codecs
from cStringIO import StringIO
from options import DRY_RUN, VERBOSE, DEBUG, PEDANTIC
from options import TRUNCATE, VACUUM
from options import INPUT_ENCODING, PG_CLIENT_ENCODING
from options import COPY_SEP, FIELD_SEP, CLOB_SEP, NULL, EMPTY_STRING
from tools import PGLoader_Error
try:
import psycopg2.psycopg1 as psycopg
except ImportError:
if VERBOSE:
print 'No psycopg2 module found, trying psycopg1'
import psycopg
class db:
""" a db connexion and utility class """
def __init__(self,
host, port, base, user, passwd,
client_encoding = PG_CLIENT_ENCODING,
copy_every = 10000, commit_every = 1000, connect = True):
""" Connects to the specified database """
self.dbconn = None
self.dsn = "host=%s port=%d user=%s dbname=%s password=%s" \
% (host, port, user, base, passwd)
self.connect = "-h %s -p %s -U %s" % (host, port, user)
self.base = base
# those parameters can be overwritten after db init
# here's their default values
self.copy_sep = COPY_SEP
self.copy_every = copy_every
self.commit_every = commit_every
self.client_encoding = client_encoding
self.null = NULL
self.empty_string = EMPTY_STRING
if connect:
self.reset()
def __del__(self):
""" db object destructor, we have to close the db connection """
if self.dbconn is None:
return
if self.running_commands > 0:
self.dbconn.commit()
self.commits += 1
self.commited_rows += self.running_commands
if self.dbconn is not None:
self.dbconn.close()
def set_encoding(self):
""" set connection encoding to self.client_encoding """
if DEBUG:
# debug only cause reconnecting happens on every
# configured section
print 'Setting client encoding to %s' % self.client_encoding
sql = 'set session client_encoding to %s'
cursor = self.dbconn.cursor()
cursor.execute(sql, [self.client_encoding])
cursor.close()
def reset(self):
""" reset internal counters and open a new database connection """
self.buffer = None
self.copy = None # flag set to True when copy is called
self.errors = 0
self.commits = 0
self.commited_rows = 0
self.running_commands = 0
self.last_commit_time = time.time()
self.first_commit_time = self.last_commit_time
if DEBUG:
if self.dbconn is not None:
print 'Debug: closing current connection'
self.dbconn.close()
if DEBUG:
print 'Debug: connecting to dns %s' % self.dsn
self.dbconn = psycopg.connect(self.dsn)
self.set_encoding()
def print_stats(self):
""" output some stats about recent activity """
d = time.time() - self.first_commit_time
u = self.commited_rows
c = self.commits
print "## %d updates in %d commits took %5.3f seconds" % (u, c, d)
if self.errors > 0:
print "## %d database errors occured" % self.errors
if self.copy and not VACUUM:
print "## Please do VACUUM your database to recover space"
else:
if u > 0:
print "## No database error occured"
return
def is_null(self, value):
""" return true if value is null, per configuration """
return value == self.null
def is_empty(self, value):
""" return true if value is empty, per configuration """
return value == self.empty_string
def truncate(self, table):
""" issue an SQL TRUNCATE TABLE on given table """
if DRY_RUN:
if VERBOSE:
print "Notice: won't truncate tables on dry-run mode"
return
sql = "TRUNCATE TABLE %s;" % table
if VERBOSE:
print 'Notice: %s' % sql
try:
cursor = self.dbconn.cursor()
cursor.execute(sql)
self.dbconn.commit()
except Exception, error:
if VERBOSE:
print error
raise PGLoader_Error, "Couldn't truncate table %s" % table
def vacuum(self):
""" issue an vacuumdb -fvz database """
if DRY_RUN:
if VERBOSE:
print
print 'Notice: no vacuum in dry-run mode'
return -1
command = "/usr/bin/vacuumdb %s -fvz %s 2>&1" \
% (self.connect, self.base)
if VERBOSE:
print command
out = os.popen(command)
for line in out.readlines():
if DEBUG:
# don't print \n
print line[:-1]
return out.close()
def insert_blob(self, table, index, rowids,
blob_cname, data, btype,
input_line, reject):
""" insert the given blob content into postgresql table
return True on success, False on error
"""
ok = True
sql = ""
if btype == 'ifx_clob':
data = data.replace("'", "\\'")
sql = "UPDATE %s SET %s = %%s WHERE " % (table, blob_cname)
elif btype == 'ifx_blob':
data = data.tostring()
sql = "UPDATE %s SET %s = %%s::bytea WHERE " % (table, blob_cname)
values = [data]
##
# Add a WHERE clause for each index
first = True
for name, col in index:
if not first: sql += " AND "
else: first = False
sql += "%s = %%s" % name
values.append(rowids[name])
sql += ";"
if DEBUG:
print 'Debug: %s' % sql
try:
cursor = self.dbconn.cursor()
cursor.execute(sql, values)
# if execute raise an exception, don't count it as a
# running command (waiting a commit)
self.running_commands += 1
if VERBOSE:
str_rowids = ""
for i,v in rowids.items():
if str_rowids != "": str_rowids += ", "
str_rowids += "%s:%s" % (i, v)
print '%s %s %s %6do' \
% (table, str_rowids, blob_cname, len(data))
if self.running_commands == self.commit_every:
now = time.time()
self.dbconn.commit()
self.commits += 1
duration = now - self.last_commit_time
self.last_commit_time = now
print "-- commit %d: %d updates in %5.3fs --" \
% (self.commits, self.running_commands, duration)
self.commited_rows += self.running_commands
self.running_commands = 1
except KeyboardInterrupt, error:
# C-c was pressed, please stop processing
self.dbconn.commit()
raise PGLoader_Error, "Aborting on user demand (Interrupt)"
except Exception, e:
self.dbconn.commit()
# don't use self.commited_rows here, it's only updated
# after a commit
print "Error: update %d rejected: commiting (read log file %s)" \
% (self.commits * self.commit_every + self.running_commands,
reject.reject_log)
reject.log(str(e), input_line)
self.errors += 1
ok = False
return ok
def save_copy_buffer(self, table):
""" save copy buffer to a temporary file for further inspection """
import tempfile
(f, n) = tempfile.mkstemp(prefix='%s.' % table,
suffix='.pgimport', dir='/tmp')
os.write(f, self.buffer.getvalue())
os.close(f)
# systematicaly write about this
print "--- COPY data buffer saved in %s ---" % n
return n
def copy_from(self, table, columns, input_line, reject, EOF = False):
""" Generate some COPY SQL for PostgreSQL """
ok = True
if not self.copy: self.copy = True
if EOF or self.running_commands == self.copy_every \
and self.buffer is not None:
# time to copy data to PostgreSQL table
if self.buffer is None:
if VERBOSE:
print "Error: no data to COPY"
return False
if DEBUG:
self.save_copy_buffer(table)
self.buffer.seek(0)
now = time.time()
try:
cursor = self.dbconn.cursor()
r = cursor.copy_from(self.buffer, table, self.copy_sep)
self.dbconn.commit()
self.commits += 1
duration = now - self.last_commit_time
self.last_commit_time = now
print "-- COPY %d: %d rows copied in %5.3fs --" \
% (self.commits, self.running_commands, duration)
# prepare next run
self.buffer.close()
self.buffer = None
self.commited_rows += self.running_commands
self.running_commands = 0
except psycopg.ProgrammingError, error:
# rollback current transaction
self.dbconn.rollback()
if VERBOSE:
print 'Notice: COPY error, trying to find on which line'
if not DEBUG:
# in DEBUG mode, copy buffer has already been saved
# to file
self.save_copy_buffer(table)
# copy recovery process
now = time.time()
c, ok, ko = self.copy_from_buff(table, self.buffer,
self.running_commands, reject)
duration = now - self.last_commit_time
self.commits += c
self.last_commit_time = now
self.commited_rows += ok
self.errors += ko
if VERBOSE:
print 'Notice: COPY error recovery done (%d/%d) in %5.3fs'\
% (ko, ok, duration)
# commit this transaction
self.dbconn.commit()
# recovery process has closed the buffer
self.buffer = None
self.running_commands = 0
except psycopg.DatabaseError, error:
# non recoverable error
mesg = "\n".join(["Please check PostgreSQL logs",
"HINT: double check your client_encoding" +
" and copy_delimiter settings"])
raise PGLoader_Error, mesg
# prepare next run
if self.buffer is None:
self.buffer = StringIO()
self.prepare_copy_data(columns)
self.running_commands += 1
return ok
def copy_from_buff(self, table, buff, count, reject):
""" If copy returned an error, try to detect wrong input line(s) """
if count == 1:
reject.log('COPY error on this line', buff.getvalue())
buff.close()
if DEBUG:
print '--- Notice: found one more line in error'
# returns commits, ok, ko
return 0, 0, 1
##
# Dichotomy
# we cut the buffer into two buffers, try to copy from them
a = StringIO()
b = StringIO()
n = 0
m = count / 2
# return values, copied lines and errors
commits = ok = ko = 0
buff.seek(0)
for line in buff.readlines():
if n < m:
a.write(line)
else:
b.write(line)
n += 1
# we don't need no more orgininal buff
buff.close()
if DEBUG:
print '--- Trying to find errors, dividing %d lines in %d and %d' \
% (count, m, n-m)
# now we have two buffers to copy to PostgreSQL database
cursor = self.dbconn.cursor()
for (x, xcount) in [(a, m), (b, n-m)]:
try:
x.seek(0)
cursor.copy_from(x, table, self.copy_sep)
self.dbconn.commit()
x.close()
if DEBUG:
print "--- COPY ERROR processing progress: %d rows copied"\
% (xcount)
x.close()
commits += 1
ok += xcount
except Exception, error:
self.dbconn.commit()
# if a is only one line long, reject this line
if xcount == 1:
ko += 1
reject.log('COPY error: %s' % error, x.getvalue())
if DEBUG:
print '--- Notice: found one more line in error'
print x.getvalue()
else:
_c, _o, _k = self.copy_from_buff(table, x, xcount, reject)
commits += _c
ok += _o
ko += _k
return commits, ok, ko
def prepare_copy_data(self, columns):
""" add a data line to copy buffer """
if columns is not None:
first_col = True
for c in columns:
# default text format COPY delimiter
if not first_col: self.buffer.write(self.copy_sep)
else: first_col = False
if self.is_null(c):
# null column value: \N
self.buffer.write('\N')
elif self.is_empty(c):
# empty string has been read
if DEBUG:
print "empty string read: '%s'" % c
self.buffer.write('')
else:
# for a list of chars to replace, please have a look to
# http://www.postgresql.org/docs/8.1/static/sql-copy.html
if INPUT_ENCODING is not None:
try:
c = c.encode(INPUT_ENCODING)
except UnicodeDecodeError, e:
reject.log(['Codec error', str(e)], input_line)
# in _split_line we remove delimiter escaping
# in order for backslash escaping not to de-escape it
# we then have to escape delimiters explicitely now
for orig, escaped in [('\\', '\\\\'),
(self.copy_sep,
'\\%s' % self.copy_sep),
('\b', '\\b'),
('\f', '\\f'),
('\n', '\\n'),
('\r', '\\r'),
('\t', '\\t'),
('\v', '\\v')]:
c = c.replace(orig, escaped)
self.buffer.write(c)
# end of row, \n
self.buffer.write('\n')

109
pgloader/lo.py Normal file
View File

@ -0,0 +1,109 @@
# -*- coding: ISO-8859-15 -*-
# Author: Dimitri Fontaine <dimitri@dalibo.com>
#
# pgloader Large Object support
from cStringIO import StringIO
from tools import PGLoader_Error
from options import DRY_RUN, VERBOSE, DEBUG, PEDANTIC
from options import INPUT_ENCODING
class ifx_lo:
""" an Informix Large Object data file as given by UNLOAD """
def __del__(self):
""" close self.file on object destruction """
self.file.close()
def extract(self, rowid, field_nb, begin, length):
""" extract given positionned data from Informix Clob out file, save
them in a file and returns the filename where text is stored """
# get stream position and data length
begin = long(begin, 16)
length = long(length, 16)
# get blob data
self.file.seek(begin)
try:
content = self.file.read(length)
except UnicodeDecodeError, e:
# as of now, this is a fatal error
print
print 'Fatal error in clob file %s at position Ox%x' \
% (self.filename, begin)
raise PGImport_Error, e
return content
class ifx_clob(ifx_lo):
""" Informix Text Large Object file """
def __init__(self, filename):
""" init a clob object """
self.file = None
self.filename = filename
if self.file is None:
if INPUT_ENCODING is not None:
self.file = codecs.open(self.filename, 'r',
encoding = INPUT_ENCODING)
else:
self.file = open(self.filename, 'r')
if VERBOSE:
print "Notice: Opening informix clob file:", self.filename
class ifx_blob(ifx_lo):
""" Informix Binary Large Object file """
def __init__(self, filename, field_sep):
""" init a clob object """
self.file = None
self.filename = filename
self.field_sep = field_sep # used by bytea_escape
# some helpers for bytea escaping
self.octals = range(0, 32)
self.octals += range(127, 256)
if self.file is None:
self.file = open(self.filename, 'rb')
if VERBOSE:
print "Notice: Opening informix blob file:", self.filename
def bytea_escape(self, bitstring):
""" escape chars from bitstring for PostgreSQL bytea input
see http://www.postgresql.org/docs/8.1/static/datatype-binary.html
"""
escaped = StringIO()
bsize = len(bitstring)
pos = 0
while pos < bsize:
c = bitstring[pos]
o = ord(c)
if o in self.octals or c == self.field_sep:
# PostgreSQL wants octal numbers!
escaped.write('\\%03o' % o)
elif o == 39:
escaped.write('\\047')
elif o == 92:
escaped.write('\\134')
else:
escaped.write(c)
pos += 1
r = escaped.getvalue()
escaped.close()
return r
def extract(self, rowid, field_nb, begin, length):
""" extract content, then bytea escape it """
content = ifx_lo.extract(self, rowid, field_nb, begin, length)
return self.bytea_escape(content)

30
pgloader/options.py Normal file
View File

@ -0,0 +1,30 @@
# -*- coding: ISO-8859-15 -*-
# Author: Dimitri Fontaine <dimitri@dalibo.com>
#
# Some common options, for each module to get them
INPUT_ENCODING = None
PG_CLIENT_ENCODING = 'latin9'
COPY_SEP = None
FIELD_SEP = '|'
CLOB_SEP = ','
NULL = ''
EMPTY_STRING = '\ '
NEWLINE_ESCAPES = None
DEBUG = False
VERBOSE = False
DRY_RUN = False
PEDANTIC = False
TRUNCATE = False
VACUUM = False
COUNT = None
FROM_COUNT = None
FROM_ID = None

707
pgloader/pgloader.py Normal file
View File

@ -0,0 +1,707 @@
# -*- coding: ISO-8859-15 -*-
# Author: Dimitri Fontaine <dimitri@dalibo.com>
#
# pgloader main class
#
# handles configuration, parse data, then pass them to database module for
# COPY preparation
import os, sys, os.path, time, codecs
from cStringIO import StringIO
from tools import PGLoader_Error, Reject, parse_config_string
from db import db
from lo import ifx_clob, ifx_blob
from options import DRY_RUN, VERBOSE, DEBUG, PEDANTIC
from options import TRUNCATE, VACUUM
from options import COUNT, FROM_COUNT, FROM_ID
from options import INPUT_ENCODING, PG_CLIENT_ENCODING
from options import COPY_SEP, FIELD_SEP, CLOB_SEP, NULL, EMPTY_STRING
from options import NEWLINE_ESCAPES
class PGLoader:
"""
PGLoader reads some data file and depending on ts configuration,
import data with COPY or update blob data with UPDATE.
"""
def __init__(self, name, config, db):
""" Init with a configuration section """
# Some settings
self.name = name
self.db = db
self.index = None
self.columns = None
self.blob_cols = None
self.config_errors = 0
self.errors = 0
self.updates = 0
self.init_time = time.time()
# we may have to open several clob files while parsing the
# unload data file, hence we keep track of them all
self.blobs = {}
if VERBOSE:
print
print "[%s] parse configuration" % self.name
# some configuration elements don't have default value
for opt in ('table', 'filename'):
if config.has_option(name, opt):
self.__dict__[opt] = config.get(name, opt)
else:
print 'Error: please configure %s.%s' % (name, opt)
self.config_errors += 1
##
# reject log and data files defaults to /tmp/<section>.rej[.log]
if config.has_option(name, 'reject_log'):
self.reject_log = config.get(name, 'reject_log')
else:
self.reject_log = os.path.join('/tmp', '%s.rej.log' % name)
if VERBOSE:
print 'Notice: reject log in %s' % self.reject_log
if config.has_option(name, 'reject_data'):
self.reject_data = config.get(name, 'reject_data')
else:
self.reject_data = os.path.join('/tmp', '%s.rej' % name)
if VERBOSE:
print 'Notice: rejected data in %s' % self.reject_data
# reject logging
self.reject = Reject(self.reject_log, self.reject_data)
# optionnal number of columns per line
self.field_count = None
if config.has_option(name, 'field_count'):
self.field_count = config.getint(name, 'field_count')
# optionnal field separator
self.field_sep = FIELD_SEP
if config.has_option(name, 'field_sep'):
self.field_sep = config.get(name, 'field_sep')
if not DRY_RUN:
if self.db.copy_sep is None:
self.db.copy_sep = self.field_sep
# optionnal trailing separator option
self.trailing_sep = False
if config.has_option(name, 'trailing_sep'):
self.trailing_sep = config.get(name, 'trailing_sep') == 'True'
# optionnal null and empty_string per table parameters
if config.has_option(name, 'null'):
self.db.null = parse_config_string(config.get(name, 'null'))
else:
self.db.null = NULL
if config.has_option(name, 'empty_string'):
self.db.empty_string = parse_config_string(
config.get(name, 'empty_string'))
else:
self.db.empty_string = EMPTY_STRING
# optionnal local option client_encoding
if config.has_option(name, 'client_encoding'):
self.db.client_encoding = parse_config_string(
config.get(name, 'client_encoding'))
if DEBUG:
print "null: '%s'" % self.db.null
print "empty_string: '%s'" % self.db.empty_string
print "client_encoding: '%s'" % self.db.client_encoding
##
# we parse some columns definitions
if config.has_option(name, 'index'):
self._parse_fields('index', config.get(name, 'index'))
if config.has_option(name, 'columns'):
self._parse_fields('columns', config.get(name, 'columns'))
if config.has_option(name, 'blob_columns'):
self._parse_fields('blob_cols',
config.get(name, 'blob_columns'),
btype = True)
if DEBUG:
print 'index', self.index
print 'columns', self.columns
print 'blob_columns', self.blob_cols
# optionnal newline escaped option
self.newline_escapes = []
if config.has_option(name, 'newline_escapes'):
if NEWLINE_ESCAPES is not None:
# this parameter is globally set, will ignore local
# definition
print "Warning: ignoring %s newline_escapes option" % name
print " option is set to '%s' globally" \
% NEWLINE_ESCAPES
else:
self._parse_fields('newline_escapes',
config.get(name, 'newline_escapes'),
argtype = 'char')
if NEWLINE_ESCAPES is not None:
# set NEWLINE_ESCAPES for each table column
self.newline_escapes = [(a, NEWLINE_ESCAPES)
for (a, x) in self.columns]
##
# How can we mix those columns definitions ?
# - we want to load table data with COPY
# - we want to load blob data into text or bytea field, with COPY
# - we want to do both
if self.columns is None and self.blob_cols is None:
# nothing to do ? perfect, done
self.reject.log(
"Error: in section '%s': " % self.name +\
"please configure some work to do (columns, blob_cols)")
self.config_errors += 1
if self.blob_cols is not None and self.index is None:
# if you want to load blobs, UPDATE need indexes
# is this error still necessary?
self.reject.log(
"Error: in section '%s': " % self.name +\
"for blob importing (blob_cols), please configure index")
self.config_errors += 1
##
# We have for example columns = col1:2, col2:1
# this means the order of input columns is not the same as the
# awaited order of COPY, so we want a mapping index, here [2, 1]
if self.columns is not None:
self.col_mapping = [i for (c, i) in self.columns]
##
# if options.fromid is not None it has to be either a value,
# when index is single key or a dict in a string, when index
# is a multiple key
global FROM_ID
if FROM_ID is not None:
if len(self.index) > 1:
# we have to evaluate given string and see if it is a
# dictionnary
try:
d = {}
ids = [x.strip() for x in FROM_ID.split(',')]
for id in ids:
k, v = id.split(':')
d[k] = v
FROM_ID = d
except Exception, e:
self.reject.log(
'Error: unable to parse given key %s' % FROM_ID)
raise PGLoader_Error
print 'Notice: composite key found, -I evaluated to %s' % FROM_ID
if self.config_errors > 0:
mesg = ['Configuration errors for section %s' % self.name,
'Please see reject log file %s' % self.reject_log]
raise PGLoader_Error, '\n'.join(mesg)
# Now reset database connection
if not DRY_RUN:
self.db.reset()
def _parse_fields(self, attr, str, btype = False, argtype = 'int'):
""" parse the user string str for fields definition to store
into self.attr """
def __getarg(arg, argtype):
""" return arg depending on its type """
if argtype == 'int':
# arg is the target column index
try:
arg = int(arg)
except ValueError:
raise PGLoader_Error
elif argtype == 'char':
# arg is an escape char
if len(arg) > 1:
raise PGLoader_Error
return arg
f = self.__dict__[attr] = []
try:
for field_def in str.split(','):
properties = [x.strip() for x in field_def.split(':')]
if not btype:
# normal column definition, for COPY usage
colname, arg = properties
f.append((colname, __getarg(arg, argtype)))
else:
# blob column definition, with blob type, for
# UPDATE usage
colname, arg, btype = properties
f.append((colname, __getarg(arg, argtype), btype))
except Exception, error:
# FIXME: make some errors and write some error messages
raise
def _split_line(self, line):
""" split given line and returns a columns list """
last_sep = 0
columns = []
pos = line.find(self.field_sep, last_sep)
while pos != -1:
# don't consider backslash escaped separators melted into data
# warning, we may find some data|\\|data
# that is some escaped \ then a legal | separator
i=1
while pos-i >= 0 and line[pos-i] == '\\':
i += 1
# now i-1 is the number of \ preceding the separator
# it's a legal separator only if i-1 is even
if (i-1) % 2 == 0:
# there's no need to keep escaped delimiters inside a column
# and we want to avoid double-escaping them
columns.append(line[last_sep:pos]
.replace("\\%s" % self.field_sep,
self.field_sep))
last_sep = pos + 1
pos = line.find(self.field_sep, pos + 1)
# append last column
columns.append(line[last_sep:])
return columns
def _rowids(self, columns):
""" get rowids for given input line """
rowids = {}
try:
for id_name, id_col in self.index:
rowids[id_name] = columns[id_col - 1]
except IndexError, e:
messages = [
"Warning: couldn't get id %d on column #%d" % (id_name,
id_col),
str(e)
]
self.reject.log(messages, line)
return rowids
def summary(self):
""" return a (duration, updates, errors) tuple """
self.duration = time.time() - self.init_time
if self.reject is not None:
self.errors = self.reject.errors
if self.db is not None:
self.updates = self.db.commited_rows
return (self.duration, self.updates, self.errors)
def print_stats(self):
""" print out some statistics """
if self.reject is not None:
self.errors = self.reject.errors
self.reject.print_stats()
if self.db is not None:
self.updates = self.db.commited_rows
self.db.print_stats()
return
def run(self):
""" depending on configuration, do given job """
# Announce the beginning of the work
print "[%s] data import" % self.name
if TRUNCATE and not DRY_RUN:
self.db.truncate(self.table)
if self.columns is not None:
print "Notice: COPY csv data"
self.csv_import()
elif self.blob_cols is not None:
# elif: COPY process also blob data
print "Notice: UPDATE blob data"
# then show up some stats
self.print_stats()
def csv_import(self):
""" import CSV data, using COPY """
for line, columns in self.read_data():
if self.blob_cols is not None:
columns, rowids = self.read_blob(line, columns)
if DEBUG:
print self.col_mapping
print len(columns), len(self.col_mapping)
if False and VERBOSE:
print line
for i in [37, 44, 52, 38]:
print len(columns[i-1]), columns[i-1]
print
##
# Now we have to reorder the columns to match schema
c_ordered = [columns[i-1] for i in self.col_mapping]
if DRY_RUN or DEBUG:
print line
print c_ordered
print len(c_ordered)
print
if not DRY_RUN:
self.db.copy_from(self.table, c_ordered, line, self.reject)
if not DRY_RUN:
# we may need a last COPY for the rest of data
self.db.copy_from(self.table, None, None, self.reject, EOF = True)
return
def lo_import(self):
""" import large object data, using UPDATEs """
for line, columns in self.read_data():
##
# read blob data and replace its PG escaped form into columns
columns, rowids = self.read_blob(line, columns)
# and insert it into database
if DRY_RUN or DEBUG:
print line
print columns
print
if not DRY_RUN:
self.db.insert_blob(self.table, self.index,
rowids, cname, data, btype,
line, self.reject)
def _chomp(self, input_line):
""" chomp end of line when necessary, and trailing_sep too """
if len(input_line) == 0:
if DEBUG:
print 'pgloader._chomp: skipping empty line'
return input_line
# chomp a copy of the input_line, we will need the original one
line = input_line[:]
if line[-2:] == "\r\n":
line = line[:-2]
elif line[-1] == "\r":
line = line[:-1]
elif line[-1] == "\n":
line = line[:-1]
# trailing separator to whipe out ?
if self.trailing_sep \
and line[-len(self.field_sep)] == self.field_sep:
line = line[:-len(self.field_sep)]
return line
def _escape_newlines(self, columns):
""" trim out newline escapes to be found inside data columns """
if DEBUG:
print 'Debug: escaping columns newlines'
print 'Debug:', self.newline_escapes
for (ne_col, ne_esc) in self.newline_escapes:
# don't forget configured col references use counting from 1
ne_colnum = dict(self.columns)[ne_col] - 1
if DEBUG:
print 'Debug: column %s[%d] escaped with %s' \
% (ne_col, ne_colnum+1, ne_esc)
col_data = columns[ne_colnum]
if self.db.is_null(col_data) or self.db.is_empty(col_data):
if DEBUG:
print 'Debug: skipping null or empty column'
continue
escaped = []
tmp = col_data
for line in tmp.split('\n'):
if len(line) == 0:
if DEBUG:
print 'Debug: skipping empty line'
continue
if DEBUG:
print 'Debug: chomping:', line
tmpline = self._chomp(line)
if tmpline[-1] == ne_esc:
tmpline = tmpline[:-1]
# chomp out only escaping char, not newline itself
escaped.append(line[:len(tmpline)] + \
line[len(tmpline)+1:])
else:
# line does not end with escaping char, keep it
escaped.append(line)
columns[ne_colnum] = '\n'.join(escaped)
return columns
def read_data(self):
""" read data from configured file, and generate (yields) for
each data line: line, columns and rowid """
# temporary feature for controlling when to begin real inserts
# if first time launch, set to True.
input_buffer = StringIO()
nb_lines = 0
begin_linenb = None
nb_plines = 0
##
# if neither -I nor -F was used, we can state that begin = 0
if FROM_ID is None and FROM_COUNT == 0:
if VERBOSE:
print 'Notice: beginning on first line'
begin_linenb = 1
if INPUT_ENCODING is not None:
try:
fd = codecs.open(self.filename, encoding = INPUT_ENCODING)
except LookupError, e:
# codec not found
raise PGLoader_Error, "Input codec: %s" % e
else:
try:
fd = open(self.filename)
except IOError, error:
raise PGLoader_Error, error
for line in fd:
# we count real physical lines
nb_plines += 1
if INPUT_ENCODING is not None:
# this may not be necessary, after all
try:
line = line.encode(INPUT_ENCODING)
except UnicodeDecodeError, e:
reject.log(['Codec error', str(e)], input_line)
continue
if self.field_count is not None:
input_buffer.write(line)
# act as if this were the last input_buffer for this line
tmp = self._chomp(input_buffer.getvalue())
columns = self._split_line(tmp)
nb_cols = len(columns)
# check we got them all if not and field_count was
# given, we have a multi-line input
if nb_cols < self.field_count:
continue
else:
# we have read all the logical line
line = tmp
input_buffer.close()
input_buffer = StringIO()
if nb_cols != self.field_count:
if DEBUG:
print line
print columns
print
self.reject.log(
'Error parsing columns on line ' +\
'%d [row %d]: found %d columns' \
% (nb_plines, nb_lines, nb_cols), line)
else:
# normal operation mode : one physical line is one
# logical line. we didn't split input line yet
line = self._chomp(line)
nb_cols = None
columns = None
if len(line) == 0:
# skip empty lines
continue
# we count logical lines
nb_lines += 1
##
# if -F is used, count lines to skip, and skip them
if FROM_COUNT > 0:
if nb_lines < FROM_COUNT:
continue
if nb_lines == FROM_COUNT:
begin_linenb = nb_lines
if VERBOSE:
print 'Notice: reached beginning on line %d' % nb_lines
##
# check for beginning if option -I was used
if FROM_ID is not None:
if columns is None:
columns = self._split_line(line)
rowids = self._rowids(columns)
if FROM_ID == rowids:
begin_linenb = nb_lines
if VERBOSE:
print 'Notice: reached beginning on line %d' % nb_lines
elif begin_linenb is None:
# begin is set to 1 when we don't use neither -I nor -F
continue
if COUNT is not None and begin_linenb is not None \
and (nb_lines - begin_linenb + 1) > COUNT:
if VERBOSE:
print 'Notice: reached line %d, stopping' % nb_lines
break
if columns is None:
columns = self._split_line(line)
if DEBUG:
print 'Debug: read data'
# now, we may have to apply newline_escapes on configured columns
if NEWLINE_ESCAPES or self.newline_escapes != []:
columns = self._escape_newlines(columns)
nb_cols = len(columns)
if nb_cols != len(self.columns):
if DEBUG:
print line
print columns
print
msg = 'Error parsing columns on line ' +\
'%d [row %d]: found %d columns' \
% (nb_plines, nb_lines, nb_cols)
self.reject.log(msg, line)
continue
yield line, columns
def read_blob(self, line, columns):
""" read blob data and replace columns values with PG escape
data to give as input to COPY or UPDATE """
# cache rowids, a single line ids will be the same whatever
# the number of clob entries are configured
rowids = None
for (cname, c, btype) in self.blob_cols:
try:
blob_col = c-1
blob = columns[blob_col].strip()
except Exception, e:
m = [
"Warning: couldn't get '%s' column (#%d)" % (cname, c),
str(e)
]
try:
self.reject.log(m, line)
except PGLoader_Error:
# pedantic setting, but we want to continue
continue
if blob != "":
try:
(begin, length, blobname) = blob.split(CLOB_SEP)
except Exception, e:
m = ["Warning: column '%s'" % cname + \
" is not a valid blob reference: %s" % blob,
str(e)
]
self.reject.log(m, line)
continue
# Informix sometimes output 0,0,0 as blob reference
if begin == length == blobname == "0":
columns[blob_col] = ''
continue
abs_blobname = os.path.join(os.path.dirname(self.filename),
blobname)
# We want to manage existing blob file
if not os.access(abs_blobname, os.R_OK):
self.reject.log(
"Warning: Can't read blob file %s" % abs_blobname,
line)
if abs_blobname not in self.blobs:
if btype not in ['ifx_blob', 'ifx_clob']:
msg = "Error: Blob type '%s' not supported" % mode
raise PGLoader_Error, msg
elif btype == 'ifx_blob':
self.blobs[abs_blobname] = ifx_blob(abs_blobname,
self.field_sep)
elif btype == 'ifx_clob':
self.blobs[abs_blobname] = ifx_clob(abs_blobname)
blob = self.blobs[abs_blobname]
# we now need to get row id(s) from ordered columns
if rowids is None:
rowids = self._rowids(columns)
# get data from blob object
data = blob.extract(rowids, c, begin, length)
columns[blob_col] = data
if DEBUG:
print 'Debug: read blob data'
print rowids
print data
print
# no we have read all defined blob data, returns new columns
return columns, rowids

118
pgloader/tools.py Normal file
View File

@ -0,0 +1,118 @@
# -*- coding: ISO-8859-15 -*-
# Author: Dimitri Fontaine <dimitri@dalibo.com>
#
# pgloader librairies
import os, sys, os.path, time, codecs
from cStringIO import StringIO
from options import DRY_RUN, VERBOSE, DEBUG, PEDANTIC
class PGLoader_Error(Exception):
""" Internal pgloader processing error """
pass
class Reject:
""" We log rejects into two files, reject_log and reject_data
reject_log contains some error messages and reasons
reject_data contains input lines which this tool couldn't manage
"""
def __init__(self, reject_log, reject_data):
""" Constructor, with file names """
self.reject_log = reject_log
self.reject_data = reject_data
# we will open files on first error
self.errors = 0
def print_stats(self):
""" give a summary """
if DRY_RUN:
return
if self.errors == 0:
print "## No data were rejected"
else:
print "## %d errors found into data" % self.errors
print " please read %s for errors log" % self.reject_log
print " and %s for data still to process" % self.reject_data
def log(self, messages, data = None):
""" log the messages into reject_log, and the data into reject_data
We open the file on each request, cause we supose errors to be
rare while the import process will take a long time.
"""
if self.errors == 0:
try:
fd_log = open(self.reject_log, 'wb+')
fd_data = open(self.reject_data, 'wb+')
except IOError, error:
raise PGLoader_Error, error
else:
fd_log = open(self.reject_log, 'ab+')
fd_data = open(self.reject_data, 'ab+')
# message has to be either a string or a list of strings
if type(messages) == type("string"):
error = messages + "\n"
fd_log.write(error)
if PEDANTIC and not VERBOSE:
# (write the message just once)
sys.stderr.write(error)
else:
error = None
for m in messages:
if error is None: error = m
m += "\n"
fd_log.write(m)
if PEDANTIC:
sys.stderr.write(m)
# add a separation line between log entries
fd_log.write("\n")
# data has to be a string, a single input line or None
# the input line is not chomped
if data is not None:
fd_data.write(data)
# now we close the two fds
for f in [fd_log, fd_data]:
f.flush()
f.close()
self.errors += 1
if PEDANTIC:
raise PGLoader_Error, error
def parse_config_string(str):
""" parse a config string
used for null and empty_string elements
null = ""
empty_string = "\ "
this would result in null =="" and empty_string == '"\ "',
which is not what we want.
"""
if len(str) > 2:
if (str[0] == str[-1] == '"') \
or (str[0] == str[-1] == "'"):
# we have a param = "foo" configuration, we want to return only
# the foo
return str[1:-1]
return str