From 41f1a40e49004e5a441006d9d5113ffe6facec77 Mon Sep 17 00:00:00 2001 From: dim Date: Thu, 14 Feb 2008 12:18:49 +0000 Subject: [PATCH] Factorize readers file opening code --- examples/parallel.conf | 2 +- pgloader/csvreader.py | 25 ++++++++----------------- pgloader/reader.py | 24 ++++++++++++++++++++++++ pgloader/textreader.py | 26 +++++++------------------- 4 files changed, 40 insertions(+), 37 deletions(-) diff --git a/examples/parallel.conf b/examples/parallel.conf index 3da983c..2518ca4 100644 --- a/examples/parallel.conf +++ b/examples/parallel.conf @@ -29,7 +29,7 @@ field_sep = ; columns = a, b [split] -format = text +format = csv use_template = parallel_template section_threads = 2 split_file_reading = True diff --git a/pgloader/csvreader.py b/pgloader/csvreader.py index f617270..66fb512 100644 --- a/pgloader/csvreader.py +++ b/pgloader/csvreader.py @@ -66,29 +66,20 @@ class CSVReader(DataReader): csv.register_dialect('pgloader', pgloader_dialect) - if self.input_encoding is not None: - try: - fd = codecs.open(self.filename, encoding = self.input_encoding) - except LookupError, e: - # codec not found - raise PGLoader_Error, "Input codec: %s" % e - else: - try: - fd = open(self.filename, "rb") - except IOError, error: - raise PGLoader_Error, error - + self._open() + if self.start is not None and self.start > 0: self.log.info("CSV Reader starting at offset %d" % self.start) - fd.seek(self.start) + self.fd.seek(self.start) # now read the lines - for columns in csv.reader(fd, dialect = 'pgloader'): + for columns in csv.reader(self.fd, dialect = 'pgloader'): line = self.field_sep.join(columns) yield line, columns - if self.end is not None and fd.tell() >= self.end: - self.log.info("CSV Reader stoping, offset %d >= %d" % (fd.tell(), self.end)) - fd.close() + if self.end is not None and self.fd.tell() >= self.end: + self.log.info("CSV Reader stoping, offset %d >= %d" \ + % (self.fd.tell(), self.end)) + self.fd.close() return diff --git a/pgloader/reader.py b/pgloader/reader.py index 0757ef7..6ccb962 100644 --- a/pgloader/reader.py +++ b/pgloader/reader.py @@ -93,6 +93,30 @@ class DataReader: return self.__dict__[option] + def _open(self, mode = 'rb'): + """ open self.filename wrt self.encoding """ + # we don't yet force buffering, but... + self.bufsize = -1 + + if self.input_encoding is not None: + try: + self.fd = codecs.open(self.filename, + encoding = self.input_encoding, + buffering = self.bufsize) + except LookupError, e: + # codec not found + raise PGLoader_Error, "Input codec: %s" % e + except IOError, e: + # file not found, for example + raise PGLoader_Error, "IOError: %s" % e + else: + try: + self.fd = open(self.filename, mode, self.bufsize) + except IOError, error: + raise PGLoader_Error, error + + return self.fd + def readlines(self): """ read data from configured file, and generate (yields) for each data line: line, columns and rowid """ diff --git a/pgloader/textreader.py b/pgloader/textreader.py index 882fc4b..fc79631 100644 --- a/pgloader/textreader.py +++ b/pgloader/textreader.py @@ -72,32 +72,20 @@ class TextReader(DataReader): self.log.info('beginning on first line') begin_linenb = 1 - if self.input_encoding is not None: - try: - fd = codecs.open(self.filename, encoding = self.input_encoding) - except LookupError, e: - # codec not found - raise PGLoader_Error, "Input codec: %s" % e - except IOError, e: - # file not found, for example - raise PGLoader_Error, "IOError: %s" % e - else: - try: - fd = open(self.filename) - except IOError, error: - raise PGLoader_Error, error + self._open() if self.start is not None and self.start > 0: self.log.info("Text Reader starting at offset %d" % self.start) - fd.seek(self.start) + self.fd.seek(self.start) - for line in fd: + for line in self.fd: # we count real physical lines nb_plines += 1 - if self.end is not None and fd.tell() >= self.end: - self.log.info("Text Reader stoping, offset %d >= %d" % (fd.tell(), self.end)) - fd.close() + if self.end is not None and self.fd.tell() >= self.end: + self.log.info("Text Reader stoping, offset %d >= %d" \ + % (self.fd.tell(), self.end)) + self.fd.close() break if self.input_encoding is not None: