From 7838a3c15a82281eec435ef16aad63e97015bdfc Mon Sep 17 00:00:00 2001 From: Bryan Newbold Date: Tue, 10 Dec 2019 10:29:39 -0800 Subject: add ingest-container command (new CLI tool) The intent of this tool is to make it easy to enque ingest requests into kafka, to be processed by a worker pool and eventually end up inserted into fatcat (for ingest hits that pass various checks). As a specific example use-case, we have pretty good coverage of eLife (a prominent OA publisher), but have missed some publications in the past, and have a large gap for the year 2019: https://fatcat.wiki/container/en4qj5ijrbf5djxx7p5zzpjyoq/coverage This tool would make it trivial to enqueue all the missing releases to be crawled. Future variants on this tool could query for, eg, long-tail OA works. --- python/fatcat_ingest.py | 136 ++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 136 insertions(+) create mode 100755 python/fatcat_ingest.py (limited to 'python/fatcat_ingest.py') diff --git a/python/fatcat_ingest.py b/python/fatcat_ingest.py new file mode 100755 index 00000000..36bc530b --- /dev/null +++ b/python/fatcat_ingest.py @@ -0,0 +1,136 @@ +#!/usr/bin/env python3 + +""" +Intended to be a command line interface to "Save Paper Now" and ingest +request/response. +""" + +import sys +import json +import argparse +from collections import Counter + +from fatcat_tools import public_api, simple_kafka_producer, kafka_fail_fast +from fatcat_tools.transforms import release_ingest_request +import elasticsearch +from elasticsearch_dsl import Search + + +def run_ingest_container(args): + """ + This command queries elasticsearch for releases from a given container (eg, + journal), and prepares ingest requests for them. + + By default it filters to releases which don't have any fulltext files + archived in IA, and dumps the ingest requests as JSON. + """ + + # ensure API connection works + args.api.get_changelog() + + kafka_producer = None + ingest_file_request_topic = "sandcrawler-{}.ingest-file-requests".format(args.env) + if args.enqueue_kafka: + print("Will send ingest requests to kafka topic: {}".format(ingest_file_request_topic), file=sys.stderr) + kafka_producer = simple_kafka_producer(args.kafka_hosts) + + client = elasticsearch.Elasticsearch(args.elasticsearch_endpoint) + + s = Search(using=client, index="fatcat_release") \ + .filter("term", in_ia=False) \ + .filter("term", is_oa=True) + + # filter/query by container + if args.container_id: + s = s.filter("term", container_id=args.container_id) + elif args.issnl: + s = s.filter("term", issnl=args.issnl) + elif args.publisher: + s = s.query("match", publisher=args.publisher) + elif args.name: + s = s.query("match", container_name=args.name) + else: + print("You must supply at least one query/filter parameter! Eg, ISSN-L", file=sys.stderr) + sys.exit(-1) + + counts = Counter({'ingest_request': 0, 'elasticsearch_release': 0, 'estimate': 0}) + counts['estimate'] = s.count() + print("Expecting {} release objects in search queries".format(counts['estimate']), file=sys.stderr) + + # TODO: handling the scroll DELETE with the exception pass below is messy + # because it is usually accompanied with a generator cleanup that doesn't + # work (?) + results = s.scan() + try: + for esr in results: + counts['elasticsearch_release'] += 1 + release = args.api.get_release(esr.ident) + ingest_request = release_ingest_request( + release, + oa_only=False, + ingest_request_source="fatcat-ingest-container", + ) + if not ingest_request: + continue + if kafka_producer != None: + kafka_producer.produce( + ingest_file_request_topic, + json.dumps(ingest_request).encode('utf-8'), + #key=None, + on_delivery=kafka_fail_fast, + ) + counts['kafka'] += 1 + # also printing to stdout when in kafka mode; could skip? + print(json.dumps(ingest_request)) + counts['ingest_request'] += 1 + except elasticsearch.exceptions.AuthorizationException: + print("Ignoring Auth exception, usually due to DELETE on scan scroll", file=sys.stderr) + finally: + if kafka_producer != None: + kafka_producer.flush() + print(counts, file=sys.stderr) + +def main(): + parser = argparse.ArgumentParser() + parser.add_argument('--debug', + action='store_true', + help="enable debugging interface") + parser.add_argument('--host-url', + default="http://localhost:9411/v0", + help="connect to this host/port") + parser.add_argument('--enqueue-kafka', + action='store_true', + help="send ingest requests directly to sandcrawler kafka topic for processing") + parser.add_argument('--kafka-hosts', + default="localhost:9092", + help="list of Kafka brokers (host/port) to use") + parser.add_argument('--elasticsearch-endpoint', + default="https://search.fatcat.wiki", + help="elasticsearch API. internal endpoint prefered, but public is default") + parser.add_argument('--env', + default="dev", + help="Kafka topic namespace to use (eg, prod, qa, dev)") + subparsers = parser.add_subparsers() + + sub_ingest_container = subparsers.add_parser('ingest-container', + help="Create ingest requests for releases from a specific container") + sub_ingest_container.set_defaults(func=run_ingest_container) + sub_ingest_container.add_argument('--container-id', + help="fatcat container entity ident") + sub_ingest_container.add_argument('--issnl', + help="ISSN-L of container entity") + sub_ingest_container.add_argument('--publisher', + help="publisher name") + sub_ingest_container.add_argument('--name', + help="container name") + + args = parser.parse_args() + if not args.__dict__.get("func"): + print("tell me what to do!") + sys.exit(-1) + + args.api = public_api(args.host_url) + args.func(args) + +if __name__ == '__main__': + main() -- cgit v1.2.3 From 4c716f9e39046fde3e98a3686a5f086f3d53315a Mon Sep 17 00:00:00 2001 From: Bryan Newbold Date: Wed, 11 Dec 2019 16:54:57 -0800 Subject: simplify ES scroll deletion using param() This gets rid of some mess error handling code by properly configuring the elasticsearch client to just not clean up scroll iterators when accessing the public (prod or qa) search interfaces. Leaving the scroll state around isn't ideal, so we still delete them if possible (eg, connecting directly to elasticsearch). Thanks to Martin for pointing out this solution in review. --- python/fatcat_ingest.py | 58 ++++++++++++++++++++++++------------------------- 1 file changed, 29 insertions(+), 29 deletions(-) (limited to 'python/fatcat_ingest.py') diff --git a/python/fatcat_ingest.py b/python/fatcat_ingest.py index 36bc530b..05b7e848 100755 --- a/python/fatcat_ingest.py +++ b/python/fatcat_ingest.py @@ -57,37 +57,37 @@ def run_ingest_container(args): counts['estimate'] = s.count() print("Expecting {} release objects in search queries".format(counts['estimate']), file=sys.stderr) - # TODO: handling the scroll DELETE with the exception pass below is messy - # because it is usually accompanied with a generator cleanup that doesn't - # work (?) + # don't try to clean up scroll if we are connected to public server (behind + # nginx proxy that disallows DELETE) + if args.elasticsearch_endpoint in ( + 'https://search.fatcat.wiki', + 'https://search.qa.fatcat.wiki'): + s = s.params(clear_scroll=False) + results = s.scan() - try: - for esr in results: - counts['elasticsearch_release'] += 1 - release = args.api.get_release(esr.ident) - ingest_request = release_ingest_request( - release, - oa_only=False, - ingest_request_source="fatcat-ingest-container", - ) - if not ingest_request: - continue - if kafka_producer != None: - kafka_producer.produce( - ingest_file_request_topic, - json.dumps(ingest_request).encode('utf-8'), - #key=None, - on_delivery=kafka_fail_fast, - ) - counts['kafka'] += 1 - # also printing to stdout when in kafka mode; could skip? - print(json.dumps(ingest_request)) - counts['ingest_request'] += 1 - except elasticsearch.exceptions.AuthorizationException: - print("Ignoring Auth exception, usually due to DELETE on scan scroll", file=sys.stderr) - finally: + for esr in results: + counts['elasticsearch_release'] += 1 + release = args.api.get_release(esr.ident) + ingest_request = release_ingest_request( + release, + oa_only=False, + ingest_request_source="fatcat-ingest-container", + ) + if not ingest_request: + continue if kafka_producer != None: - kafka_producer.flush() + kafka_producer.produce( + ingest_file_request_topic, + json.dumps(ingest_request).encode('utf-8'), + #key=None, + on_delivery=kafka_fail_fast, + ) + counts['kafka'] += 1 + # also printing to stdout when in kafka mode; could skip? + print(json.dumps(ingest_request)) + counts['ingest_request'] += 1 + if kafka_producer != None: + kafka_producer.flush() print(counts, file=sys.stderr) def main(): -- cgit v1.2.3 From 44a3f1286a81ad5565a925b766124f05165c492f Mon Sep 17 00:00:00 2001 From: Bryan Newbold Date: Wed, 11 Dec 2019 16:57:18 -0800 Subject: improve argparse usage --fatcat-api-url is clearer than --host-url remove unimplemented --debug (copy/paste from webface argparse) use formater which will display 'default' parameters with --help Thanks to Martin for pointing out the later, which i've always wanted! --- python/fatcat_ingest.py | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) (limited to 'python/fatcat_ingest.py') diff --git a/python/fatcat_ingest.py b/python/fatcat_ingest.py index 05b7e848..46f46536 100755 --- a/python/fatcat_ingest.py +++ b/python/fatcat_ingest.py @@ -91,11 +91,9 @@ def run_ingest_container(args): print(counts, file=sys.stderr) def main(): - parser = argparse.ArgumentParser() - parser.add_argument('--debug', - action='store_true', - help="enable debugging interface") - parser.add_argument('--host-url', + parser = argparse.ArgumentParser( + formatter_class=argparse.ArgumentDefaultsHelpFormatter) + parser.add_argument('--fatcat-api-url', default="http://localhost:9411/v0", help="connect to this host/port") parser.add_argument('--enqueue-kafka', @@ -129,7 +127,7 @@ def main(): print("tell me what to do!") sys.exit(-1) - args.api = public_api(args.host_url) + args.api = public_api(args.fatcat_api_url) args.func(args) if __name__ == '__main__': -- cgit v1.2.3 From 125c93748920c7a213a5bb572e19b923a3587f8b Mon Sep 17 00:00:00 2001 From: Bryan Newbold Date: Thu, 12 Dec 2019 10:30:39 -0800 Subject: container_issnl, not issnl, for ES release query Caught by Martin in review; Thanks! --- python/fatcat_ingest.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'python/fatcat_ingest.py') diff --git a/python/fatcat_ingest.py b/python/fatcat_ingest.py index 46f46536..9f0bf22e 100755 --- a/python/fatcat_ingest.py +++ b/python/fatcat_ingest.py @@ -44,7 +44,7 @@ def run_ingest_container(args): if args.container_id: s = s.filter("term", container_id=args.container_id) elif args.issnl: - s = s.filter("term", issnl=args.issnl) + s = s.filter("term", container_issnl=args.issnl) elif args.publisher: s = s.query("match", publisher=args.publisher) elif args.name: -- cgit v1.2.3