mirror of
https://github.com/dimitri/pgloader.git
synced 2026-05-04 10:31:02 +02:00
Factorize readers file opening code
This commit is contained in:
parent
4bfaea7d83
commit
41f1a40e49
@ -29,7 +29,7 @@ field_sep = ;
|
||||
columns = a, b
|
||||
|
||||
[split]
|
||||
format = text
|
||||
format = csv
|
||||
use_template = parallel_template
|
||||
section_threads = 2
|
||||
split_file_reading = True
|
||||
|
||||
@ -66,29 +66,20 @@ class CSVReader(DataReader):
|
||||
|
||||
csv.register_dialect('pgloader', pgloader_dialect)
|
||||
|
||||
if self.input_encoding is not None:
|
||||
try:
|
||||
fd = codecs.open(self.filename, encoding = self.input_encoding)
|
||||
except LookupError, e:
|
||||
# codec not found
|
||||
raise PGLoader_Error, "Input codec: %s" % e
|
||||
else:
|
||||
try:
|
||||
fd = open(self.filename, "rb")
|
||||
except IOError, error:
|
||||
raise PGLoader_Error, error
|
||||
|
||||
self._open()
|
||||
|
||||
if self.start is not None and self.start > 0:
|
||||
self.log.info("CSV Reader starting at offset %d" % self.start)
|
||||
fd.seek(self.start)
|
||||
self.fd.seek(self.start)
|
||||
|
||||
# now read the lines
|
||||
for columns in csv.reader(fd, dialect = 'pgloader'):
|
||||
for columns in csv.reader(self.fd, dialect = 'pgloader'):
|
||||
line = self.field_sep.join(columns)
|
||||
yield line, columns
|
||||
|
||||
if self.end is not None and fd.tell() >= self.end:
|
||||
self.log.info("CSV Reader stoping, offset %d >= %d" % (fd.tell(), self.end))
|
||||
fd.close()
|
||||
if self.end is not None and self.fd.tell() >= self.end:
|
||||
self.log.info("CSV Reader stoping, offset %d >= %d" \
|
||||
% (self.fd.tell(), self.end))
|
||||
self.fd.close()
|
||||
return
|
||||
|
||||
|
||||
@ -93,6 +93,30 @@ class DataReader:
|
||||
|
||||
return self.__dict__[option]
|
||||
|
||||
def _open(self, mode = 'rb'):
|
||||
""" open self.filename wrt self.encoding """
|
||||
# we don't yet force buffering, but...
|
||||
self.bufsize = -1
|
||||
|
||||
if self.input_encoding is not None:
|
||||
try:
|
||||
self.fd = codecs.open(self.filename,
|
||||
encoding = self.input_encoding,
|
||||
buffering = self.bufsize)
|
||||
except LookupError, e:
|
||||
# codec not found
|
||||
raise PGLoader_Error, "Input codec: %s" % e
|
||||
except IOError, e:
|
||||
# file not found, for example
|
||||
raise PGLoader_Error, "IOError: %s" % e
|
||||
else:
|
||||
try:
|
||||
self.fd = open(self.filename, mode, self.bufsize)
|
||||
except IOError, error:
|
||||
raise PGLoader_Error, error
|
||||
|
||||
return self.fd
|
||||
|
||||
def readlines(self):
|
||||
""" read data from configured file, and generate (yields) for
|
||||
each data line: line, columns and rowid """
|
||||
|
||||
@ -72,32 +72,20 @@ class TextReader(DataReader):
|
||||
self.log.info('beginning on first line')
|
||||
begin_linenb = 1
|
||||
|
||||
if self.input_encoding is not None:
|
||||
try:
|
||||
fd = codecs.open(self.filename, encoding = self.input_encoding)
|
||||
except LookupError, e:
|
||||
# codec not found
|
||||
raise PGLoader_Error, "Input codec: %s" % e
|
||||
except IOError, e:
|
||||
# file not found, for example
|
||||
raise PGLoader_Error, "IOError: %s" % e
|
||||
else:
|
||||
try:
|
||||
fd = open(self.filename)
|
||||
except IOError, error:
|
||||
raise PGLoader_Error, error
|
||||
self._open()
|
||||
|
||||
if self.start is not None and self.start > 0:
|
||||
self.log.info("Text Reader starting at offset %d" % self.start)
|
||||
fd.seek(self.start)
|
||||
self.fd.seek(self.start)
|
||||
|
||||
for line in fd:
|
||||
for line in self.fd:
|
||||
# we count real physical lines
|
||||
nb_plines += 1
|
||||
|
||||
if self.end is not None and fd.tell() >= self.end:
|
||||
self.log.info("Text Reader stoping, offset %d >= %d" % (fd.tell(), self.end))
|
||||
fd.close()
|
||||
if self.end is not None and self.fd.tell() >= self.end:
|
||||
self.log.info("Text Reader stoping, offset %d >= %d" \
|
||||
% (self.fd.tell(), self.end))
|
||||
self.fd.close()
|
||||
break
|
||||
|
||||
if self.input_encoding is not None:
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user