From 8ed8219e372cb89ffa089a6f89852f4565e51eb8 Mon Sep 17 00:00:00 2001 From: dim Date: Mon, 4 Jun 2007 12:14:18 +0000 Subject: [PATCH] CSV support, can load not-all-columns of data files, can load to table with more cols than data file --- debian/changelog | 15 ++ examples/README | 39 ++- examples/csv/csv.data | 6 + examples/csv/csv.sql | 6 + examples/partial/partial.data | 7 + examples/partial/partial.sql | 7 + examples/pgloader.conf | 24 +- pgloader.1.sgml | 280 +++++++++++++------- pgloader/csvreader.py | 83 ++++++ pgloader/db.py | 12 +- pgloader/pgloader.py | 472 ++++++++-------------------------- pgloader/reader.py | 70 +++++ pgloader/textreader.py | 318 +++++++++++++++++++++++ 13 files changed, 868 insertions(+), 471 deletions(-) create mode 100644 examples/csv/csv.data create mode 100644 examples/csv/csv.sql create mode 100644 examples/partial/partial.data create mode 100644 examples/partial/partial.sql create mode 100644 pgloader/csvreader.py create mode 100644 pgloader/reader.py create mode 100644 pgloader/textreader.py diff --git a/debian/changelog b/debian/changelog index f3c39db..1ad75c0 100644 --- a/debian/changelog +++ b/debian/changelog @@ -1,3 +1,18 @@ +pgloader (2.2.0) unstable; urgency=low + + * Support for partial loading of data (subrange(s) of columns) + * COPY table (col1, col2, ..., coln) systematically used + * Support for CSV format (with quoting) + + -- Dimitri Fontaine Mon, 04 Jun 2007 11:13:21 +0200 + +pgloader (2.1.0) unstable; urgency=low + + * Added support for partial COPY table definition + * Documentation and example update (see serial) + + -- Dimitri Fontaine Fri, 19 Jan 2007 12:25:39 +0100 + pgloader (2.0.2) unstable; urgency=low * configurable null and empty_string representations diff --git a/examples/README b/examples/README index 5889c20..e1fb48e 100644 --- a/examples/README +++ b/examples/README @@ -20,8 +20,10 @@ The provided examples are: . errors - Same test, but with impossible dates. Should report some errors. It does -not report errors, check you're not using psycopg 1.1.21. + Same test, but with impossible dates. Should report some errors. If it + does not report errors, check you're not using psycopg 1.1.21. + + Should report 3 errors out of 7 lines (4 updates). . clob @@ -33,23 +35,38 @@ not report errors, check you're not using psycopg 1.1.21. A dataset with newline escaped and multi-line input (without quoting) Beware of data reordering, too. +. csv + + A dataset with csv delimiter ',' and quoting '"'. + +. partial + + A dataset from which we only load some columns of the provided one. + +. serial + + In this dataset the id field is ommited, it's a serial which will be + automatically set by PostgreSQL while COPYing. You can launch all those pgloader tests in one run, provided you created the necessary tables: - $ for test in simple clob cluttured; do psql pgloader < $test/$test.sql; done + $ for sql in */*sql; do psql pgloader < $sql; done $ ../pgloader.py -Tc pgloader.conf - + [...] - + Table name | duration | size | updates | errors ==================================================================== - clob | 0.121s | 32 kB | 7 | 0 - cluttered | 0.041s | 32 kB | 3 | 0 - simple | 0.040s | 16 kB | 6 | 0 + clob | 0.041s | 32 kB | 7 | 0 + cluttered | 0.037s | 32 kB | 6 | 0 + csv | 0.019s | 16 kB | 6 | 0 + errors | 0.032s | 32 kB | 4 | 3 + partial | 0.024s | 32 kB | 7 | 0 + serial | 0.028s | 32 kB | 7 | 0 + simple | 0.029s | 32 kB | 7 | 0 ==================================================================== - Total | 0.369s | 80 kB | 16 | 0 - -And you then have a nice summary. + Total | 0.210s | 208 kB | 44 | 3 + diff --git a/examples/csv/csv.data b/examples/csv/csv.data new file mode 100644 index 0000000..d6e01b3 --- /dev/null +++ b/examples/csv/csv.data @@ -0,0 +1,6 @@ +"2.6.190.56","2.6.190.63","33996344","33996351","GB","United Kingdom" +"3.0.0.0","4.17.135.31","50331648","68257567","US","United States" +"4.17.135.32","4.17.135.63","68257568","68257599","CA","Canada" +"4.17.135.64","4.17.142.255","68257600","68259583","US","United States" +"4.17.143.0","4.17.143.15","68259584","68259599","CA","Canada" +"4.17.143.16","4.18.32.71","68259600","68296775","US","United States" diff --git a/examples/csv/csv.sql b/examples/csv/csv.sql new file mode 100644 index 0000000..ed0c5f7 --- /dev/null +++ b/examples/csv/csv.sql @@ -0,0 +1,6 @@ +CREATE TABLE csv ( + a bigint, + b bigint, + c char(2), + d text +); \ No newline at end of file diff --git a/examples/partial/partial.data b/examples/partial/partial.data new file mode 100644 index 0000000..2d15d92 --- /dev/null +++ b/examples/partial/partial.data @@ -0,0 +1,7 @@ +1%foo%bar%baz%hop +2%foo%bar%baz%hop +3%foo%bar%baz%hop +4%foo%bar%baz%hop +5%foo%bar%baz%hop +6%foo%bar%baz%hop +7%foo%bar%baz%hop diff --git a/examples/partial/partial.sql b/examples/partial/partial.sql new file mode 100644 index 0000000..b58b0fa --- /dev/null +++ b/examples/partial/partial.sql @@ -0,0 +1,7 @@ +CREATE TABLE partial ( + a integer primary key, + b text, + c text, + d text, + e text +); \ No newline at end of file diff --git a/examples/pgloader.conf b/examples/pgloader.conf index f4e398b..b80ecdb 100644 --- a/examples/pgloader.conf +++ b/examples/pgloader.conf @@ -17,6 +17,7 @@ newline_escapes = \ [simple] table = simple +format = text filename = simple/simple.data field_sep = | trailing_sep = True @@ -28,6 +29,7 @@ reject_data = /tmp/simple.rej [errors] table = errors +format = text filename = errors/errors.data field_sep = | trailing_sep = True @@ -35,6 +37,7 @@ columns = a:1, b:3, c:2 [clob] table = clob +format = text filename = clob/clob.data field_sep = | columns = a:1, b:2 @@ -43,6 +46,7 @@ blob_columns = b:2:ifx_clob [cluttered] table = cluttered +format = text filename = cluttered/cluttered.data field_sep = ^ trailing_sep = True @@ -52,7 +56,25 @@ columns = a:1, b:3, c:2 [serial] table = serial +format = text filename = serial/serial.data field_sep = ; -partial_copy = True columns = b:2, c:1 + +[partial] +table = partial +format = text +filename = partial/partial.data +field_sep = % +columns = a:1, b:2, c:3, d:4, e:5 +only_cols = 1-3, 5 + +[csv] +table = csv +format = csv +filename = csv/csv.data +field_sep = , +quotechar = " +columns = x:1, y:2, a:3, b:4, c:5, d:6 +only_cols = 3-6 + diff --git a/pgloader.1.sgml b/pgloader.1.sgml index 159d67c..cadf5d9 100644 --- a/pgloader.1.sgml +++ b/pgloader.1.sgml @@ -2,7 +2,7 @@
- dim@dalibo.com + dim@tapoueh.org
Dimitri @@ -263,7 +263,7 @@ Import CSV data and Large Object to PostgreSQL - CONFIGURATION + GLOBAL CONFIGURATION SECTION The configuration file has a .ini file syntax, its first section has to be the pgsql one, defining how to @@ -404,9 +404,9 @@ Import CSV data and Large Object to PostgreSQL local setting). - You can setup here a global escape caracter, to be considered on - each and every column of each and every table defined - thereafter. + You can setup here a global escape caracter, to be + considered on each and every column of each and every + text-format table defined thereafter. @@ -439,12 +439,20 @@ Import CSV data and Large Object to PostgreSQL + + + COMMON FORMAT CONFIGURATION PARAMETERS You then can define any number of data section, and give them an arbitrary name. Some options are required, some are actually optionnals, in which case it is said so thereafter. + + First, we'll go through common parameters, applicable whichever + format of data you're refering to. Then text-format only + parameters will be presented, followed by csv-only parameters. + @@ -455,6 +463,19 @@ Import CSV data and Large Object to PostgreSQL + + + + + The format data are to be found, either + text or csv. + + + See next sections for format specific options. + + + + @@ -506,45 +527,6 @@ Import CSV data and Large Object to PostgreSQL - - - - - The UNLOAD command does not escape - newlines when they appear into table data. Hence, you may - obtain multi-line data files, where a single database row - (say tuple if you prefer to) can span multiple physical - lines into the unloaded file. - - - If this is your case, you may want to configure here the - number of columns per tuple. Then - pgloader will count columns and - buffer line input in order to re-assemble several physical - lines into one data row when needed. - - - This parameter is optionnal. - - - - - - - - - If this option is set to True, the - input data file is known to append a - field_sep as the last character of each - of its lines. With this option set, this last character is - then not considered as a field separator. - - - This parameter is optionnal and defaults to False. - - - - @@ -591,16 +573,137 @@ Import CSV data and Large Object to PostgreSQL - + - If your columns definition does not contain all of the - PostgreSQL table definition, set this parameter to - True. + Table index definition, to be used in blob UPDATE'ing. You + define an index column by giving its name and its column + number (as found into your data file, and counting from 1) + separated by a colon. If your table has a composite key, + then you can define multiple columns here, separated by a + comma. - This parameter is optionnal and defaults to - False. + index = colname:3, other_colname:5 + + + + + + + + + You can define here table columns, with the same + definition format as in previous index + parameter. + + + Note you'll have to define here all the columns to be + found in data file, whether you want to use them all or + not. When not using them all, use the + only_cols parameter to restrict. + + + As of pgloader 2.2 the column list used + might not be the same as the table columns definition. + + + In case you have a lot a columns per table, you will want + to use multiple lines for this parameter value. Python + ConfigParser module knows how to read + multi-line parameters, you don't have to escape anything. + + + + + + + + + If you want to only load a part of the columns you have + into the data file, this option let you define which + columns you're interrested in. only_col + is a comma separated list of ranges or values, as in + following example. + + + only_cols = 1-3, 5 + + + This parameter is optionnal and defaults to the list of + all columns given on the columns + parameter list, in the colname order. + + + + + + + + + The definition of the colums where to find some blob or + clob reference. This definition is composed by a table + column name, a column number (couting from one) reference + into the Informix UNLOAD data file, and + a large object type, separated by a colon. You can have + several columns in this field, separated by a + comma. + + + Supported large objects type are Informix blob and clob, + the awaited configuration string are respectively + ifx_blob for binary (bytea) content + type and ifx_clob for text type values. + + + Here's an example: + + + blob_type = clob_column:3:ifx_blob, other_clob_column:5:ifx_clob + + + + + + + + TEXT FORMAT CONFIGURATION PARAMETERS + + + + + + The UNLOAD command does not escape + newlines when they appear into table data. Hence, you may + obtain multi-line data files, where a single database row + (say tuple if you prefer to) can span multiple physical + lines into the unloaded file. + + + If this is your case, you may want to configure here the + number of columns per tuple. Then + pgloader will count columns and + buffer line input in order to re-assemble several physical + lines into one data row when needed. + + + This parameter is optionnal. + + + + + + + + + If this option is set to True, the + input data file is known to append a + field_sep as the last character of each + of its lines. With this option set, this last character is + then not considered as a field separator. + + + This parameter is optionnal and defaults to False. @@ -644,66 +747,59 @@ Import CSV data and Large Object to PostgreSQL + + + + + CSV FORMAT CONFIGURATION PARAMETERS + + - + - - Table index definition, to be used in blob UPDATE'ing. You - define an index column by giving its name and its column - number (as found into your data file, and counting from 1) - separated by a colon. If your table has a composite key, - then you can define multiple columns here, separated by a - comma. - - index = colname:3, other_colname:5 + Controls how instances of quotechar appearing inside a + field should be themselves be quoted. When True, the + character is doubled. When False, the escapechar is used + as a prefix to the quotechar. It defaults to True. - + - You can define here table columns, with the same - definition format as in previous index - parameter. - - - In case you have a lot a columns per table, you will want - to use ultiple lines for this parameter value. Python - ConfigParser module knows how to read - multi-line parameters, you don't have to escape anything. + A one-character string used by the writer to escape the + delimiter if quoting is set to QUOTE_NONE and the + quotechar if doublequote is False. On reading, the + escapechar removes any special meaning from the following + character. It defaults to None, which disables escaping. - + - The definition of the colums where to find some blob or - clob reference. This definition is composed by a table - column name, a column number (couting from one) reference - into the Informix UNLOAD data file, and - a large object type, separated by a colon. You can have - several columns in this field, separated by a - comma. - - - Supported large objects type are Informix blob and clob, - the awaited configuration string are respectively - ifx_blob for binary (bytea) content - type and ifx_clob for text type values. - - - Here's an example: - - - blob_type = clob_column:3:ifx_blob, other_clob_column:5:ifx_clob + A one-character string used to quote fields containing + special characters, such as the delimiter or quotechar, or + which contain new-line characters. It defaults to '"'. + + + + + + When True, whitespace immediately following the delimiter + is ignored. The default is False. + + + + @@ -737,7 +833,7 @@ Import CSV data and Large Object to PostgreSQL BUGS - Please report bugs to Dimitri Fontaine <dim@dalibo.com>. + Please report bugs to Dimitri Fontaine <dim@tapoueh.org>. When last line is alone on a COPY command and its @@ -750,7 +846,7 @@ Import CSV data and Large Object to PostgreSQL AUTHORS pgloader is written by Dimitri - Fontaine dim@dalibo.com. + Fontaine dim@tapoueh.org. diff --git a/pgloader/csvreader.py b/pgloader/csvreader.py new file mode 100644 index 0000000..a857b44 --- /dev/null +++ b/pgloader/csvreader.py @@ -0,0 +1,83 @@ +# -*- coding: ISO-8859-15 -*- +# Author: Dimitri Fontaine +# +# pgloader text format reader +# +# handles configuration, parse data, then pass them to database module for +# COPY preparation + +import os, sys, os.path, time, codecs, csv +from cStringIO import StringIO + +from tools import PGLoader_Error, Reject, parse_config_string +from db import db +from lo import ifx_clob, ifx_blob +from reader import DataReader + +from options import DRY_RUN, VERBOSE, DEBUG, PEDANTIC +from options import TRUNCATE, VACUUM +from options import COUNT, FROM_COUNT, FROM_ID +from options import INPUT_ENCODING, PG_CLIENT_ENCODING +from options import COPY_SEP, FIELD_SEP, CLOB_SEP, NULL, EMPTY_STRING +from options import NEWLINE_ESCAPES + +class CSVReader(DataReader): + """ + Read some CSV formatted data + """ + + def readconfig(self, name, config): + """ get this reader module configuration from config file """ + DataReader.readconfig(self, name, config) + + # optionnal doublequote: defaults to escaping, not doubling + self.doublequote = False + if config.has_option(name, 'doublequote'): + self.trailing_sep = config.get(name, 'doublequote') == 'True' + + self.escapechar = None + if config.has_option(name, 'escapechar'): + self.escapechar = config.get(name, 'escapechar')[0] + + self.quotechar = '"' + if config.has_option(name, 'quotechar'): + self.quotechar = config.get(name, 'quotechar')[0] + + self.skipinitialspace = False + if config.has_option(name, 'skipinitialspace'): + self.skipinitialspace = config.get(name, 'skipinitialspace') == 'True' + + + def readlines(self): + """ read data from configured file, and generate (yields) for + each data line: line, columns and rowid """ + + # make a dialect, then implement a reader with it + class pgloader_dialect(csv.Dialect): + delimiter = self.field_sep + doublequote = self.doublequote + escapechar = self.escapechar + quotechar = self.quotechar + skipinitialspace = self.skipinitialspace + + lineterminator = '\r\n' + quoting = csv.QUOTE_MINIMAL + + csv.register_dialect('pgloader', pgloader_dialect) + + if INPUT_ENCODING is not None: + try: + fd = codecs.open(self.filename, encoding = INPUT_ENCODING) + except LookupError, e: + # codec not found + raise PGLoader_Error, "Input codec: %s" % e + else: + try: + fd = open(self.filename, "rb") + except IOError, error: + raise PGLoader_Error, error + + # now read the lines + for columns in csv.reader(fd, dialect = 'pgloader'): + line = self.field_sep.join(columns) + yield line, columns diff --git a/pgloader/db.py b/pgloader/db.py index 909cb28..c981823 100644 --- a/pgloader/db.py +++ b/pgloader/db.py @@ -257,16 +257,18 @@ class db: print "--- COPY data buffer saved in %s ---" % n return n - def copy_from(self, table, partial_coldef, columns, input_line, + def copy_from(self, table, table_colspec, columns, input_line, reject, EOF = False): """ Generate some COPY SQL for PostgreSQL """ ok = True if not self.copy: self.copy = True - if partial_coldef is not None: - # we prefer not having to mess table param on the caller side - # as it's an implementation detail concerning db class - table = "%s (%s) " % (table, partial_coldef) + ## + # build the table colomns specs from parameters + # ie. we always issue COPY table (col1, col2, ..., coln) commands + table = "%s (%s) " % (table, ", ".join(table_colspec)) + if DEBUG: + print 'COPY %s' % table if EOF or self.running_commands == self.copy_every \ and self.buffer is not None: diff --git a/pgloader/pgloader.py b/pgloader/pgloader.py index 61c5a20..85765ae 100644 --- a/pgloader/pgloader.py +++ b/pgloader/pgloader.py @@ -49,14 +49,6 @@ class PGLoader: print print "[%s] parse configuration" % self.name - # some configuration elements don't have default value - for opt in ('table', 'filename'): - if config.has_option(name, opt): - self.__dict__[opt] = config.get(name, opt) - else: - print 'Error: please configure %s.%s' % (name, opt) - self.config_errors += 1 - ## # reject log and data files defaults to /tmp/
.rej[.log] if config.has_option(name, 'reject_log'): @@ -75,38 +67,6 @@ class PGLoader: # reject logging self.reject = Reject(self.reject_log, self.reject_data) - - - # optionnal number of columns per line - self.field_count = None - if config.has_option(name, 'field_count'): - self.field_count = config.getint(name, 'field_count') - - # optionnal field separator - self.field_sep = FIELD_SEP - if config.has_option(name, 'field_sep'): - self.field_sep = config.get(name, 'field_sep') - - if not DRY_RUN: - if self.db.copy_sep is None: - self.db.copy_sep = self.field_sep - - # optionnal trailing separator option - self.trailing_sep = False - if config.has_option(name, 'trailing_sep'): - self.trailing_sep = config.get(name, 'trailing_sep') == 'True' - - # optionnal null and empty_string per table parameters - if config.has_option(name, 'null'): - self.db.null = parse_config_string(config.get(name, 'null')) - else: - self.db.null = NULL - - if config.has_option(name, 'empty_string'): - self.db.empty_string = parse_config_string( - config.get(name, 'empty_string')) - else: - self.db.empty_string = EMPTY_STRING # optionnal local option client_encoding if config.has_option(name, 'client_encoding'): @@ -114,10 +74,18 @@ class PGLoader: config.get(name, 'client_encoding')) if DEBUG: - print "null: '%s'" % self.db.null - print "empty_string: '%s'" % self.db.empty_string print "client_encoding: '%s'" % self.db.client_encoding + + ## + # data filename + for opt in ('table', 'filename'): + if config.has_option(name, opt): + self.__dict__[opt] = config.get(name, opt) + else: + print 'Error: please configure %s.%s' % (name, opt) + self.config_errors += 1 + ## # we parse some columns definitions if config.has_option(name, 'index'): @@ -137,34 +105,74 @@ class PGLoader: print 'blob_columns', self.blob_cols + ## + # We have for example columns = col1:2, col2:1 + # this means the order of input columns is not the same as the + # awaited order of COPY, so we want a mapping index, here [2, 1] + if self.columns is not None: + self.col_mapping = [i for (c, i) in self.columns] + + ## # optionnal partial loading option (sequences case) - self.partial_copy = False - self.partial_coldef = None - - if config.has_option(name, 'partial_copy'): - self.partial_copy = config.get(name, 'partial_copy') == 'True' + # self.table_colspec is the column list to give to + # COPY table(...) command, either the cols given in + # the only_cols config, or the columns directly + self.only_cols = None + self.table_colspec = [n for (n, pos) in self.columns] - if self.partial_copy: - self.partial_coldef = [name for (name, pos) in self.columns] + if config.has_option(name, 'only_cols'): + self.only_cols = config.get(name, 'only_cols') - # optionnal newline escaped option - self.newline_escapes = [] - if config.has_option(name, 'newline_escapes'): - if NEWLINE_ESCAPES is not None: - # this parameter is globally set, will ignore local - # definition - print "Warning: ignoring %s newline_escapes option" % name - print " option is set to '%s' globally" \ - % NEWLINE_ESCAPES - else: - self._parse_fields('newline_escapes', - config.get(name, 'newline_escapes'), - argtype = 'char') + ## + # first make an index list out of configuration + # which contains coma separated ranges or values + # as for example: only_cols = 1-3, 5 + try: + only_cols = [x.strip() for x in self.only_cols.split(",")] + expanded = [] - if NEWLINE_ESCAPES is not None: - # set NEWLINE_ESCAPES for each table column - self.newline_escapes = [(a, NEWLINE_ESCAPES) - for (a, x) in self.columns] + # expand ranges + for oc in only_cols: + if '-' in oc: + (a, b) = [int(x) for x in oc.split("-")] + for i in range(a, b+1): + expanded.append(i) + else: + expanded.append(int(oc)) + + self.only_cols = expanded + self.table_colspec = [self.columns[x-1][0] for x in expanded] + + except Exception, e: + print 'Error: section %s, only_cols: configured range is invalid' % name + raise PGLoader_Error, e + + if DEBUG: + print "only_cols", self.only_cols + print "table_colspec", self.table_colspec + + + ## + # data format, from which depend data reader + self.format = None + if config.has_option(name, 'format'): + self.format = config.get(name, 'format') + + if self.format.lower() == 'csv': + from csvreader import CSVReader + self.reader = CSVReader(self.db, self.filename, self.table, self.columns) + + elif self.format.lower() == 'text': + from textreader import TextReader + self.reader = TextReader(self.db, self.filename, self.table, self.columns) + + if self.format is None: + print 'Error: %s: format parameter needed' % name + raise PGLoader_Error + + ## + # parse the reader specific section options + self.reader.readconfig(name, config) ## # How can we mix those columns definitions ? @@ -187,12 +195,6 @@ class PGLoader: "for blob importing (blob_cols), please configure index") self.config_errors += 1 - ## - # We have for example columns = col1:2, col2:1 - # this means the order of input columns is not the same as the - # awaited order of COPY, so we want a mapping index, here [2, 1] - if self.columns is not None: - self.col_mapping = [i for (c, i) in self.columns] ## # if options.fromid is not None it has to be either a value, @@ -267,55 +269,7 @@ class PGLoader: except Exception, error: # FIXME: make some errors and write some error messages raise - - def _split_line(self, line): - """ split given line and returns a columns list """ - last_sep = 0 - columns = [] - pos = line.find(self.field_sep, last_sep) - - while pos != -1: - # don't consider backslash escaped separators melted into data - # warning, we may find some data|\\|data - # that is some escaped \ then a legal | separator - i=1 - while pos-i >= 0 and line[pos-i] == '\\': - i += 1 - - # now i-1 is the number of \ preceding the separator - # it's a legal separator only if i-1 is even - if (i-1) % 2 == 0: - # there's no need to keep escaped delimiters inside a column - # and we want to avoid double-escaping them - columns.append(line[last_sep:pos] - .replace("\\%s" % self.field_sep, - self.field_sep)) - last_sep = pos + 1 - - pos = line.find(self.field_sep, pos + 1) - - # append last column - columns.append(line[last_sep:]) - return columns - - def _rowids(self, columns): - """ get rowids for given input line """ - rowids = {} - try: - for id_name, id_col in self.index: - rowids[id_name] = columns[id_col - 1] - - except IndexError, e: - messages = [ - "Warning: couldn't get id %d on column #%d" % (id_name, - id_col), - str(e) - ] - - self.reject.log(messages, line) - - return rowids - + def summary(self): """ return a (duration, updates, errors) tuple """ self.duration = time.time() - self.init_time @@ -351,7 +305,7 @@ class PGLoader: if self.columns is not None: print "Notice: COPY csv data" - self.csv_import() + self.data_import() elif self.blob_cols is not None: # elif: COPY process also blob data @@ -360,18 +314,9 @@ class PGLoader: # then show up some stats self.print_stats() - def csv_import(self): - """ import CSV data, using COPY """ - - ## - # Inform database about optionnal partial columns definition - # usage for COPY (sequences case, e.g.) - if self.partial_coldef is not None: - partial_copy_coldef = ", ".join(self.partial_coldef) - else: - partial_copy_coldef = None - - for line, columns in self.read_data(): + def data_import(self): + """ import CSV or TEXT data, using COPY """ + for line, columns in self.reader.readlines(): if self.blob_cols is not None: columns, rowids = self.read_blob(line, columns) @@ -379,34 +324,36 @@ class PGLoader: print self.col_mapping print len(columns), len(self.col_mapping) - if False and VERBOSE: - print line - for i in [37, 44, 52, 38]: - print len(columns[i-1]), columns[i-1] - print - ## - # Now we have to reorder the columns to match schema - c_ordered = [columns[i-1] for i in self.col_mapping] + # Now we have to reorder the columns to match schema, and only + # consider data matched by self.only_cols + if self.only_cols is not None: + c_ordered = [columns[self.col_mapping[i-1]-1] for i in self.only_cols] + else: + c_ordered = [columns[i-1] for i in self.col_mapping] if DRY_RUN or DEBUG: print line print c_ordered print len(c_ordered) - print self.db.partial_coldef + print self.table_colspec print if not DRY_RUN: - self.db.copy_from(self.table, partial_copy_coldef, + self.db.copy_from(self.table, self.table_colspec, c_ordered, line, self.reject) if not DRY_RUN: # we may need a last COPY for the rest of data - self.db.copy_from(self.table, partial_copy_coldef, + self.db.copy_from(self.table, self.table_colspec, None, None, self.reject, EOF = True) return + ## + # BLOB data reading/parsing + # probably should be moved out from this file + def lo_import(self): """ import large object data, using UPDATEs """ @@ -426,223 +373,6 @@ class PGLoader: rowids, cname, data, btype, line, self.reject) - def _chomp(self, input_line): - """ chomp end of line when necessary, and trailing_sep too """ - - if len(input_line) == 0: - if DEBUG: - print 'pgloader._chomp: skipping empty line' - return input_line - - # chomp a copy of the input_line, we will need the original one - line = input_line[:] - - if line[-2:] == "\r\n": - line = line[:-2] - - elif line[-1] == "\r": - line = line[:-1] - - elif line[-1] == "\n": - line = line[:-1] - - # trailing separator to whipe out ? - if self.trailing_sep \ - and line[-len(self.field_sep)] == self.field_sep: - - line = line[:-len(self.field_sep)] - - return line - - def _escape_newlines(self, columns): - """ trim out newline escapes to be found inside data columns """ - if DEBUG: - print 'Debug: escaping columns newlines' - print 'Debug:', self.newline_escapes - - for (ne_col, ne_esc) in self.newline_escapes: - # don't forget configured col references use counting from 1 - ne_colnum = dict(self.columns)[ne_col] - 1 - if DEBUG: - print 'Debug: column %s[%d] escaped with %s' \ - % (ne_col, ne_colnum+1, ne_esc) - - col_data = columns[ne_colnum] - - if self.db.is_null(col_data) or self.db.is_empty(col_data): - if DEBUG: - print 'Debug: skipping null or empty column' - continue - - escaped = [] - tmp = col_data - - for line in tmp.split('\n'): - if len(line) == 0: - if DEBUG: - print 'Debug: skipping empty line' - continue - - if DEBUG: - print 'Debug: chomping:', line - - tmpline = self._chomp(line) - if tmpline[-1] == ne_esc: - tmpline = tmpline[:-1] - - # chomp out only escaping char, not newline itself - escaped.append(line[:len(tmpline)] + \ - line[len(tmpline)+1:]) - - else: - # line does not end with escaping char, keep it - escaped.append(line) - - columns[ne_colnum] = '\n'.join(escaped) - return columns - - def read_data(self): - """ read data from configured file, and generate (yields) for - each data line: line, columns and rowid """ - - # temporary feature for controlling when to begin real inserts - # if first time launch, set to True. - input_buffer = StringIO() - nb_lines = 0 - begin_linenb = None - nb_plines = 0 - - ## - # if neither -I nor -F was used, we can state that begin = 0 - if FROM_ID is None and FROM_COUNT == 0: - if VERBOSE: - print 'Notice: beginning on first line' - begin_linenb = 1 - - if INPUT_ENCODING is not None: - try: - fd = codecs.open(self.filename, encoding = INPUT_ENCODING) - except LookupError, e: - # codec not found - raise PGLoader_Error, "Input codec: %s" % e - else: - try: - fd = open(self.filename) - except IOError, error: - raise PGLoader_Error, error - - for line in fd: - # we count real physical lines - nb_plines += 1 - - if INPUT_ENCODING is not None: - # this may not be necessary, after all - try: - line = line.encode(INPUT_ENCODING) - except UnicodeDecodeError, e: - reject.log(['Codec error', str(e)], input_line) - continue - - if self.field_count is not None: - input_buffer.write(line) - # act as if this were the last input_buffer for this line - tmp = self._chomp(input_buffer.getvalue()) - columns = self._split_line(tmp) - nb_cols = len(columns) - - # check we got them all if not and field_count was - # given, we have a multi-line input - if nb_cols < self.field_count: - continue - else: - # we have read all the logical line - line = tmp - input_buffer.close() - input_buffer = StringIO() - - if nb_cols != self.field_count: - if DEBUG: - print line - print columns - print - self.reject.log( - 'Error parsing columns on line ' +\ - '%d [row %d]: found %d columns' \ - % (nb_plines, nb_lines, nb_cols), line) - else: - # normal operation mode : one physical line is one - # logical line. we didn't split input line yet - line = self._chomp(line) - nb_cols = None - columns = None - - if len(line) == 0: - # skip empty lines - continue - - # we count logical lines - nb_lines += 1 - - ## - # if -F is used, count lines to skip, and skip them - if FROM_COUNT > 0: - if nb_lines < FROM_COUNT: - continue - - if nb_lines == FROM_COUNT: - begin_linenb = nb_lines - if VERBOSE: - print 'Notice: reached beginning on line %d' % nb_lines - - ## - # check for beginning if option -I was used - if FROM_ID is not None: - if columns is None: - columns = self._split_line(line) - - rowids = self._rowids(columns) - - if FROM_ID == rowids: - begin_linenb = nb_lines - if VERBOSE: - print 'Notice: reached beginning on line %d' % nb_lines - - elif begin_linenb is None: - # begin is set to 1 when we don't use neither -I nor -F - continue - - if COUNT is not None and begin_linenb is not None \ - and (nb_lines - begin_linenb + 1) > COUNT: - - if VERBOSE: - print 'Notice: reached line %d, stopping' % nb_lines - break - - if columns is None: - columns = self._split_line(line) - - if DEBUG: - print 'Debug: read data' - - # now, we may have to apply newline_escapes on configured columns - if NEWLINE_ESCAPES or self.newline_escapes != []: - columns = self._escape_newlines(columns) - - nb_cols = len(columns) - if nb_cols != len(self.columns): - if DEBUG: - print line - print columns - print - - msg = 'Error parsing columns on line ' +\ - '%d [row %d]: found %d columns' \ - % (nb_plines, nb_lines, nb_cols) - - self.reject.log(msg, line) - continue - - yield line, columns def read_blob(self, line, columns): @@ -727,3 +457,21 @@ class PGLoader: return columns, rowids + def _rowids(self, columns): + """ get rowids for given input line """ + rowids = {} + try: + for id_name, id_col in self.index: + rowids[id_name] = columns[id_col - 1] + + except IndexError, e: + messages = [ + "Warning: couldn't get id %d on column #%d" % (id_name, + id_col), + str(e) + ] + + self.reject.log(messages, line) + + return rowids + diff --git a/pgloader/reader.py b/pgloader/reader.py new file mode 100644 index 0000000..71aadf5 --- /dev/null +++ b/pgloader/reader.py @@ -0,0 +1,70 @@ +# -*- coding: ISO-8859-15 -*- +# Author: Dimitri Fontaine +# +# pgloader data reader interface and defaults + +from tools import PGLoader_Error, Reject, parse_config_string +from db import db +from lo import ifx_clob, ifx_blob + +from options import DRY_RUN, VERBOSE, DEBUG, PEDANTIC +from options import TRUNCATE, VACUUM +from options import COUNT, FROM_COUNT, FROM_ID +from options import INPUT_ENCODING, PG_CLIENT_ENCODING +from options import COPY_SEP, FIELD_SEP, CLOB_SEP, NULL, EMPTY_STRING +from options import NEWLINE_ESCAPES + +class DataReader: + """ + Read some text formatted data, which look like CSV but are not: + - no quoting support + - multi-line support is explicit (via + """ + + def __init__(self, db, filename, table, columns): + """ init internal variables """ + self.db = db + self.filename = filename + self.table = table + self.columns = columns + + def readconfig(self, name, config): + """ read configuration section for common options + + name is configuration section name, conf the ConfigParser object + + specific option reading code is to be found on subclasses + which implements read data parsing code. + + see textreader.py and csvreader.py + """ + # optionnal null and empty_string per table parameters + if config.has_option(name, 'null'): + self.db.null = parse_config_string(config.get(name, 'null')) + else: + self.db.null = NULL + + if config.has_option(name, 'empty_string'): + self.db.empty_string = parse_config_string( + config.get(name, 'empty_string')) + else: + self.db.empty_string = EMPTY_STRING + + + # optionnal field separator + self.field_sep = FIELD_SEP + if config.has_option(name, 'field_sep'): + self.field_sep = config.get(name, 'field_sep') + + if not DRY_RUN: + if self.db.copy_sep is None: + self.db.copy_sep = self.field_sep + + if DEBUG: + print "null: '%s'" % self.db.null + print "empty_string: '%s'" % self.db.empty_string + + def readlines(self): + """ read data from configured file, and generate (yields) for + each data line: line, columns and rowid """ + pass diff --git a/pgloader/textreader.py b/pgloader/textreader.py new file mode 100644 index 0000000..34304b6 --- /dev/null +++ b/pgloader/textreader.py @@ -0,0 +1,318 @@ +# -*- coding: ISO-8859-15 -*- +# Author: Dimitri Fontaine +# +# pgloader text format reader +# +# handles configuration, parse data, then pass them to database module for +# COPY preparation + +import os, sys, os.path, time, codecs +from cStringIO import StringIO + +from tools import PGLoader_Error, Reject, parse_config_string +from db import db +from lo import ifx_clob, ifx_blob +from reader import DataReader + +from options import DRY_RUN, VERBOSE, DEBUG, PEDANTIC +from options import TRUNCATE, VACUUM +from options import COUNT, FROM_COUNT, FROM_ID +from options import INPUT_ENCODING, PG_CLIENT_ENCODING +from options import COPY_SEP, FIELD_SEP, CLOB_SEP, NULL, EMPTY_STRING +from options import NEWLINE_ESCAPES + +class TextReader(DataReader): + """ + Read some text formatted data, which look like CSV but are not: + - no quoting support + - trailing separator trailing support + - multi-line support is explicit (via field_count parameter) + - newline escaping in multi-line content support + - ... + """ + + def readconfig(self, name, config): + """ get this reader module configuration from config file """ + DataReader.readconfig(self, name, config) + + # optionnal number of columns per line + self.field_count = None + if config.has_option(name, 'field_count'): + self.field_count = config.getint(name, 'field_count') + + # optionnal trailing separator option + self.trailing_sep = False + if config.has_option(name, 'trailing_sep'): + self.trailing_sep = config.get(name, 'trailing_sep') == 'True' + + # optionnal newline escaped option + self.newline_escapes = [] + if config.has_option(name, 'newline_escapes'): + if NEWLINE_ESCAPES is not None: + # this parameter is globally set, will ignore local + # definition + print "Warning: ignoring %s newline_escapes option" % name + print " option is set to '%s' globally" \ + % NEWLINE_ESCAPES + else: + self._parse_fields('newline_escapes', + config.get(name, 'newline_escapes'), + argtype = 'char') + + if NEWLINE_ESCAPES is not None: + # set NEWLINE_ESCAPES for each table column + self.newline_escapes = [(a, NEWLINE_ESCAPES) + for (a, x) in self.columns] + + + + def readlines(self): + """ read data from configured file, and generate (yields) for + each data line: line, columns and rowid """ + + # temporary feature for controlling when to begin real inserts + # if first time launch, set to True. + input_buffer = StringIO() + nb_lines = 0 + begin_linenb = None + nb_plines = 0 + + ## + # if neither -I nor -F was used, we can state that begin = 0 + if FROM_ID is None and FROM_COUNT == 0: + if VERBOSE: + print 'Notice: beginning on first line' + begin_linenb = 1 + + if INPUT_ENCODING is not None: + try: + fd = codecs.open(self.filename, encoding = INPUT_ENCODING) + except LookupError, e: + # codec not found + raise PGLoader_Error, "Input codec: %s" % e + else: + try: + fd = open(self.filename) + except IOError, error: + raise PGLoader_Error, error + + for line in fd: + # we count real physical lines + nb_plines += 1 + + if INPUT_ENCODING is not None: + # this may not be necessary, after all + try: + line = line.encode(INPUT_ENCODING) + except UnicodeDecodeError, e: + reject.log(['Codec error', str(e)], input_line) + continue + + if self.field_count is not None: + input_buffer.write(line) + # act as if this were the last input_buffer for this line + tmp = self._chomp(input_buffer.getvalue()) + columns = self._split_line(tmp) + nb_cols = len(columns) + + # check we got them all if not and field_count was + # given, we have a multi-line input + if nb_cols < self.field_count: + continue + else: + # we have read all the logical line + line = tmp + input_buffer.close() + input_buffer = StringIO() + + if nb_cols != self.field_count: + if DEBUG: + print line + print columns + print + self.reject.log( + 'Error parsing columns on line ' +\ + '%d [row %d]: found %d columns' \ + % (nb_plines, nb_lines, nb_cols), line) + else: + # normal operation mode : one physical line is one + # logical line. we didn't split input line yet + line = self._chomp(line) + nb_cols = None + columns = None + + if len(line) == 0: + # skip empty lines + continue + + # we count logical lines + nb_lines += 1 + + ## + # if -F is used, count lines to skip, and skip them + if FROM_COUNT > 0: + if nb_lines < FROM_COUNT: + continue + + if nb_lines == FROM_COUNT: + begin_linenb = nb_lines + if VERBOSE: + print 'Notice: reached beginning on line %d' % nb_lines + + ## + # check for beginning if option -I was used + if FROM_ID is not None: + if columns is None: + columns = self._split_line(line) + + rowids = self._rowids(columns) + + if FROM_ID == rowids: + begin_linenb = nb_lines + if VERBOSE: + print 'Notice: reached beginning on line %d' % nb_lines + + elif begin_linenb is None: + # begin is set to 1 when we don't use neither -I nor -F + continue + + if COUNT is not None and begin_linenb is not None \ + and (nb_lines - begin_linenb + 1) > COUNT: + + if VERBOSE: + print 'Notice: reached line %d, stopping' % nb_lines + break + + if columns is None: + columns = self._split_line(line) + + if DEBUG: + print 'Debug: read data' + + # now, we may have to apply newline_escapes on configured columns + if NEWLINE_ESCAPES or self.newline_escapes != []: + columns = self._escape_newlines(columns) + + nb_cols = len(columns) + if nb_cols != len(self.columns): + if DEBUG: + print line + print columns + print + + msg = 'Error parsing columns on line ' +\ + '%d [row %d]: found %d columns' \ + % (nb_plines, nb_lines, nb_cols) + + self.reject.log(msg, line) + continue + + yield line, columns + + + def _split_line(self, line): + """ split given line and returns a columns list """ + last_sep = 0 + columns = [] + pos = line.find(self.field_sep, last_sep) + + while pos != -1: + # don't consider backslash escaped separators melted into data + # warning, we may find some data|\\|data + # that is some escaped \ then a legal | separator + i=1 + while pos-i >= 0 and line[pos-i] == '\\': + i += 1 + + # now i-1 is the number of \ preceding the separator + # it's a legal separator only if i-1 is even + if (i-1) % 2 == 0: + # there's no need to keep escaped delimiters inside a column + # and we want to avoid double-escaping them + columns.append(line[last_sep:pos] + .replace("\\%s" % self.field_sep, + self.field_sep)) + last_sep = pos + 1 + + pos = line.find(self.field_sep, pos + 1) + + # append last column + columns.append(line[last_sep:]) + return columns + + + def _chomp(self, input_line): + """ chomp end of line when necessary, and trailing_sep too """ + + if len(input_line) == 0: + if DEBUG: + print 'pgloader._chomp: skipping empty line' + return input_line + + # chomp a copy of the input_line, we will need the original one + line = input_line[:] + + if line[-2:] == "\r\n": + line = line[:-2] + + elif line[-1] == "\r": + line = line[:-1] + + elif line[-1] == "\n": + line = line[:-1] + + # trailing separator to whipe out ? + if self.trailing_sep \ + and line[-len(self.field_sep)] == self.field_sep: + + line = line[:-len(self.field_sep)] + + return line + + def _escape_newlines(self, columns): + """ trim out newline escapes to be found inside data columns """ + if DEBUG: + print 'Debug: escaping columns newlines' + print 'Debug:', self.newline_escapes + + for (ne_col, ne_esc) in self.newline_escapes: + # don't forget configured col references use counting from 1 + ne_colnum = dict(self.columns)[ne_col] - 1 + if DEBUG: + print 'Debug: column %s[%d] escaped with %s' \ + % (ne_col, ne_colnum+1, ne_esc) + + col_data = columns[ne_colnum] + + if self.db.is_null(col_data) or self.db.is_empty(col_data): + if DEBUG: + print 'Debug: skipping null or empty column' + continue + + escaped = [] + tmp = col_data + + for line in tmp.split('\n'): + if len(line) == 0: + if DEBUG: + print 'Debug: skipping empty line' + continue + + if DEBUG: + print 'Debug: chomping:', line + + tmpline = self._chomp(line) + if tmpline[-1] == ne_esc: + tmpline = tmpline[:-1] + + # chomp out only escaping char, not newline itself + escaped.append(line[:len(tmpline)] + \ + line[len(tmpline)+1:]) + + else: + # line does not end with escaping char, keep it + escaped.append(line) + + columns[ne_colnum] = '\n'.join(escaped) + return columns +