diff options
-rw-r--r-- | fuzzycat/cluster.py | 13 |
1 files changed, 6 insertions, 7 deletions
diff --git a/fuzzycat/cluster.py b/fuzzycat/cluster.py index 3228247..a53f5a1 100644 --- a/fuzzycat/cluster.py +++ b/fuzzycat/cluster.py @@ -76,7 +76,7 @@ from typing import IO, Any, Callable, Dict, Generator, List, Optional, Tuple import fuzzy import regex -from zstandard import ZstdCompressor +import zstandard from fuzzycat.utils import cut, shellout, slugify_string, zstdlines @@ -345,7 +345,7 @@ class Cluster: """ with tempfile.NamedTemporaryFile(delete=False, mode="wb", prefix=self.prefix) as tf: if self.compress: - zc = ZstdCompressor(level=9, threads=multiprocessing.cpu_count()) + zc = zstandard.ZstdCompressor(level=9, threads=multiprocessing.cpu_count()) writer = zc.stream_writer(tf) else: writer = tf @@ -371,11 +371,11 @@ class Cluster: # them here; maybe offer TSV and JSON output and extra flag # XXX: this needs to be compressed (e.g. with 2B records, we # fill up disk too quickly) - data = bytes("{}\t{}\t{}\n".format(id, key, line.replace("\t", " ")), + data = bytes("{}\t{}\t{}\n".format(id, key, line.replace("\t", " ").strip()), encoding="utf-8") writer.write(data) if self.compress: - writer.flush() + writer.flush(zstandard.FLUSH_FRAME) sf = self.sort(tf.name, opts='-k 2') if self.compress: @@ -405,10 +405,9 @@ class Cluster: if fast: env["LC_ALL"] = "C" if self.compress: - render_env = " ".join(["{}={}".format(k, v) for k, v in os.environ.copy().items()]) - output = shellout("zstdcat -T0 {input} | {env} sort {opts} | zstd -c9 > {output}", + output = shellout("zstdcat -T0 {input} | LC_ALL=C TMPDIR={tmpdir} sort {opts} | zstd -T0 -c9 > {output}", input=filename, - env=render_env, + tmpdir=self.tmpdir, opts=opts) else: subprocess.run(["sort"] + opts.split() + [filename], stdout=tf, env=env, check=True) |