aboutsummaryrefslogtreecommitdiffstats
path: root/fuzzycat/cluster.py
blob: 3b7f3f56439c30b0ee8198d2125c7410fd66f9a5 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
"""
Clustering part of matching.

We want to have generic and fast way to derive various clusters. Input is json
lines of release entities, e.g. from a database dump.

Map and reduce.

* input (json) blob -> (ident, value) -> group by value -> emit idents per group

Example output:

    {
      "v": [
	"7uvh4z6zsjcptia5ig6swu4fre",
	"chlthrumyfg23aqw4r477j3vge",
	"yuo4smv4bzefdjsudbbzka3qv4"
      ],
      "k": "124-5_0137.dcm",
      "c": "t"
    }

Performance data points:

$ time zstdcat -T0 release_export_expanded.json.zst | pv -l | \
    parallel --roundrobin --pipe -j 16 fuzzycat-cluster /bigger/tmp -t title > cluster_title.json

Takes 607 min (around 3800 docs/s).
"""

import argparse
import fileinput
import itertools
import json
import os
import subprocess
import tempfile
import re
import string
import operator

import orjson as json
import fuzzy

DEFAULT_CACHE_DIR = os.path.join(os.path.expanduser("~"), ".cache", "fuzzycat")


def sort_by_column(filename, mode="w", opts="-k 2", fast=True, prefix="fuzzycat-"):
    """
    Sort tabular file with sort(1), returns the filename of the sorted file.
    """
    with tempfile.NamedTemporaryFile(delete=False, mode=mode, prefix=prefix) as tf:
        env = os.environ.copy()
        if fast:
            env["LC_ALL"] = "C"
        subprocess.run(["sort"] + opts.split() + [filename], stdout=tf)

    return tf.name

def group_by_column(filename, key=None, value=None, comment=""):
    """
    Group a sorted file with given key function. Use another function to
    extract the value.
    """
    with open(filename) as f:
        for k, g in itertools.groupby(f, key=key):
            doc = {
                "v": [value(v) for v in g],
                "c": comment,
                "k": k.strip(),
            }
            yield doc

# XXX: LineOps

def cut(f=0, sep='\t'):
    """
    Similar to cut(1), but zero indexed.
    """
    return lambda v: v.split(sep)[f]

def cluster_by_title(args):
    """
    Basic example for a three stage process: extract, sort, group. Speed is
    about: 20K/s (json roundtrip, sorting, grouping).
    """
    files = args.files if len(args.files) > 0 else ('-', )
    fg = operator.itemgetter("ident", "title")

    with tempfile.NamedTemporaryFile(delete=False, mode="w", prefix=args.tmp_prefix) as tf:
        for line in fileinput.input(files=files)
            try:
                doc = json.loads(line)
                id, title = fg(doc)
                if not title:
                    continue
                title = title.replace("\t", " ").replace("\n", " ").strip()
            except KeyError as err:
                continue
            print("%s\t%s" % (id, title), file=tf)

    sbc = sort_by_column(tf.name, opts="-k 2", prefix=args.tmp_prefix)
    for doc in group_by_column(sbc, key=cut(f=1), value=cut(f=0), comment="t"):
        print(json.dumps(doc).decode("utf-8"))

    os.remove(sbc)
    os.remove(tf.name)

def cluster_by_title_normalized(args):
    """
    Normalize title, e.g. analysisofheritability. 17k/s.
    """
    files = args.files if len(args.files) > 0 else ('-', )
    fg = operator.itemgetter("ident", "title")
    pattern = re.compile('[\W_]+', re.UNICODE)

    with tempfile.NamedTemporaryFile(delete=False, mode="w", prefix=args.tmp_prefix) as tf:
        for line in fileinput.input(files=files):
            try:
                doc = json.loads(line)
                id, title = fg(doc)
                if not title:
                    continue
                title = title.replace("\t", " ").replace("\n", " ").strip().lower()
                title = pattern.sub('', title)
            except KeyError as err:
                continue
            print("%s\t%s" % (id, title), file=tf)

    sbc = sort_by_column(tf.name, opts="-k 2")
    for doc in group_by_column(sbc, key=cut(f=1), value=cut(f=0), comment="tn"):
        print(json.dumps(doc).decode("utf-8"))

    os.remove(sbc)
    os.remove(tf.name)

def cluster_by_title_nysiis(args):
    """
    Soundex on title.
    """
    files = args.files if len(args.files) > 0 else ('-', )
    fg = operator.itemgetter("ident", "title")

    with tempfile.NamedTemporaryFile(delete=False, mode="w", prefix=args.tmp_prefix) as tf:
        for line in fileinput.input(files=files):
            try:
                doc = json.loads(line)
                id, title = fg(doc)
                if not title:
                    continue
                title = fuzzy.nysiis(title)
            except KeyError as err:
                continue
            print("%s\t%s" % (id, title), file=tf)

    sbc = sort_by_column(tf.name, opts="-k 2")
    for doc in group_by_column(sbc, key=cut(f=1), value=cut(f=0), comment="nysiis"):
        print(json.dumps(doc).decode("utf-8"))

    os.remove(sbc)
    os.remove(tf.name)

def main():
    types = {
        "title": cluster_by_title,
        "title_normalized": cluster_by_title_normalized,
        "title_nysiis": cluster_by_title_nysiis,
    }
    parser = argparse.ArgumentParser(prog='fuzzycat-cluster',
                                     usage='%(prog)s [options]',
                                     formatter_class=argparse.ArgumentDefaultsHelpFormatter)
    parser.add_argument("-t", "--type", default="title", help="clustering variant to use")
    parser.add_argument("-l", "--list", action="store_true", help="list cluster variants")
    parser.add_argument("--tmp-prefix", default="fuzzycat-", help="prefix for tmp file")
    parser.add_argument("--tmpdir", default=tempfile.gettempdir(), help="temp directory")
    parser.add_argument('files', metavar='FILE', nargs='*', help='files to read, if empty, stdin is used')
    args = parser.parse_args()

    tempfile.tempdir = args.tmpdir

    if args.list:
        print("\n".join(types.keys()))
        return
    func = types.get(args.type)
    if func is None:
        print("invalid type: {}".format(args.type))
        return
    func(args)