CSV support, can load not-all-columns of data files, can load to table with more cols than data file

This commit is contained in:
dim 2007-06-04 12:14:18 +00:00
parent 29934d7112
commit 8ed8219e37
13 changed files with 868 additions and 471 deletions

15
debian/changelog vendored
View File

@ -1,3 +1,18 @@
pgloader (2.2.0) unstable; urgency=low
* Support for partial loading of data (subrange(s) of columns)
* COPY table (col1, col2, ..., coln) systematically used
* Support for CSV format (with quoting)
-- Dimitri Fontaine <dim@tapoueh.org> Mon, 04 Jun 2007 11:13:21 +0200
pgloader (2.1.0) unstable; urgency=low
* Added support for partial COPY table definition
* Documentation and example update (see serial)
-- Dimitri Fontaine <dim@dalibo.com> Fri, 19 Jan 2007 12:25:39 +0100
pgloader (2.0.2) unstable; urgency=low
* configurable null and empty_string representations

View File

@ -20,8 +20,10 @@ The provided examples are:
. errors
Same test, but with impossible dates. Should report some errors. It does
not report errors, check you're not using psycopg 1.1.21.
Same test, but with impossible dates. Should report some errors. If it
does not report errors, check you're not using psycopg 1.1.21.
Should report 3 errors out of 7 lines (4 updates).
. clob
@ -33,23 +35,38 @@ not report errors, check you're not using psycopg 1.1.21.
A dataset with newline escaped and multi-line input (without quoting)
Beware of data reordering, too.
. csv
A dataset with csv delimiter ',' and quoting '"'.
. partial
A dataset from which we only load some columns of the provided one.
. serial
In this dataset the id field is ommited, it's a serial which will be
automatically set by PostgreSQL while COPYing.
You can launch all those pgloader tests in one run, provided you created the
necessary tables:
$ for test in simple clob cluttured; do psql pgloader < $test/$test.sql; done
$ for sql in */*sql; do psql pgloader < $sql; done
$ ../pgloader.py -Tc pgloader.conf
[...]
Table name | duration | size | updates | errors
====================================================================
clob | 0.121s | 32 kB | 7 | 0
cluttered | 0.041s | 32 kB | 3 | 0
simple | 0.040s | 16 kB | 6 | 0
clob | 0.041s | 32 kB | 7 | 0
cluttered | 0.037s | 32 kB | 6 | 0
csv | 0.019s | 16 kB | 6 | 0
errors | 0.032s | 32 kB | 4 | 3
partial | 0.024s | 32 kB | 7 | 0
serial | 0.028s | 32 kB | 7 | 0
simple | 0.029s | 32 kB | 7 | 0
====================================================================
Total | 0.369s | 80 kB | 16 | 0
And you then have a nice summary.
Total | 0.210s | 208 kB | 44 | 3

6
examples/csv/csv.data Normal file
View File

@ -0,0 +1,6 @@
"2.6.190.56","2.6.190.63","33996344","33996351","GB","United Kingdom"
"3.0.0.0","4.17.135.31","50331648","68257567","US","United States"
"4.17.135.32","4.17.135.63","68257568","68257599","CA","Canada"
"4.17.135.64","4.17.142.255","68257600","68259583","US","United States"
"4.17.143.0","4.17.143.15","68259584","68259599","CA","Canada"
"4.17.143.16","4.18.32.71","68259600","68296775","US","United States"

6
examples/csv/csv.sql Normal file
View File

@ -0,0 +1,6 @@
CREATE TABLE csv (
a bigint,
b bigint,
c char(2),
d text
);

View File

@ -0,0 +1,7 @@
1%foo%bar%baz%hop
2%foo%bar%baz%hop
3%foo%bar%baz%hop
4%foo%bar%baz%hop
5%foo%bar%baz%hop
6%foo%bar%baz%hop
7%foo%bar%baz%hop

View File

@ -0,0 +1,7 @@
CREATE TABLE partial (
a integer primary key,
b text,
c text,
d text,
e text
);

View File

@ -17,6 +17,7 @@ newline_escapes = \
[simple]
table = simple
format = text
filename = simple/simple.data
field_sep = |
trailing_sep = True
@ -28,6 +29,7 @@ reject_data = /tmp/simple.rej
[errors]
table = errors
format = text
filename = errors/errors.data
field_sep = |
trailing_sep = True
@ -35,6 +37,7 @@ columns = a:1, b:3, c:2
[clob]
table = clob
format = text
filename = clob/clob.data
field_sep = |
columns = a:1, b:2
@ -43,6 +46,7 @@ blob_columns = b:2:ifx_clob
[cluttered]
table = cluttered
format = text
filename = cluttered/cluttered.data
field_sep = ^
trailing_sep = True
@ -52,7 +56,25 @@ columns = a:1, b:3, c:2
[serial]
table = serial
format = text
filename = serial/serial.data
field_sep = ;
partial_copy = True
columns = b:2, c:1
[partial]
table = partial
format = text
filename = partial/partial.data
field_sep = %
columns = a:1, b:2, c:3, d:4, e:5
only_cols = 1-3, 5
[csv]
table = csv
format = csv
filename = csv/csv.data
field_sep = ,
quotechar = "
columns = x:1, y:2, a:3, b:4, c:5, d:6
only_cols = 3-6

View File

@ -2,7 +2,7 @@
<refentry>
<refentryinfo>
<address>
<email>dim@dalibo.com</email>
<email>dim@tapoueh.org</email>
</address>
<author>
<firstname>Dimitri</firstname>
@ -263,7 +263,7 @@ Import CSV data and Large Object to PostgreSQL
</refsect1>
<refsect1>
<title>CONFIGURATION</title>
<title>GLOBAL CONFIGURATION SECTION</title>
<para>
The configuration file has a .ini file syntax, its first section
has to be the <command>pgsql</command> one, defining how to
@ -404,9 +404,9 @@ Import CSV data and Large Object to PostgreSQL
local setting).
</para>
<para>
You can setup here a global escape caracter, to be considered on
each and every column of each and every table defined
thereafter.
You can setup here a global escape caracter, to be
considered on each and every column of each and every
text-format table defined thereafter.
</para>
</listitem>
</varlistentry>
@ -439,12 +439,20 @@ Import CSV data and Large Object to PostgreSQL
</listitem>
</varlistentry>
</variablelist>
</refsect1>
<refsect1>
<title>COMMON FORMAT CONFIGURATION PARAMETERS</title>
<para>
You then can define any number of data section, and give them an
arbitrary name. Some options are required, some are actually
optionnals, in which case it is said so thereafter.
</para>
<para>
First, we'll go through common parameters, applicable whichever
format of data you're refering to. Then text-format only
parameters will be presented, followed by csv-only parameters.
</para>
<variablelist>
<varlistentry>
<term><option>table</option></term>
@ -455,6 +463,19 @@ Import CSV data and Large Object to PostgreSQL
</listitem>
</varlistentry>
<varlistentry>
<term><option>format</option></term>
<listitem>
<para>
The format data are to be found, either
<command>text</command> or <command>csv</command>.
</para>
<para>
See next sections for format specific options.
</para>
</listitem>
</varlistentry>
<varlistentry>
<term><option>filename</option></term>
<listitem>
@ -506,45 +527,6 @@ Import CSV data and Large Object to PostgreSQL
</listitem>
</varlistentry>
<varlistentry>
<term><option>field_count</option></term>
<listitem>
<para>
The <command>UNLOAD</command> command does not escape
newlines when they appear into table data. Hence, you may
obtain multi-line data files, where a single database row
(say tuple if you prefer to) can span multiple physical
lines into the unloaded file.
</para>
<para>
If this is your case, you may want to configure here the
number of columns per tuple. Then
<command>pgloader</command> will count columns and
buffer line input in order to re-assemble several physical
lines into one data row when needed.
</para>
<para>
This parameter is optionnal.
</para>
</listitem>
</varlistentry>
<varlistentry>
<term><option>trailing_sep</option></term>
<listitem>
<para>
If this option is set to <command>True</command>, the
input data file is known to append a
<command>field_sep</command> as the last character of each
of its lines. With this option set, this last character is
then not considered as a field separator.
</para>
<para>
This parameter is optionnal and defaults to False.
</para>
</listitem>
</varlistentry>
<varlistentry>
<term><option>client_encoding</option></term>
<listitem>
@ -591,16 +573,137 @@ Import CSV data and Large Object to PostgreSQL
</varlistentry>
<varlistentry>
<term><option>partial_copy</option></term>
<term><option>index</option></term>
<listitem>
<para>
If your columns definition does not contain all of the
PostgreSQL table definition, set this parameter to
<command>True</command>.
Table index definition, to be used in blob UPDATE'ing. You
define an index column by giving its name and its column
number (as found into your data file, and counting from 1)
separated by a colon. If your table has a composite key,
then you can define multiple columns here, separated by a
comma.
</para>
<para>
This parameter is optionnal and defaults to
<command>False</command>.
index = colname:3, other_colname:5
</para>
</listitem>
</varlistentry>
<varlistentry>
<term><option>columns</option></term>
<listitem>
<para>
You can define here table columns, with the same
definition format as in previous <command>index</command>
parameter.
</para>
<para>
Note you'll have to define here all the columns to be
found in data file, whether you want to use them all or
not. When not using them all, use the
<command>only_cols</command> parameter to restrict.
</para>
<para>
As of <command>pgloader 2.2</command> the column list used
might not be the same as the table columns definition.
</para>
<para>
In case you have a lot a columns per table, you will want
to use multiple lines for this parameter value. Python
<command>ConfigParser</command> module knows how to read
multi-line parameters, you don't have to escape anything.
</para>
</listitem>
</varlistentry>
<varlistentry>
<term><option>only_cols</option></term>
<listitem>
<para>
If you want to only load a part of the columns you have
into the data file, this option let you define which
columns you're interrested in. <command>only_col</command>
is a comma separated list of ranges or values, as in
following example.
</para>
<para>
only_cols = 1-3, 5
</para>
<para>
This parameter is optionnal and defaults to the list of
all columns given on the <command>columns</command>
parameter list, in the colname order.
</para>
</listitem>
</varlistentry>
<varlistentry>
<term><option>blob_columns</option></term>
<listitem>
<para>
The definition of the colums where to find some blob or
clob reference. This definition is composed by a table
column name, a column number (couting from one) reference
into the Informix <command>UNLOAD</command> data file, and
a large object type, separated by a colon. You can have
several columns in this field, separated by a
comma.
</para>
<para>
Supported large objects type are Informix blob and clob,
the awaited configuration string are respectively
<command>ifx_blob</command> for binary (bytea) content
type and <command>ifx_clob</command> for text type values.
</para>
<para>
Here's an example:
</para>
<para>
blob_type = clob_column:3:ifx_blob, other_clob_column:5:ifx_clob
</para>
</listitem>
</varlistentry>
</variablelist>
</refsect1>
<refsect1>
<title>TEXT FORMAT CONFIGURATION PARAMETERS</title>
<variablelist>
<varlistentry>
<term><option>field_count</option></term>
<listitem>
<para>
The <command>UNLOAD</command> command does not escape
newlines when they appear into table data. Hence, you may
obtain multi-line data files, where a single database row
(say tuple if you prefer to) can span multiple physical
lines into the unloaded file.
</para>
<para>
If this is your case, you may want to configure here the
number of columns per tuple. Then
<command>pgloader</command> will count columns and
buffer line input in order to re-assemble several physical
lines into one data row when needed.
</para>
<para>
This parameter is optionnal.
</para>
</listitem>
</varlistentry>
<varlistentry>
<term><option>trailing_sep</option></term>
<listitem>
<para>
If this option is set to <command>True</command>, the
input data file is known to append a
<command>field_sep</command> as the last character of each
of its lines. With this option set, this last character is
then not considered as a field separator.
</para>
<para>
This parameter is optionnal and defaults to False.
</para>
</listitem>
</varlistentry>
@ -644,66 +747,59 @@ Import CSV data and Large Object to PostgreSQL
</listitem>
</varlistentry>
</variablelist>
</refsect1>
<refsect1>
<title>CSV FORMAT CONFIGURATION PARAMETERS</title>
<variablelist>
<varlistentry>
<term><option>index</option></term>
<term><option>doublequote</option></term>
<listitem>
<para>
Table index definition, to be used in blob UPDATE'ing. You
define an index column by giving its name and its column
number (as found into your data file, and counting from 1)
separated by a colon. If your table has a composite key,
then you can define multiple columns here, separated by a
comma.
</para>
<para>
index = colname:3, other_colname:5
Controls how instances of quotechar appearing inside a
field should be themselves be quoted. When True, the
character is doubled. When False, the escapechar is used
as a prefix to the quotechar. It defaults to True.
</para>
</listitem>
</varlistentry>
<varlistentry>
<term><option>columns</option></term>
<term><option>escapechar</option></term>
<listitem>
<para>
You can define here table columns, with the same
definition format as in previous <command>index</command>
parameter.
</para>
<para>
In case you have a lot a columns per table, you will want
to use ultiple lines for this parameter value. Python
<command>ConfigParser</command> module knows how to read
multi-line parameters, you don't have to escape anything.
A one-character string used by the writer to escape the
delimiter if quoting is set to QUOTE_NONE and the
quotechar if doublequote is False. On reading, the
escapechar removes any special meaning from the following
character. It defaults to None, which disables escaping.
</para>
</listitem>
</varlistentry>
<varlistentry>
<term><option>blob_columns</option></term>
<term><option>quotechar</option></term>
<listitem>
<para>
The definition of the colums where to find some blob or
clob reference. This definition is composed by a table
column name, a column number (couting from one) reference
into the Informix <command>UNLOAD</command> data file, and
a large object type, separated by a colon. You can have
several columns in this field, separated by a
comma.
</para>
<para>
Supported large objects type are Informix blob and clob,
the awaited configuration string are respectively
<command>ifx_blob</command> for binary (bytea) content
type and <command>ifx_clob</command> for text type values.
</para>
<para>
Here's an example:
</para>
<para>
blob_type = clob_column:3:ifx_blob, other_clob_column:5:ifx_clob
A one-character string used to quote fields containing
special characters, such as the delimiter or quotechar, or
which contain new-line characters. It defaults to '"'.
</para>
</listitem>
</varlistentry>
<varlistentry>
<term><option>skipinitialspace</option></term>
<listitem>
<para>
When True, whitespace immediately following the delimiter
is ignored. The default is False.
</para>
</listitem>
</varlistentry>
</variablelist>
</refsect1>
@ -737,7 +833,7 @@ Import CSV data and Large Object to PostgreSQL
<refsect1>
<title>BUGS</title>
<para>
Please report bugs to Dimitri Fontaine &lt;dim@dalibo.com&gt;.
Please report bugs to Dimitri Fontaine &lt;dim@tapoueh.org&gt;.
</para>
<para>
When last line is alone on a <command>COPY</command> command and its
@ -750,7 +846,7 @@ Import CSV data and Large Object to PostgreSQL
<title>AUTHORS</title>
<para>
<command>pgloader</command> is written by <author>Dimitri
Fontaine</author> <email>dim@dalibo.com</email>.
Fontaine</author> <email>dim@tapoueh.org</email>.
</para>
</refsect1>

83
pgloader/csvreader.py Normal file
View File

@ -0,0 +1,83 @@
# -*- coding: ISO-8859-15 -*-
# Author: Dimitri Fontaine <dimitri@dalibo.com>
#
# pgloader text format reader
#
# handles configuration, parse data, then pass them to database module for
# COPY preparation
import os, sys, os.path, time, codecs, csv
from cStringIO import StringIO
from tools import PGLoader_Error, Reject, parse_config_string
from db import db
from lo import ifx_clob, ifx_blob
from reader import DataReader
from options import DRY_RUN, VERBOSE, DEBUG, PEDANTIC
from options import TRUNCATE, VACUUM
from options import COUNT, FROM_COUNT, FROM_ID
from options import INPUT_ENCODING, PG_CLIENT_ENCODING
from options import COPY_SEP, FIELD_SEP, CLOB_SEP, NULL, EMPTY_STRING
from options import NEWLINE_ESCAPES
class CSVReader(DataReader):
"""
Read some CSV formatted data
"""
def readconfig(self, name, config):
""" get this reader module configuration from config file """
DataReader.readconfig(self, name, config)
# optionnal doublequote: defaults to escaping, not doubling
self.doublequote = False
if config.has_option(name, 'doublequote'):
self.trailing_sep = config.get(name, 'doublequote') == 'True'
self.escapechar = None
if config.has_option(name, 'escapechar'):
self.escapechar = config.get(name, 'escapechar')[0]
self.quotechar = '"'
if config.has_option(name, 'quotechar'):
self.quotechar = config.get(name, 'quotechar')[0]
self.skipinitialspace = False
if config.has_option(name, 'skipinitialspace'):
self.skipinitialspace = config.get(name, 'skipinitialspace') == 'True'
def readlines(self):
""" read data from configured file, and generate (yields) for
each data line: line, columns and rowid """
# make a dialect, then implement a reader with it
class pgloader_dialect(csv.Dialect):
delimiter = self.field_sep
doublequote = self.doublequote
escapechar = self.escapechar
quotechar = self.quotechar
skipinitialspace = self.skipinitialspace
lineterminator = '\r\n'
quoting = csv.QUOTE_MINIMAL
csv.register_dialect('pgloader', pgloader_dialect)
if INPUT_ENCODING is not None:
try:
fd = codecs.open(self.filename, encoding = INPUT_ENCODING)
except LookupError, e:
# codec not found
raise PGLoader_Error, "Input codec: %s" % e
else:
try:
fd = open(self.filename, "rb")
except IOError, error:
raise PGLoader_Error, error
# now read the lines
for columns in csv.reader(fd, dialect = 'pgloader'):
line = self.field_sep.join(columns)
yield line, columns

View File

@ -257,16 +257,18 @@ class db:
print "--- COPY data buffer saved in %s ---" % n
return n
def copy_from(self, table, partial_coldef, columns, input_line,
def copy_from(self, table, table_colspec, columns, input_line,
reject, EOF = False):
""" Generate some COPY SQL for PostgreSQL """
ok = True
if not self.copy: self.copy = True
if partial_coldef is not None:
# we prefer not having to mess table param on the caller side
# as it's an implementation detail concerning db class
table = "%s (%s) " % (table, partial_coldef)
##
# build the table colomns specs from parameters
# ie. we always issue COPY table (col1, col2, ..., coln) commands
table = "%s (%s) " % (table, ", ".join(table_colspec))
if DEBUG:
print 'COPY %s' % table
if EOF or self.running_commands == self.copy_every \
and self.buffer is not None:

View File

@ -49,14 +49,6 @@ class PGLoader:
print
print "[%s] parse configuration" % self.name
# some configuration elements don't have default value
for opt in ('table', 'filename'):
if config.has_option(name, opt):
self.__dict__[opt] = config.get(name, opt)
else:
print 'Error: please configure %s.%s' % (name, opt)
self.config_errors += 1
##
# reject log and data files defaults to /tmp/<section>.rej[.log]
if config.has_option(name, 'reject_log'):
@ -76,48 +68,24 @@ class PGLoader:
# reject logging
self.reject = Reject(self.reject_log, self.reject_data)
# optionnal number of columns per line
self.field_count = None
if config.has_option(name, 'field_count'):
self.field_count = config.getint(name, 'field_count')
# optionnal field separator
self.field_sep = FIELD_SEP
if config.has_option(name, 'field_sep'):
self.field_sep = config.get(name, 'field_sep')
if not DRY_RUN:
if self.db.copy_sep is None:
self.db.copy_sep = self.field_sep
# optionnal trailing separator option
self.trailing_sep = False
if config.has_option(name, 'trailing_sep'):
self.trailing_sep = config.get(name, 'trailing_sep') == 'True'
# optionnal null and empty_string per table parameters
if config.has_option(name, 'null'):
self.db.null = parse_config_string(config.get(name, 'null'))
else:
self.db.null = NULL
if config.has_option(name, 'empty_string'):
self.db.empty_string = parse_config_string(
config.get(name, 'empty_string'))
else:
self.db.empty_string = EMPTY_STRING
# optionnal local option client_encoding
if config.has_option(name, 'client_encoding'):
self.db.client_encoding = parse_config_string(
config.get(name, 'client_encoding'))
if DEBUG:
print "null: '%s'" % self.db.null
print "empty_string: '%s'" % self.db.empty_string
print "client_encoding: '%s'" % self.db.client_encoding
##
# data filename
for opt in ('table', 'filename'):
if config.has_option(name, opt):
self.__dict__[opt] = config.get(name, opt)
else:
print 'Error: please configure %s.%s' % (name, opt)
self.config_errors += 1
##
# we parse some columns definitions
if config.has_option(name, 'index'):
@ -137,34 +105,74 @@ class PGLoader:
print 'blob_columns', self.blob_cols
##
# We have for example columns = col1:2, col2:1
# this means the order of input columns is not the same as the
# awaited order of COPY, so we want a mapping index, here [2, 1]
if self.columns is not None:
self.col_mapping = [i for (c, i) in self.columns]
##
# optionnal partial loading option (sequences case)
self.partial_copy = False
self.partial_coldef = None
# self.table_colspec is the column list to give to
# COPY table(...) command, either the cols given in
# the only_cols config, or the columns directly
self.only_cols = None
self.table_colspec = [n for (n, pos) in self.columns]
if config.has_option(name, 'partial_copy'):
self.partial_copy = config.get(name, 'partial_copy') == 'True'
if config.has_option(name, 'only_cols'):
self.only_cols = config.get(name, 'only_cols')
if self.partial_copy:
self.partial_coldef = [name for (name, pos) in self.columns]
##
# first make an index list out of configuration
# which contains coma separated ranges or values
# as for example: only_cols = 1-3, 5
try:
only_cols = [x.strip() for x in self.only_cols.split(",")]
expanded = []
# optionnal newline escaped option
self.newline_escapes = []
if config.has_option(name, 'newline_escapes'):
if NEWLINE_ESCAPES is not None:
# this parameter is globally set, will ignore local
# definition
print "Warning: ignoring %s newline_escapes option" % name
print " option is set to '%s' globally" \
% NEWLINE_ESCAPES
# expand ranges
for oc in only_cols:
if '-' in oc:
(a, b) = [int(x) for x in oc.split("-")]
for i in range(a, b+1):
expanded.append(i)
else:
self._parse_fields('newline_escapes',
config.get(name, 'newline_escapes'),
argtype = 'char')
expanded.append(int(oc))
if NEWLINE_ESCAPES is not None:
# set NEWLINE_ESCAPES for each table column
self.newline_escapes = [(a, NEWLINE_ESCAPES)
for (a, x) in self.columns]
self.only_cols = expanded
self.table_colspec = [self.columns[x-1][0] for x in expanded]
except Exception, e:
print 'Error: section %s, only_cols: configured range is invalid' % name
raise PGLoader_Error, e
if DEBUG:
print "only_cols", self.only_cols
print "table_colspec", self.table_colspec
##
# data format, from which depend data reader
self.format = None
if config.has_option(name, 'format'):
self.format = config.get(name, 'format')
if self.format.lower() == 'csv':
from csvreader import CSVReader
self.reader = CSVReader(self.db, self.filename, self.table, self.columns)
elif self.format.lower() == 'text':
from textreader import TextReader
self.reader = TextReader(self.db, self.filename, self.table, self.columns)
if self.format is None:
print 'Error: %s: format parameter needed' % name
raise PGLoader_Error
##
# parse the reader specific section options
self.reader.readconfig(name, config)
##
# How can we mix those columns definitions ?
@ -187,12 +195,6 @@ class PGLoader:
"for blob importing (blob_cols), please configure index")
self.config_errors += 1
##
# We have for example columns = col1:2, col2:1
# this means the order of input columns is not the same as the
# awaited order of COPY, so we want a mapping index, here [2, 1]
if self.columns is not None:
self.col_mapping = [i for (c, i) in self.columns]
##
# if options.fromid is not None it has to be either a value,
@ -268,54 +270,6 @@ class PGLoader:
# FIXME: make some errors and write some error messages
raise
def _split_line(self, line):
""" split given line and returns a columns list """
last_sep = 0
columns = []
pos = line.find(self.field_sep, last_sep)
while pos != -1:
# don't consider backslash escaped separators melted into data
# warning, we may find some data|\\|data
# that is some escaped \ then a legal | separator
i=1
while pos-i >= 0 and line[pos-i] == '\\':
i += 1
# now i-1 is the number of \ preceding the separator
# it's a legal separator only if i-1 is even
if (i-1) % 2 == 0:
# there's no need to keep escaped delimiters inside a column
# and we want to avoid double-escaping them
columns.append(line[last_sep:pos]
.replace("\\%s" % self.field_sep,
self.field_sep))
last_sep = pos + 1
pos = line.find(self.field_sep, pos + 1)
# append last column
columns.append(line[last_sep:])
return columns
def _rowids(self, columns):
""" get rowids for given input line """
rowids = {}
try:
for id_name, id_col in self.index:
rowids[id_name] = columns[id_col - 1]
except IndexError, e:
messages = [
"Warning: couldn't get id %d on column #%d" % (id_name,
id_col),
str(e)
]
self.reject.log(messages, line)
return rowids
def summary(self):
""" return a (duration, updates, errors) tuple """
self.duration = time.time() - self.init_time
@ -351,7 +305,7 @@ class PGLoader:
if self.columns is not None:
print "Notice: COPY csv data"
self.csv_import()
self.data_import()
elif self.blob_cols is not None:
# elif: COPY process also blob data
@ -360,18 +314,9 @@ class PGLoader:
# then show up some stats
self.print_stats()
def csv_import(self):
""" import CSV data, using COPY """
##
# Inform database about optionnal partial columns definition
# usage for COPY (sequences case, e.g.)
if self.partial_coldef is not None:
partial_copy_coldef = ", ".join(self.partial_coldef)
else:
partial_copy_coldef = None
for line, columns in self.read_data():
def data_import(self):
""" import CSV or TEXT data, using COPY """
for line, columns in self.reader.readlines():
if self.blob_cols is not None:
columns, rowids = self.read_blob(line, columns)
@ -379,34 +324,36 @@ class PGLoader:
print self.col_mapping
print len(columns), len(self.col_mapping)
if False and VERBOSE:
print line
for i in [37, 44, 52, 38]:
print len(columns[i-1]), columns[i-1]
print
##
# Now we have to reorder the columns to match schema
# Now we have to reorder the columns to match schema, and only
# consider data matched by self.only_cols
if self.only_cols is not None:
c_ordered = [columns[self.col_mapping[i-1]-1] for i in self.only_cols]
else:
c_ordered = [columns[i-1] for i in self.col_mapping]
if DRY_RUN or DEBUG:
print line
print c_ordered
print len(c_ordered)
print self.db.partial_coldef
print self.table_colspec
print
if not DRY_RUN:
self.db.copy_from(self.table, partial_copy_coldef,
self.db.copy_from(self.table, self.table_colspec,
c_ordered, line, self.reject)
if not DRY_RUN:
# we may need a last COPY for the rest of data
self.db.copy_from(self.table, partial_copy_coldef,
self.db.copy_from(self.table, self.table_colspec,
None, None, self.reject, EOF = True)
return
##
# BLOB data reading/parsing
# probably should be moved out from this file
def lo_import(self):
""" import large object data, using UPDATEs """
@ -426,223 +373,6 @@ class PGLoader:
rowids, cname, data, btype,
line, self.reject)
def _chomp(self, input_line):
""" chomp end of line when necessary, and trailing_sep too """
if len(input_line) == 0:
if DEBUG:
print 'pgloader._chomp: skipping empty line'
return input_line
# chomp a copy of the input_line, we will need the original one
line = input_line[:]
if line[-2:] == "\r\n":
line = line[:-2]
elif line[-1] == "\r":
line = line[:-1]
elif line[-1] == "\n":
line = line[:-1]
# trailing separator to whipe out ?
if self.trailing_sep \
and line[-len(self.field_sep)] == self.field_sep:
line = line[:-len(self.field_sep)]
return line
def _escape_newlines(self, columns):
""" trim out newline escapes to be found inside data columns """
if DEBUG:
print 'Debug: escaping columns newlines'
print 'Debug:', self.newline_escapes
for (ne_col, ne_esc) in self.newline_escapes:
# don't forget configured col references use counting from 1
ne_colnum = dict(self.columns)[ne_col] - 1
if DEBUG:
print 'Debug: column %s[%d] escaped with %s' \
% (ne_col, ne_colnum+1, ne_esc)
col_data = columns[ne_colnum]
if self.db.is_null(col_data) or self.db.is_empty(col_data):
if DEBUG:
print 'Debug: skipping null or empty column'
continue
escaped = []
tmp = col_data
for line in tmp.split('\n'):
if len(line) == 0:
if DEBUG:
print 'Debug: skipping empty line'
continue
if DEBUG:
print 'Debug: chomping:', line
tmpline = self._chomp(line)
if tmpline[-1] == ne_esc:
tmpline = tmpline[:-1]
# chomp out only escaping char, not newline itself
escaped.append(line[:len(tmpline)] + \
line[len(tmpline)+1:])
else:
# line does not end with escaping char, keep it
escaped.append(line)
columns[ne_colnum] = '\n'.join(escaped)
return columns
def read_data(self):
""" read data from configured file, and generate (yields) for
each data line: line, columns and rowid """
# temporary feature for controlling when to begin real inserts
# if first time launch, set to True.
input_buffer = StringIO()
nb_lines = 0
begin_linenb = None
nb_plines = 0
##
# if neither -I nor -F was used, we can state that begin = 0
if FROM_ID is None and FROM_COUNT == 0:
if VERBOSE:
print 'Notice: beginning on first line'
begin_linenb = 1
if INPUT_ENCODING is not None:
try:
fd = codecs.open(self.filename, encoding = INPUT_ENCODING)
except LookupError, e:
# codec not found
raise PGLoader_Error, "Input codec: %s" % e
else:
try:
fd = open(self.filename)
except IOError, error:
raise PGLoader_Error, error
for line in fd:
# we count real physical lines
nb_plines += 1
if INPUT_ENCODING is not None:
# this may not be necessary, after all
try:
line = line.encode(INPUT_ENCODING)
except UnicodeDecodeError, e:
reject.log(['Codec error', str(e)], input_line)
continue
if self.field_count is not None:
input_buffer.write(line)
# act as if this were the last input_buffer for this line
tmp = self._chomp(input_buffer.getvalue())
columns = self._split_line(tmp)
nb_cols = len(columns)
# check we got them all if not and field_count was
# given, we have a multi-line input
if nb_cols < self.field_count:
continue
else:
# we have read all the logical line
line = tmp
input_buffer.close()
input_buffer = StringIO()
if nb_cols != self.field_count:
if DEBUG:
print line
print columns
print
self.reject.log(
'Error parsing columns on line ' +\
'%d [row %d]: found %d columns' \
% (nb_plines, nb_lines, nb_cols), line)
else:
# normal operation mode : one physical line is one
# logical line. we didn't split input line yet
line = self._chomp(line)
nb_cols = None
columns = None
if len(line) == 0:
# skip empty lines
continue
# we count logical lines
nb_lines += 1
##
# if -F is used, count lines to skip, and skip them
if FROM_COUNT > 0:
if nb_lines < FROM_COUNT:
continue
if nb_lines == FROM_COUNT:
begin_linenb = nb_lines
if VERBOSE:
print 'Notice: reached beginning on line %d' % nb_lines
##
# check for beginning if option -I was used
if FROM_ID is not None:
if columns is None:
columns = self._split_line(line)
rowids = self._rowids(columns)
if FROM_ID == rowids:
begin_linenb = nb_lines
if VERBOSE:
print 'Notice: reached beginning on line %d' % nb_lines
elif begin_linenb is None:
# begin is set to 1 when we don't use neither -I nor -F
continue
if COUNT is not None and begin_linenb is not None \
and (nb_lines - begin_linenb + 1) > COUNT:
if VERBOSE:
print 'Notice: reached line %d, stopping' % nb_lines
break
if columns is None:
columns = self._split_line(line)
if DEBUG:
print 'Debug: read data'
# now, we may have to apply newline_escapes on configured columns
if NEWLINE_ESCAPES or self.newline_escapes != []:
columns = self._escape_newlines(columns)
nb_cols = len(columns)
if nb_cols != len(self.columns):
if DEBUG:
print line
print columns
print
msg = 'Error parsing columns on line ' +\
'%d [row %d]: found %d columns' \
% (nb_plines, nb_lines, nb_cols)
self.reject.log(msg, line)
continue
yield line, columns
def read_blob(self, line, columns):
@ -727,3 +457,21 @@ class PGLoader:
return columns, rowids
def _rowids(self, columns):
""" get rowids for given input line """
rowids = {}
try:
for id_name, id_col in self.index:
rowids[id_name] = columns[id_col - 1]
except IndexError, e:
messages = [
"Warning: couldn't get id %d on column #%d" % (id_name,
id_col),
str(e)
]
self.reject.log(messages, line)
return rowids

70
pgloader/reader.py Normal file
View File

@ -0,0 +1,70 @@
# -*- coding: ISO-8859-15 -*-
# Author: Dimitri Fontaine <dim@tapoueh.org>
#
# pgloader data reader interface and defaults
from tools import PGLoader_Error, Reject, parse_config_string
from db import db
from lo import ifx_clob, ifx_blob
from options import DRY_RUN, VERBOSE, DEBUG, PEDANTIC
from options import TRUNCATE, VACUUM
from options import COUNT, FROM_COUNT, FROM_ID
from options import INPUT_ENCODING, PG_CLIENT_ENCODING
from options import COPY_SEP, FIELD_SEP, CLOB_SEP, NULL, EMPTY_STRING
from options import NEWLINE_ESCAPES
class DataReader:
"""
Read some text formatted data, which look like CSV but are not:
- no quoting support
- multi-line support is explicit (via
"""
def __init__(self, db, filename, table, columns):
""" init internal variables """
self.db = db
self.filename = filename
self.table = table
self.columns = columns
def readconfig(self, name, config):
""" read configuration section for common options
name is configuration section name, conf the ConfigParser object
specific option reading code is to be found on subclasses
which implements read data parsing code.
see textreader.py and csvreader.py
"""
# optionnal null and empty_string per table parameters
if config.has_option(name, 'null'):
self.db.null = parse_config_string(config.get(name, 'null'))
else:
self.db.null = NULL
if config.has_option(name, 'empty_string'):
self.db.empty_string = parse_config_string(
config.get(name, 'empty_string'))
else:
self.db.empty_string = EMPTY_STRING
# optionnal field separator
self.field_sep = FIELD_SEP
if config.has_option(name, 'field_sep'):
self.field_sep = config.get(name, 'field_sep')
if not DRY_RUN:
if self.db.copy_sep is None:
self.db.copy_sep = self.field_sep
if DEBUG:
print "null: '%s'" % self.db.null
print "empty_string: '%s'" % self.db.empty_string
def readlines(self):
""" read data from configured file, and generate (yields) for
each data line: line, columns and rowid """
pass

318
pgloader/textreader.py Normal file
View File

@ -0,0 +1,318 @@
# -*- coding: ISO-8859-15 -*-
# Author: Dimitri Fontaine <dimitri@dalibo.com>
#
# pgloader text format reader
#
# handles configuration, parse data, then pass them to database module for
# COPY preparation
import os, sys, os.path, time, codecs
from cStringIO import StringIO
from tools import PGLoader_Error, Reject, parse_config_string
from db import db
from lo import ifx_clob, ifx_blob
from reader import DataReader
from options import DRY_RUN, VERBOSE, DEBUG, PEDANTIC
from options import TRUNCATE, VACUUM
from options import COUNT, FROM_COUNT, FROM_ID
from options import INPUT_ENCODING, PG_CLIENT_ENCODING
from options import COPY_SEP, FIELD_SEP, CLOB_SEP, NULL, EMPTY_STRING
from options import NEWLINE_ESCAPES
class TextReader(DataReader):
"""
Read some text formatted data, which look like CSV but are not:
- no quoting support
- trailing separator trailing support
- multi-line support is explicit (via field_count parameter)
- newline escaping in multi-line content support
- ...
"""
def readconfig(self, name, config):
""" get this reader module configuration from config file """
DataReader.readconfig(self, name, config)
# optionnal number of columns per line
self.field_count = None
if config.has_option(name, 'field_count'):
self.field_count = config.getint(name, 'field_count')
# optionnal trailing separator option
self.trailing_sep = False
if config.has_option(name, 'trailing_sep'):
self.trailing_sep = config.get(name, 'trailing_sep') == 'True'
# optionnal newline escaped option
self.newline_escapes = []
if config.has_option(name, 'newline_escapes'):
if NEWLINE_ESCAPES is not None:
# this parameter is globally set, will ignore local
# definition
print "Warning: ignoring %s newline_escapes option" % name
print " option is set to '%s' globally" \
% NEWLINE_ESCAPES
else:
self._parse_fields('newline_escapes',
config.get(name, 'newline_escapes'),
argtype = 'char')
if NEWLINE_ESCAPES is not None:
# set NEWLINE_ESCAPES for each table column
self.newline_escapes = [(a, NEWLINE_ESCAPES)
for (a, x) in self.columns]
def readlines(self):
""" read data from configured file, and generate (yields) for
each data line: line, columns and rowid """
# temporary feature for controlling when to begin real inserts
# if first time launch, set to True.
input_buffer = StringIO()
nb_lines = 0
begin_linenb = None
nb_plines = 0
##
# if neither -I nor -F was used, we can state that begin = 0
if FROM_ID is None and FROM_COUNT == 0:
if VERBOSE:
print 'Notice: beginning on first line'
begin_linenb = 1
if INPUT_ENCODING is not None:
try:
fd = codecs.open(self.filename, encoding = INPUT_ENCODING)
except LookupError, e:
# codec not found
raise PGLoader_Error, "Input codec: %s" % e
else:
try:
fd = open(self.filename)
except IOError, error:
raise PGLoader_Error, error
for line in fd:
# we count real physical lines
nb_plines += 1
if INPUT_ENCODING is not None:
# this may not be necessary, after all
try:
line = line.encode(INPUT_ENCODING)
except UnicodeDecodeError, e:
reject.log(['Codec error', str(e)], input_line)
continue
if self.field_count is not None:
input_buffer.write(line)
# act as if this were the last input_buffer for this line
tmp = self._chomp(input_buffer.getvalue())
columns = self._split_line(tmp)
nb_cols = len(columns)
# check we got them all if not and field_count was
# given, we have a multi-line input
if nb_cols < self.field_count:
continue
else:
# we have read all the logical line
line = tmp
input_buffer.close()
input_buffer = StringIO()
if nb_cols != self.field_count:
if DEBUG:
print line
print columns
print
self.reject.log(
'Error parsing columns on line ' +\
'%d [row %d]: found %d columns' \
% (nb_plines, nb_lines, nb_cols), line)
else:
# normal operation mode : one physical line is one
# logical line. we didn't split input line yet
line = self._chomp(line)
nb_cols = None
columns = None
if len(line) == 0:
# skip empty lines
continue
# we count logical lines
nb_lines += 1
##
# if -F is used, count lines to skip, and skip them
if FROM_COUNT > 0:
if nb_lines < FROM_COUNT:
continue
if nb_lines == FROM_COUNT:
begin_linenb = nb_lines
if VERBOSE:
print 'Notice: reached beginning on line %d' % nb_lines
##
# check for beginning if option -I was used
if FROM_ID is not None:
if columns is None:
columns = self._split_line(line)
rowids = self._rowids(columns)
if FROM_ID == rowids:
begin_linenb = nb_lines
if VERBOSE:
print 'Notice: reached beginning on line %d' % nb_lines
elif begin_linenb is None:
# begin is set to 1 when we don't use neither -I nor -F
continue
if COUNT is not None and begin_linenb is not None \
and (nb_lines - begin_linenb + 1) > COUNT:
if VERBOSE:
print 'Notice: reached line %d, stopping' % nb_lines
break
if columns is None:
columns = self._split_line(line)
if DEBUG:
print 'Debug: read data'
# now, we may have to apply newline_escapes on configured columns
if NEWLINE_ESCAPES or self.newline_escapes != []:
columns = self._escape_newlines(columns)
nb_cols = len(columns)
if nb_cols != len(self.columns):
if DEBUG:
print line
print columns
print
msg = 'Error parsing columns on line ' +\
'%d [row %d]: found %d columns' \
% (nb_plines, nb_lines, nb_cols)
self.reject.log(msg, line)
continue
yield line, columns
def _split_line(self, line):
""" split given line and returns a columns list """
last_sep = 0
columns = []
pos = line.find(self.field_sep, last_sep)
while pos != -1:
# don't consider backslash escaped separators melted into data
# warning, we may find some data|\\|data
# that is some escaped \ then a legal | separator
i=1
while pos-i >= 0 and line[pos-i] == '\\':
i += 1
# now i-1 is the number of \ preceding the separator
# it's a legal separator only if i-1 is even
if (i-1) % 2 == 0:
# there's no need to keep escaped delimiters inside a column
# and we want to avoid double-escaping them
columns.append(line[last_sep:pos]
.replace("\\%s" % self.field_sep,
self.field_sep))
last_sep = pos + 1
pos = line.find(self.field_sep, pos + 1)
# append last column
columns.append(line[last_sep:])
return columns
def _chomp(self, input_line):
""" chomp end of line when necessary, and trailing_sep too """
if len(input_line) == 0:
if DEBUG:
print 'pgloader._chomp: skipping empty line'
return input_line
# chomp a copy of the input_line, we will need the original one
line = input_line[:]
if line[-2:] == "\r\n":
line = line[:-2]
elif line[-1] == "\r":
line = line[:-1]
elif line[-1] == "\n":
line = line[:-1]
# trailing separator to whipe out ?
if self.trailing_sep \
and line[-len(self.field_sep)] == self.field_sep:
line = line[:-len(self.field_sep)]
return line
def _escape_newlines(self, columns):
""" trim out newline escapes to be found inside data columns """
if DEBUG:
print 'Debug: escaping columns newlines'
print 'Debug:', self.newline_escapes
for (ne_col, ne_esc) in self.newline_escapes:
# don't forget configured col references use counting from 1
ne_colnum = dict(self.columns)[ne_col] - 1
if DEBUG:
print 'Debug: column %s[%d] escaped with %s' \
% (ne_col, ne_colnum+1, ne_esc)
col_data = columns[ne_colnum]
if self.db.is_null(col_data) or self.db.is_empty(col_data):
if DEBUG:
print 'Debug: skipping null or empty column'
continue
escaped = []
tmp = col_data
for line in tmp.split('\n'):
if len(line) == 0:
if DEBUG:
print 'Debug: skipping empty line'
continue
if DEBUG:
print 'Debug: chomping:', line
tmpline = self._chomp(line)
if tmpline[-1] == ne_esc:
tmpline = tmpline[:-1]
# chomp out only escaping char, not newline itself
escaped.append(line[:len(tmpline)] + \
line[len(tmpline)+1:])
else:
# line does not end with escaping char, keep it
escaped.append(line)
columns[ne_colnum] = '\n'.join(escaped)
return columns