diff --git a/debian/changelog b/debian/changelog index 2de05e1..9c8349b 100644 --- a/debian/changelog +++ b/debian/changelog @@ -8,6 +8,7 @@ pgloader (2.3.3~dev2-1) unstable; urgency=low * Have --debug show a traceback * Fix a bug where pgloader would freeze on early error (no such file) * Implement an option to set csv field size limit + * Implement --load-from-stdin -- Dimitri Fontaine Sun, 4 Apr 2010 19:34:39 +0200 diff --git a/examples/pgloader.conf b/examples/pgloader.conf index fe07651..d1f5b82 100644 --- a/examples/pgloader.conf +++ b/examples/pgloader.conf @@ -120,6 +120,11 @@ columns = * fixed_specs = a:0:10, b:10:8, c:18:8, d:26:17 reformat = c:pgtime:time +[stdin] +table = stdin +format = csv +columns = * + [csv] table = csv format = csv diff --git a/examples/stdin/stdin.data b/examples/stdin/stdin.data new file mode 100644 index 0000000..be9251e --- /dev/null +++ b/examples/stdin/stdin.data @@ -0,0 +1,8 @@ +1|first entry +2|second one +3|another +4|still running +5|well, some more +6|antepenultima +7|next to last +8|hey, it's the last! \ No newline at end of file diff --git a/examples/stdin/stdin.sql b/examples/stdin/stdin.sql new file mode 100644 index 0000000..e46e5b3 --- /dev/null +++ b/examples/stdin/stdin.sql @@ -0,0 +1,4 @@ +CREATE TABLE stdin ( + a integer, + b text +); diff --git a/pgloader.1.txt b/pgloader.1.txt index bcca478..8794e71 100644 --- a/pgloader.1.txt +++ b/pgloader.1.txt @@ -216,6 +216,24 @@ Example: -o standard_conforming_strings=on -o client_encoding=utf8 Force +pgloader+ to use given version of psycopg, either +1+ or +2+. +== INTERNAL USAGE OPTIONS == + +Those have been developped for internal +pgloader+ usage only, but still +need to be documented. Also, they are maintained and you could find an usage +for them. + +--load-from-stdin:: + + Consider standard input as the data file. When using this function, + either give a section name from which to apply all the setup except for + the +filename+ to load from, or use +--load-to-table+. + +--load-to-table:: + + This option's argument must be the name of the PostgreSQL table you're + loading the data to, it's useful when you want to load from +stdin+ and + avoid editing a full configuration section. + == GLOBAL CONFIGURATION SECTION == The configuration file has a +.ini+ file syntax, its first section has diff --git a/pgloader.py b/pgloader.py index 85d2bcc..a424412 100755 --- a/pgloader.py +++ b/pgloader.py @@ -143,6 +143,19 @@ def parse_options(): default = None, help = "Force usage of given version of psycopg") + parser.add_option("--load-from-stdin", action = "store_true", + dest = "stdin", + default = False, + help = "Load standard input data into given table name") + + parser.add_option("--load-to-table", dest = "table", + default = None, + help = "Load to given table when --load-from-stdin") + + parser.add_option("--boundaries", dest = "boundaries", + default = None, + help = "Load only in given boundaries, Start..End") + (opts, args) = parser.parse_args() if opts.version: @@ -214,6 +227,9 @@ def parse_options(): pgloader.options.SECTION_THREADS = opts.section_threads pgloader.options.MAX_PARALLEL_SECTIONS = opts.parallel + pgloader.options.LOAD_FROM_STDIN = opts.stdin + pgloader.options.LOAD_TO_TABLE = opts.table + if pgloader.options.MAX_PARALLEL_SECTIONS is None: from pgloader.options import DEFAULT_MAX_PARALLEL_SECTIONS pgloader.options.MAX_PARALLEL_SECTIONS = DEFAULT_MAX_PARALLEL_SECTIONS @@ -279,6 +295,15 @@ def parse_options(): else: pgloader.options.PSYCOPG_VERSION = opts.psycopg_version + if opts.boundaries: + try: + start, end = [int(x) for x in opts.boundaries.split("..")] + pgloader.options.FILE_BOUNDARIES = (start, end) + except ValueError, e: + print >>sys.stderr, \ + "Error: boundaries should be an integer range written X..Y" + sys.exit(1) + return opts.config, args def parse_config(conffile): @@ -548,14 +573,43 @@ def load_data(): from pgloader.options import VERBOSE, DEBUG, QUIET, SUMMARY from pgloader.options import DRY_RUN, PEDANTIC, VACUUM from pgloader.options import MAX_PARALLEL_SECTIONS + from pgloader.options import LOAD_FROM_STDIN, LOAD_TO_TABLE + from pgloader.options import FILE_BOUNDARIES from pgloader.pgloader import PGLoader from pgloader.tools import PGLoader_Error sections = [] summary = {} - # args are meant to be configuration sections, or filenames - if len(args) > 0: + # args are meant to be configuration sections, or filenames, or stdin + if LOAD_FROM_STDIN: + if FILE_BOUNDARIES is not None: + log.warning("Can't use --boundaries on stdin") + + if len(args) == 0: + s = '' + config.add_section(s) + config.set(s, 'table', LOAD_TO_TABLE) + config.set(s, 'filename', 'sys.stdin') + config.set(s, 'columns', '*') + config.set(s, 'format', 'csv') + sections.append(s) + + elif len(args) == 1: + if config.has_section(args[0]): + # apply given section parameters, then load from stdin + config.set(args[0], 'filename', 'sys.stdin') + sections.append(args[0]) + else: + print >>sys.stderr, \ + "Error: Please provide a [%s] section" % args[0] + sys.exit(5) + else: + print >>sys.stderr, \ + "Error: can't read several sections all from stdin" + sys.exit(5) + + elif len(args) > 0: for s in args: if config.has_section(s): sections.append(s) @@ -577,14 +631,21 @@ def load_data(): sections.append(s) else: - log.debug("No argument on CLI, will consider all sections") - for s in config.sections(): - if s != 'pgsql': - sections.append(s) + if not LOAD_FROM_STDIN: + # don't load all sections first when asked to load stdin + log.debug("No argument on CLI, will consider all sections") + for s in config.sections(): + if s != 'pgsql': + sections.append(s) - # we run through sorted section list, unless we got the section list - # from command line - sections.sort() + # we run through sorted section list, unless we got the section list + # from command line + sections.sort() + + if FILE_BOUNDARIES is not None and len(sections) > 1: + print >>sys.stderr, \ + "Error: will not apply boundaries on more than one file" + sys.exit(5) log.info('Will consider following sections:') for line in myprint(sections): @@ -634,6 +695,8 @@ def load_data(): if loader: if not loader.template: + if FILE_BOUNDARIES is not None and len(sections) == 1: + loader.reader.set_boundaries(FILE_BOUNDARIES) filename = loader.filename input_encoding = loader.input_encoding threads[s] = loader diff --git a/pgloader/options.py b/pgloader/options.py index 3373cfc..be46767 100644 --- a/pgloader/options.py +++ b/pgloader/options.py @@ -53,3 +53,6 @@ LOG_FILE = None REJECT_LOG_FILE = '%s.rej.log' REJECT_DATA_FILE = '%s.rej' + +LOAD_FROM_STDIN = None +LOAD_TO_TABLE = None diff --git a/pgloader/reader.py b/pgloader/reader.py index bcef947..6b7cd30 100644 --- a/pgloader/reader.py +++ b/pgloader/reader.py @@ -2,6 +2,7 @@ # # pgloader data reader interface and defaults +import sys from tools import PGLoader_Error, Reject, parse_config_string from db import db from lo import ifx_clob, ifx_blob @@ -192,9 +193,13 @@ class UnbufferedFileReader: if self.encoding is not None: try: import codecs - self.fd = codecs.open(self.filename, - encoding = self.encoding, - buffering = self.bufsize) + if self.filename == 'sys.stdin': + f = sys.stdin + else: + f = open(self.filename, self.mode, self.bufsize) + + self.log.warning('PHOQUE "%s"', f) + self.fd = codecs.getreader(self.encoding)(f) self.log.info("Opened '%s' with encoding '%s'" \ % (self.filename, self.encoding)) except LookupError, e: @@ -206,7 +211,10 @@ class UnbufferedFileReader: else: try: - self.fd = open(self.filename, mode, self.bufsize) + if self.filename == 'sys.stdin': + self.fd = sys.stdin + else: + self.fd = open(self.filename, mode, self.bufsize) except IOError, error: raise PGLoader_Error, error @@ -240,7 +248,12 @@ class UnbufferedFileReader: while line != '': line = self.fd.readline() self.line_nb += 1 - self.position = self.fd.tell() + try: + self.position = self.fd.tell() + except IOError, error: + #IOError: [Errno 29] Illegal seek --- when stdin reaches EOF + self.log.info(error) + return ## # if -F is used, count lines to skip, and skip them