diff options
author | Martin Czygan <martin.czygan@gmail.com> | 2021-02-02 00:40:41 +0100 |
---|---|---|
committer | Martin Czygan <martin.czygan@gmail.com> | 2021-02-02 00:40:41 +0100 |
commit | d5ade2a068e2f420b5376f07e13db66c5b43a01e (patch) | |
tree | ee9b9fba3e0aa49d0d6254d06e93e2ecdb4603b7 | |
parent | 727f44887e0612b54010704dc997fd2ebd8b0344 (diff) | |
download | fuzzycat-d5ade2a068e2f420b5376f07e13db66c5b43a01e.tar.gz fuzzycat-d5ade2a068e2f420b5376f07e13db66c5b43a01e.zip |
add compress kwarg to cluster
Will compress intermediate results with zstd (https://git.io/Jt00y9).
-rw-r--r-- | fuzzycat/cluster.py | 58 | ||||
-rw-r--r-- | fuzzycat/utils.py | 22 | ||||
-rw-r--r-- | tests/data/zstd/empty.txt | 0 | ||||
-rw-r--r-- | tests/data/zstd/empty.txt.zst | bin | 0 -> 13 bytes | |||
-rw-r--r-- | tests/data/zstd/lines.txt | 9 | ||||
-rw-r--r-- | tests/data/zstd/lines.txt.zst | bin | 0 -> 31 bytes | |||
-rw-r--r-- | tests/data/zstd/single.txt | 1 | ||||
-rw-r--r-- | tests/data/zstd/single.txt.zst | bin | 0 -> 18 bytes | |||
-rw-r--r-- | tests/test_cluster.py | 13 | ||||
-rw-r--r-- | tests/test_utils.py | 16 |
10 files changed, 97 insertions, 22 deletions
diff --git a/fuzzycat/cluster.py b/fuzzycat/cluster.py index 10bb431..c211d34 100644 --- a/fuzzycat/cluster.py +++ b/fuzzycat/cluster.py @@ -62,6 +62,7 @@ import fileinput import itertools import json import logging +import multiprocessing import operator import os import re @@ -75,8 +76,9 @@ from typing import IO, Any, Callable, Dict, Generator, List, Optional, Tuple import fuzzy import regex +from zstandard import ZstdCompressor -from fuzzycat.utils import cut, slugify_string +from fuzzycat.utils import cut, slugify_string, zstdlines __all__ = [ "release_key_title", @@ -302,6 +304,8 @@ class Cluster: Two main options are iterable (TODO: work on parsed docs), and the key function to apply to value to group by. + + TODO: We want compression. """ def __init__(self, iterable: collections.abc.Iterable, @@ -313,6 +317,7 @@ class Cluster: strict: bool = False, min_cluster_size: int = 2, max_cluster_size: int = 100, + compress=False, verbose=True): self.iterable: collections.abc.Iterable = iterable self.key: Callable[[Any], Tuple[str, str]] = key @@ -324,6 +329,7 @@ class Cluster: self.min_cluster_size = min_cluster_size self.max_cluster_size = max_cluster_size self.verbose = verbose + self.compress = compress self.counter: Dict[str, int] = collections.Counter({ "key_fail": 0, "key_ok": 0, @@ -337,9 +343,15 @@ class Cluster: First map documents to keys, then group by keys, outline: json -> tsv -> sort -> group -> json. """ - with tempfile.NamedTemporaryFile(delete=False, mode="w", prefix=self.prefix) as tf: + with tempfile.NamedTemporaryFile(delete=False, mode="wb", prefix=self.prefix) as tf: + if self.compress: + zc = ZstdCompressor(level=9, threads=multiprocessing.cpu_count()) + writer = zc.stream_writer(tf) + else: + writer = tf + print(self.iterable) for i, line in enumerate(self.iterable): - if i % 100000 == 0 and self.verbose: + if self.verbose and i % 100000 == 0: print("@{}".format(i), file=sys.stderr) try: doc = json.loads(line) @@ -358,16 +370,26 @@ class Cluster: self.counter["key_ok"] += 1 # XXX: if the line itself contains tabs, we need to remove # them here; maybe offer TSV and JSON output and extra flag - print("{}\t{}\t{}".format(id, key, line.replace("\t", " ")), file=tf) + # XXX: this needs to be compressed (e.g. with 2B records, we + # fill up disk too quickly) + data = bytes("{}\t{}\t{}\n".format(id, key, line.replace("\t", " ")), + encoding="utf-8") + writer.write(data) + if self.compress: + writer.flush() sf = self.sort(tf.name, opts='-k 2') - with open(sf) as f: - for doc in self.group_by(f, key=cut(f=1)): - if len(doc["v"]) < self.min_cluster_size: - continue - self.counter["num_clusters"] += 1 - json.dump(doc, self.output) - self.output.write("\n") + if self.compress: + f = zstdlines(sf) + else: + f = open(sf) + + for doc in self.group_by(f, key=cut(f=1)): + if len(doc["v"]) < self.min_cluster_size: + continue + self.counter["num_clusters"] += 1 + json.dump(doc, self.output) + self.output.write("\n") os.remove(sf) os.remove(tf.name) @@ -383,9 +405,17 @@ class Cluster: env["TMPDIR"] = self.tmpdir if fast: env["LC_ALL"] = "C" - subprocess.run(["sort"] + opts.split() + [filename], stdout=tf, env=env, check=True) - - return tf.name + if self.compress: + render_env = " ".join(["{}={}".format(k, v) for k, v in os.environ.copy().items()]) + output = shellout("zstdcat -T0 {input} | {env} sort {opts} | zstd -c9 > {output}", + input=filename, + env=render_env, + opts=opts) + else: + subprocess.run(["sort"] + opts.split() + [filename], stdout=tf, env=env, check=True) + output = tf.name + + return output def group_by(self, seq: collections.abc.Iterable, diff --git a/fuzzycat/utils.py b/fuzzycat/utils.py index 84db5ec..55729a1 100644 --- a/fuzzycat/utils.py +++ b/fuzzycat/utils.py @@ -8,6 +8,7 @@ import string import requests from glom import PathAccessError, glom +from zstandard import ZstdDecompressor printable_no_punct = string.digits + string.ascii_letters + string.whitespace @@ -178,3 +179,24 @@ def random_idents_from_query(query="*", raise RuntimeError('to few documents') idents = [doc["_source"]["ident"] for doc in payload["hits"]["hits"]] return random.sample(idents, r) + + +def zstdlines(filename): + """ + Generator over lines from a zstd compressed file. + """ + decomp = ZstdDecompressor() + with open(filename, "rb") as f: + with decomp.stream_reader(f) as reader: + prev_line = "" + while True: + chunk = reader.read(65536) + if not chunk: + break + string_data = chunk.decode('utf-8') + lines = string_data.split("\n") + for i, line in enumerate(lines[:-1]): + if i == 0: + line = prev_line + line + yield line + prev_line = lines[-1] diff --git a/tests/data/zstd/empty.txt b/tests/data/zstd/empty.txt new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/tests/data/zstd/empty.txt diff --git a/tests/data/zstd/empty.txt.zst b/tests/data/zstd/empty.txt.zst Binary files differnew file mode 100644 index 0000000..e58c09d --- /dev/null +++ b/tests/data/zstd/empty.txt.zst diff --git a/tests/data/zstd/lines.txt b/tests/data/zstd/lines.txt new file mode 100644 index 0000000..0719398 --- /dev/null +++ b/tests/data/zstd/lines.txt @@ -0,0 +1,9 @@ +1 +2 +3 +4 +5 +6 +7 +8 +9 diff --git a/tests/data/zstd/lines.txt.zst b/tests/data/zstd/lines.txt.zst Binary files differnew file mode 100644 index 0000000..bc9be49 --- /dev/null +++ b/tests/data/zstd/lines.txt.zst diff --git a/tests/data/zstd/single.txt b/tests/data/zstd/single.txt new file mode 100644 index 0000000..4b37d57 --- /dev/null +++ b/tests/data/zstd/single.txt @@ -0,0 +1 @@ +zzzz diff --git a/tests/data/zstd/single.txt.zst b/tests/data/zstd/single.txt.zst Binary files differnew file mode 100644 index 0000000..47e377f --- /dev/null +++ b/tests/data/zstd/single.txt.zst diff --git a/tests/test_cluster.py b/tests/test_cluster.py index 3ad32a7..793798b 100644 --- a/tests/test_cluster.py +++ b/tests/test_cluster.py @@ -109,20 +109,19 @@ def test_release_key_title_nysiis(): def test_cluster(): sio = io.StringIO() - cluster = Cluster([ - json.dumps(line) for line in [ + lines = [ + json.dumps(doc) for doc in [ { "title": "hello world", - "ident": 1 + "ident": 1, }, { "title": "hello world!", - "ident": 2 + "ident": 2, }, ] - ], - release_key_title_normalized, - output=sio) + ] + cluster = Cluster(lines, release_key_title_normalized, output=sio) stats = cluster.run() assert stats == { "key_fail": 0, diff --git a/tests/test_utils.py b/tests/test_utils.py index fa930fe..29b125b 100644 --- a/tests/test_utils.py +++ b/tests/test_utils.py @@ -1,7 +1,9 @@ import pytest +import os from fuzzycat.utils import (author_similarity_score, cut, jaccard, nwise, slugify_string, - token_n_grams, tokenize_string, parse_page_string, dict_key_exists) + token_n_grams, tokenize_string, parse_page_string, dict_key_exists, + zstdlines) def test_slugify_string(): @@ -84,3 +86,15 @@ def test_page_page_string(): assert parse_page_string("123-125") == (123, 125, 3) assert parse_page_string("123-124a") == (123, 124, 2) assert parse_page_string("1-1000") == (1, 1000, 1000) + + +def test_zstdlines(): + test_dir = os.path.join(os.path.dirname(os.path.realpath(__file__)), "data/zstd") + examples = ( + (os.path.join(test_dir, "lines.txt.zst"), os.path.join(test_dir, "lines.txt")), + (os.path.join(test_dir, "empty.txt.zst"), os.path.join(test_dir, "empty.txt")), + (os.path.join(test_dir, "single.txt.zst"), os.path.join(test_dir, "single.txt")), + ) + for zfn, fn in examples: + with open(fn) as f: + assert [s.strip() for s in f.readlines()] == list(zstdlines(zfn)) |