diff options
-rw-r--r-- | fuzzycat/cluster.py | 218 | ||||
-rw-r--r-- | fuzzycat/main.py | 33 | ||||
-rw-r--r-- | tests/test_cluster.py | 2 |
3 files changed, 92 insertions, 161 deletions
diff --git a/fuzzycat/cluster.py b/fuzzycat/cluster.py index d97ffc0..e72097a 100644 --- a/fuzzycat/cluster.py +++ b/fuzzycat/cluster.py @@ -1,190 +1,92 @@ """ -Clustering part of matching. - -We want to have generic and fast way to derive various clusters. Input is json -lines of release entities, e.g. from a database dump. - -Map and reduce. - -* input (json) blob -> (ident, value) -> group by value -> emit idents per group - -Example output: - - { - "v": [ - "7uvh4z6zsjcptia5ig6swu4fre", - "chlthrumyfg23aqw4r477j3vge", - "yuo4smv4bzefdjsudbbzka3qv4" - ], - "k": "124-5_0137.dcm", - "c": "t" - } +Clustering stage. +""" -Performance data points: +import functools +import fuzzy +import operator +import re -$ time zstdcat -T0 release_export_expanded.json.zst | pv -l | \ - parallel --roundrobin --pipe -j 16 fuzzycat-cluster /bigger/tmp -t title > cluster_title.json +__all__ = [ + "release_key_title", + "release_key_title_normalized", + "release_key_title_nysiis", + "sort_file_by_column", +] -Takes 607 min (around 3800 docs/s). -""" +get_ident_title = operator.itemgetter("ident", "title") +ws_replacer = str.maketrans("\t", " ", "\n", " ") +non_word_re = re.compile('[\W_]+', re.UNICODE) -import argparse -import fileinput -import itertools -import json -import os -import subprocess -import tempfile -import re -import string -import operator +def cut(value, f=0, sep='\t'): + """ + Split value by separator and return a single column. + """ + return value.split(sep)[f] -import orjson as json -import fuzzy +def release_key_title(re): + id, title = get_ident_title(re) + if not title: + raise ValueError('title missing') + title = title.translate(ws_replacer).strip() + return (id, title) -# Move this into .env, with dotenv or dynaconf. -DEFAULT_CACHE_DIR = os.path.join(os.path.expanduser("~"), ".cache", "fuzzycat") +def release_key_title_normalized(re): + id, title = release_key_title(re) + return (id, non_word_re.sub('', title)) +def release_key_title_nysiis(re): + id, title = release_key_title(re) + return (id, fuzzy.nysiis(title)) -def sort_by_column(filename, mode="w", opts="-k 2", fast=True, prefix="fuzzycat-"): +def sort_by_column(filename, opts="-k 2", fast=True, mode="w", prefix="fuzzycat-"): """ Sort tabular file with sort(1), returns the filename of the sorted file. - XXX: use separate /fast/tmp for sort. + TODO: use separate /fast/tmp for sort. """ with tempfile.NamedTemporaryFile(delete=False, mode=mode, prefix=prefix) as tf: env = os.environ.copy() if fast: env["LC_ALL"] = "C" - subprocess.run(["sort"] + opts.split() + [filename], stdout=tf) + subprocess.run(["sort"] + opts.split() + [filename], stdout=tf, env=env) return tf.name -def group_by_column(filename, key=None, value=None, comment=""): - """ - Group a sorted file with given key function. Use another function to - extract the value. - """ +def group_by(filename, key=None, value=None, comment=""): with open(filename) as f: for k, g in itertools.groupby(f, key=key): doc = { + "k": k.strip(), "v": [value(v) for v in g], "c": comment, - "k": k.strip(), } yield doc -# XXX: LineOps - -def cut(f=0, sep='\t'): +class Cluster: """ - Similar to cut(1), but zero indexed. + Cluster scaffold for release entities. """ - return lambda v: v.split(sep)[f] - -def cluster_by_title(args): - """ - Basic example for a three stage process: extract, sort, group. Speed is - about: 20K/s (json roundtrip, sorting, grouping). - """ - files = args.files if len(args.files) > 0 else ('-', ) - fg = operator.itemgetter("ident", "title") - - with tempfile.NamedTemporaryFile(delete=False, mode="w", prefix=args.tmp_prefix) as tf: - for line in fileinput.input(files=files) - try: - doc = json.loads(line) - id, title = fg(doc) - if not title: - continue - title = title.replace("\t", " ").replace("\n", " ").strip() - except KeyError as err: - continue - print("%s\t%s" % (id, title), file=tf) - - sbc = sort_by_column(tf.name, opts="-k 2", prefix=args.tmp_prefix) - for doc in group_by_column(sbc, key=cut(f=1), value=cut(f=0), comment="t"): - print(json.dumps(doc).decode("utf-8")) - - os.remove(sbc) - os.remove(tf.name) - -def cluster_by_title_normalized(args): - """ - Normalize title, e.g. analysisofheritability. 17k/s. - """ - files = args.files if len(args.files) > 0 else ('-', ) - fg = operator.itemgetter("ident", "title") - pattern = re.compile('[\W_]+', re.UNICODE) - - with tempfile.NamedTemporaryFile(delete=False, mode="w", prefix=args.tmp_prefix) as tf: - for line in fileinput.input(files=files): - try: - doc = json.loads(line) - id, title = fg(doc) - if not title: + def __init__(self, files=None, output=None, keyfunc=lambda v: v, tmp_prefix='fuzzycat-'): + self.files = files + self.tmp_prefix = tmp_prefix + self.keyfunc = keyfunc + self.output = output + if self.output is None: + self.output = sys.stdout + + def run(self): + with tempfile.NamedTemporaryFile(delete=False, mode="w", prefix=self.tmp_prefix) as tf: + for line in fileinput.input(files=files): + try: + id, key = self.keyfunc(json.loads(line)) + except (KeyError, ValueError): continue - title = title.replace("\t", " ").replace("\n", " ").strip().lower() - title = pattern.sub('', title) - except KeyError as err: - continue - print("%s\t%s" % (id, title), file=tf) - - sbc = sort_by_column(tf.name, opts="-k 2") - for doc in group_by_column(sbc, key=cut(f=1), value=cut(f=0), comment="tn"): - print(json.dumps(doc).decode("utf-8")) - - os.remove(sbc) - os.remove(tf.name) - -def cluster_by_title_nysiis(args): - """ - Soundex on title. - """ - files = args.files if len(args.files) > 0 else ('-', ) - fg = operator.itemgetter("ident", "title") - - with tempfile.NamedTemporaryFile(delete=False, mode="w", prefix=args.tmp_prefix) as tf: - for line in fileinput.input(files=files): - try: - doc = json.loads(line) - id, title = fg(doc) - if not title: - continue - title = fuzzy.nysiis(title) - except KeyError as err: - continue - print("%s\t%s" % (id, title), file=tf) - - sbc = sort_by_column(tf.name, opts="-k 2") - for doc in group_by_column(sbc, key=cut(f=1), value=cut(f=0), comment="nysiis"): - print(json.dumps(doc).decode("utf-8")) - - os.remove(sbc) - os.remove(tf.name) - -def main(): - types = { - "title": cluster_by_title, - "title_normalized": cluster_by_title_normalized, - "title_nysiis": cluster_by_title_nysiis, - } - parser = argparse.ArgumentParser(prog='fuzzycat-cluster', - usage='%(prog)s [options]', - formatter_class=argparse.ArgumentDefaultsHelpFormatter) - parser.add_argument("-t", "--type", default="title", help="clustering variant to use") - parser.add_argument("-l", "--list", action="store_true", help="list cluster variants") - parser.add_argument("--tmp-prefix", default="fuzzycat-", help="prefix for tmp file") - parser.add_argument("--tmpdir", default=tempfile.gettempdir(), help="temp directory") - parser.add_argument('files', metavar='FILE', nargs='*', help='files to read, if empty, stdin is used') - args = parser.parse_args() + else: + print("{}\t{}".format(id, key), file=tf) - tempfile.tempdir = args.tmpdir + sbc = sort_by_column(tf.name, opts='-k 2', prefix=self.tmp_prefix) + for doc in group_by(sbc, key=cut(f=1), value=cut(f=0), comment=keyfunc.__name__): + json.dump(doc, self.output) - if args.list: - print("\n".join(types.keys())) - return - func = types.get(args.type) - if func is None: - print("invalid type: {}".format(args.type)) - return - func(args) + os.remove(sbc) + os.remove(tf.name) diff --git a/fuzzycat/main.py b/fuzzycat/main.py index 6244f00..8c566d2 100644 --- a/fuzzycat/main.py +++ b/fuzzycat/main.py @@ -1,7 +1,34 @@ +import argparse import elasticsearch +import tempfile +import sys -def main(): - print("fuzzycat") +def run_cluster(args): + print('cluster') + +def run_verify(args): + print('verify') if __name__ == '__main__': - main() + parser = argparse.ArgumentParser(prog='fuzzycat', + usage='%(prog)s command [options]', + formatter_class=argparse.ArgumentDefaultsHelpFormatter) + + parser.add_argument('--tmp-prefix', default='fuzzycat-', help='temp file prefix') + parser.add_argument('--tmpdir', default=tempfile.gettempdir(), help='temporary directory') + subparsers = parser.add_subparsers() + + sub_cluster = subparsers.add_parser('cluster', help='group entities') + sub_cluster.set_defaults(func=run_cluster) + sub_cluster.add_argument('-t', '--type', default='title', help='cluster algorithm') + + sub_verify = subparsers.add_parser('verify', help='verify groups') + sub_verify.set_defaults(func=run_verify) + + args = parser.parse_args() + if not args.__dict__.get("func"): + print('fuzzycat: use -h or --help for usage', file=sys.stderr) + sys.exit(1) + + args.func(args) + diff --git a/tests/test_cluster.py b/tests/test_cluster.py new file mode 100644 index 0000000..139597f --- /dev/null +++ b/tests/test_cluster.py @@ -0,0 +1,2 @@ + + |