From 9fd2afc91d43ff4f0d81d66f19ea4351c2168411 Mon Sep 17 00:00:00 2001 From: Bryan Newbold Date: Tue, 10 Dec 2019 08:50:09 -0800 Subject: pipenv: add elasticsearch and elasticsearch-dsl libraries These are low-level and high-level (respectively) client wrappers for elasticsearch --- python/Pipfile | 2 ++ python/Pipfile.lock | 18 +++++++++++++++++- 2 files changed, 19 insertions(+), 1 deletion(-) (limited to 'python') diff --git a/python/Pipfile b/python/Pipfile index 1c15cab2..ae261053 100644 --- a/python/Pipfile +++ b/python/Pipfile @@ -45,6 +45,8 @@ bs4 = "*" python-magic = "*" pylatexenc = "*" pygal = "*" +elasticsearch-dsl = ">=6.0.0,<7.0.0" +elasticsearch = ">=6.0.0,<7.0.0" [requires] # Python 3.5 is the bundled (system) version of python for Ubuntu 16.04 diff --git a/python/Pipfile.lock b/python/Pipfile.lock index 5a8ef462..8ced4d88 100644 --- a/python/Pipfile.lock +++ b/python/Pipfile.lock @@ -1,7 +1,7 @@ { "_meta": { "hash": { - "sha256": "e2e05ace1d00d2859f8942ac21bbb7fcbde9dc2f28df74f4173b3dfd7c7d3932" + "sha256": "7ef50f1c42fdcd59e1016ca4581bf6ab1fe60cd3133417532eddb107dd402c8d" }, "pipfile-spec": 6, "requires": { @@ -139,6 +139,22 @@ "index": "pypi", "version": "==1.1.0" }, + "elasticsearch": { + "hashes": [ + "sha256:1f0f633e3b500d5042424f75a505badf8c4b9962c1b4734cdfb3087fb67920be", + "sha256:fb5ab15ee283f104b5a7a5695c7e879cb2927e4eb5aed9c530811590b41259ad" + ], + "index": "pypi", + "version": "==6.4.0" + }, + "elasticsearch-dsl": { + "hashes": [ + "sha256:26416f4dd46ceca43d62ef74970d9de4bdd6f4b0f163316f0b432c9e61a08bec", + "sha256:f60aea7fd756ac1fbe7ce114bbf4949aefbf495dfe8896640e787c67344f12f6" + ], + "index": "pypi", + "version": "==6.4.0" + }, "fatcat-openapi-client": { "path": "./../python_openapi_client" }, -- cgit v1.2.3 From c0af75e31983852b15e430260dd67f220f31981a Mon Sep 17 00:00:00 2001 From: Bryan Newbold Date: Tue, 10 Dec 2019 10:20:21 -0800 Subject: add another ingest request source to whitelist --- python/fatcat_tools/importers/ingest.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) (limited to 'python') diff --git a/python/fatcat_tools/importers/ingest.py b/python/fatcat_tools/importers/ingest.py index 7dad13ce..deb4ef51 100644 --- a/python/fatcat_tools/importers/ingest.py +++ b/python/fatcat_tools/importers/ingest.py @@ -28,13 +28,16 @@ class IngestFileResultImporter(EntityImporter): print("Requiring GROBID status == 200") else: print("NOT checking GROBID success") - self.ingest_request_source_whitelist = ['fatcat-changelog'] + self.ingest_request_source_whitelist = [ + 'fatcat-changelog', + 'fatcat-ingest-container', + ] if kwargs.get('skip_source_whitelist', False): self.ingest_request_source_whitelist = [] def want(self, row): """ - Logic here probably needs work: + Logic here probably needs work (TODO): - Direct ingests via DOI from fatcat-changelog should probably go through regardless of GROBID status -- cgit v1.2.3 From a7736d91665f6a98090cd448d02f1542aec6c180 Mon Sep 17 00:00:00 2001 From: Bryan Newbold Date: Tue, 10 Dec 2019 10:21:21 -0800 Subject: factor out some basic kafka helpers --- python/fatcat_tools/__init__.py | 1 + python/fatcat_tools/kafka.py | 22 ++++++++++++++++++++++ 2 files changed, 23 insertions(+) create mode 100644 python/fatcat_tools/kafka.py (limited to 'python') diff --git a/python/fatcat_tools/__init__.py b/python/fatcat_tools/__init__.py index f2798f0b..78c5c90b 100644 --- a/python/fatcat_tools/__init__.py +++ b/python/fatcat_tools/__init__.py @@ -2,3 +2,4 @@ from .api_auth import authenticated_api, public_api from .fcid import fcid2uuid, uuid2fcid from .transforms import * +from .kafka import simple_kafka_producer, kafka_fail_fast diff --git a/python/fatcat_tools/kafka.py b/python/fatcat_tools/kafka.py new file mode 100644 index 00000000..53b62a37 --- /dev/null +++ b/python/fatcat_tools/kafka.py @@ -0,0 +1,22 @@ + +from confluent_kafka import Consumer, Producer, KafkaException + + +def kafka_fail_fast(err, msg): + if err is not None: + print("Kafka producer delivery error: {}".format(err)) + print("Bailing out...") + # TODO: should it be sys.exit(-1)? + raise KafkaException(err) + +def simple_kafka_producer(kafka_hosts): + + kafka_config = { + 'bootstrap.servers': kafka_hosts, + 'message.max.bytes': 20000000, # ~20 MBytes; broker-side max is ~50 MBytes + 'delivery.report.only.error': True, + 'default.topic.config': { + 'request.required.acks': -1, + }, + } + return Producer(kafka_config) -- cgit v1.2.3 From 7838a3c15a82281eec435ef16aad63e97015bdfc Mon Sep 17 00:00:00 2001 From: Bryan Newbold Date: Tue, 10 Dec 2019 10:29:39 -0800 Subject: add ingest-container command (new CLI tool) The intent of this tool is to make it easy to enque ingest requests into kafka, to be processed by a worker pool and eventually end up inserted into fatcat (for ingest hits that pass various checks). As a specific example use-case, we have pretty good coverage of eLife (a prominent OA publisher), but have missed some publications in the past, and have a large gap for the year 2019: https://fatcat.wiki/container/en4qj5ijrbf5djxx7p5zzpjyoq/coverage This tool would make it trivial to enqueue all the missing releases to be crawled. Future variants on this tool could query for, eg, long-tail OA works. --- python/fatcat_ingest.py | 136 ++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 136 insertions(+) create mode 100755 python/fatcat_ingest.py (limited to 'python') diff --git a/python/fatcat_ingest.py b/python/fatcat_ingest.py new file mode 100755 index 00000000..36bc530b --- /dev/null +++ b/python/fatcat_ingest.py @@ -0,0 +1,136 @@ +#!/usr/bin/env python3 + +""" +Intended to be a command line interface to "Save Paper Now" and ingest +request/response. +""" + +import sys +import json +import argparse +from collections import Counter + +from fatcat_tools import public_api, simple_kafka_producer, kafka_fail_fast +from fatcat_tools.transforms import release_ingest_request +import elasticsearch +from elasticsearch_dsl import Search + + +def run_ingest_container(args): + """ + This command queries elasticsearch for releases from a given container (eg, + journal), and prepares ingest requests for them. + + By default it filters to releases which don't have any fulltext files + archived in IA, and dumps the ingest requests as JSON. + """ + + # ensure API connection works + args.api.get_changelog() + + kafka_producer = None + ingest_file_request_topic = "sandcrawler-{}.ingest-file-requests".format(args.env) + if args.enqueue_kafka: + print("Will send ingest requests to kafka topic: {}".format(ingest_file_request_topic), file=sys.stderr) + kafka_producer = simple_kafka_producer(args.kafka_hosts) + + client = elasticsearch.Elasticsearch(args.elasticsearch_endpoint) + + s = Search(using=client, index="fatcat_release") \ + .filter("term", in_ia=False) \ + .filter("term", is_oa=True) + + # filter/query by container + if args.container_id: + s = s.filter("term", container_id=args.container_id) + elif args.issnl: + s = s.filter("term", issnl=args.issnl) + elif args.publisher: + s = s.query("match", publisher=args.publisher) + elif args.name: + s = s.query("match", container_name=args.name) + else: + print("You must supply at least one query/filter parameter! Eg, ISSN-L", file=sys.stderr) + sys.exit(-1) + + counts = Counter({'ingest_request': 0, 'elasticsearch_release': 0, 'estimate': 0}) + counts['estimate'] = s.count() + print("Expecting {} release objects in search queries".format(counts['estimate']), file=sys.stderr) + + # TODO: handling the scroll DELETE with the exception pass below is messy + # because it is usually accompanied with a generator cleanup that doesn't + # work (?) + results = s.scan() + try: + for esr in results: + counts['elasticsearch_release'] += 1 + release = args.api.get_release(esr.ident) + ingest_request = release_ingest_request( + release, + oa_only=False, + ingest_request_source="fatcat-ingest-container", + ) + if not ingest_request: + continue + if kafka_producer != None: + kafka_producer.produce( + ingest_file_request_topic, + json.dumps(ingest_request).encode('utf-8'), + #key=None, + on_delivery=kafka_fail_fast, + ) + counts['kafka'] += 1 + # also printing to stdout when in kafka mode; could skip? + print(json.dumps(ingest_request)) + counts['ingest_request'] += 1 + except elasticsearch.exceptions.AuthorizationException: + print("Ignoring Auth exception, usually due to DELETE on scan scroll", file=sys.stderr) + finally: + if kafka_producer != None: + kafka_producer.flush() + print(counts, file=sys.stderr) + +def main(): + parser = argparse.ArgumentParser() + parser.add_argument('--debug', + action='store_true', + help="enable debugging interface") + parser.add_argument('--host-url', + default="http://localhost:9411/v0", + help="connect to this host/port") + parser.add_argument('--enqueue-kafka', + action='store_true', + help="send ingest requests directly to sandcrawler kafka topic for processing") + parser.add_argument('--kafka-hosts', + default="localhost:9092", + help="list of Kafka brokers (host/port) to use") + parser.add_argument('--elasticsearch-endpoint', + default="https://search.fatcat.wiki", + help="elasticsearch API. internal endpoint prefered, but public is default") + parser.add_argument('--env', + default="dev", + help="Kafka topic namespace to use (eg, prod, qa, dev)") + subparsers = parser.add_subparsers() + + sub_ingest_container = subparsers.add_parser('ingest-container', + help="Create ingest requests for releases from a specific container") + sub_ingest_container.set_defaults(func=run_ingest_container) + sub_ingest_container.add_argument('--container-id', + help="fatcat container entity ident") + sub_ingest_container.add_argument('--issnl', + help="ISSN-L of container entity") + sub_ingest_container.add_argument('--publisher', + help="publisher name") + sub_ingest_container.add_argument('--name', + help="container name") + + args = parser.parse_args() + if not args.__dict__.get("func"): + print("tell me what to do!") + sys.exit(-1) + + args.api = public_api(args.host_url) + args.func(args) + +if __name__ == '__main__': + main() -- cgit v1.2.3 From 4c716f9e39046fde3e98a3686a5f086f3d53315a Mon Sep 17 00:00:00 2001 From: Bryan Newbold Date: Wed, 11 Dec 2019 16:54:57 -0800 Subject: simplify ES scroll deletion using param() This gets rid of some mess error handling code by properly configuring the elasticsearch client to just not clean up scroll iterators when accessing the public (prod or qa) search interfaces. Leaving the scroll state around isn't ideal, so we still delete them if possible (eg, connecting directly to elasticsearch). Thanks to Martin for pointing out this solution in review. --- python/fatcat_ingest.py | 58 ++++++++++++++++++++++++------------------------- 1 file changed, 29 insertions(+), 29 deletions(-) (limited to 'python') diff --git a/python/fatcat_ingest.py b/python/fatcat_ingest.py index 36bc530b..05b7e848 100755 --- a/python/fatcat_ingest.py +++ b/python/fatcat_ingest.py @@ -57,37 +57,37 @@ def run_ingest_container(args): counts['estimate'] = s.count() print("Expecting {} release objects in search queries".format(counts['estimate']), file=sys.stderr) - # TODO: handling the scroll DELETE with the exception pass below is messy - # because it is usually accompanied with a generator cleanup that doesn't - # work (?) + # don't try to clean up scroll if we are connected to public server (behind + # nginx proxy that disallows DELETE) + if args.elasticsearch_endpoint in ( + 'https://search.fatcat.wiki', + 'https://search.qa.fatcat.wiki'): + s = s.params(clear_scroll=False) + results = s.scan() - try: - for esr in results: - counts['elasticsearch_release'] += 1 - release = args.api.get_release(esr.ident) - ingest_request = release_ingest_request( - release, - oa_only=False, - ingest_request_source="fatcat-ingest-container", - ) - if not ingest_request: - continue - if kafka_producer != None: - kafka_producer.produce( - ingest_file_request_topic, - json.dumps(ingest_request).encode('utf-8'), - #key=None, - on_delivery=kafka_fail_fast, - ) - counts['kafka'] += 1 - # also printing to stdout when in kafka mode; could skip? - print(json.dumps(ingest_request)) - counts['ingest_request'] += 1 - except elasticsearch.exceptions.AuthorizationException: - print("Ignoring Auth exception, usually due to DELETE on scan scroll", file=sys.stderr) - finally: + for esr in results: + counts['elasticsearch_release'] += 1 + release = args.api.get_release(esr.ident) + ingest_request = release_ingest_request( + release, + oa_only=False, + ingest_request_source="fatcat-ingest-container", + ) + if not ingest_request: + continue if kafka_producer != None: - kafka_producer.flush() + kafka_producer.produce( + ingest_file_request_topic, + json.dumps(ingest_request).encode('utf-8'), + #key=None, + on_delivery=kafka_fail_fast, + ) + counts['kafka'] += 1 + # also printing to stdout when in kafka mode; could skip? + print(json.dumps(ingest_request)) + counts['ingest_request'] += 1 + if kafka_producer != None: + kafka_producer.flush() print(counts, file=sys.stderr) def main(): -- cgit v1.2.3 From 44a3f1286a81ad5565a925b766124f05165c492f Mon Sep 17 00:00:00 2001 From: Bryan Newbold Date: Wed, 11 Dec 2019 16:57:18 -0800 Subject: improve argparse usage --fatcat-api-url is clearer than --host-url remove unimplemented --debug (copy/paste from webface argparse) use formater which will display 'default' parameters with --help Thanks to Martin for pointing out the later, which i've always wanted! --- python/fatcat_ingest.py | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) (limited to 'python') diff --git a/python/fatcat_ingest.py b/python/fatcat_ingest.py index 05b7e848..46f46536 100755 --- a/python/fatcat_ingest.py +++ b/python/fatcat_ingest.py @@ -91,11 +91,9 @@ def run_ingest_container(args): print(counts, file=sys.stderr) def main(): - parser = argparse.ArgumentParser() - parser.add_argument('--debug', - action='store_true', - help="enable debugging interface") - parser.add_argument('--host-url', + parser = argparse.ArgumentParser( + formatter_class=argparse.ArgumentDefaultsHelpFormatter) + parser.add_argument('--fatcat-api-url', default="http://localhost:9411/v0", help="connect to this host/port") parser.add_argument('--enqueue-kafka', @@ -129,7 +127,7 @@ def main(): print("tell me what to do!") sys.exit(-1) - args.api = public_api(args.host_url) + args.api = public_api(args.fatcat_api_url) args.func(args) if __name__ == '__main__': -- cgit v1.2.3 From 125c93748920c7a213a5bb572e19b923a3587f8b Mon Sep 17 00:00:00 2001 From: Bryan Newbold Date: Thu, 12 Dec 2019 10:30:39 -0800 Subject: container_issnl, not issnl, for ES release query Caught by Martin in review; Thanks! --- python/fatcat_ingest.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'python') diff --git a/python/fatcat_ingest.py b/python/fatcat_ingest.py index 46f46536..9f0bf22e 100755 --- a/python/fatcat_ingest.py +++ b/python/fatcat_ingest.py @@ -44,7 +44,7 @@ def run_ingest_container(args): if args.container_id: s = s.filter("term", container_id=args.container_id) elif args.issnl: - s = s.filter("term", issnl=args.issnl) + s = s.filter("term", container_issnl=args.issnl) elif args.publisher: s = s.query("match", publisher=args.publisher) elif args.name: -- cgit v1.2.3