Adding support for columns=* configuration

This commit is contained in:
dim 2008-02-25 14:21:53 +00:00
parent 45176adec7
commit 174ed31302
7 changed files with 105 additions and 8 deletions

6
debian/changelog vendored
View File

@ -1,3 +1,9 @@
pgloader (2.3.0~dev-2) unstable; urgency=low
* columns = * is now supported
-- Dimitri Fontaine <dim@tapoueh.org> Mon, 25 Feb 2008 14:58:46 +0100
pgloader (2.3.0~dev-1) unstable; urgency=low
* Mutli-threaded pgloader (see options max_parallel_sections,

5
debian/control vendored
View File

@ -13,5 +13,6 @@ Description: loads flat data files into PostgreSQL
pgloader imports data from flat files and insert it into a database
table. You have to provide a configuration file containing sections,
which each defines how to load a table. CSV and text format are
supported, the latter being able to parse multi-line non-quoted with
trailing delimiter quite CSV input, such as given by Informix UNLOAD.
supported, the former is able to import MySQL dumps when -T is used,
the latter is able to parse multi-line non-quoted with trailing
delimiter quite CSV input, such as given by Informix UNLOAD.

View File

@ -0,0 +1,8 @@
1:2008-02-18:first entry
2:2008-02-19:second one
3:2008-02-20:another
4:2008-02-21:still running
5:2008-02-22:well, some more
6:2008-02-23:antepenultima
7:2008-02-24:next to last
8:2008-02-25:hey, it's today!

View File

@ -0,0 +1,5 @@
CREATE TABLE allcols (
a integer primary key,
b date,
c text
);

View File

@ -12,7 +12,7 @@ lc_messages = C
;client_encoding = 'utf-8'
client_encoding = 'latin1'
copy_every = 5000
copy_every = 5
commit_every = 5
#copy_delimiter = %
@ -38,6 +38,13 @@ columns = a:1, b:3, c:2
reject_log = /tmp/simple.rej.log
reject_data = /tmp/simple.rej
[allcols]
table = allcols
format = csv
filename = allcols/allcols.data
field_sep = :
columns = *
[errors]
table = errors
format = text
@ -77,7 +84,7 @@ table = partial
format = text
filename = partial/partial.data
field_sep = %
columns = a:1, b:2, c:3, d:4, e:5
columns = *
only_cols = 1-3, 5
[udc]

View File

@ -45,6 +45,10 @@ class db:
self.empty_string = EMPTY_STRING
self.lc_messages = None
# this allows to specify configuration has columns = *
# when true, we don't include column list in COPY statements
self.all_cols = None
if connect:
self.reset()
@ -107,6 +111,50 @@ class db:
raise PGLoader_Error, e
cursor.close()
def get_all_columns(self, tablename):
""" select the columns name list from catalog """
if tablename.find('.') == -1:
schemaname = 'public'
else:
try:
schemaname, tablename = tablename.split('.')
except ValueError, e:
self.log.warning("db.get_all_columns: " + \
"%s has more than one '.' separator" \
% tablename)
raise PGLoader_Error, e
sql = """
SELECT attname, attnum
FROM pg_attribute
WHERE attrelid = (SELECT oid
FROM pg_class
WHERE relname = %s
AND relnamespace = (SELECT oid
FROM pg_namespace
WHERE nspname = %s)
)
AND attnum > 0 AND NOT attisdropped
ORDER BY attnum
"""
self.log.debug("get_all_columns: %s %s %s" % (tablename, schemaname, sql))
columns = []
cursor = self.dbconn.cursor()
try:
cursor.execute(sql, [tablename, schemaname])
for row in cursor.fetchall():
columns.append(row)
except psycopg.ProgrammingError, e:
raise PGLoader_Error, e
cursor.close()
return columns
def reset(self):
""" reset internal counters and open a new database connection """
self.buffer = None
@ -141,7 +189,7 @@ class db:
d = time.time() - self.first_commit_time
u = self.commited_rows
c = self.commits
self.log.info(" %d updates in %d commits took %5.3f seconds", u, c, d)
self.log.info(" %d rows copied in %d commits took %5.3f seconds", u, c, d)
if self.errors > 0:
self.log.error("%d database errors occured", self.errors)
@ -335,7 +383,12 @@ class db:
# build the table colomns specs from parameters
# ie. we always issue COPY table (col1, col2, ..., coln) commands
tablename = table
table = "%s (%s) " % (table, ", ".join(columnlist))
if self.all_cols:
table = table
else:
table = "%s (%s) " % (table, ", ".join(columnlist))
self.log.debug("COPY will use table definition: '%s'" % table)
if EOF or self.running_commands == self.copy_every \
and self.buffer is not None:

View File

@ -77,6 +77,7 @@ class PGLoader(threading.Thread):
self.tsection = None
self.index = None
self.all_cols = None
self.columns = None
self.blob_cols = None
@ -139,7 +140,6 @@ class PGLoader(threading.Thread):
# Now reset database connection
if not DRY_RUN:
self.db.log = self.log
self.db.reset()
if not self.template and not DRY_RUN:
@ -205,6 +205,9 @@ class PGLoader(threading.Thread):
self.log.debug("_dbconnect copy_sep %s " % self.db.copy_sep)
# we want self.db messages to get printed as from our section
self.db.log = self.log
except Exception, error:
log.error("Could not initialize PostgreSQL connection")
raise PGLoader_Error, error
@ -295,7 +298,18 @@ class PGLoader(threading.Thread):
self._parse_fields('index', config.get(name, 'index'))
if config.has_option(name, 'columns'):
self._parse_fields('columns', config.get(name, 'columns'))
conf_cols = config.get(name, 'columns')
if conf_cols == '*':
self.all_cols = True
self.db.all_cols = True
# force db to connect now
self.db.reset()
self.columns = self.db.get_all_columns(self.table)
self.log.info("columns = *, got %s", str(self.columns))
else:
self._parse_fields('columns', config.get(name, 'columns'))
if config.has_option(name, 'blob_columns'):
self._parse_fields('blob_cols',
@ -478,6 +492,9 @@ class PGLoader(threading.Thread):
self.log.debug("only_cols %s", str(self.only_cols))
self.log.debug("columnlist %s", str(self.columnlist))
if not self.template and self.only_cols is not None:
self.db.all_cols = False
##
# This option is textreader specific, but being lazy and
# short-timed, I don't make self._parse_fields() callable from