diff --git a/debian/changelog b/debian/changelog index f135322..287861b 100644 --- a/debian/changelog +++ b/debian/changelog @@ -1,3 +1,9 @@ +pgloader (2.3.0~dev-2) unstable; urgency=low + + * columns = * is now supported + + -- Dimitri Fontaine Mon, 25 Feb 2008 14:58:46 +0100 + pgloader (2.3.0~dev-1) unstable; urgency=low * Mutli-threaded pgloader (see options max_parallel_sections, diff --git a/debian/control b/debian/control index b25175c..4b75c17 100644 --- a/debian/control +++ b/debian/control @@ -13,5 +13,6 @@ Description: loads flat data files into PostgreSQL pgloader imports data from flat files and insert it into a database table. You have to provide a configuration file containing sections, which each defines how to load a table. CSV and text format are - supported, the latter being able to parse multi-line non-quoted with - trailing delimiter quite CSV input, such as given by Informix UNLOAD. + supported, the former is able to import MySQL dumps when -T is used, + the latter is able to parse multi-line non-quoted with trailing + delimiter quite CSV input, such as given by Informix UNLOAD. diff --git a/examples/allcols/allcols.data b/examples/allcols/allcols.data new file mode 100644 index 0000000..bdc72ab --- /dev/null +++ b/examples/allcols/allcols.data @@ -0,0 +1,8 @@ +1:2008-02-18:first entry +2:2008-02-19:second one +3:2008-02-20:another +4:2008-02-21:still running +5:2008-02-22:well, some more +6:2008-02-23:antepenultima +7:2008-02-24:next to last +8:2008-02-25:hey, it's today! diff --git a/examples/allcols/allcols.sql b/examples/allcols/allcols.sql new file mode 100644 index 0000000..bfdc3f7 --- /dev/null +++ b/examples/allcols/allcols.sql @@ -0,0 +1,5 @@ +CREATE TABLE allcols ( + a integer primary key, + b date, + c text +); diff --git a/examples/pgloader.conf b/examples/pgloader.conf index 99372d8..4a5a828 100644 --- a/examples/pgloader.conf +++ b/examples/pgloader.conf @@ -12,7 +12,7 @@ lc_messages = C ;client_encoding = 'utf-8' client_encoding = 'latin1' -copy_every = 5000 +copy_every = 5 commit_every = 5 #copy_delimiter = % @@ -38,6 +38,13 @@ columns = a:1, b:3, c:2 reject_log = /tmp/simple.rej.log reject_data = /tmp/simple.rej +[allcols] +table = allcols +format = csv +filename = allcols/allcols.data +field_sep = : +columns = * + [errors] table = errors format = text @@ -77,7 +84,7 @@ table = partial format = text filename = partial/partial.data field_sep = % -columns = a:1, b:2, c:3, d:4, e:5 +columns = * only_cols = 1-3, 5 [udc] diff --git a/pgloader/db.py b/pgloader/db.py index d6c0ef9..0ab1b58 100644 --- a/pgloader/db.py +++ b/pgloader/db.py @@ -45,6 +45,10 @@ class db: self.empty_string = EMPTY_STRING self.lc_messages = None + # this allows to specify configuration has columns = * + # when true, we don't include column list in COPY statements + self.all_cols = None + if connect: self.reset() @@ -107,6 +111,50 @@ class db: raise PGLoader_Error, e cursor.close() + def get_all_columns(self, tablename): + """ select the columns name list from catalog """ + + if tablename.find('.') == -1: + schemaname = 'public' + else: + try: + schemaname, tablename = tablename.split('.') + except ValueError, e: + self.log.warning("db.get_all_columns: " + \ + "%s has more than one '.' separator" \ + % tablename) + raise PGLoader_Error, e + + sql = """ + SELECT attname, attnum + FROM pg_attribute + WHERE attrelid = (SELECT oid + FROM pg_class + WHERE relname = %s + AND relnamespace = (SELECT oid + FROM pg_namespace + WHERE nspname = %s) + ) + AND attnum > 0 AND NOT attisdropped +ORDER BY attnum +""" + self.log.debug("get_all_columns: %s %s %s" % (tablename, schemaname, sql)) + + columns = [] + cursor = self.dbconn.cursor() + try: + cursor.execute(sql, [tablename, schemaname]) + + for row in cursor.fetchall(): + columns.append(row) + + except psycopg.ProgrammingError, e: + raise PGLoader_Error, e + + cursor.close() + + return columns + def reset(self): """ reset internal counters and open a new database connection """ self.buffer = None @@ -141,7 +189,7 @@ class db: d = time.time() - self.first_commit_time u = self.commited_rows c = self.commits - self.log.info(" %d updates in %d commits took %5.3f seconds", u, c, d) + self.log.info(" %d rows copied in %d commits took %5.3f seconds", u, c, d) if self.errors > 0: self.log.error("%d database errors occured", self.errors) @@ -335,7 +383,12 @@ class db: # build the table colomns specs from parameters # ie. we always issue COPY table (col1, col2, ..., coln) commands tablename = table - table = "%s (%s) " % (table, ", ".join(columnlist)) + if self.all_cols: + table = table + else: + table = "%s (%s) " % (table, ", ".join(columnlist)) + + self.log.debug("COPY will use table definition: '%s'" % table) if EOF or self.running_commands == self.copy_every \ and self.buffer is not None: diff --git a/pgloader/pgloader.py b/pgloader/pgloader.py index fa7060d..b3a1946 100644 --- a/pgloader/pgloader.py +++ b/pgloader/pgloader.py @@ -77,6 +77,7 @@ class PGLoader(threading.Thread): self.tsection = None self.index = None + self.all_cols = None self.columns = None self.blob_cols = None @@ -139,7 +140,6 @@ class PGLoader(threading.Thread): # Now reset database connection if not DRY_RUN: - self.db.log = self.log self.db.reset() if not self.template and not DRY_RUN: @@ -205,6 +205,9 @@ class PGLoader(threading.Thread): self.log.debug("_dbconnect copy_sep %s " % self.db.copy_sep) + # we want self.db messages to get printed as from our section + self.db.log = self.log + except Exception, error: log.error("Could not initialize PostgreSQL connection") raise PGLoader_Error, error @@ -295,7 +298,18 @@ class PGLoader(threading.Thread): self._parse_fields('index', config.get(name, 'index')) if config.has_option(name, 'columns'): - self._parse_fields('columns', config.get(name, 'columns')) + conf_cols = config.get(name, 'columns') + if conf_cols == '*': + self.all_cols = True + self.db.all_cols = True + + # force db to connect now + self.db.reset() + self.columns = self.db.get_all_columns(self.table) + self.log.info("columns = *, got %s", str(self.columns)) + + else: + self._parse_fields('columns', config.get(name, 'columns')) if config.has_option(name, 'blob_columns'): self._parse_fields('blob_cols', @@ -478,6 +492,9 @@ class PGLoader(threading.Thread): self.log.debug("only_cols %s", str(self.only_cols)) self.log.debug("columnlist %s", str(self.columnlist)) + if not self.template and self.only_cols is not None: + self.db.all_cols = False + ## # This option is textreader specific, but being lazy and # short-timed, I don't make self._parse_fields() callable from