diff --git a/examples/pgloader.conf b/examples/pgloader.conf index 0e9fc5e..91da5cf 100644 --- a/examples/pgloader.conf +++ b/examples/pgloader.conf @@ -5,7 +5,8 @@ base = pgloader user = dim pass = None -client_encoding = 'utf-8' +;client_encoding = 'utf-8' +client_encoding = 'latin1' copy_every = 5 commit_every = 5 #copy_delimiter = % @@ -13,8 +14,6 @@ commit_every = 5 null = "" empty_string = "\ " -newline_escapes = \ - [simple] table = simple format = text @@ -47,7 +46,7 @@ blob_columns = b:2:ifx_clob [cluttered] table = cluttered -format = text +format = text filename = cluttered/cluttered.data field_sep = ^ trailing_sep = True @@ -70,6 +69,16 @@ field_sep = % columns = a:1, b:2, c:3, d:4, e:5 only_cols = 1-3, 5 +[udc] +table = udc +format = text +filename = udc/udc.data +input_encoding = 'latin1' +field_sep = § +columns = b:2, d:1, x:3, y:4 +udc_c = constant value +copy_columns = b, c, d + [csv] table = csv format = csv diff --git a/pgloader.1.txt b/pgloader.1.txt index af68647..83d9285 100644 --- a/pgloader.1.txt +++ b/pgloader.1.txt @@ -267,6 +267,10 @@ filename:: form +[bc]lob[0-9a-f]{4}.[0-9a-f]{3}+, but this information is not used by +pgloader+. +input_encoding:: + + The encoding of the configured +filename+. + reject_log:: In case of errors processing input data, a human readable log per rejected diff --git a/pgloader.py b/pgloader.py index 8ce0755..282d342 100644 --- a/pgloader.py +++ b/pgloader.py @@ -253,6 +253,78 @@ def duration_pprint(duration): else: return '%10.3f' % duration +def print_summary(dbconn, sections, summary, td): + """ print a pretty summary """ + from pgloader.options import VERBOSE, DEBUG, QUIET, SUMMARY + from pgloader.options import DRY_RUN, PEDANTIC, VACUUM + from pgloader.pgloader import PGLoader + from pgloader.tools import PGLoader_Error + + retcode = 0 + + t= 'Table name | duration | size | copy rows | errors ' + _= '====================================================================' + + tu = te = ts = 0 # total updates, errors, size + if not DRY_RUN: + dbconn.reset() + cursor = dbconn.dbconn.cursor() + + s_ok = 0 + for s in sections: + if s not in summary: + continue + + s_ok += 1 + if s_ok == 1: + # print pretty sumary header now + print + print t + print _ + + t, d, u, e = summary[s] + d = duration_pprint(d) + + if not DRY_RUN: + sql = "select pg_total_relation_size(%s), " + \ + "pg_size_pretty(pg_total_relation_size(%s));" + cursor.execute(sql, [t, t]) + octets, sp = cursor.fetchone() + ts += octets + + if sp[5:] == 'bytes': sp = sp[:-5] + ' B' + else: + sp = '-' + + tn = s + if len(tn) > 18: + tn = s[0:15] + "..." + + print '%-18s| %ss | %7s | %10d | %10d' % (tn, d, sp, u, e) + + tu += u + te += e + + if e > 0: + retcode += 1 + + if s_ok > 1: + td = duration_pprint(td) + + # pretty size + cursor.execute("select pg_size_pretty(%s);", [ts]) + [ts] = cursor.fetchone() + if ts[5:] == 'bytes': ts = ts[:-5] + ' B' + + print _ + print 'Total | %ss | %7s | %10d | %10d' \ + % (td, ts, tu, te) + + if not DRY_RUN: + cursor.close() + + return retcode + def load_data(): """ read option line and configuration file, then process data import of given section, or all sections if no section is given on @@ -310,78 +382,23 @@ def load_data(): if PEDANTIC: pgloader.print_stats() + except UnicodeDecodeError, e: + print "Error: can't open '%s' with given input encoding '%s'" \ + % (pgloader.filename, pgloader.input_encoding) + except KeyboardInterrupt: print "Aborting on user demand (Interrupt)" # total duration td = time.time() - begin - retcode = 0 - t= 'Table name | duration | size | copy rows | errors ' - _= '====================================================================' - if SUMMARY: - # print a pretty summary - tu = te = ts = 0 # total updates, errors, size - if not DRY_RUN: - dbconn.reset() - cursor = dbconn.dbconn.cursor() - - s_ok = 0 - for s in sections: - if s not in summary: - continue - - s_ok += 1 - if s_ok == 1: - # print pretty sumary header now - print - print t - print _ - - t, d, u, e = summary[s] - d = duration_pprint(d) - - if not DRY_RUN: - sql = "select pg_total_relation_size(%s), " + \ - "pg_size_pretty(pg_total_relation_size(%s));" - cursor.execute(sql, [t, t]) - octets, sp = cursor.fetchone() - ts += octets - - if sp[5:] == 'bytes': sp = sp[:-5] + ' B' - else: - sp = '-' - - tn = s - if len(tn) > 18: - tn = s[0:15] + "..." - - print '%-18s| %ss | %7s | %10d | %10d' % (tn, d, sp, u, e) - - tu += u - te += e - - if e > 0: - retcode += 1 - - if s_ok > 1: - td = duration_pprint(td) - - # pretty size - cursor.execute("select pg_size_pretty(%s);", [ts]) - [ts] = cursor.fetchone() - if ts[5:] == 'bytes': ts = ts[:-5] + ' B' - - print _ - print 'Total | %ss | %7s | %10d | %10d' \ - % (td, ts, tu, te) - - if not DRY_RUN: - cursor.close() - - print + try: + retcode = print_summary(dbconn, sections, summary, td) + print + except PGLoader_Error, e: + print "Can't print summary: %s" % e if VACUUM and not DRY_RUN: print 'vacuumdb... ' diff --git a/pgloader/csvreader.py b/pgloader/csvreader.py index 3a155f2..29cb4fc 100644 --- a/pgloader/csvreader.py +++ b/pgloader/csvreader.py @@ -64,9 +64,9 @@ class CSVReader(DataReader): csv.register_dialect('pgloader', pgloader_dialect) - if INPUT_ENCODING is not None: + if self.input_encoding is not None: try: - fd = codecs.open(self.filename, encoding = INPUT_ENCODING) + fd = codecs.open(self.filename, encoding = self.input_encoding) except LookupError, e: # codec not found raise PGLoader_Error, "Input codec: %s" % e diff --git a/pgloader/db.py b/pgloader/db.py index 4163328..29e72cf 100644 --- a/pgloader/db.py +++ b/pgloader/db.py @@ -69,7 +69,10 @@ class db: sql = 'set session client_encoding to %s' cursor = self.dbconn.cursor() - cursor.execute(sql, [self.client_encoding]) + try: + cursor.execute(sql, [self.client_encoding]) + except psycopg.ProgrammingError, e: + raise PGLoader_Error, e cursor.close() def set_datestyle(self): @@ -368,7 +371,7 @@ class db: if self.buffer is None: self.buffer = StringIO() - self.prepare_copy_data(columns) + self.prepare_copy_data(columns, input_line, reject) self.running_commands += 1 return ok @@ -447,7 +450,7 @@ class db: return commits, ok, ko - def prepare_copy_data(self, columns): + def prepare_copy_data(self, columns, input_line, reject): """ add a data line to copy buffer """ if columns is not None: first_col = True diff --git a/pgloader/lo.py b/pgloader/lo.py index 3175a9b..7422278 100644 --- a/pgloader/lo.py +++ b/pgloader/lo.py @@ -38,15 +38,16 @@ class ifx_lo: class ifx_clob(ifx_lo): """ Informix Text Large Object file """ - def __init__(self, filename): + def __init__(self, filename, input_encoding): """ init a clob object """ self.file = None self.filename = filename if self.file is None: - if INPUT_ENCODING is not None: + if input_encoding is not None: + import codecs self.file = codecs.open(self.filename, 'r', - encoding = INPUT_ENCODING) + encoding = input_encoding) else: self.file = open(self.filename, 'r') diff --git a/pgloader/pgloader.py b/pgloader/pgloader.py index c7341b3..26954a2 100644 --- a/pgloader/pgloader.py +++ b/pgloader/pgloader.py @@ -76,6 +76,14 @@ class PGLoader: if DEBUG and not DRY_RUN: print "client_encoding: '%s'" % self.db.client_encoding + # optionnal local option input_encoding + self.input_encoding = None + if config.has_option(name, 'input_encoding'): + self.input_encoding = parse_config_string( + config.get(name, 'input_encoding')) + + if DEBUG: + print "input_encoding: '%s'" % self.input_encoding # optionnal local option datestyle if config.has_option(name, 'datestyle'): @@ -290,13 +298,13 @@ class PGLoader: if self.format.lower() == 'csv': from csvreader import CSVReader self.reader = CSVReader(self.db, self.reject, - self.filename, + self.filename, self.input_encoding, self.table, self.columns) elif self.format.lower() == 'text': from textreader import TextReader self.reader = TextReader(self.db, self.reject, - self.filename, + self.filename, self.input_encoding, self.table, self.columns, self.newline_escapes) @@ -605,7 +613,9 @@ class PGLoader: self.field_sep) elif btype == 'ifx_clob': - self.blobs[abs_blobname] = ifx_clob(abs_blobname) + self.blobs[abs_blobname] = \ + ifx_clob(abs_blobname, + self.input_encoding) blob = self.blobs[abs_blobname] diff --git a/pgloader/reader.py b/pgloader/reader.py index 989e835..2c31d06 100644 --- a/pgloader/reader.py +++ b/pgloader/reader.py @@ -20,14 +20,19 @@ class DataReader: - multi-line support is explicit (via """ - def __init__(self, db, reject, filename, table, columns): + def __init__(self, db, reject, filename, input_encoding, table, columns): """ init internal variables """ self.db = db self.filename = filename + self.input_encoding = input_encoding self.table = table self.columns = columns self.reject = reject + if self.input_encoding is None: + if INPUT_ENCODING is not None: + self.input_encoding = INPUT_ENCODING + def readconfig(self, name, config): """ read configuration section for common options diff --git a/pgloader/textreader.py b/pgloader/textreader.py index b557c9a..2c24d49 100644 --- a/pgloader/textreader.py +++ b/pgloader/textreader.py @@ -30,9 +30,11 @@ class TextReader(DataReader): - ... """ - def __init__(self, db, reject, filename, table, columns, newline_escapes): + def __init__(self, db, reject, filename, input_encoding, + table, columns, newline_escapes): """ init textreader with a newline_escapes parameter """ - DataReader.__init__(self, db, reject, filename, table, columns) + DataReader.__init__(self, db, reject, + filename, input_encoding, table, columns) self.newline_escapes = newline_escapes @@ -70,9 +72,9 @@ class TextReader(DataReader): print 'Notice: beginning on first line' begin_linenb = 1 - if INPUT_ENCODING is not None: + if self.input_encoding is not None: try: - fd = codecs.open(self.filename, encoding = INPUT_ENCODING) + fd = codecs.open(self.filename, encoding = self.input_encoding) except LookupError, e: # codec not found raise PGLoader_Error, "Input codec: %s" % e @@ -86,10 +88,10 @@ class TextReader(DataReader): # we count real physical lines nb_plines += 1 - if INPUT_ENCODING is not None: + if self.input_encoding is not None: # this may not be necessary, after all try: - line = line.encode(INPUT_ENCODING) + line = line.encode(self.input_encoding) except UnicodeDecodeError, e: reject.log(['Codec error', str(e)], input_line) continue