diff --git a/examples/README b/examples/README index 00104ac..bc73abd 100644 --- a/examples/README +++ b/examples/README @@ -6,7 +6,7 @@ the tables it needs, then issue the pgloader command: $ createdb --encoding=utf-8 pgloader $ cd examples $ psql pgloader < simple/simple.sql - $ ../pgloader.py -Tvc examples/pgloader.conf simple + $ ../pgloader.py -Tvc pgloader.conf simple If you want to load data from all examples, create tables for all of them first, then run pgloader without argument. diff --git a/pgloader.py b/pgloader.py index 7d70e2e..cae58be 100644 --- a/pgloader.py +++ b/pgloader.py @@ -534,6 +534,10 @@ def load_data(): finished[s].set() log.error(e) + except IOError, e: + # No space left on device? can't log it + break + if loader: if not loader.template: filename = loader.filename @@ -542,7 +546,7 @@ def load_data(): # .start() will sem.aquire(), so we won't have more # than max_running threads running at any time. - log.debug("Starting thread for %s" % s) + log.debug("Starting a thread for %s" % s) threads[s].start() else: log.info("Skipping section %s, which is a template" % s) @@ -568,6 +572,11 @@ def load_data(): except KeyboardInterrupt: interrupted = True log.warning("Aborting on user demand (Interrupt)") + break + + except IOError, e: + # typically, No Space Left On Device, can't report nor continue + break current += 1 @@ -595,5 +604,19 @@ def load_data(): return retcode if __name__ == "__main__": - sys.exit(load_data()) + try: + ret = load_data() + except Exception, e: + print >>STDERR, e + sys.exit(1) + + except IOError, e: + print >>STDERR, e + sys.exit(1) + + except KeyboardInterrupt, e: + print >>STDERR, e + sys.exit(1) + + sys.exit(ret) diff --git a/pgloader/db.py b/pgloader/db.py index 0331423..7245212 100644 --- a/pgloader/db.py +++ b/pgloader/db.py @@ -94,9 +94,13 @@ class db: def close(self): """ close self.dbconn PostgreSQL connection """ if self.dbconn is not None: - self.log.debug('closing current connection') + try: + self.log.info('closing current database connection') + except IOError, e: + # Ignore no space left on device etc here + pass + self.dbconn.close() - self.dbconn = None def set_encoding(self): diff --git a/pgloader/pgloader.py b/pgloader/pgloader.py index 6c8adc6..461fa5c 100644 --- a/pgloader/pgloader.py +++ b/pgloader/pgloader.py @@ -102,7 +102,7 @@ class PGLoader(threading.Thread): if config.has_option(name, 'use_template'): self.tsection = config.get(name, 'use_template') - self._dbconnect(config) + self._dbconfig(config) if self.tsection is not None: self.template = config.get(name, 'use_template') @@ -138,25 +138,14 @@ class PGLoader(threading.Thread): self.log.debug("Reading configuration from section [%s]", name) self._read_conf(name, config, db) - # Now reset database connection - if not DRY_RUN: - self.db.reset() - - if not self.template and not DRY_RUN: - # check we have properly configured the copy separator - if self.db.copy_sep is None: - self.log.debug("%s" % self.db) - self.log.error("COPY sep is %s" % self.db.copy_sep) - msg = "BUG: pgloader couldn't configure its COPY separator" - raise PGLoader_Error, msg - self.log.debug('%s init done' % name) def __del__(self): """ PGLoader destructor, we close the db connection """ - self.db.close() + if not DRY_RUN: + self.db.close() - def _dbconnect(self, config): + def _dbconfig(self, config): """ connects to database """ section = 'pgsql' @@ -215,6 +204,20 @@ class PGLoader(threading.Thread): except Exception, error: log.error("Could not initialize PostgreSQL connection") raise PGLoader_Error, error + + def _postinit(self): + """ This has to be called while self.sem is acquired """ + # Now reset database connection + if not DRY_RUN: + self.db.reset() + + if not self.template and not DRY_RUN: + # check we have properly configured the copy separator + if self.db.copy_sep is None: + self.log.debug("%s" % self.db) + self.log.error("COPY sep is %s" % self.db.copy_sep) + msg = "BUG: pgloader couldn't configure its COPY separator" + raise PGLoader_Error, msg def _read_conf(self, name, config, db, want_template = False): """ init self from config section name """ @@ -810,26 +813,35 @@ class PGLoader(threading.Thread): """ controling thread which dispatch the job """ # care about number of threads launched + self.log.debug("%s.sem.acquire()" % self.logname) self.sem.acquire() + self.log.debug("%s acquired starting semaphore" % self.logname) + + # postinit (will call db.reset() which will get us connected) + self.log.debug("%s postinit" % self.logname) + self._postinit() # tell parent thread we are running now self.started.set() self.init_time = time.time() # Announce the beginning of the work - self.log.debug("%s processing" % self.logname) + self.log.info("%s processing" % self.logname) if self.section_threads == 1: - if 'reader' in self.__dict__ and self.reader.start is not None: - self.log.debug("Loading from offset %d to %d" \ - % (self.reader.start, self.reader.end)) - try: - # catch worker exception + # when "No space left on device" where logs are sent, + # we want to catch the exception + if 'reader' in self.__dict__ and self.reader.start is not None: + self.log.debug("Loading from offset %d to %d" \ + % (self.reader.start, self.reader.end)) + self.prepare_processing() self.process() self.finish_processing() + except Exception, e: + # resources get freed in self.terminate() self.log.error(e) self.terminate() @@ -858,15 +870,24 @@ class PGLoader(threading.Thread): # force PostgreSQL connection closing, do not wait for garbage # collector - self.db.close() + if not DRY_RUN: + self.db.close() - self.log.debug("releasing %s semaphore" % self.logname) + try: + self.log.info("releasing %s semaphore" % self.logname) + except IOError, e: + # ignore "No space left on device" or other errors here + pass + self.sem.release() # tell parent thread processing is now over, here - self.log.debug("Announce it's over") + try: + self.log.info("Announce it's over") + except IOError, e: + pass + self.finished.set() - return def split_file_read(self): diff --git a/pgloader/reader.py b/pgloader/reader.py index 4a8de0f..527bea5 100644 --- a/pgloader/reader.py +++ b/pgloader/reader.py @@ -65,6 +65,11 @@ class DataReader: self.db.empty_string = parse_config_string(self.empty_string) self._getopt('field_sep', config, name, template, FIELD_SEP) + self.field_sep = self.field_sep.decode('string-escape') + + if len(self.field_sep) != 1: + raise PGLoader_Error, "field_sep must be 1 char, not %d (%s)" \ + % (len(self.field_sep), self.field_sep) if not DRY_RUN: if self.db.copy_sep is None: