diff --git a/examples/parallel.conf b/examples/parallel.conf index 40e6b82..b4f5fca 100644 --- a/examples/parallel.conf +++ b/examples/parallel.conf @@ -29,7 +29,7 @@ field_sep = ; columns = a, b [split] -format = csv +format = text use_template = ptmpl section_threads = 2 split_file_reading = True diff --git a/examples/pgloader.conf b/examples/pgloader.conf index c2bfaee..99372d8 100644 --- a/examples/pgloader.conf +++ b/examples/pgloader.conf @@ -85,7 +85,7 @@ table = udc format = text filename = udc/udc.data input_encoding = 'latin1' -field_sep = § +field_sep = % columns = b:2, d:1, x:3, y:4 udc_c = constant value copy_columns = b, c, d diff --git a/examples/udc/udc.data b/examples/udc/udc.data index 37d377e..666f5a1 100644 --- a/examples/udc/udc.data +++ b/examples/udc/udc.data @@ -1,5 +1,5 @@ -1§5§foo§bar -2§10§bar§toto -3§4§toto§titi -4§18§titi§baz -5§2§baz§foo +1%5%foo%bar +2%10%bar%toto +3%4%toto%titi +4%18%titi%baz +5%2%baz%foo diff --git a/pgloader/csvreader.py b/pgloader/csvreader.py index 66fb512..898b1e6 100644 --- a/pgloader/csvreader.py +++ b/pgloader/csvreader.py @@ -5,7 +5,7 @@ # handles configuration, parse data, then pass them to database module for # COPY preparation -import os, sys, os.path, time, codecs, csv +import os, sys, os.path, time, csv from cStringIO import StringIO from tools import PGLoader_Error, Reject, parse_config_string @@ -69,17 +69,22 @@ class CSVReader(DataReader): self._open() if self.start is not None and self.start > 0: - self.log.info("CSV Reader starting at offset %d" % self.start) + self.log.debug("CSV Reader starting at offset %d" % self.start) self.fd.seek(self.start) + self.log.info("csvreader at position %d" % self.fd.tell()) + # now read the lines for columns in csv.reader(self.fd, dialect = 'pgloader'): + + self.log.info("csvreader at position %d" % self.fd.tell()) + line = self.field_sep.join(columns) yield line, columns if self.end is not None and self.fd.tell() >= self.end: - self.log.info("CSV Reader stoping, offset %d >= %d" \ - % (self.fd.tell(), self.end)) + self.log.debug("CSV Reader stoping, offset %d >= %d" \ + % (self.fd.tell(), self.end)) self.fd.close() return diff --git a/pgloader/pgloader.py b/pgloader/pgloader.py index d513458..fe0b344 100644 --- a/pgloader/pgloader.py +++ b/pgloader/pgloader.py @@ -137,9 +137,11 @@ class PGLoader(threading.Thread): self.db.log = self.log self.db.reset() + if not self.template and not DRY_RUN: # check we have properly configured the copy separator if self.db.copy_sep is None: self.log.debug("%s" % self.db) + self.log.error("COPY sep is %s" % self.db.copy_sep) msg = "BUG: pgloader couldn't configure its COPY separator" raise PGLoader_Error, msg @@ -567,7 +569,7 @@ class PGLoader(threading.Thread): self.table, self.columns, self.newline_escapes) - self.log.info('reader.readconfig()') + self.log.debug('reader.readconfig()') self.reader.readconfig(config, name, self.tsection) @@ -776,8 +778,8 @@ class PGLoader(threading.Thread): if self.section_threads == 1: if 'reader' in self.__dict__ and self.reader.start is not None: - self.log.info("Loading from offset %d to %d" \ - % (self.reader.start, self.reader.end)) + self.log.debug("Loading from offset %d to %d" \ + % (self.reader.start, self.reader.end)) try: # catch worker exception @@ -817,8 +819,12 @@ class PGLoader(threading.Thread): more thread is running """ from tools import running_threads + + n = running_threads(workers) + self.log.info("Waiting for %d/%d threads to terminate" \ + % (n, len(workers))) - for i in range(len(workers)): + for i in range(n): sem.acquire() self.log.debug("Acquired %d times, " % (i+1) + \ "still waiting for %d threads to terminate" \ @@ -893,9 +899,6 @@ class PGLoader(threading.Thread): # be started time.sleep(2) - n = running_threads(threads) - log.info("Waiting for %d threads to terminate" % n) - self.wait_for_workers(sem, threads) self.finish_processing() self.duration = time.time() - self.init_time @@ -1020,10 +1023,6 @@ class PGLoader(threading.Thread): self.log.debug("locks[%d].release (done set)" % c) locks[c].release() - from tools import running_threads - n = running_threads(threads) - self.log.info("Waiting for %d threads to terminate" % n) - self.wait_for_workers(sem, threads) self.finish_processing() self.duration = time.time() - self.init_time diff --git a/pgloader/reader.py b/pgloader/reader.py index 4198cf8..3dde6c6 100644 --- a/pgloader/reader.py +++ b/pgloader/reader.py @@ -102,9 +102,11 @@ class DataReader: if self.input_encoding is not None: try: + import codecs self.fd = codecs.open(self.filename, encoding = self.input_encoding, buffering = self.bufsize) + self.log.info("Opened '%s' with encoding '%s'" % (self.filename, self.input_encoding)) except LookupError, e: # codec not found raise PGLoader_Error, "Input codec: %s" % e @@ -117,7 +119,9 @@ class DataReader: except IOError, error: raise PGLoader_Error, error - self.log.info("Opened '%s' in %s" % (self.filename, self.fd)) + self.log.debug("Opened '%s' in %s (fileno %s), ftell %d" \ + % (self.filename, self.fd, + self.fd.fileno(), self.fd.tell())) return self.fd def readlines(self): @@ -130,5 +134,5 @@ class DataReader: self.start = start self.end = end - self.log.info("reader start=%d, end=%d" % (self.start, self.end)) + self.log.debug("reader start=%d, end=%d" % (self.start, self.end)) diff --git a/pgloader/textreader.py b/pgloader/textreader.py index fc79631..85842d4 100644 --- a/pgloader/textreader.py +++ b/pgloader/textreader.py @@ -5,7 +5,7 @@ # handles configuration, parse data, then pass them to database module for # COPY preparation -import os, sys, os.path, time, codecs +import os, sys, os.path, time from cStringIO import StringIO from tools import PGLoader_Error, Reject, parse_config_string @@ -69,24 +69,38 @@ class TextReader(DataReader): ## # if neither -I nor -F was used, we can state that begin = 0 if FROM_ID is None and FROM_COUNT == 0: - self.log.info('beginning on first line') + self.log.debug('beginning on first line') begin_linenb = 1 self._open() if self.start is not None and self.start > 0: - self.log.info("Text Reader starting at offset %d" % self.start) + self.log.debug("Text Reader starting at offset %d" % self.start) self.fd.seek(self.start) - for line in self.fd: + self.log.debug("textreader at position %d" % self.fd.tell()) + + #for line in self.fd.readline(): + + line = 'dumb non-empty init value' + last_line_read = False + + while line != '': + line = self.fd.readline() + # we count real physical lines nb_plines += 1 - if self.end is not None and self.fd.tell() >= self.end: - self.log.info("Text Reader stoping, offset %d >= %d" \ - % (self.fd.tell(), self.end)) + if last_line_read: + self.log.debug("Text Reader stoping, offset %d >= %d" \ + % (self.fd.tell(), self.end)) self.fd.close() - break + return + + if self.end is not None and self.fd.tell() >= self.end: + # we want to process current line and stop at next + # iteration + last_line_read = True if self.input_encoding is not None: # this may not be necessary, after all