diff options
author | Bryan Newbold <bnewbold@robocracy.org> | 2019-12-10 10:29:39 -0800 |
---|---|---|
committer | Bryan Newbold <bnewbold@robocracy.org> | 2019-12-10 10:29:41 -0800 |
commit | 7838a3c15a82281eec435ef16aad63e97015bdfc (patch) | |
tree | c65df48c37b448421da4ef766108d581ab1b3428 /python | |
parent | a7736d91665f6a98090cd448d02f1542aec6c180 (diff) | |
download | fatcat-7838a3c15a82281eec435ef16aad63e97015bdfc.tar.gz fatcat-7838a3c15a82281eec435ef16aad63e97015bdfc.zip |
add ingest-container command (new CLI tool)
The intent of this tool is to make it easy to enque ingest requests into
kafka, to be processed by a worker pool and eventually end up inserted
into fatcat (for ingest hits that pass various checks).
As a specific example use-case, we have pretty good coverage of eLife (a
prominent OA publisher), but have missed some publications in the past,
and have a large gap for the year 2019:
https://fatcat.wiki/container/en4qj5ijrbf5djxx7p5zzpjyoq/coverage
This tool would make it trivial to enqueue all the missing releases to
be crawled.
Future variants on this tool could query for, eg, long-tail OA works.
Diffstat (limited to 'python')
-rwxr-xr-x | python/fatcat_ingest.py | 136 |
1 files changed, 136 insertions, 0 deletions
diff --git a/python/fatcat_ingest.py b/python/fatcat_ingest.py new file mode 100755 index 00000000..36bc530b --- /dev/null +++ b/python/fatcat_ingest.py @@ -0,0 +1,136 @@ +#!/usr/bin/env python3 + +""" +Intended to be a command line interface to "Save Paper Now" and ingest +request/response. +""" + +import sys +import json +import argparse +from collections import Counter + +from fatcat_tools import public_api, simple_kafka_producer, kafka_fail_fast +from fatcat_tools.transforms import release_ingest_request +import elasticsearch +from elasticsearch_dsl import Search + + +def run_ingest_container(args): + """ + This command queries elasticsearch for releases from a given container (eg, + journal), and prepares ingest requests for them. + + By default it filters to releases which don't have any fulltext files + archived in IA, and dumps the ingest requests as JSON. + """ + + # ensure API connection works + args.api.get_changelog() + + kafka_producer = None + ingest_file_request_topic = "sandcrawler-{}.ingest-file-requests".format(args.env) + if args.enqueue_kafka: + print("Will send ingest requests to kafka topic: {}".format(ingest_file_request_topic), file=sys.stderr) + kafka_producer = simple_kafka_producer(args.kafka_hosts) + + client = elasticsearch.Elasticsearch(args.elasticsearch_endpoint) + + s = Search(using=client, index="fatcat_release") \ + .filter("term", in_ia=False) \ + .filter("term", is_oa=True) + + # filter/query by container + if args.container_id: + s = s.filter("term", container_id=args.container_id) + elif args.issnl: + s = s.filter("term", issnl=args.issnl) + elif args.publisher: + s = s.query("match", publisher=args.publisher) + elif args.name: + s = s.query("match", container_name=args.name) + else: + print("You must supply at least one query/filter parameter! Eg, ISSN-L", file=sys.stderr) + sys.exit(-1) + + counts = Counter({'ingest_request': 0, 'elasticsearch_release': 0, 'estimate': 0}) + counts['estimate'] = s.count() + print("Expecting {} release objects in search queries".format(counts['estimate']), file=sys.stderr) + + # TODO: handling the scroll DELETE with the exception pass below is messy + # because it is usually accompanied with a generator cleanup that doesn't + # work (?) + results = s.scan() + try: + for esr in results: + counts['elasticsearch_release'] += 1 + release = args.api.get_release(esr.ident) + ingest_request = release_ingest_request( + release, + oa_only=False, + ingest_request_source="fatcat-ingest-container", + ) + if not ingest_request: + continue + if kafka_producer != None: + kafka_producer.produce( + ingest_file_request_topic, + json.dumps(ingest_request).encode('utf-8'), + #key=None, + on_delivery=kafka_fail_fast, + ) + counts['kafka'] += 1 + # also printing to stdout when in kafka mode; could skip? + print(json.dumps(ingest_request)) + counts['ingest_request'] += 1 + except elasticsearch.exceptions.AuthorizationException: + print("Ignoring Auth exception, usually due to DELETE on scan scroll", file=sys.stderr) + finally: + if kafka_producer != None: + kafka_producer.flush() + print(counts, file=sys.stderr) + +def main(): + parser = argparse.ArgumentParser() + parser.add_argument('--debug', + action='store_true', + help="enable debugging interface") + parser.add_argument('--host-url', + default="http://localhost:9411/v0", + help="connect to this host/port") + parser.add_argument('--enqueue-kafka', + action='store_true', + help="send ingest requests directly to sandcrawler kafka topic for processing") + parser.add_argument('--kafka-hosts', + default="localhost:9092", + help="list of Kafka brokers (host/port) to use") + parser.add_argument('--elasticsearch-endpoint', + default="https://search.fatcat.wiki", + help="elasticsearch API. internal endpoint prefered, but public is default") + parser.add_argument('--env', + default="dev", + help="Kafka topic namespace to use (eg, prod, qa, dev)") + subparsers = parser.add_subparsers() + + sub_ingest_container = subparsers.add_parser('ingest-container', + help="Create ingest requests for releases from a specific container") + sub_ingest_container.set_defaults(func=run_ingest_container) + sub_ingest_container.add_argument('--container-id', + help="fatcat container entity ident") + sub_ingest_container.add_argument('--issnl', + help="ISSN-L of container entity") + sub_ingest_container.add_argument('--publisher', + help="publisher name") + sub_ingest_container.add_argument('--name', + help="container name") + + args = parser.parse_args() + if not args.__dict__.get("func"): + print("tell me what to do!") + sys.exit(-1) + + args.api = public_api(args.host_url) + args.func(args) + +if __name__ == '__main__': + main() |