aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--fuzzycat/cluster.py58
-rw-r--r--fuzzycat/utils.py22
-rw-r--r--tests/data/zstd/empty.txt0
-rw-r--r--tests/data/zstd/empty.txt.zstbin0 -> 13 bytes
-rw-r--r--tests/data/zstd/lines.txt9
-rw-r--r--tests/data/zstd/lines.txt.zstbin0 -> 31 bytes
-rw-r--r--tests/data/zstd/single.txt1
-rw-r--r--tests/data/zstd/single.txt.zstbin0 -> 18 bytes
-rw-r--r--tests/test_cluster.py13
-rw-r--r--tests/test_utils.py16
10 files changed, 97 insertions, 22 deletions
diff --git a/fuzzycat/cluster.py b/fuzzycat/cluster.py
index 10bb431..c211d34 100644
--- a/fuzzycat/cluster.py
+++ b/fuzzycat/cluster.py
@@ -62,6 +62,7 @@ import fileinput
import itertools
import json
import logging
+import multiprocessing
import operator
import os
import re
@@ -75,8 +76,9 @@ from typing import IO, Any, Callable, Dict, Generator, List, Optional, Tuple
import fuzzy
import regex
+from zstandard import ZstdCompressor
-from fuzzycat.utils import cut, slugify_string
+from fuzzycat.utils import cut, slugify_string, zstdlines
__all__ = [
"release_key_title",
@@ -302,6 +304,8 @@ class Cluster:
Two main options are iterable (TODO: work on parsed docs), and the key
function to apply to value to group by.
+
+ TODO: We want compression.
"""
def __init__(self,
iterable: collections.abc.Iterable,
@@ -313,6 +317,7 @@ class Cluster:
strict: bool = False,
min_cluster_size: int = 2,
max_cluster_size: int = 100,
+ compress=False,
verbose=True):
self.iterable: collections.abc.Iterable = iterable
self.key: Callable[[Any], Tuple[str, str]] = key
@@ -324,6 +329,7 @@ class Cluster:
self.min_cluster_size = min_cluster_size
self.max_cluster_size = max_cluster_size
self.verbose = verbose
+ self.compress = compress
self.counter: Dict[str, int] = collections.Counter({
"key_fail": 0,
"key_ok": 0,
@@ -337,9 +343,15 @@ class Cluster:
First map documents to keys, then group by keys, outline: json -> tsv
-> sort -> group -> json.
"""
- with tempfile.NamedTemporaryFile(delete=False, mode="w", prefix=self.prefix) as tf:
+ with tempfile.NamedTemporaryFile(delete=False, mode="wb", prefix=self.prefix) as tf:
+ if self.compress:
+ zc = ZstdCompressor(level=9, threads=multiprocessing.cpu_count())
+ writer = zc.stream_writer(tf)
+ else:
+ writer = tf
+ print(self.iterable)
for i, line in enumerate(self.iterable):
- if i % 100000 == 0 and self.verbose:
+ if self.verbose and i % 100000 == 0:
print("@{}".format(i), file=sys.stderr)
try:
doc = json.loads(line)
@@ -358,16 +370,26 @@ class Cluster:
self.counter["key_ok"] += 1
# XXX: if the line itself contains tabs, we need to remove
# them here; maybe offer TSV and JSON output and extra flag
- print("{}\t{}\t{}".format(id, key, line.replace("\t", " ")), file=tf)
+ # XXX: this needs to be compressed (e.g. with 2B records, we
+ # fill up disk too quickly)
+ data = bytes("{}\t{}\t{}\n".format(id, key, line.replace("\t", " ")),
+ encoding="utf-8")
+ writer.write(data)
+ if self.compress:
+ writer.flush()
sf = self.sort(tf.name, opts='-k 2')
- with open(sf) as f:
- for doc in self.group_by(f, key=cut(f=1)):
- if len(doc["v"]) < self.min_cluster_size:
- continue
- self.counter["num_clusters"] += 1
- json.dump(doc, self.output)
- self.output.write("\n")
+ if self.compress:
+ f = zstdlines(sf)
+ else:
+ f = open(sf)
+
+ for doc in self.group_by(f, key=cut(f=1)):
+ if len(doc["v"]) < self.min_cluster_size:
+ continue
+ self.counter["num_clusters"] += 1
+ json.dump(doc, self.output)
+ self.output.write("\n")
os.remove(sf)
os.remove(tf.name)
@@ -383,9 +405,17 @@ class Cluster:
env["TMPDIR"] = self.tmpdir
if fast:
env["LC_ALL"] = "C"
- subprocess.run(["sort"] + opts.split() + [filename], stdout=tf, env=env, check=True)
-
- return tf.name
+ if self.compress:
+ render_env = " ".join(["{}={}".format(k, v) for k, v in os.environ.copy().items()])
+ output = shellout("zstdcat -T0 {input} | {env} sort {opts} | zstd -c9 > {output}",
+ input=filename,
+ env=render_env,
+ opts=opts)
+ else:
+ subprocess.run(["sort"] + opts.split() + [filename], stdout=tf, env=env, check=True)
+ output = tf.name
+
+ return output
def group_by(self,
seq: collections.abc.Iterable,
diff --git a/fuzzycat/utils.py b/fuzzycat/utils.py
index 84db5ec..55729a1 100644
--- a/fuzzycat/utils.py
+++ b/fuzzycat/utils.py
@@ -8,6 +8,7 @@ import string
import requests
from glom import PathAccessError, glom
+from zstandard import ZstdDecompressor
printable_no_punct = string.digits + string.ascii_letters + string.whitespace
@@ -178,3 +179,24 @@ def random_idents_from_query(query="*",
raise RuntimeError('to few documents')
idents = [doc["_source"]["ident"] for doc in payload["hits"]["hits"]]
return random.sample(idents, r)
+
+
+def zstdlines(filename):
+ """
+ Generator over lines from a zstd compressed file.
+ """
+ decomp = ZstdDecompressor()
+ with open(filename, "rb") as f:
+ with decomp.stream_reader(f) as reader:
+ prev_line = ""
+ while True:
+ chunk = reader.read(65536)
+ if not chunk:
+ break
+ string_data = chunk.decode('utf-8')
+ lines = string_data.split("\n")
+ for i, line in enumerate(lines[:-1]):
+ if i == 0:
+ line = prev_line + line
+ yield line
+ prev_line = lines[-1]
diff --git a/tests/data/zstd/empty.txt b/tests/data/zstd/empty.txt
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/tests/data/zstd/empty.txt
diff --git a/tests/data/zstd/empty.txt.zst b/tests/data/zstd/empty.txt.zst
new file mode 100644
index 0000000..e58c09d
--- /dev/null
+++ b/tests/data/zstd/empty.txt.zst
Binary files differ
diff --git a/tests/data/zstd/lines.txt b/tests/data/zstd/lines.txt
new file mode 100644
index 0000000..0719398
--- /dev/null
+++ b/tests/data/zstd/lines.txt
@@ -0,0 +1,9 @@
+1
+2
+3
+4
+5
+6
+7
+8
+9
diff --git a/tests/data/zstd/lines.txt.zst b/tests/data/zstd/lines.txt.zst
new file mode 100644
index 0000000..bc9be49
--- /dev/null
+++ b/tests/data/zstd/lines.txt.zst
Binary files differ
diff --git a/tests/data/zstd/single.txt b/tests/data/zstd/single.txt
new file mode 100644
index 0000000..4b37d57
--- /dev/null
+++ b/tests/data/zstd/single.txt
@@ -0,0 +1 @@
+zzzz
diff --git a/tests/data/zstd/single.txt.zst b/tests/data/zstd/single.txt.zst
new file mode 100644
index 0000000..47e377f
--- /dev/null
+++ b/tests/data/zstd/single.txt.zst
Binary files differ
diff --git a/tests/test_cluster.py b/tests/test_cluster.py
index 3ad32a7..793798b 100644
--- a/tests/test_cluster.py
+++ b/tests/test_cluster.py
@@ -109,20 +109,19 @@ def test_release_key_title_nysiis():
def test_cluster():
sio = io.StringIO()
- cluster = Cluster([
- json.dumps(line) for line in [
+ lines = [
+ json.dumps(doc) for doc in [
{
"title": "hello world",
- "ident": 1
+ "ident": 1,
},
{
"title": "hello world!",
- "ident": 2
+ "ident": 2,
},
]
- ],
- release_key_title_normalized,
- output=sio)
+ ]
+ cluster = Cluster(lines, release_key_title_normalized, output=sio)
stats = cluster.run()
assert stats == {
"key_fail": 0,
diff --git a/tests/test_utils.py b/tests/test_utils.py
index fa930fe..29b125b 100644
--- a/tests/test_utils.py
+++ b/tests/test_utils.py
@@ -1,7 +1,9 @@
import pytest
+import os
from fuzzycat.utils import (author_similarity_score, cut, jaccard, nwise, slugify_string,
- token_n_grams, tokenize_string, parse_page_string, dict_key_exists)
+ token_n_grams, tokenize_string, parse_page_string, dict_key_exists,
+ zstdlines)
def test_slugify_string():
@@ -84,3 +86,15 @@ def test_page_page_string():
assert parse_page_string("123-125") == (123, 125, 3)
assert parse_page_string("123-124a") == (123, 124, 2)
assert parse_page_string("1-1000") == (1, 1000, 1000)
+
+
+def test_zstdlines():
+ test_dir = os.path.join(os.path.dirname(os.path.realpath(__file__)), "data/zstd")
+ examples = (
+ (os.path.join(test_dir, "lines.txt.zst"), os.path.join(test_dir, "lines.txt")),
+ (os.path.join(test_dir, "empty.txt.zst"), os.path.join(test_dir, "empty.txt")),
+ (os.path.join(test_dir, "single.txt.zst"), os.path.join(test_dir, "single.txt")),
+ )
+ for zfn, fn in examples:
+ with open(fn) as f:
+ assert [s.strip() for s in f.readlines()] == list(zstdlines(zfn))