aboutsummaryrefslogtreecommitdiffstats
path: root/python
diff options
context:
space:
mode:
Diffstat (limited to 'python')
-rw-r--r--python/refcat/base.py337
-rw-r--r--python/refcat/cli.py14
-rw-r--r--python/refcat/tasks.py7
-rw-r--r--python/setup.py4
4 files changed, 344 insertions, 18 deletions
diff --git a/python/refcat/base.py b/python/refcat/base.py
new file mode 100644
index 0000000..240e4a0
--- /dev/null
+++ b/python/refcat/base.py
@@ -0,0 +1,337 @@
+"""
+Default task
+============
+
+A default task, that covers file system layout.
+"""
+
+import datetime
+import hashlib
+import os
+import random
+import re
+import string
+import subprocess
+import tempfile
+
+import luigi
+
+__all__ = [
+ 'BaseTask',
+ 'ClosestDateParameter',
+ 'Gzip',
+ 'TSV',
+ 'Zstd',
+ 'random_string',
+ 'shellout',
+]
+
+logger = logging.getLogger('refcat')
+
+
+class ClosestDateParameter(luigi.DateParameter):
+ """
+ A marker parameter to replace date parameter value with whatever
+ self.closest() returns. Use in conjunction with `gluish.task.BaseTask`.
+ """
+ use_closest_date = True
+
+
+def is_closest_date_parameter(task, param_name):
+ """ Return the parameter class of param_name on task. """
+ for name, obj in task.get_params():
+ if name == param_name:
+ return hasattr(obj, 'use_closest_date')
+ return False
+
+
+def delistify(x):
+ """ A basic slug version of a given parameter list. """
+ if isinstance(x, list):
+ x = [e.replace("'", "") for e in x]
+ return '-'.join(sorted(x))
+ return x
+
+
+class BaseTask(luigi.Task):
+ """
+ A base task with a `path` method. BASE should be set to the root
+ directory of all tasks. TAG is a shard for a group of related tasks.
+ """
+ BASE = tempfile.gettempdir()
+ TAG = 'default'
+
+ def closest(self):
+ """ Return the closest date for a given date.
+ Defaults to the same date. """
+ if not hasattr(self, 'date'):
+ raise AttributeError('Task has no date attribute.')
+ return self.date
+
+ def effective_task_id(self):
+ """ Replace date in task id with closest date. """
+ params = self.param_kwargs
+ if 'date' in params and is_closest_date_parameter(self, 'date'):
+ params['date'] = self.closest()
+ task_id_parts = sorted(['%s=%s' % (k, str(v)) for k, v in params.items()])
+ return '%s(%s)' % (self.task_family, ', '.join(task_id_parts))
+ else:
+ return self.task_id
+
+ def taskdir(self):
+ """ Return the directory under which all artefacts are stored. """
+ return os.path.join(self.BASE, self.TAG, self.task_family)
+
+ def path(self, filename=None, ext='tsv', digest=False, shard=False, encoding='utf-8'):
+ """
+ Return the path for this class with a certain set of parameters.
+ `ext` sets the extension of the file.
+ If `hash` is true, the filename (w/o extenstion) will be hashed.
+ If `shard` is true, the files are placed in shards, based on the first
+ two chars of the filename (hashed).
+ """
+ if self.BASE is NotImplemented:
+ raise RuntimeError('BASE directory must be set.')
+
+ params = dict(self.get_params())
+
+ if filename is None:
+ parts = []
+
+ for name, param in self.get_params():
+ if not param.significant:
+ continue
+ if name == 'date' and is_closest_date_parameter(self, 'date'):
+ parts.append('date-%s' % self.closest())
+ continue
+ if hasattr(param, 'is_list') and param.is_list:
+ es = '-'.join([str(v) for v in getattr(self, name)])
+ parts.append('%s-%s' % (name, es))
+ continue
+
+ val = getattr(self, name)
+
+ if isinstance(val, datetime.datetime):
+ val = val.strftime('%Y-%m-%dT%H%M%S')
+ elif isinstance(val, datetime.date):
+ val = val.strftime('%Y-%m-%d')
+
+ parts.append('%s-%s' % (name, val))
+
+ name = '-'.join(sorted(parts))
+ if len(name) == 0:
+ name = 'output'
+ if digest:
+ name = hashlib.sha1(name.encode(encoding)).hexdigest()
+ if not ext:
+ filename = '{fn}'.format(ext=ext, fn=name)
+ else:
+ filename = '{fn}.{ext}'.format(ext=ext, fn=name)
+ if shard:
+ prefix = hashlib.sha1(filename.encode(encoding)).hexdigest()[:2]
+ return os.path.join(self.BASE, self.TAG, self.task_family, prefix, filename)
+
+ return os.path.join(self.BASE, self.TAG, self.task_family, filename)
+
+
+def shellout(template,
+ preserve_whitespace=False,
+ executable='/bin/bash',
+ ignoremap=None,
+ encoding=None,
+ pipefail=True,
+ **kwargs):
+ """
+
+ Takes a shell command template and executes it. The template must use the
+ new (2.6+) format mini language. `kwargs` must contain any defined
+ placeholder, only `output` is optional and will be autofilled with a
+ temporary file if it used, but not specified explicitly.
+
+ If `pipefail` is `False` no subshell environment will be spawned, where a
+ failed pipe will cause an error as well. If `preserve_whitespace` is `True`,
+ no whitespace normalization is performed. A custom shell executable name can
+ be passed in `executable` and defaults to `/bin/bash`.
+
+ Raises RuntimeError on nonzero exit codes. To ignore certain errors, pass a
+ dictionary in `ignoremap`, with the error code to ignore as key and a string
+ message as value.
+
+ Simple template:
+
+ wc -l < {input} > {output}
+
+ Quoted curly braces:
+
+ ps ax|awk '{{print $1}}' > {output}
+
+ Usage with luigi:
+
+ ...
+ tmp = shellout('wc -l < {input} > {output}', input=self.input().path)
+ luigi.LocalTarget(tmp).move(self.output().path)
+ ....
+
+ """
+ if not 'output' in kwargs:
+ kwargs.update({'output': tempfile.mkstemp(prefix='refcat-')[1]})
+ if ignoremap is None:
+ ignoremap = {}
+ if encoding:
+ command = template.decode(encoding).format(**kwargs)
+ else:
+ command = template.format(**kwargs)
+ if not preserve_whitespace:
+ command = re.sub('[ \t\n]+', ' ', command)
+ if pipefail:
+ command = '(set -o pipefail && %s)' % command
+ logger.debug(command)
+ code = subprocess.call([command], shell=True, executable=executable)
+ if not code == 0:
+ if code in ignoremap:
+ logger.info("Ignoring error via ignoremap: %s" % ignoremap.get(code))
+ else:
+ logger.error('%s: %s' % (command, code))
+ error = RuntimeError('%s exitcode: %s' % (command, code))
+ error.code = code
+ raise error
+ return kwargs.get('output')
+
+
+def random_string(length=16):
+ """
+ Return a random string (upper and lowercase letters) of length `length`,
+ defaults to 16.
+ """
+ return ''.join(random.choice(string.ascii_letters) for _ in range(length))
+
+
+def which(program):
+ """
+ Search for program in PATH.
+ """
+ def is_exe(fpath):
+ return os.path.isfile(fpath) and os.access(fpath, os.X_OK)
+
+ fpath, fname = os.path.split(program)
+ if fpath:
+ if is_exe(program):
+ return program
+ else:
+ for path in os.environ["PATH"].split(os.pathsep):
+ path = path.strip('"')
+ exe_file = os.path.join(path, program)
+ if is_exe(exe_file):
+ return exe_file
+
+ return None
+
+
+def write_tsv(output_stream, *tup, **kwargs):
+ """
+ Write argument list in `tup` out as a tab-separeated row to the stream.
+ """
+ encoding = kwargs.get('encoding') or 'utf-8'
+ value = u'\t'.join([s for s in tup]) + '\n'
+ if encoding is None:
+ if isinstance(value, str):
+ output_stream.write(value.encode('utf-8'))
+ else:
+ output_stream.write(value)
+ else:
+ output_stream.write(value.encode(encoding))
+
+
+def iter_tsv(input_stream, cols=None, encoding='utf-8'):
+ """
+ If a tuple is given in cols, use the elements as names to construct
+ a namedtuple.
+ Columns can be marked as ignored by using ``X`` or ``0`` as column name.
+ Example (ignore the first four columns of a five column TSV):
+ ::
+ def run(self):
+ with self.input().open() as handle:
+ for row in handle.iter_tsv(cols=('X', 'X', 'X', 'X', 'iln')):
+ print(row.iln)
+ """
+ if cols:
+ cols = [c if not c in ('x', 'X', 0, None) else random_string(length=5) for c in cols]
+ Record = collections.namedtuple('Record', cols)
+ for line in input_stream:
+ yield Record._make(line.decode(encoding).rstrip('\n').split('\t'))
+ else:
+ for line in input_stream:
+ yield tuple(line.decode(encoding).rstrip('\n').split('\t'))
+
+
+class TSVFormat(luigi.format.Format):
+ """
+ A basic CSV/TSV format.
+ Discussion: https://groups.google.com/forum/#!topic/luigi-user/F813st16xqw
+ """
+ def hdfs_reader(self, input_pipe):
+ raise NotImplementedError()
+
+ def hdfs_writer(self, output_pipe):
+ raise NotImplementedError()
+
+ def pipe_reader(self, input_pipe):
+ input_pipe.iter_tsv = functools.partial(iter_tsv, input_pipe)
+ return input_pipe
+
+ def pipe_writer(self, output_pipe):
+ output_pipe.write_tsv = functools.partial(write_tsv, output_pipe)
+ return output_pipe
+
+
+class GzipFormat(luigi.format.Format):
+ """
+ A gzip format, that upgrades itself to pigz, if it's installed.
+ """
+ input = 'bytes'
+ output = 'bytes'
+
+ def __init__(self, compression_level=None):
+ self.compression_level = compression_level
+ self.gzip = ["gzip"]
+ self.gunzip = ["gunzip"]
+
+ if which('pigz'):
+ self.gzip = ["pigz"]
+ self.gunzip = ["unpigz"]
+
+ def pipe_reader(self, input_pipe):
+ return luigi.format.InputPipeProcessWrapper(self.gunzip, input_pipe)
+
+ def pipe_writer(self, output_pipe):
+ args = self.gzip
+ if self.compression_level is not None:
+ args.append('-' + str(int(self.compression_level)))
+ return luigi.format.OutputPipeProcessWrapper(args, output_pipe)
+
+
+class ZstdFormat(luigi.format.Format):
+ """
+ The zstandard format.
+ """
+ input = 'bytes'
+ output = 'bytes'
+
+ def __init__(self, compression_level=None):
+ self.compression_level = compression_level
+ self.zstd = ["zstd"]
+ self.unzstd = ["unzstd"]
+
+ def pipe_reader(self, input_pipe):
+ return luigi.format.InputPipeProcessWrapper(self.unzstd, input_pipe)
+
+ def pipe_writer(self, output_pipe):
+ args = self.zstd
+ if self.compression_level is not None:
+ args.append('-' + str(int(self.compression_level)))
+ return luigi.format.OutputPipeProcessWrapper(args, output_pipe)
+
+
+TSV = TSVFormat()
+Gzip = GzipFormat()
+Zstd = ZstdFormat()
diff --git a/python/refcat/cli.py b/python/refcat/cli.py
index f47c63c..534eee6 100644
--- a/python/refcat/cli.py
+++ b/python/refcat/cli.py
@@ -22,7 +22,6 @@ import subprocess
import sys
import tempfile
-import gluish
import luigi
from luigi.cmdline_parser import CmdlineParser
from luigi.parameter import MissingParameterException
@@ -35,17 +34,13 @@ from refcat.settings import LOGGING_CONF_FILE, settings
from refcat.tasks import *
from refcat.utils import columnize
-# XXX: get rid of gluish dep, include them in refcat
+# These are utility classes of luigi.
suppress_task_names = [
"Available",
"BaseTask",
"Config",
"Executable",
"ExternalTask",
- "FillSolrIndex",
- "GitCloneRepository",
- "GitUpdateRepository",
- "MockTask",
"RangeBase",
"RangeByMinutes",
"RangeByMinutesBase",
@@ -237,7 +232,7 @@ def main():
else:
print("not implemented: {}".format(sys.argv[1]))
except KeyError:
- print("sub-command not implemented: {}".format(sys.argv[1]), file=sys.stderr)
+ print("subcommand not implemented: {}".format(sys.argv[1]), file=sys.stderr)
except (AttributeError, ValueError, RuntimeError) as exc:
print(exc, file=sys.stderr)
sys.exit(1)
@@ -252,10 +247,7 @@ def main():
print("TMPDIR {}".format(settings.TMPDIR))
print("SHIV_ROOT {}".format(os.environ.get("SHIV_ROOT") or shiv_root_default))
print()
- names = [
- name for name in sorted(Register.task_names()) if name not in suppress_task_names and not name.islower()
- ]
- print(columnize(names))
+ print(columnize(list(effective_task_names())))
sys.exit(0)
# If we found no subcommand, assume task name.
diff --git a/python/refcat/tasks.py b/python/refcat/tasks.py
index 2d98f5c..c798272 100644
--- a/python/refcat/tasks.py
+++ b/python/refcat/tasks.py
@@ -232,10 +232,8 @@ import sys
import tempfile
import luigi
-from gluish.format import Zstd
-from gluish.task import BaseTask
-from gluish.utils import shellout
+from refcat.base import BaseTask, Zstd, shellout
from refcat.settings import settings
@@ -1516,7 +1514,8 @@ class CDXURL(Refcat):
tmpdir=self.tmpdir,
limit=self.limit,
input=self.input().path,
- cache=self.cache, ignoremap={141: "todo: root cause"})
+ cache=self.cache,
+ ignoremap={141: "todo: root cause"})
luigi.LocalTarget(output).move(self.output().path)
diff --git a/python/setup.py b/python/setup.py
index 4b743d7..901180f 100644
--- a/python/setup.py
+++ b/python/setup.py
@@ -13,7 +13,7 @@ with open("README.md", "r") as fh:
description="Reference data munging tasks and utilities",
long_description=long_description,
long_description_content_type="text/markdown",
- url="https://github.com/internetarchive/cgraph",
+ url="https://gitlab.com/internetarchive/cgraph",
packages=setuptools.find_packages(),
classifiers=[
"Programming Language :: Python :: 3",
@@ -26,8 +26,6 @@ with open("README.md", "r") as fh:
]},
install_requires=[
"dynaconf[ini]",
- "fuzzycat",
- "gluish",
],
extras_require={"dev": [
"ipython",