mirror of
https://github.com/dimitri/pgloader.git
synced 2026-05-04 10:31:02 +02:00
Split file reading works with text format (problem was file iterator buffering)
This commit is contained in:
parent
a068a5ea6a
commit
610d3d6c2e
@ -29,7 +29,7 @@ field_sep = ;
|
||||
columns = a, b
|
||||
|
||||
[split]
|
||||
format = csv
|
||||
format = text
|
||||
use_template = ptmpl
|
||||
section_threads = 2
|
||||
split_file_reading = True
|
||||
|
||||
@ -85,7 +85,7 @@ table = udc
|
||||
format = text
|
||||
filename = udc/udc.data
|
||||
input_encoding = 'latin1'
|
||||
field_sep = §
|
||||
field_sep = %
|
||||
columns = b:2, d:1, x:3, y:4
|
||||
udc_c = constant value
|
||||
copy_columns = b, c, d
|
||||
|
||||
@ -1,5 +1,5 @@
|
||||
1§5§foo§bar
|
||||
2§10§bar§toto
|
||||
3§4§toto§titi
|
||||
4§18§titi§baz
|
||||
5§2§baz§foo
|
||||
1%5%foo%bar
|
||||
2%10%bar%toto
|
||||
3%4%toto%titi
|
||||
4%18%titi%baz
|
||||
5%2%baz%foo
|
||||
|
||||
@ -5,7 +5,7 @@
|
||||
# handles configuration, parse data, then pass them to database module for
|
||||
# COPY preparation
|
||||
|
||||
import os, sys, os.path, time, codecs, csv
|
||||
import os, sys, os.path, time, csv
|
||||
from cStringIO import StringIO
|
||||
|
||||
from tools import PGLoader_Error, Reject, parse_config_string
|
||||
@ -69,17 +69,22 @@ class CSVReader(DataReader):
|
||||
self._open()
|
||||
|
||||
if self.start is not None and self.start > 0:
|
||||
self.log.info("CSV Reader starting at offset %d" % self.start)
|
||||
self.log.debug("CSV Reader starting at offset %d" % self.start)
|
||||
self.fd.seek(self.start)
|
||||
|
||||
self.log.info("csvreader at position %d" % self.fd.tell())
|
||||
|
||||
# now read the lines
|
||||
for columns in csv.reader(self.fd, dialect = 'pgloader'):
|
||||
|
||||
self.log.info("csvreader at position %d" % self.fd.tell())
|
||||
|
||||
line = self.field_sep.join(columns)
|
||||
yield line, columns
|
||||
|
||||
if self.end is not None and self.fd.tell() >= self.end:
|
||||
self.log.info("CSV Reader stoping, offset %d >= %d" \
|
||||
% (self.fd.tell(), self.end))
|
||||
self.log.debug("CSV Reader stoping, offset %d >= %d" \
|
||||
% (self.fd.tell(), self.end))
|
||||
self.fd.close()
|
||||
return
|
||||
|
||||
|
||||
@ -137,9 +137,11 @@ class PGLoader(threading.Thread):
|
||||
self.db.log = self.log
|
||||
self.db.reset()
|
||||
|
||||
if not self.template and not DRY_RUN:
|
||||
# check we have properly configured the copy separator
|
||||
if self.db.copy_sep is None:
|
||||
self.log.debug("%s" % self.db)
|
||||
self.log.error("COPY sep is %s" % self.db.copy_sep)
|
||||
msg = "BUG: pgloader couldn't configure its COPY separator"
|
||||
raise PGLoader_Error, msg
|
||||
|
||||
@ -567,7 +569,7 @@ class PGLoader(threading.Thread):
|
||||
self.table, self.columns,
|
||||
self.newline_escapes)
|
||||
|
||||
self.log.info('reader.readconfig()')
|
||||
self.log.debug('reader.readconfig()')
|
||||
self.reader.readconfig(config, name, self.tsection)
|
||||
|
||||
|
||||
@ -776,8 +778,8 @@ class PGLoader(threading.Thread):
|
||||
|
||||
if self.section_threads == 1:
|
||||
if 'reader' in self.__dict__ and self.reader.start is not None:
|
||||
self.log.info("Loading from offset %d to %d" \
|
||||
% (self.reader.start, self.reader.end))
|
||||
self.log.debug("Loading from offset %d to %d" \
|
||||
% (self.reader.start, self.reader.end))
|
||||
|
||||
try:
|
||||
# catch worker exception
|
||||
@ -817,8 +819,12 @@ class PGLoader(threading.Thread):
|
||||
more thread is running
|
||||
"""
|
||||
from tools import running_threads
|
||||
|
||||
n = running_threads(workers)
|
||||
self.log.info("Waiting for %d/%d threads to terminate" \
|
||||
% (n, len(workers)))
|
||||
|
||||
for i in range(len(workers)):
|
||||
for i in range(n):
|
||||
sem.acquire()
|
||||
self.log.debug("Acquired %d times, " % (i+1) + \
|
||||
"still waiting for %d threads to terminate" \
|
||||
@ -893,9 +899,6 @@ class PGLoader(threading.Thread):
|
||||
# be started
|
||||
time.sleep(2)
|
||||
|
||||
n = running_threads(threads)
|
||||
log.info("Waiting for %d threads to terminate" % n)
|
||||
|
||||
self.wait_for_workers(sem, threads)
|
||||
self.finish_processing()
|
||||
self.duration = time.time() - self.init_time
|
||||
@ -1020,10 +1023,6 @@ class PGLoader(threading.Thread):
|
||||
self.log.debug("locks[%d].release (done set)" % c)
|
||||
locks[c].release()
|
||||
|
||||
from tools import running_threads
|
||||
n = running_threads(threads)
|
||||
self.log.info("Waiting for %d threads to terminate" % n)
|
||||
|
||||
self.wait_for_workers(sem, threads)
|
||||
self.finish_processing()
|
||||
self.duration = time.time() - self.init_time
|
||||
|
||||
@ -102,9 +102,11 @@ class DataReader:
|
||||
|
||||
if self.input_encoding is not None:
|
||||
try:
|
||||
import codecs
|
||||
self.fd = codecs.open(self.filename,
|
||||
encoding = self.input_encoding,
|
||||
buffering = self.bufsize)
|
||||
self.log.info("Opened '%s' with encoding '%s'" % (self.filename, self.input_encoding))
|
||||
except LookupError, e:
|
||||
# codec not found
|
||||
raise PGLoader_Error, "Input codec: %s" % e
|
||||
@ -117,7 +119,9 @@ class DataReader:
|
||||
except IOError, error:
|
||||
raise PGLoader_Error, error
|
||||
|
||||
self.log.info("Opened '%s' in %s" % (self.filename, self.fd))
|
||||
self.log.debug("Opened '%s' in %s (fileno %s), ftell %d" \
|
||||
% (self.filename, self.fd,
|
||||
self.fd.fileno(), self.fd.tell()))
|
||||
return self.fd
|
||||
|
||||
def readlines(self):
|
||||
@ -130,5 +134,5 @@ class DataReader:
|
||||
self.start = start
|
||||
self.end = end
|
||||
|
||||
self.log.info("reader start=%d, end=%d" % (self.start, self.end))
|
||||
self.log.debug("reader start=%d, end=%d" % (self.start, self.end))
|
||||
|
||||
|
||||
@ -5,7 +5,7 @@
|
||||
# handles configuration, parse data, then pass them to database module for
|
||||
# COPY preparation
|
||||
|
||||
import os, sys, os.path, time, codecs
|
||||
import os, sys, os.path, time
|
||||
from cStringIO import StringIO
|
||||
|
||||
from tools import PGLoader_Error, Reject, parse_config_string
|
||||
@ -69,24 +69,38 @@ class TextReader(DataReader):
|
||||
##
|
||||
# if neither -I nor -F was used, we can state that begin = 0
|
||||
if FROM_ID is None and FROM_COUNT == 0:
|
||||
self.log.info('beginning on first line')
|
||||
self.log.debug('beginning on first line')
|
||||
begin_linenb = 1
|
||||
|
||||
self._open()
|
||||
|
||||
if self.start is not None and self.start > 0:
|
||||
self.log.info("Text Reader starting at offset %d" % self.start)
|
||||
self.log.debug("Text Reader starting at offset %d" % self.start)
|
||||
self.fd.seek(self.start)
|
||||
|
||||
for line in self.fd:
|
||||
self.log.debug("textreader at position %d" % self.fd.tell())
|
||||
|
||||
#for line in self.fd.readline():
|
||||
|
||||
line = 'dumb non-empty init value'
|
||||
last_line_read = False
|
||||
|
||||
while line != '':
|
||||
line = self.fd.readline()
|
||||
|
||||
# we count real physical lines
|
||||
nb_plines += 1
|
||||
|
||||
if self.end is not None and self.fd.tell() >= self.end:
|
||||
self.log.info("Text Reader stoping, offset %d >= %d" \
|
||||
% (self.fd.tell(), self.end))
|
||||
if last_line_read:
|
||||
self.log.debug("Text Reader stoping, offset %d >= %d" \
|
||||
% (self.fd.tell(), self.end))
|
||||
self.fd.close()
|
||||
break
|
||||
return
|
||||
|
||||
if self.end is not None and self.fd.tell() >= self.end:
|
||||
# we want to process current line and stop at next
|
||||
# iteration
|
||||
last_line_read = True
|
||||
|
||||
if self.input_encoding is not None:
|
||||
# this may not be necessary, after all
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user