Better handling of split_file_reading case, Still In Progress

This commit is contained in:
dim 2008-02-12 12:24:15 +00:00
parent b534962192
commit a2195ec2e7
6 changed files with 55 additions and 44 deletions

View File

@ -29,12 +29,13 @@ filename = parallel/parallel.data
field_sep = ;
columns = a, b
[parallel_greg]
[split]
use_template = parallel_template
section_threads = 4
section_threads = 2
split_file_reading = True
[parallel_simon]
; round robin reader (1 reader feed section_threads processing threads)
[rrr]
use_template = parallel_template
section_threads = 2
split_file_reading = False

View File

@ -393,16 +393,6 @@ def print_summary(dbconn, sections, summary, td):
return retcode
def running_threads(threads):
""" count running threads """
running = 0
for s in threads:
if threads[s].isAlive():
running += 1
return running
def load_data():
""" read option line and configuration file, then process data
import of given section, or all sections if no section is given on
@ -523,13 +513,15 @@ def load_data():
current += 1
if not interrupted:
from pgloader.tools import running_threads
n = running_threads(threads)
log.info("Waiting for %d threads to terminate" % n)
# Try to acquire all semaphore entries
for i in range(max_running):
sem.acquire()
log.debug("Acquired %d times, " % i + \
log.debug("Acquired %d times, " % (i+1) + \
"still waiting for %d threads to terminate" \
% running_threads(threads))

View File

@ -82,11 +82,11 @@ class CSVReader(DataReader):
# now read the lines
for columns in csv.reader(fd, dialect = 'pgloader'):
if self.end is not None and fd.tell() >= self.end:
self.log.info("CSV Reader stoping, offset %d >= %d" % (fd.tell(), self.end()))
fd.close()
break
line = self.field_sep.join(columns)
yield line, columns
if self.end is not None and fd.tell() >= self.end:
self.log.info("CSV Reader stoping, offset %d >= %d" % (fd.tell(), self.end))
fd.close()
return

View File

@ -743,9 +743,9 @@ class PGLoader(threading.Thread):
dummy_str = fd.readline()
# update both current boundary end and next start
boundaries[b] = (start, fd.tell())
boundaries[b] = (start, fd.tell()-1)
if (b+1) < len(boundaries):
boundaries[b+1] = (fd.tell()+1, boundaries[b+1][1])
boundaries[b+1] = (fd.tell(), boundaries[b+1][1])
fd.close()
@ -755,7 +755,6 @@ class PGLoader(threading.Thread):
self.prepare_processing()
# now create self.section_threads PGLoader threads
# the semaphore here is not really usefull, but is part of the API
sem = threading.BoundedSemaphore(self.section_threads)
summary = {}
threads = {}
@ -776,29 +775,37 @@ class PGLoader(threading.Thread):
threads[current_name] = loader
threads[current_name].start()
running += 1
except Exception, e:
raise
# wait for loaders completion
while running > 0:
try:
for cn in threads:
if not threads[cn].isAlive():
running -= 1
# wait for loaders completion, first let them some time to
# be started
time.sleep(2)
from tools import running_threads
n = running_threads(threads)
log.info("Waiting for %d threads to terminate" % n)
if running > 0:
self.log.info('waiting for %d threads, sleeping %gs' % (running, 1))
time.sleep(1)
except KeyboardInterrupt:
self.log.warning("Aborting %d threads in section %s "\
% (running, self.name))
break
# Try to acquire all semaphore entries
for i in range(self.section_threads):
sem.acquire()
log.info("Acquired %d times, " % (i+1) + \
"still waiting for %d threads to terminate" \
% running_threads(threads))
self.finish_processing()
self.duration = time.time() - self.init_time
self.log.info('No more threads are running, %s done' % self.name)
stats = [0, 0]
for s in summary:
for i in range(2, len(summary[s])):
stats[i-2] += summary[s][i]
for x in [self.table, self.duration] + stats:
self.stats.append(x)
else:
# here we need a special thread reading the file
pass
@ -830,7 +837,11 @@ class PGLoader(threading.Thread):
# then show up some stats
self.print_stats()
# update the main summary
self.log.info("loading done")
return
def update_summary(self):
""" update the main summary """
self.duration = time.time() - self.init_time
if self.reject is not None:
@ -839,10 +850,6 @@ class PGLoader(threading.Thread):
for x in [self.table, self.duration, self.db.commited_rows, self.errors]:
self.stats.append(x)
self.log.info("loading done")
return
def process(self):
""" depending on configuration, do given job """
@ -854,6 +861,8 @@ class PGLoader(threading.Thread):
# elif: COPY process also blob data
self.log.info("UPDATE blob data")
self.update_summary()
def data_import(self):
""" import CSV or TEXT data, using COPY """

View File

@ -105,7 +105,7 @@ class TextReader(DataReader):
nb_plines += 1
if self.end is not None and fd.tell() >= self.end:
self.log.info("Text Reader stoping, offset %d >= %d" % (fd.tell(), self.end()))
self.log.info("Text Reader stoping, offset %d >= %d" % (fd.tell(), self.end))
fd.close()
break

View File

@ -167,3 +167,12 @@ def check_dirname(path):
return True, None
def running_threads(threads):
""" count running threads """
running = 0
for s in threads:
if threads[s].isAlive():
running += 1
return running