diff --git a/pgloader.py b/pgloader.py index 494e57d..246b65a 100755 --- a/pgloader.py +++ b/pgloader.py @@ -752,23 +752,29 @@ def load_data(): log.info("All threads are started, wait for them to terminate") check_events(finished, log, "processing is over") + # check whether any thread failed + for section, loader in threads.iteritems(): + if not loader.success: + return 1 + # total duration td = time.time() - begin - retcode = 0 if SUMMARY and not interrupted: try: - retcode = print_summary(None, sections, summary, td) + print_summary(None, sections, summary, td) print except PGLoader_Error, e: log.error("Can't print summary: %s" % e) + return 1 except KeyboardInterrupt: - pass + return 1 - return retcode + return 0 if __name__ == "__main__": + ret = 1 try: ret = load_data() except Exception, e: diff --git a/pgloader/pgloader.py b/pgloader/pgloader.py index 5b1becd..7326748 100644 --- a/pgloader/pgloader.py +++ b/pgloader/pgloader.py @@ -826,36 +826,42 @@ class PGLoader(threading.Thread): self.sem.acquire() self.log.debug("%s acquired starting semaphore" % self.logname) - # postinit (will call db.reset() which will get us connected) - self.log.debug("%s postinit" % self.logname) - self._postinit() - # tell parent thread we are running now self.started.set() self.init_time = time.time() + try: + # postinit (will call db.reset() which will get us connected) + self.log.debug("%s postinit" % self.logname) + self._postinit() + + # do the actual processing in do_run and handle all exceptions + self.do_run() + + except Exception, e: + self.log.error(e) + self.terminate(False) + return + + self.terminate() + return + + def do_run(self): + # Announce the beginning of the work self.log.info("%s processing" % self.logname) if self.section_threads == 1: - try: - # when "No space left on device" where logs are sent, - # we want to catch the exception - if 'reader' in self.__dict__ and self.reader.start is not None: - self.log.debug("Loading from offset %d to %d" \ - % (self.reader.start, self.reader.end)) + # when "No space left on device" where logs are sent, + # we want to catch the exception + if 'reader' in self.__dict__ and self.reader.start is not None: + self.log.debug("Loading from offset %d to %d" \ + % (self.reader.start, self.reader.end)) - self.prepare_processing() - self.process() - self.finish_processing() + self.prepare_processing() + self.process() + self.finish_processing() - except Exception, e: - # resources get freed in self.terminate() - self.terminate() - self.log.error(e) - raise - - self.terminate() return # Mutli-Threaded processing of current section @@ -873,10 +879,9 @@ class PGLoader(threading.Thread): # here we need a special thread reading the file self.round_robin_read() - self.terminate() return - def terminate(self): + def terminate(self, success = True): """ Announce it's over and free the concurrency control semaphore """ # force PostgreSQL connection closing, do not wait for garbage @@ -898,6 +903,7 @@ class PGLoader(threading.Thread): except IOError, e: pass + self.success = success self.finished.set() return