diff --git a/pgloader/csvreader.py b/pgloader/csvreader.py index 872c968..ecd71c6 100644 --- a/pgloader/csvreader.py +++ b/pgloader/csvreader.py @@ -75,6 +75,7 @@ class CSVReader(DataReader): # don't forget COUNT and FROM_COUNT option in CSV mode nb_lines = 0 begin_linenb = None + last_line_nb = 1 ## # if -F was not used, we can state that begin = 0 @@ -106,7 +107,13 @@ class CSVReader(DataReader): self.log.info('reached line %d, stopping', nb_lines) return - line = self.field_sep.join(columns) - yield line, columns + line = self.field_sep.join(columns) + offsets = range(last_line_nb, self.fd.line_nb) + last_line_nb = self.fd.line_nb + + if self.start: + offsets = (self.start, offsets) + + yield offsets, line, columns return diff --git a/pgloader/db.py b/pgloader/db.py index 7245212..194357e 100644 --- a/pgloader/db.py +++ b/pgloader/db.py @@ -411,7 +411,8 @@ ORDER BY attnum self.log.warning("COPY data buffer saved in %s" % n) return n - def copy_from(self, table, columnlist, columns, input_line, + def copy_from(self, table, columnlist, + columns, input_line, offsets, reject, EOF = False): """ Generate some COPY SQL for PostgreSQL """ ok = True @@ -473,8 +474,11 @@ ORDER BY attnum # copy recovery process now = time.time() - c, ok, ko = self.copy_from_buff(table, self.buffer, - self.running_commands, reject) + c, ok, ko = self.copy_from_buff(table, + self.buffer, + self.first_offsets, + self.running_commands, + reject) duration = now - self.last_commit_time self.commits += c @@ -495,16 +499,18 @@ ORDER BY attnum # prepare next run if self.buffer is None: self.buffer = StringIO() + self.first_offsets = offsets self.prepare_copy_data(columns, input_line, reject) self.running_commands += 1 return ok - def copy_from_buff(self, table, buff, count, reject): + def copy_from_buff(self, table, buff, first_offsets, count, reject): """ If copy returned an error, try to detect wrong input line(s) """ if count == 1: - reject.log('COPY error on this line', buff.getvalue()) + msg = self.copy_error_message(first_offsets, 0) + reject.log(msg, buff.getvalue()) buff.close() self.log.debug('found one more line in error') @@ -558,12 +564,26 @@ ORDER BY attnum # if a is only one line long, reject this line if xcount == 1: ko += 1 - reject.log('COPY error: %s' % error, x.getvalue()) + + linecount = 0 + if x == b: + linecount += m + + msg = self.copy_error_message(first_offsets, linecount) + msg += '\nCOPY error: %s' % error + reject.log(msg, x.getvalue()) self.log.debug('Notice: found one more line in error') self.log.debug(x.getvalue()) else: - _c, _o, _k = self.copy_from_buff(table, x, xcount, reject) + new_offsets = first_offsets + if x == b: + new_offsets = [m + o for o in first_offsets] + + _c, _o, _k = self.copy_from_buff(table, + x, + new_offsets, + xcount, reject) commits += _c ok += _o ko += _k @@ -623,3 +643,19 @@ ORDER BY attnum # end of row, \n self.buffer.write('\n') + + + def copy_error_message(self, offsets, error_buff_offset): + """ Build the COPY pgloader error message with line numbers """ + msg = 'COPY error on line' + if type(offsets) == type([]): + if len(offsets) > 1: + msg += 's' + msg += 's %s' % ' '.join([str(x + error_buff_offset) + for x in offsets]) + + else: + # offsets is (start position (byte, ftell()), [line, numbers]) + msg += ' ' + + return msg diff --git a/pgloader/fixedreader.py b/pgloader/fixedreader.py index 06d9fa8..f351b07 100644 --- a/pgloader/fixedreader.py +++ b/pgloader/fixedreader.py @@ -65,10 +65,7 @@ class FixedReader(DataReader): start = self.start, end = self.end) - line_nb = 0 - for line in self.fd: - line_nb += 1 line = line.strip("\n") llen = len(line) columns = [] @@ -77,7 +74,7 @@ class FixedReader(DataReader): start, length = self.positions[cname] if llen < (start+length): - self.log.error("Line %d is too short " % line_nb + + self.log.error("Line %d is too short " % self.fd.line_nb + "(column %s requires len >= %d)" \ % (cname, start+length)) @@ -86,4 +83,7 @@ class FixedReader(DataReader): columns.append(line[start:start+length]) - yield line, columns + if self.start: + offsets = (self.start, offsets) + + yield offsets, line, columns diff --git a/pgloader/pgloader.py b/pgloader/pgloader.py index 461fa5c..1feee38 100644 --- a/pgloader/pgloader.py +++ b/pgloader/pgloader.py @@ -839,10 +839,11 @@ class PGLoader(threading.Thread): self.prepare_processing() self.process() self.finish_processing() - + except Exception, e: # resources get freed in self.terminate() self.log.error(e) + raise self.terminate() return @@ -1129,8 +1130,8 @@ class PGLoader(threading.Thread): """ return next line from either self.queue or self.reader """ if self.queue is None: - for line, columns in self.reader.readlines(): - yield line, columns + for offsets, line, columns in self.reader.readlines(): + yield offsets, line, columns return @@ -1139,8 +1140,8 @@ class PGLoader(threading.Thread): if len(self.queue) > 0: self.log.debug("processing queue") - for line, columns in self.queue.readlines(): - yield line, columns + for offsets, line, columns in self.queue.readlines(): + yield offsets, line, columns self.lock.release() @@ -1211,7 +1212,9 @@ class PGLoader(threading.Thread): if self.udcs: dudcs = dict(self.udcs) - for line, columns in self.readlines(): + for offsets, line, columns in self.readlines(): + self.log.debug('offsets %s', offsets) + if self.blob_cols is not None: columns, rowids = self.read_blob(line, columns) @@ -1279,12 +1282,12 @@ class PGLoader(threading.Thread): if not DRY_RUN: self.db.copy_from(self.table, self.columnlist, - data, line, self.reject) + data, line, offsets, self.reject) if not DRY_RUN: # we may need a last COPY for the rest of data self.db.copy_from(self.table, self.columnlist, - None, None, self.reject, EOF = True) + None, None, None, self.reject, EOF = True) return diff --git a/pgloader/reader.py b/pgloader/reader.py index 527bea5..4eff4bb 100644 --- a/pgloader/reader.py +++ b/pgloader/reader.py @@ -112,7 +112,13 @@ class DataReader: def readlines(self): """ read data from configured file, and generate (yields) for - each data line: line, columns and rowid """ + each data line: offsets, line, columns and rowid + + offsets: list of line numbers where the line was read in the file + tuple of (reader offset, [list, of, line, numbers]) + + the second case is used when in split file reading mode + """ pass def set_boundaries(self, (start, end)): @@ -142,6 +148,7 @@ class UnbufferedFileReader: self.end = end self.fd = None self.position = 0 + self.line_nb = 0 # we don't yet force buffering, but... self.bufsize = -1 @@ -196,6 +203,7 @@ class UnbufferedFileReader: while line != '': line = self.fd.readline() + self.line_nb += 1 self.position = self.fd.tell() if line == '' or last_line_read: diff --git a/pgloader/textreader.py b/pgloader/textreader.py index bdb8150..35f4718 100644 --- a/pgloader/textreader.py +++ b/pgloader/textreader.py @@ -69,6 +69,9 @@ class TextReader(DataReader): begin_linenb = None nb_plines = 0 + tmp_offsets = [] + offsets = [] + ## # if neither -I nor -F was used, we can state that begin = 0 if FROM_ID is None and FROM_COUNT == 0: @@ -84,6 +87,10 @@ class TextReader(DataReader): # we count real physical lines nb_plines += 1 + # and we store them in offsets for error messages + tmp_offsets.append(nb_plines) + self.log.debug('current offset %s' % tmp_offsets) + if self.input_encoding is not None: # this may not be necessary, after all try: @@ -128,7 +135,12 @@ class TextReader(DataReader): continue # we count logical lines - nb_lines += 1 + nb_lines += 1 + offsets = tmp_offsets[:] + tmp_offsets = [] + + if self.start: + offsets = (self.start, offsets) ## # if -F is used, count lines to skip, and skip them @@ -183,8 +195,7 @@ class TextReader(DataReader): self.reject.log(msg, line) continue - yield line, columns - + yield offsets, line, columns def _split_line(self, line): """ split given line and returns a columns list """