Preliminary support for real line numbers usage in logs, ok for simple cases --- no split_file_reading, multi-line input broken

This commit is contained in:
dim 2008-08-07 22:49:22 +00:00
parent 0e42971ea0
commit 094313d7ff
6 changed files with 91 additions and 26 deletions

View File

@ -75,6 +75,7 @@ class CSVReader(DataReader):
# don't forget COUNT and FROM_COUNT option in CSV mode
nb_lines = 0
begin_linenb = None
last_line_nb = 1
##
# if -F was not used, we can state that begin = 0
@ -106,7 +107,13 @@ class CSVReader(DataReader):
self.log.info('reached line %d, stopping', nb_lines)
return
line = self.field_sep.join(columns)
yield line, columns
line = self.field_sep.join(columns)
offsets = range(last_line_nb, self.fd.line_nb)
last_line_nb = self.fd.line_nb
if self.start:
offsets = (self.start, offsets)
yield offsets, line, columns
return

View File

@ -411,7 +411,8 @@ ORDER BY attnum
self.log.warning("COPY data buffer saved in %s" % n)
return n
def copy_from(self, table, columnlist, columns, input_line,
def copy_from(self, table, columnlist,
columns, input_line, offsets,
reject, EOF = False):
""" Generate some COPY SQL for PostgreSQL """
ok = True
@ -473,8 +474,11 @@ ORDER BY attnum
# copy recovery process
now = time.time()
c, ok, ko = self.copy_from_buff(table, self.buffer,
self.running_commands, reject)
c, ok, ko = self.copy_from_buff(table,
self.buffer,
self.first_offsets,
self.running_commands,
reject)
duration = now - self.last_commit_time
self.commits += c
@ -495,16 +499,18 @@ ORDER BY attnum
# prepare next run
if self.buffer is None:
self.buffer = StringIO()
self.first_offsets = offsets
self.prepare_copy_data(columns, input_line, reject)
self.running_commands += 1
return ok
def copy_from_buff(self, table, buff, count, reject):
def copy_from_buff(self, table, buff, first_offsets, count, reject):
""" If copy returned an error, try to detect wrong input line(s) """
if count == 1:
reject.log('COPY error on this line', buff.getvalue())
msg = self.copy_error_message(first_offsets, 0)
reject.log(msg, buff.getvalue())
buff.close()
self.log.debug('found one more line in error')
@ -558,12 +564,26 @@ ORDER BY attnum
# if a is only one line long, reject this line
if xcount == 1:
ko += 1
reject.log('COPY error: %s' % error, x.getvalue())
linecount = 0
if x == b:
linecount += m
msg = self.copy_error_message(first_offsets, linecount)
msg += '\nCOPY error: %s' % error
reject.log(msg, x.getvalue())
self.log.debug('Notice: found one more line in error')
self.log.debug(x.getvalue())
else:
_c, _o, _k = self.copy_from_buff(table, x, xcount, reject)
new_offsets = first_offsets
if x == b:
new_offsets = [m + o for o in first_offsets]
_c, _o, _k = self.copy_from_buff(table,
x,
new_offsets,
xcount, reject)
commits += _c
ok += _o
ko += _k
@ -623,3 +643,19 @@ ORDER BY attnum
# end of row, \n
self.buffer.write('\n')
def copy_error_message(self, offsets, error_buff_offset):
""" Build the COPY pgloader error message with line numbers """
msg = 'COPY error on line'
if type(offsets) == type([]):
if len(offsets) > 1:
msg += 's'
msg += 's %s' % ' '.join([str(x + error_buff_offset)
for x in offsets])
else:
# offsets is (start position (byte, ftell()), [line, numbers])
msg += ' '
return msg

View File

@ -65,10 +65,7 @@ class FixedReader(DataReader):
start = self.start,
end = self.end)
line_nb = 0
for line in self.fd:
line_nb += 1
line = line.strip("\n")
llen = len(line)
columns = []
@ -77,7 +74,7 @@ class FixedReader(DataReader):
start, length = self.positions[cname]
if llen < (start+length):
self.log.error("Line %d is too short " % line_nb +
self.log.error("Line %d is too short " % self.fd.line_nb +
"(column %s requires len >= %d)" \
% (cname, start+length))
@ -86,4 +83,7 @@ class FixedReader(DataReader):
columns.append(line[start:start+length])
yield line, columns
if self.start:
offsets = (self.start, offsets)
yield offsets, line, columns

View File

@ -839,10 +839,11 @@ class PGLoader(threading.Thread):
self.prepare_processing()
self.process()
self.finish_processing()
except Exception, e:
# resources get freed in self.terminate()
self.log.error(e)
raise
self.terminate()
return
@ -1129,8 +1130,8 @@ class PGLoader(threading.Thread):
""" return next line from either self.queue or self.reader """
if self.queue is None:
for line, columns in self.reader.readlines():
yield line, columns
for offsets, line, columns in self.reader.readlines():
yield offsets, line, columns
return
@ -1139,8 +1140,8 @@ class PGLoader(threading.Thread):
if len(self.queue) > 0:
self.log.debug("processing queue")
for line, columns in self.queue.readlines():
yield line, columns
for offsets, line, columns in self.queue.readlines():
yield offsets, line, columns
self.lock.release()
@ -1211,7 +1212,9 @@ class PGLoader(threading.Thread):
if self.udcs:
dudcs = dict(self.udcs)
for line, columns in self.readlines():
for offsets, line, columns in self.readlines():
self.log.debug('offsets %s', offsets)
if self.blob_cols is not None:
columns, rowids = self.read_blob(line, columns)
@ -1279,12 +1282,12 @@ class PGLoader(threading.Thread):
if not DRY_RUN:
self.db.copy_from(self.table, self.columnlist,
data, line, self.reject)
data, line, offsets, self.reject)
if not DRY_RUN:
# we may need a last COPY for the rest of data
self.db.copy_from(self.table, self.columnlist,
None, None, self.reject, EOF = True)
None, None, None, self.reject, EOF = True)
return

View File

@ -112,7 +112,13 @@ class DataReader:
def readlines(self):
""" read data from configured file, and generate (yields) for
each data line: line, columns and rowid """
each data line: offsets, line, columns and rowid
offsets: list of line numbers where the line was read in the file
tuple of (reader offset, [list, of, line, numbers])
the second case is used when in split file reading mode
"""
pass
def set_boundaries(self, (start, end)):
@ -142,6 +148,7 @@ class UnbufferedFileReader:
self.end = end
self.fd = None
self.position = 0
self.line_nb = 0
# we don't yet force buffering, but...
self.bufsize = -1
@ -196,6 +203,7 @@ class UnbufferedFileReader:
while line != '':
line = self.fd.readline()
self.line_nb += 1
self.position = self.fd.tell()
if line == '' or last_line_read:

View File

@ -69,6 +69,9 @@ class TextReader(DataReader):
begin_linenb = None
nb_plines = 0
tmp_offsets = []
offsets = []
##
# if neither -I nor -F was used, we can state that begin = 0
if FROM_ID is None and FROM_COUNT == 0:
@ -84,6 +87,10 @@ class TextReader(DataReader):
# we count real physical lines
nb_plines += 1
# and we store them in offsets for error messages
tmp_offsets.append(nb_plines)
self.log.debug('current offset %s' % tmp_offsets)
if self.input_encoding is not None:
# this may not be necessary, after all
try:
@ -128,7 +135,12 @@ class TextReader(DataReader):
continue
# we count logical lines
nb_lines += 1
nb_lines += 1
offsets = tmp_offsets[:]
tmp_offsets = []
if self.start:
offsets = (self.start, offsets)
##
# if -F is used, count lines to skip, and skip them
@ -183,8 +195,7 @@ class TextReader(DataReader):
self.reject.log(msg, line)
continue
yield line, columns
yield offsets, line, columns
def _split_line(self, line):
""" split given line and returns a columns list """