Protect the database connection establishing into the parralel sections semaphore, allow field_sep to be configured to \t, and some fixes

This commit is contained in:
dim 2008-05-29 09:16:13 +00:00
parent d068b05ae2
commit 5a63f4f609
5 changed files with 83 additions and 30 deletions

View File

@ -6,7 +6,7 @@ the tables it needs, then issue the pgloader command:
$ createdb --encoding=utf-8 pgloader
$ cd examples
$ psql pgloader < simple/simple.sql
$ ../pgloader.py -Tvc examples/pgloader.conf simple
$ ../pgloader.py -Tvc pgloader.conf simple
If you want to load data from all examples, create tables for all of them
first, then run pgloader without argument.

View File

@ -534,6 +534,10 @@ def load_data():
finished[s].set()
log.error(e)
except IOError, e:
# No space left on device? can't log it
break
if loader:
if not loader.template:
filename = loader.filename
@ -542,7 +546,7 @@ def load_data():
# .start() will sem.aquire(), so we won't have more
# than max_running threads running at any time.
log.debug("Starting thread for %s" % s)
log.debug("Starting a thread for %s" % s)
threads[s].start()
else:
log.info("Skipping section %s, which is a template" % s)
@ -568,6 +572,11 @@ def load_data():
except KeyboardInterrupt:
interrupted = True
log.warning("Aborting on user demand (Interrupt)")
break
except IOError, e:
# typically, No Space Left On Device, can't report nor continue
break
current += 1
@ -595,5 +604,19 @@ def load_data():
return retcode
if __name__ == "__main__":
sys.exit(load_data())
try:
ret = load_data()
except Exception, e:
print >>STDERR, e
sys.exit(1)
except IOError, e:
print >>STDERR, e
sys.exit(1)
except KeyboardInterrupt, e:
print >>STDERR, e
sys.exit(1)
sys.exit(ret)

View File

@ -94,9 +94,13 @@ class db:
def close(self):
""" close self.dbconn PostgreSQL connection """
if self.dbconn is not None:
self.log.debug('closing current connection')
try:
self.log.info('closing current database connection')
except IOError, e:
# Ignore no space left on device etc here
pass
self.dbconn.close()
self.dbconn = None
def set_encoding(self):

View File

@ -102,7 +102,7 @@ class PGLoader(threading.Thread):
if config.has_option(name, 'use_template'):
self.tsection = config.get(name, 'use_template')
self._dbconnect(config)
self._dbconfig(config)
if self.tsection is not None:
self.template = config.get(name, 'use_template')
@ -138,25 +138,14 @@ class PGLoader(threading.Thread):
self.log.debug("Reading configuration from section [%s]", name)
self._read_conf(name, config, db)
# Now reset database connection
if not DRY_RUN:
self.db.reset()
if not self.template and not DRY_RUN:
# check we have properly configured the copy separator
if self.db.copy_sep is None:
self.log.debug("%s" % self.db)
self.log.error("COPY sep is %s" % self.db.copy_sep)
msg = "BUG: pgloader couldn't configure its COPY separator"
raise PGLoader_Error, msg
self.log.debug('%s init done' % name)
def __del__(self):
""" PGLoader destructor, we close the db connection """
self.db.close()
if not DRY_RUN:
self.db.close()
def _dbconnect(self, config):
def _dbconfig(self, config):
""" connects to database """
section = 'pgsql'
@ -215,6 +204,20 @@ class PGLoader(threading.Thread):
except Exception, error:
log.error("Could not initialize PostgreSQL connection")
raise PGLoader_Error, error
def _postinit(self):
""" This has to be called while self.sem is acquired """
# Now reset database connection
if not DRY_RUN:
self.db.reset()
if not self.template and not DRY_RUN:
# check we have properly configured the copy separator
if self.db.copy_sep is None:
self.log.debug("%s" % self.db)
self.log.error("COPY sep is %s" % self.db.copy_sep)
msg = "BUG: pgloader couldn't configure its COPY separator"
raise PGLoader_Error, msg
def _read_conf(self, name, config, db, want_template = False):
""" init self from config section name """
@ -810,26 +813,35 @@ class PGLoader(threading.Thread):
""" controling thread which dispatch the job """
# care about number of threads launched
self.log.debug("%s.sem.acquire()" % self.logname)
self.sem.acquire()
self.log.debug("%s acquired starting semaphore" % self.logname)
# postinit (will call db.reset() which will get us connected)
self.log.debug("%s postinit" % self.logname)
self._postinit()
# tell parent thread we are running now
self.started.set()
self.init_time = time.time()
# Announce the beginning of the work
self.log.debug("%s processing" % self.logname)
self.log.info("%s processing" % self.logname)
if self.section_threads == 1:
if 'reader' in self.__dict__ and self.reader.start is not None:
self.log.debug("Loading from offset %d to %d" \
% (self.reader.start, self.reader.end))
try:
# catch worker exception
# when "No space left on device" where logs are sent,
# we want to catch the exception
if 'reader' in self.__dict__ and self.reader.start is not None:
self.log.debug("Loading from offset %d to %d" \
% (self.reader.start, self.reader.end))
self.prepare_processing()
self.process()
self.finish_processing()
except Exception, e:
# resources get freed in self.terminate()
self.log.error(e)
self.terminate()
@ -858,15 +870,24 @@ class PGLoader(threading.Thread):
# force PostgreSQL connection closing, do not wait for garbage
# collector
self.db.close()
if not DRY_RUN:
self.db.close()
self.log.debug("releasing %s semaphore" % self.logname)
try:
self.log.info("releasing %s semaphore" % self.logname)
except IOError, e:
# ignore "No space left on device" or other errors here
pass
self.sem.release()
# tell parent thread processing is now over, here
self.log.debug("Announce it's over")
try:
self.log.info("Announce it's over")
except IOError, e:
pass
self.finished.set()
return
def split_file_read(self):

View File

@ -65,6 +65,11 @@ class DataReader:
self.db.empty_string = parse_config_string(self.empty_string)
self._getopt('field_sep', config, name, template, FIELD_SEP)
self.field_sep = self.field_sep.decode('string-escape')
if len(self.field_sep) != 1:
raise PGLoader_Error, "field_sep must be 1 char, not %d (%s)" \
% (len(self.field_sep), self.field_sep)
if not DRY_RUN:
if self.db.copy_sep is None: