diff --git a/examples/parallel.conf b/examples/parallel.conf index b1a5072..faa6ebc 100644 --- a/examples/parallel.conf +++ b/examples/parallel.conf @@ -29,12 +29,13 @@ filename = parallel/parallel.data field_sep = ; columns = a, b -[parallel_greg] +[split] use_template = parallel_template -section_threads = 4 +section_threads = 2 split_file_reading = True -[parallel_simon] +; round robin reader (1 reader feed section_threads processing threads) +[rrr] use_template = parallel_template section_threads = 2 split_file_reading = False diff --git a/pgloader.py b/pgloader.py index 3892f3d..da311b7 100644 --- a/pgloader.py +++ b/pgloader.py @@ -393,16 +393,6 @@ def print_summary(dbconn, sections, summary, td): return retcode -def running_threads(threads): - """ count running threads """ - running = 0 - for s in threads: - if threads[s].isAlive(): - running += 1 - - return running - - def load_data(): """ read option line and configuration file, then process data import of given section, or all sections if no section is given on @@ -523,13 +513,15 @@ def load_data(): current += 1 if not interrupted: + from pgloader.tools import running_threads + n = running_threads(threads) log.info("Waiting for %d threads to terminate" % n) # Try to acquire all semaphore entries for i in range(max_running): sem.acquire() - log.debug("Acquired %d times, " % i + \ + log.debug("Acquired %d times, " % (i+1) + \ "still waiting for %d threads to terminate" \ % running_threads(threads)) diff --git a/pgloader/csvreader.py b/pgloader/csvreader.py index 07953f3..4366bf1 100644 --- a/pgloader/csvreader.py +++ b/pgloader/csvreader.py @@ -82,11 +82,11 @@ class CSVReader(DataReader): # now read the lines for columns in csv.reader(fd, dialect = 'pgloader'): - - if self.end is not None and fd.tell() >= self.end: - self.log.info("CSV Reader stoping, offset %d >= %d" % (fd.tell(), self.end())) - fd.close() - break - line = self.field_sep.join(columns) yield line, columns + + if self.end is not None and fd.tell() >= self.end: + self.log.info("CSV Reader stoping, offset %d >= %d" % (fd.tell(), self.end)) + fd.close() + return + diff --git a/pgloader/pgloader.py b/pgloader/pgloader.py index a5d7c10..0ee5a2b 100644 --- a/pgloader/pgloader.py +++ b/pgloader/pgloader.py @@ -743,9 +743,9 @@ class PGLoader(threading.Thread): dummy_str = fd.readline() # update both current boundary end and next start - boundaries[b] = (start, fd.tell()) + boundaries[b] = (start, fd.tell()-1) if (b+1) < len(boundaries): - boundaries[b+1] = (fd.tell()+1, boundaries[b+1][1]) + boundaries[b+1] = (fd.tell(), boundaries[b+1][1]) fd.close() @@ -755,7 +755,6 @@ class PGLoader(threading.Thread): self.prepare_processing() # now create self.section_threads PGLoader threads - # the semaphore here is not really usefull, but is part of the API sem = threading.BoundedSemaphore(self.section_threads) summary = {} threads = {} @@ -776,29 +775,37 @@ class PGLoader(threading.Thread): threads[current_name] = loader threads[current_name].start() running += 1 - + except Exception, e: raise - # wait for loaders completion - while running > 0: - try: - for cn in threads: - if not threads[cn].isAlive(): - running -= 1 + # wait for loaders completion, first let them some time to + # be started + time.sleep(2) + + from tools import running_threads + n = running_threads(threads) + log.info("Waiting for %d threads to terminate" % n) - if running > 0: - self.log.info('waiting for %d threads, sleeping %gs' % (running, 1)) - time.sleep(1) - - except KeyboardInterrupt: - self.log.warning("Aborting %d threads in section %s "\ - % (running, self.name)) - break + # Try to acquire all semaphore entries + for i in range(self.section_threads): + sem.acquire() + log.info("Acquired %d times, " % (i+1) + \ + "still waiting for %d threads to terminate" \ + % running_threads(threads)) self.finish_processing() + self.duration = time.time() - self.init_time self.log.info('No more threads are running, %s done' % self.name) + stats = [0, 0] + for s in summary: + for i in range(2, len(summary[s])): + stats[i-2] += summary[s][i] + + for x in [self.table, self.duration] + stats: + self.stats.append(x) + else: # here we need a special thread reading the file pass @@ -830,7 +837,11 @@ class PGLoader(threading.Thread): # then show up some stats self.print_stats() - # update the main summary + self.log.info("loading done") + return + + def update_summary(self): + """ update the main summary """ self.duration = time.time() - self.init_time if self.reject is not None: @@ -839,10 +850,6 @@ class PGLoader(threading.Thread): for x in [self.table, self.duration, self.db.commited_rows, self.errors]: self.stats.append(x) - self.log.info("loading done") - return - - def process(self): """ depending on configuration, do given job """ @@ -854,6 +861,8 @@ class PGLoader(threading.Thread): # elif: COPY process also blob data self.log.info("UPDATE blob data") + self.update_summary() + def data_import(self): """ import CSV or TEXT data, using COPY """ diff --git a/pgloader/textreader.py b/pgloader/textreader.py index cf92778..f7a70df 100644 --- a/pgloader/textreader.py +++ b/pgloader/textreader.py @@ -105,7 +105,7 @@ class TextReader(DataReader): nb_plines += 1 if self.end is not None and fd.tell() >= self.end: - self.log.info("Text Reader stoping, offset %d >= %d" % (fd.tell(), self.end())) + self.log.info("Text Reader stoping, offset %d >= %d" % (fd.tell(), self.end)) fd.close() break diff --git a/pgloader/tools.py b/pgloader/tools.py index f13d06c..6d786d0 100644 --- a/pgloader/tools.py +++ b/pgloader/tools.py @@ -167,3 +167,12 @@ def check_dirname(path): return True, None + +def running_threads(threads): + """ count running threads """ + running = 0 + for s in threads: + if threads[s].isAlive(): + running += 1 + + return running