diff --git a/examples/parallel.conf b/examples/parallel.conf index 6f16c08..3da983c 100644 --- a/examples/parallel.conf +++ b/examples/parallel.conf @@ -12,7 +12,7 @@ lc_messages = C ;client_encoding = 'utf-8' client_encoding = 'latin1' -copy_every = 5000 +copy_every = 50 commit_every = 5 #copy_delimiter = % @@ -24,19 +24,20 @@ max_parallel_sections = 1 [parallel_template] template = True table = parallel -format = csv filename = parallel/parallel.data field_sep = ; columns = a, b [split] -use_template = parallel_template -section_threads = 2 -split_file_reading = True +format = text +use_template = parallel_template +section_threads = 2 +split_file_reading = True ; round robin reader (1 reader feed section_threads processing threads) [rrr] -use_template = parallel_template -section_threads = 2 -split_file_reading = False -rrqueue_size = -1 +format = csv +use_template = parallel_template +section_threads = 2 +split_file_reading = False +rrqueue_size = -1 diff --git a/pgloader/pgloader.py b/pgloader/pgloader.py index 6300f9a..67553f4 100644 --- a/pgloader/pgloader.py +++ b/pgloader/pgloader.py @@ -60,7 +60,7 @@ class PGLoader(threading.Thread): logname = name self.log = getLogger(logname) - self.__dbconnect__(config) + self._dbconnect(config) self.template = None self.use_template = None @@ -104,8 +104,8 @@ class PGLoader(threading.Thread): self.log = getLogger(self.template) try: - self.__read_conf__(self.template, config, db, - want_template = True) + self._read_conf(self.template, config, db, + want_template = True) except PGLoader_Error, e: self.log.error(e) m = "%s.use_template does not refer to a template section"\ @@ -119,7 +119,7 @@ class PGLoader(threading.Thread): # now load specific configuration self.log.info("Reading configuration from section [%s]", name) - self.__read_conf__(name, config, db) + self._read_conf(name, config, db) # force reinit of self.reader, which depends on template and # specific options @@ -135,7 +135,7 @@ class PGLoader(threading.Thread): self.log.debug('%s init done' % name) - def __dbconnect__(self, config): + def _dbconnect(self, config): """ connects to database """ section = 'pgsql' @@ -177,7 +177,7 @@ class PGLoader(threading.Thread): log.error("Could not initialize PostgreSQL connection") raise PGLoader_Error, error - def __read_conf__(self, name, config, db, want_template = False): + def _read_conf(self, name, config, db, want_template = False): """ init self from config section name """ # we'll need both of them from the globals @@ -503,18 +503,6 @@ class PGLoader(threading.Thread): raise PGLoader_Error, \ "Conflict: can't use both '%s' and '--from-id'" % opt - if not self.template \ - and self.format.lower() == 'text' \ - and self.field_count is not None: - - # this option is not compatible with text mode when - # field_count is used (meaning end of line could be found - # in the data) - - raise PGLoader_Error, \ - "Can't use split_file_reading with text " +\ - "format when 'field_count' is used" - ## # Reader's init if config.has_option(name, 'format'): @@ -533,6 +521,25 @@ class PGLoader(threading.Thread): self.table, self.columns, self.newline_escapes) + if not self.template \ + and self.format.lower() == 'text' \ + and ('field_count' in self.reader.__dict__ \ + and self.reader.field_count) \ + and ('trailing_sep' in self.reader.__dict__ \ + and self.reader.trailing_sep): + + # this option is not compatible with text mode when + # field_count is used (meaning end of line could be found + # in the data) + + raise PGLoader_Error, \ + "Can't use split_file_reading with text " +\ + "format when 'field_count' is used" + + if not self.template: + self.log.info("File '%s' will be read in %s format" \ + % (self.filename, self.format)) + if 'reader' in self.__dict__: self.log.debug('reader.readconfig()') self.reader.readconfig(name, config) @@ -650,7 +657,7 @@ class PGLoader(threading.Thread): """ parse the user string str for fields definition to store into self.attr """ - def __getarg(arg, argtype): + def _getarg(arg, argtype): """ return arg depending on its type """ if argtype == 'int': # arg is the target column index @@ -685,13 +692,13 @@ class PGLoader(threading.Thread): if not btype: # normal column definition, for COPY usage colname, arg = properties - f.append((colname, __getarg(arg, argtype))) + f.append((colname, _getarg(arg, argtype))) else: # blob column definition, with blob type, for # UPDATE usage colname, arg, btype = properties - f.append((colname, __getarg(arg, argtype), btype)) + f.append((colname, _getarg(arg, argtype), btype)) # update serial if argtype == 'int': @@ -954,12 +961,12 @@ class PGLoader(threading.Thread): k = threads.keys() for c in range(self.section_threads): - self.log.info("locks[%d].acquire to set %s.done = True" % (c, k[c])) + self.log.debug("locks[%d].acquire to set %s.done = True" % (c, k[c])) locks[c].acquire() threads[k[c]].done = True - self.log.info("locks[%d].release (done set)" % c) + self.log.debug("locks[%d].release (done set)" % c) locks[c].release() from tools import running_threads @@ -1050,7 +1057,6 @@ class PGLoader(threading.Thread): """ depending on configuration, do given job """ if self.columns is not None: - self.log.info("COPY csv data") self.data_import() elif self.blob_cols is not None: diff --git a/pgloader/reader.py b/pgloader/reader.py index 270bb24..b198b53 100644 --- a/pgloader/reader.py +++ b/pgloader/reader.py @@ -95,3 +95,5 @@ class DataReader: self.start = start self.end = end + self.log.info("reader start=%d, end=%d" % (self.start, self.end)) +