diff --git a/examples/parallel.conf b/examples/parallel.conf index 2518ca4..40e6b82 100644 --- a/examples/parallel.conf +++ b/examples/parallel.conf @@ -21,7 +21,7 @@ empty_string = "\ " max_parallel_sections = 1 -[parallel_template] +[ptmpl] template = True table = parallel filename = parallel/parallel.data @@ -30,14 +30,14 @@ columns = a, b [split] format = csv -use_template = parallel_template +use_template = ptmpl section_threads = 2 split_file_reading = True ; round robin reader (1 reader feed section_threads processing threads) [rrr] format = csv -use_template = parallel_template +use_template = ptmpl section_threads = 2 split_file_reading = False rrqueue_size = -1 diff --git a/pgloader.py b/pgloader.py index 939a7df..8349e2a 100644 --- a/pgloader.py +++ b/pgloader.py @@ -465,6 +465,7 @@ def load_data(): threads = {} current = 0 interrupted = False + got_errors = False max_running = MAX_PARALLEL_SECTIONS if max_running == -1: @@ -492,6 +493,8 @@ def load_data(): summary.pop(s) except PGLoader_Error, e: + got_errors = True + if e == '': log.error('[%s] Please correct previous errors' % s) else: @@ -503,6 +506,7 @@ def load_data(): pass except UnicodeDecodeError, e: + got_errors = True log.error("can't open '%s' with given input encoding '%s'" \ % (filename, input_encoding)) @@ -530,7 +534,7 @@ def load_data(): td = time.time() - begin retcode = 0 - if SUMMARY and not interrupted: + if SUMMARY and not interrupted and not got_errors: try: retcode = print_summary(None, sections, summary, td) print diff --git a/pgloader/db.py b/pgloader/db.py index ff214f1..ff272eb 100644 --- a/pgloader/db.py +++ b/pgloader/db.py @@ -514,7 +514,13 @@ class db: ('\r', '\\r'), ('\t', '\\t'), ('\v', '\\v')]: - c = c.replace(orig, escaped) + try: + c = c.replace(orig, escaped) + except TypeError, e: + self.log.error("db.prepare_copy_data columns %s" % str(columns)) + self.log.error("db.prepare_copy_data input_line %s" % str(input_line)) + self.log.error("TypeError: '%s'.replace(%s, %s)" % (c, orig, escaped)) + raise PGLoader_Error, e self.buffer.write(c) diff --git a/pgloader/pgloader.py b/pgloader/pgloader.py index 12f8c43..d513458 100644 --- a/pgloader/pgloader.py +++ b/pgloader/pgloader.py @@ -33,34 +33,39 @@ class PGLoader(threading.Thread): def __init__(self, name, config, sem, stats, logname = None, - reject = None, queue = None, lock = None): + reject = None, queue = None, lock = None, copy_sep = None): """ Init with a configuration section """ - threading.Thread.__init__(self, name = name) + + # logname is given when we use several Threads for reading + # input file + self.logname = logname + if self.logname is None: + self.logname = name + self.log = getLogger(self.logname) - # sem, stats and queue (if not None) are global objects: - # sem is shared by all threads at the same level - # stats is a private entry of a shared dict - # queue is given when reading in round robin mode - # lock is a threading.Lock for reading sync - # reject is a common reject object, protected by a parent-thread rlock + threading.Thread.__init__(self, name = self.logname) + + # sem, stats and queue (if not None) are global objects + # sem is shared by all threads at the same level + # stats is a private entry of a shared dict + # queue is given when reading in round robin mode + # lock is a threading.Lock for reading sync + # reject is a common reject object, protected by a parent-thread rlock # - self.sem = sem - self.stats = stats - self.queue = queue - self.lock = lock - self.reject = reject + self.sem = sem + self.stats = stats + self.queue = queue + self.lock = lock + self.reject = reject # thereafter parameters are local self.name = name self.config = config - # logname is given when we use several Threads for reading - # input file - if logname is None: - logname = name - self.log = getLogger(logname) - - self._dbconnect(config) + # in the round-robin reader case, workers won't have + # initialized a reader, thus won't read the configuration of + # copy_sep. + self.copy_sep = copy_sep self.template = None self.use_template = None @@ -90,44 +95,54 @@ class PGLoader(threading.Thread): # check if the section wants to use a template if config.has_option(name, 'use_template'): self.tsection = config.get(name, 'use_template') - self.template = config.get(name, 'use_template') - if not config.has_section(self.template): - m = '%s refers to unknown template section %s' \ - % (name, self.template) - - raise PGLoader_Error, m + self._dbconnect(config) - # first load template configuration - self.log.info("Reading configuration from template " +\ - "section [%s]", self.template) + if self.tsection is not None: + self.template = config.get(name, 'use_template') - self.real_log = self.log - self.log = getLogger(self.template) + if not config.has_section(self.template): + m = '%s refers to unknown template section %s' \ + % (name, self.template) - try: - self._read_conf(self.template, config, db, - want_template = True) - except PGLoader_Error, e: - self.log.error(e) - m = "%s.use_template does not refer to a template section"\ - % name - raise PGLoader_Error, m + raise PGLoader_Error, m - # reinit self.template now its relative config section is read - self.template = None - self.log = self.real_log + # first load template configuration + self.log.info("Reading configuration from template " +\ + "section [%s]", self.template) + self.real_log = self.log + self.log = getLogger(self.template) + + try: + self._read_conf(self.template, config, db, + want_template = True) + except PGLoader_Error, e: + self.log.error(e) + m = "%s.use_template does not refer to a template section"\ + % name + raise PGLoader_Error, m + + # reinit self.template now its relative config section is read + self.template = None + self.log = self.real_log + + if not self.template: # now load specific configuration - self.log.info("Reading configuration from section [%s]", name) - + self.log.info("Reading configuration from section [%s]", name) self._read_conf(name, config, db) # Now reset database connection if not DRY_RUN: - self.db.log = self.log + self.db.log = self.log self.db.reset() + # check we have properly configured the copy separator + if self.db.copy_sep is None: + self.log.debug("%s" % self.db) + msg = "BUG: pgloader couldn't configure its COPY separator" + raise PGLoader_Error, msg + self.log.debug('%s init done' % name) def _dbconnect(self, config): @@ -166,8 +181,22 @@ class PGLoader(threading.Thread): self.db.commit_every = config.getint( section, 'commit_every') + if self.copy_sep is not None: + self.db.copy_sep = self.copy_sep + self.log.debug("got copy_sep '%s' from self.copy_sep" \ + % self.copy_sep) + if config.has_option(section, 'copy_delimiter'): self.db.copy_sep = config.get(section, 'copy_delimiter') + self.log.debug("got copy_sep '%s' from %s" \ + % (self.db.copy_sep, section)) + + elif config.has_option(self.tsection, 'copy_delimiter'): + self.db.copy_sep = config.get(section, 'copy_delimiter') + self.log.debug("got copy_sep '%s' from self.copy_sep" \ + % (self.db.copy_sep, self.tsection)) + + self.log.debug("_dbconnect copy_sep %s " % self.db.copy_sep) except Exception, error: log.error("Could not initialize PostgreSQL connection") @@ -508,11 +537,14 @@ class PGLoader(threading.Thread): if config.has_option(name, 'format'): self.format = config.get(name, 'format') - if not self.template: + if not self.template and self.queue is None: # Only init self.reader in real section, not from # template. self.reader.readconfig() will care about # reading its configuration from template and current # section. + # + # If self.queue is not None, we'll read data from it and + # not from self.reader, so we don't care about it. if 'format' not in self.__dict__: raise PGLoader_Error, "Please configure %s.format" % name @@ -535,7 +567,7 @@ class PGLoader(threading.Thread): self.table, self.columns, self.newline_escapes) - self.log.debug('reader.readconfig()') + self.log.info('reader.readconfig()') self.reader.readconfig(config, name, self.tsection) @@ -740,18 +772,22 @@ class PGLoader(threading.Thread): self.sem.acquire() # Announce the beginning of the work - self.log.info("%s launched" % self.name) + self.log.info("%s launched" % self.logname) if self.section_threads == 1: - if self.reader.start is not None: + if 'reader' in self.__dict__ and self.reader.start is not None: self.log.info("Loading from offset %d to %d" \ % (self.reader.start, self.reader.end)) - self.prepare_processing() - self.process() - self.finish_processing() + try: + # catch worker exception + self.prepare_processing() + self.process() + self.finish_processing() + except Exception, e: + self.log.error(e) - self.log.info("Releasing %s" % self.name) + self.log.info("Releasing %s" % self.logname) self.sem.release() return @@ -771,8 +807,22 @@ class PGLoader(threading.Thread): # here we need a special thread reading the file self.round_robin_read() + self.log.info("releasing %s" % self.logname) self.sem.release() - self.log.info("%s released" % self.name) + return + + def wait_for_workers(self, sem, workers): + """ + Try to acquire all semaphore entries --- success means no + more thread is running + """ + from tools import running_threads + + for i in range(len(workers)): + sem.acquire() + self.log.debug("Acquired %d times, " % (i+1) + \ + "still waiting for %d threads to terminate" \ + % running_threads(workers)) return def split_file_read(self): @@ -843,18 +893,10 @@ class PGLoader(threading.Thread): # be started time.sleep(2) - from tools import running_threads n = running_threads(threads) log.info("Waiting for %d threads to terminate" % n) - # Try to acquire all semaphore entries --- success means no - # more thread is running - for i in range(self.section_threads): - sem.acquire() - self.log.info("Acquired %d times, " % (i+1) + \ - "still waiting for %d threads to terminate" \ - % running_threads(threads)) - + self.wait_for_workers(sem, threads) self.finish_processing() self.duration = time.time() - self.init_time self.log.info('No more threads are running, %s done' % self.name) @@ -897,10 +939,11 @@ class PGLoader(threading.Thread): loader = PGLoader(self.name, self.config, sem, summary[current], - logname = current_name, - reject = self.reject, - queue = queues[current], - lock = locks [current]) + logname = current_name, + reject = self.reject, + queue = queues[current], + lock = locks [current], + copy_sep = self.db.copy_sep) loader.section_threads = 1 loader.dont_prepare_nor_finish = True @@ -910,7 +953,13 @@ class PGLoader(threading.Thread): threads[current_name].start() except Exception, e: - raise + self.log.error("Couldn't start all workers thread") + self.log.error(e) + + if len(threads) != self.section_threads: + self.log.error("Couldn't start all threads, check previous errors") + self.wait_for_workers(sem, threads) + return # wait for loaders completion, first let them some time to # be started @@ -975,14 +1024,7 @@ class PGLoader(threading.Thread): n = running_threads(threads) self.log.info("Waiting for %d threads to terminate" % n) - # Try to acquire all semaphore entries --- success means no - # more thread is running - for i in range(self.section_threads): - sem.acquire() - self.log.debug("Acquired %d times, " % (i+1) + \ - "still waiting for %d threads to terminate" \ - % running_threads(threads)) - + self.wait_for_workers(sem, threads) self.finish_processing() self.duration = time.time() - self.init_time self.log.info('%s done' % self.name) diff --git a/pgloader/reader.py b/pgloader/reader.py index 6ccb962..4198cf8 100644 --- a/pgloader/reader.py +++ b/pgloader/reader.py @@ -65,6 +65,7 @@ class DataReader: self.db.empty_string = parse_config_string(self.empty_string) self._getopt('field_sep', config, name, template, FIELD_SEP) + if not DRY_RUN: if self.db.copy_sep is None: self.db.copy_sep = self.field_sep @@ -73,6 +74,7 @@ class DataReader: self.log.debug("reader.readconfig null: '%s'" % self.db.null) self.log.debug("reader.readconfig empty_string: '%s'", self.db.empty_string) + self.log.debug("reader.db %s copy_sep %s" % (self.db, self.db.copy_sep)) self.log.debug("reader.readconfig field_sep: '%s'", self.field_sep) @@ -115,6 +117,7 @@ class DataReader: except IOError, error: raise PGLoader_Error, error + self.log.info("Opened '%s' in %s" % (self.filename, self.fd)) return self.fd def readlines(self):