aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--fuzzycat/cluster.py13
1 files changed, 6 insertions, 7 deletions
diff --git a/fuzzycat/cluster.py b/fuzzycat/cluster.py
index 3228247..a53f5a1 100644
--- a/fuzzycat/cluster.py
+++ b/fuzzycat/cluster.py
@@ -76,7 +76,7 @@ from typing import IO, Any, Callable, Dict, Generator, List, Optional, Tuple
import fuzzy
import regex
-from zstandard import ZstdCompressor
+import zstandard
from fuzzycat.utils import cut, shellout, slugify_string, zstdlines
@@ -345,7 +345,7 @@ class Cluster:
"""
with tempfile.NamedTemporaryFile(delete=False, mode="wb", prefix=self.prefix) as tf:
if self.compress:
- zc = ZstdCompressor(level=9, threads=multiprocessing.cpu_count())
+ zc = zstandard.ZstdCompressor(level=9, threads=multiprocessing.cpu_count())
writer = zc.stream_writer(tf)
else:
writer = tf
@@ -371,11 +371,11 @@ class Cluster:
# them here; maybe offer TSV and JSON output and extra flag
# XXX: this needs to be compressed (e.g. with 2B records, we
# fill up disk too quickly)
- data = bytes("{}\t{}\t{}\n".format(id, key, line.replace("\t", " ")),
+ data = bytes("{}\t{}\t{}\n".format(id, key, line.replace("\t", " ").strip()),
encoding="utf-8")
writer.write(data)
if self.compress:
- writer.flush()
+ writer.flush(zstandard.FLUSH_FRAME)
sf = self.sort(tf.name, opts='-k 2')
if self.compress:
@@ -405,10 +405,9 @@ class Cluster:
if fast:
env["LC_ALL"] = "C"
if self.compress:
- render_env = " ".join(["{}={}".format(k, v) for k, v in os.environ.copy().items()])
- output = shellout("zstdcat -T0 {input} | {env} sort {opts} | zstd -c9 > {output}",
+ output = shellout("zstdcat -T0 {input} | LC_ALL=C TMPDIR={tmpdir} sort {opts} | zstd -T0 -c9 > {output}",
input=filename,
- env=render_env,
+ tmpdir=self.tmpdir,
opts=opts)
else:
subprocess.run(["sort"] + opts.split() + [filename], stdout=tf, env=env, check=True)