From 9393180c0c2f3555043d7bee049e2370167b2227 Mon Sep 17 00:00:00 2001 From: Davlet Panech Date: Mon, 20 Sep 2010 15:07:31 -0400 Subject: [PATCH] Better error handling in loader threads. The main script currently hangs if any loader fails to connect to the database. This happens because connection exceptions from PGLoader._postinit are not handled, and the main script keeps waiting for "finished" events that never happen. This patch ensures that exceptions in the loader threads are noted by the main script, and that an appropriate exit status is returned to the OS. Changes in pgloader/pgloader.py: - Handle all exceptions in PGLoader.run - Add a boolean "success" flag to PGLoader class that determines whether any errors occured within that object's main thread - Whenever an exception happens within a thread, set the "success" flag to false and call "terminate" Changes in pgloader.py: - check "success" flag on all threads after they finish - return 0 on success and 1 on any error - the script no longer returns the result of "print_summary", just 0 or 1. --- pgloader.py | 14 +++++++++---- pgloader/pgloader.py | 50 +++++++++++++++++++++++++------------------- 2 files changed, 38 insertions(+), 26 deletions(-) diff --git a/pgloader.py b/pgloader.py index 494e57d..246b65a 100755 --- a/pgloader.py +++ b/pgloader.py @@ -752,23 +752,29 @@ def load_data(): log.info("All threads are started, wait for them to terminate") check_events(finished, log, "processing is over") + # check whether any thread failed + for section, loader in threads.iteritems(): + if not loader.success: + return 1 + # total duration td = time.time() - begin - retcode = 0 if SUMMARY and not interrupted: try: - retcode = print_summary(None, sections, summary, td) + print_summary(None, sections, summary, td) print except PGLoader_Error, e: log.error("Can't print summary: %s" % e) + return 1 except KeyboardInterrupt: - pass + return 1 - return retcode + return 0 if __name__ == "__main__": + ret = 1 try: ret = load_data() except Exception, e: diff --git a/pgloader/pgloader.py b/pgloader/pgloader.py index 5b1becd..7326748 100644 --- a/pgloader/pgloader.py +++ b/pgloader/pgloader.py @@ -826,36 +826,42 @@ class PGLoader(threading.Thread): self.sem.acquire() self.log.debug("%s acquired starting semaphore" % self.logname) - # postinit (will call db.reset() which will get us connected) - self.log.debug("%s postinit" % self.logname) - self._postinit() - # tell parent thread we are running now self.started.set() self.init_time = time.time() + try: + # postinit (will call db.reset() which will get us connected) + self.log.debug("%s postinit" % self.logname) + self._postinit() + + # do the actual processing in do_run and handle all exceptions + self.do_run() + + except Exception, e: + self.log.error(e) + self.terminate(False) + return + + self.terminate() + return + + def do_run(self): + # Announce the beginning of the work self.log.info("%s processing" % self.logname) if self.section_threads == 1: - try: - # when "No space left on device" where logs are sent, - # we want to catch the exception - if 'reader' in self.__dict__ and self.reader.start is not None: - self.log.debug("Loading from offset %d to %d" \ - % (self.reader.start, self.reader.end)) + # when "No space left on device" where logs are sent, + # we want to catch the exception + if 'reader' in self.__dict__ and self.reader.start is not None: + self.log.debug("Loading from offset %d to %d" \ + % (self.reader.start, self.reader.end)) - self.prepare_processing() - self.process() - self.finish_processing() + self.prepare_processing() + self.process() + self.finish_processing() - except Exception, e: - # resources get freed in self.terminate() - self.terminate() - self.log.error(e) - raise - - self.terminate() return # Mutli-Threaded processing of current section @@ -873,10 +879,9 @@ class PGLoader(threading.Thread): # here we need a special thread reading the file self.round_robin_read() - self.terminate() return - def terminate(self): + def terminate(self, success = True): """ Announce it's over and free the concurrency control semaphore """ # force PostgreSQL connection closing, do not wait for garbage @@ -898,6 +903,7 @@ class PGLoader(threading.Thread): except IOError, e: pass + self.success = success self.finished.set() return