From bf0c3399d3a80b32301a3554971a962478614692 Mon Sep 17 00:00:00 2001 From: Bryan Newbold Date: Mon, 24 Sep 2018 18:07:15 -0700 Subject: script for partitioning dumps (needs test) --- extra/elasticsearch/README.md | 1 + extra/partition_dumps/README.md | 25 ++++++++++++++++++ extra/partition_dumps/partition_script.py | 42 +++++++++++++++++++++++++++++++ 3 files changed, 68 insertions(+) create mode 100644 extra/partition_dumps/README.md create mode 100755 extra/partition_dumps/partition_script.py (limited to 'extra') diff --git a/extra/elasticsearch/README.md b/extra/elasticsearch/README.md index 0d205903..8f4925aa 100644 --- a/extra/elasticsearch/README.md +++ b/extra/elasticsearch/README.md @@ -44,6 +44,7 @@ Bulk insert from a file on disk: Or, in a bulk production live-stream conversion: time zcat /srv/fatcat/snapshots/fatcat_release_dump_expanded.json.gz | ./transform_release.py | esbulk -verbose -size 20000 -id ident -w 8 -index fatcat-releases -type release + # 2018/09/24 21:42:26 53028167 docs in 1h0m56.853006293s at 14501.039 docs/s with 8 workers ## Full-Text Querying diff --git a/extra/partition_dumps/README.md b/extra/partition_dumps/README.md new file mode 100644 index 00000000..2e26a41b --- /dev/null +++ b/extra/partition_dumps/README.md @@ -0,0 +1,25 @@ + +This script is used to "partition" (split up) a complete JSON dump by some key. +For example, split release dump JSON lines into separate files, one per +journal/container. + +Example parititoning a sample by release type: + + cat release_dump_expanded_sample.json | jq .release_type -r > release_dump_expanded_sample.release_type + cat release_dump_expanded_sample.release_type | sort | uniq -c | sort -nr > release_dump_expanded_sample.release_type.counts + cat release_dump_expanded_sample.json | paste release_dump_expanded_sample.release_type - | sort > out + +More production-y example using ISSN-L: + + # will append otherwise + rm -rf ./partitioned + + # it's a pretty huge sort, will need 300+ GB scratch space? this might not scale. + zcat release_dump_expanded.json.gz | jq .container.issnl -r > release_dump_expanded.issnl + zcat release_dump_expanded.json.gz | paste release_dump_expanded.issnl - | sort | ./partition_script.py + + # for verification/stats + cat release_dump_expanded.issnl | sort | uniq -c | sort -nr > release_dump_expanded.issnl.counts + + # cleanup + rm release_dump_expanded.issnl diff --git a/extra/partition_dumps/partition_script.py b/extra/partition_dumps/partition_script.py new file mode 100755 index 00000000..edcc7e60 --- /dev/null +++ b/extra/partition_dumps/partition_script.py @@ -0,0 +1,42 @@ +#!/usr/bin/env python3 +""" +Reads key-prefixed JSON lines from stdin, and writes out to gzipped files under +./partitioned/. + +Skips empty keys and "null" (to handle a jq common-case). + +Eg, for tab-separated input: + + something {"a": 1} + something2 {"b": 2} + +Will write to ./partitioned/something.json.gz: + + {"a": 1} + +(and "b" object to ./partitioned/something2.json.gz) +""" + +import os, sys, gzip + +def run(): + last_prefix = None + f = None + os.makedirs('partitioned', exist_ok=True) + + for line in sys.stdin: + (prefix, obj) = line.strip().split('\t')[:2] + if not prefix or prefix == "null": + continue + if prefix != last_prefix: + if f: + f.close() + f = gzip.GzipFile('partitioned/{}.json.gz'.format(prefix), 'a') + f.write(obj.encode('utf-8')) + f.write(b"\n") + last_prefix = prefix + if f: + f.close() + +if __name__=="__main__": + run() -- cgit v1.2.3