From ca54951de7ee61bad045ff299a2e1ba61e6dc92b Mon Sep 17 00:00:00 2001 From: dim Date: Thu, 14 Feb 2008 22:25:25 +0000 Subject: [PATCH] Generalize unbuffered file reading to a file-like class and use it in csvreader and textreader. Almost ok now, or so it seems --- examples/parallel.conf | 17 +++++- pgloader/csvreader.py | 26 +++------ pgloader/reader.py | 126 +++++++++++++++++++++++++++++++---------- pgloader/textreader.py | 33 ++--------- 4 files changed, 124 insertions(+), 78 deletions(-) diff --git a/examples/parallel.conf b/examples/parallel.conf index b4f5fca..8554c22 100644 --- a/examples/parallel.conf +++ b/examples/parallel.conf @@ -28,16 +28,29 @@ filename = parallel/parallel.data field_sep = ; columns = a, b -[split] +[split_csv] +format = csv +use_template = ptmpl +section_threads = 2 +split_file_reading = True + +[split_text] format = text use_template = ptmpl section_threads = 2 split_file_reading = True ; round robin reader (1 reader feed section_threads processing threads) -[rrr] +[rrr_csv] format = csv use_template = ptmpl section_threads = 2 split_file_reading = False rrqueue_size = -1 + +[rrr_text] +format = text +use_template = ptmpl +section_threads = 2 +split_file_reading = False +rrqueue_size = -1 diff --git a/pgloader/csvreader.py b/pgloader/csvreader.py index 898b1e6..89703aa 100644 --- a/pgloader/csvreader.py +++ b/pgloader/csvreader.py @@ -11,7 +11,7 @@ from cStringIO import StringIO from tools import PGLoader_Error, Reject, parse_config_string from db import db from lo import ifx_clob, ifx_blob -from reader import DataReader +from reader import DataReader, UnbufferedFileReader from options import DRY_RUN, PEDANTIC from options import TRUNCATE, VACUUM @@ -47,7 +47,8 @@ class CSVReader(DataReader): for opt in ['doublequote', 'escapechar', 'quotechar', 'skipinitialspace']: - self.log.debug("reader.readconfig %s: '%s'" % (opt, self.__dict__[opt])) + self.log.debug("reader.readconfig %s: '%s'" \ + % (opt, self.__dict__[opt])) def readlines(self): """ read data from configured file, and generate (yields) for @@ -66,25 +67,14 @@ class CSVReader(DataReader): csv.register_dialect('pgloader', pgloader_dialect) - self._open() + self.fd = UnbufferedFileReader(self.filename, self.log, + encoding = self.input_encoding, + start = self.start, + end = self.end) - if self.start is not None and self.start > 0: - self.log.debug("CSV Reader starting at offset %d" % self.start) - self.fd.seek(self.start) - - self.log.info("csvreader at position %d" % self.fd.tell()) - # now read the lines for columns in csv.reader(self.fd, dialect = 'pgloader'): - - self.log.info("csvreader at position %d" % self.fd.tell()) - line = self.field_sep.join(columns) yield line, columns - - if self.end is not None and self.fd.tell() >= self.end: - self.log.debug("CSV Reader stoping, offset %d >= %d" \ - % (self.fd.tell(), self.end)) - self.fd.close() - return + return diff --git a/pgloader/reader.py b/pgloader/reader.py index 3dde6c6..bb8a2be 100644 --- a/pgloader/reader.py +++ b/pgloader/reader.py @@ -87,43 +87,16 @@ class DataReader: elif template and config.has_option(template, option): self.__dict__[option] = config.get(template, option) - self.log.debug("reader._getopt %s from %s is '%s'" % (option, template, self.__dict__[option])) + self.log.debug("reader._getopt %s from %s is '%s'" \ + % (option, template, self.__dict__[option])) elif option not in self.__dict__: - self.log.debug("reader._getopt %s defaults to '%s'" % (option, default)) + self.log.debug("reader._getopt %s defaults to '%s'" \ + % (option, default)) self.__dict__[option] = default return self.__dict__[option] - def _open(self, mode = 'rb'): - """ open self.filename wrt self.encoding """ - # we don't yet force buffering, but... - self.bufsize = -1 - - if self.input_encoding is not None: - try: - import codecs - self.fd = codecs.open(self.filename, - encoding = self.input_encoding, - buffering = self.bufsize) - self.log.info("Opened '%s' with encoding '%s'" % (self.filename, self.input_encoding)) - except LookupError, e: - # codec not found - raise PGLoader_Error, "Input codec: %s" % e - except IOError, e: - # file not found, for example - raise PGLoader_Error, "IOError: %s" % e - else: - try: - self.fd = open(self.filename, mode, self.bufsize) - except IOError, error: - raise PGLoader_Error, error - - self.log.debug("Opened '%s' in %s (fileno %s), ftell %d" \ - % (self.filename, self.fd, - self.fd.fileno(), self.fd.tell())) - return self.fd - def readlines(self): """ read data from configured file, and generate (yields) for each data line: line, columns and rowid """ @@ -136,3 +109,94 @@ class DataReader: self.log.debug("reader start=%d, end=%d" % (self.start, self.end)) + +class UnbufferedFileReader: + """ + Allow to read a file line by line, avoiding any read-buffering + effect. This allows for readers to reliably compare fd.tell() + position after each line reading. + """ + + def __init__(self, filename, log, + mode = "rb", encoding = None, + start = None, end = None): + """ constructor """ + self.filename = filename + self.log = log + self.mode = mode + self.encoding = encoding + self.start = start + self.end = end + self.fd = None + self.position = 0 + + # we don't yet force buffering, but... + self.bufsize = -1 + + if self.encoding is not None: + try: + import codecs + self.fd = codecs.open(self.filename, + encoding = self.encoding, + buffering = self.bufsize) + self.log.info("Opened '%s' with encoding '%s'" \ + % (self.filename, self.encoding)) + except LookupError, e: + # codec not found + raise PGLoader_Error, "Input codec: %s" % e + except IOError, e: + # file not found, for example + raise PGLoader_Error, "IOError: %s" % e + + else: + try: + self.fd = open(self.filename, mode, self.bufsize) + except IOError, error: + raise PGLoader_Error, error + + if self.start: + self.fd.seek(self.start) + self.position = self.fd.tell() + + self.log.debug("Opened '%s' in %s (fileno %s), ftell %d" \ + % (self.filename, self.fd, + self.fd.fileno(), self.position)) + return + + def tell(self): + return self.position + + def seek(self, position): + self.fd.seek(position) + self.position = self.fd.tell() + + return self.position + + def next(self): + """ implement the iterator protocol """ + yield self.__iter__() + + def __iter__(self): + """ read a line at a time """ + line = 'dumb non-empty init value' + last_line_read = False + + while line != '': + line = self.fd.readline() + self.position = self.fd.tell() + + if last_line_read: + self.log.debug("FileReader stoping, offset %d >= %d" \ + % (self.position, self.end)) + self.fd.close() + return + + if self.end is not None and self.fd.tell() >= self.end: + # we want to process current line and stop at next + # iteration + last_line_read = True + + yield line + + return + diff --git a/pgloader/textreader.py b/pgloader/textreader.py index 85842d4..462cf49 100644 --- a/pgloader/textreader.py +++ b/pgloader/textreader.py @@ -11,7 +11,7 @@ from cStringIO import StringIO from tools import PGLoader_Error, Reject, parse_config_string from db import db from lo import ifx_clob, ifx_blob -from reader import DataReader +from reader import DataReader, UnbufferedFileReader from options import DRY_RUN, PEDANTIC from options import TRUNCATE, VACUUM @@ -72,36 +72,15 @@ class TextReader(DataReader): self.log.debug('beginning on first line') begin_linenb = 1 - self._open() - - if self.start is not None and self.start > 0: - self.log.debug("Text Reader starting at offset %d" % self.start) - self.fd.seek(self.start) - - self.log.debug("textreader at position %d" % self.fd.tell()) + self.fd = UnbufferedFileReader(self.filename, self.log, + encoding = self.input_encoding, + start = self.start, + end = self.end) - #for line in self.fd.readline(): - - line = 'dumb non-empty init value' - last_line_read = False - - while line != '': - line = self.fd.readline() - + for line in self.fd: # we count real physical lines nb_plines += 1 - if last_line_read: - self.log.debug("Text Reader stoping, offset %d >= %d" \ - % (self.fd.tell(), self.end)) - self.fd.close() - return - - if self.end is not None and self.fd.tell() >= self.end: - # we want to process current line and stop at next - # iteration - last_line_read = True - if self.input_encoding is not None: # this may not be necessary, after all try: