Generalize unbuffered file reading to a file-like class and use it in csvreader and textreader. Almost ok now, or so it seems

This commit is contained in:
dim 2008-02-14 22:25:25 +00:00
parent a905ac7521
commit ca54951de7
4 changed files with 124 additions and 78 deletions

View File

@ -28,16 +28,29 @@ filename = parallel/parallel.data
field_sep = ;
columns = a, b
[split]
[split_csv]
format = csv
use_template = ptmpl
section_threads = 2
split_file_reading = True
[split_text]
format = text
use_template = ptmpl
section_threads = 2
split_file_reading = True
; round robin reader (1 reader feed section_threads processing threads)
[rrr]
[rrr_csv]
format = csv
use_template = ptmpl
section_threads = 2
split_file_reading = False
rrqueue_size = -1
[rrr_text]
format = text
use_template = ptmpl
section_threads = 2
split_file_reading = False
rrqueue_size = -1

View File

@ -11,7 +11,7 @@ from cStringIO import StringIO
from tools import PGLoader_Error, Reject, parse_config_string
from db import db
from lo import ifx_clob, ifx_blob
from reader import DataReader
from reader import DataReader, UnbufferedFileReader
from options import DRY_RUN, PEDANTIC
from options import TRUNCATE, VACUUM
@ -47,7 +47,8 @@ class CSVReader(DataReader):
for opt in ['doublequote', 'escapechar',
'quotechar', 'skipinitialspace']:
self.log.debug("reader.readconfig %s: '%s'" % (opt, self.__dict__[opt]))
self.log.debug("reader.readconfig %s: '%s'" \
% (opt, self.__dict__[opt]))
def readlines(self):
""" read data from configured file, and generate (yields) for
@ -66,25 +67,14 @@ class CSVReader(DataReader):
csv.register_dialect('pgloader', pgloader_dialect)
self._open()
self.fd = UnbufferedFileReader(self.filename, self.log,
encoding = self.input_encoding,
start = self.start,
end = self.end)
if self.start is not None and self.start > 0:
self.log.debug("CSV Reader starting at offset %d" % self.start)
self.fd.seek(self.start)
self.log.info("csvreader at position %d" % self.fd.tell())
# now read the lines
for columns in csv.reader(self.fd, dialect = 'pgloader'):
self.log.info("csvreader at position %d" % self.fd.tell())
line = self.field_sep.join(columns)
yield line, columns
if self.end is not None and self.fd.tell() >= self.end:
self.log.debug("CSV Reader stoping, offset %d >= %d" \
% (self.fd.tell(), self.end))
self.fd.close()
return
return

View File

@ -87,43 +87,16 @@ class DataReader:
elif template and config.has_option(template, option):
self.__dict__[option] = config.get(template, option)
self.log.debug("reader._getopt %s from %s is '%s'" % (option, template, self.__dict__[option]))
self.log.debug("reader._getopt %s from %s is '%s'" \
% (option, template, self.__dict__[option]))
elif option not in self.__dict__:
self.log.debug("reader._getopt %s defaults to '%s'" % (option, default))
self.log.debug("reader._getopt %s defaults to '%s'" \
% (option, default))
self.__dict__[option] = default
return self.__dict__[option]
def _open(self, mode = 'rb'):
""" open self.filename wrt self.encoding """
# we don't yet force buffering, but...
self.bufsize = -1
if self.input_encoding is not None:
try:
import codecs
self.fd = codecs.open(self.filename,
encoding = self.input_encoding,
buffering = self.bufsize)
self.log.info("Opened '%s' with encoding '%s'" % (self.filename, self.input_encoding))
except LookupError, e:
# codec not found
raise PGLoader_Error, "Input codec: %s" % e
except IOError, e:
# file not found, for example
raise PGLoader_Error, "IOError: %s" % e
else:
try:
self.fd = open(self.filename, mode, self.bufsize)
except IOError, error:
raise PGLoader_Error, error
self.log.debug("Opened '%s' in %s (fileno %s), ftell %d" \
% (self.filename, self.fd,
self.fd.fileno(), self.fd.tell()))
return self.fd
def readlines(self):
""" read data from configured file, and generate (yields) for
each data line: line, columns and rowid """
@ -136,3 +109,94 @@ class DataReader:
self.log.debug("reader start=%d, end=%d" % (self.start, self.end))
class UnbufferedFileReader:
"""
Allow to read a file line by line, avoiding any read-buffering
effect. This allows for readers to reliably compare fd.tell()
position after each line reading.
"""
def __init__(self, filename, log,
mode = "rb", encoding = None,
start = None, end = None):
""" constructor """
self.filename = filename
self.log = log
self.mode = mode
self.encoding = encoding
self.start = start
self.end = end
self.fd = None
self.position = 0
# we don't yet force buffering, but...
self.bufsize = -1
if self.encoding is not None:
try:
import codecs
self.fd = codecs.open(self.filename,
encoding = self.encoding,
buffering = self.bufsize)
self.log.info("Opened '%s' with encoding '%s'" \
% (self.filename, self.encoding))
except LookupError, e:
# codec not found
raise PGLoader_Error, "Input codec: %s" % e
except IOError, e:
# file not found, for example
raise PGLoader_Error, "IOError: %s" % e
else:
try:
self.fd = open(self.filename, mode, self.bufsize)
except IOError, error:
raise PGLoader_Error, error
if self.start:
self.fd.seek(self.start)
self.position = self.fd.tell()
self.log.debug("Opened '%s' in %s (fileno %s), ftell %d" \
% (self.filename, self.fd,
self.fd.fileno(), self.position))
return
def tell(self):
return self.position
def seek(self, position):
self.fd.seek(position)
self.position = self.fd.tell()
return self.position
def next(self):
""" implement the iterator protocol """
yield self.__iter__()
def __iter__(self):
""" read a line at a time """
line = 'dumb non-empty init value'
last_line_read = False
while line != '':
line = self.fd.readline()
self.position = self.fd.tell()
if last_line_read:
self.log.debug("FileReader stoping, offset %d >= %d" \
% (self.position, self.end))
self.fd.close()
return
if self.end is not None and self.fd.tell() >= self.end:
# we want to process current line and stop at next
# iteration
last_line_read = True
yield line
return

View File

@ -11,7 +11,7 @@ from cStringIO import StringIO
from tools import PGLoader_Error, Reject, parse_config_string
from db import db
from lo import ifx_clob, ifx_blob
from reader import DataReader
from reader import DataReader, UnbufferedFileReader
from options import DRY_RUN, PEDANTIC
from options import TRUNCATE, VACUUM
@ -72,36 +72,15 @@ class TextReader(DataReader):
self.log.debug('beginning on first line')
begin_linenb = 1
self._open()
if self.start is not None and self.start > 0:
self.log.debug("Text Reader starting at offset %d" % self.start)
self.fd.seek(self.start)
self.log.debug("textreader at position %d" % self.fd.tell())
self.fd = UnbufferedFileReader(self.filename, self.log,
encoding = self.input_encoding,
start = self.start,
end = self.end)
#for line in self.fd.readline():
line = 'dumb non-empty init value'
last_line_read = False
while line != '':
line = self.fd.readline()
for line in self.fd:
# we count real physical lines
nb_plines += 1
if last_line_read:
self.log.debug("Text Reader stoping, offset %d >= %d" \
% (self.fd.tell(), self.end))
self.fd.close()
return
if self.end is not None and self.fd.tell() >= self.end:
# we want to process current line and stop at next
# iteration
last_line_read = True
if self.input_encoding is not None:
# this may not be necessary, after all
try: