diff options
author | bnewbold <bnewbold@archive.org> | 2019-12-12 18:52:46 +0000 |
---|---|---|
committer | bnewbold <bnewbold@archive.org> | 2019-12-12 18:52:46 +0000 |
commit | 374ed6ccac6191461616ac3df85daf3a3a9ab2ed (patch) | |
tree | c3cd2e912eaa97ada104303a9fb5c857e60a3a33 | |
parent | 7831f78cc9ccef7331c9176dbecb34f8afc9b54f (diff) | |
parent | 125c93748920c7a213a5bb572e19b923a3587f8b (diff) | |
download | fatcat-374ed6ccac6191461616ac3df85daf3a3a9ab2ed.tar.gz fatcat-374ed6ccac6191461616ac3df85daf3a3a9ab2ed.zip |
Merge branch 'bnewbold-ingest-oa-container' into 'master'
container-ingest tool
See merge request webgroup/fatcat!8
-rw-r--r-- | python/Pipfile | 2 | ||||
-rw-r--r-- | python/Pipfile.lock | 18 | ||||
-rwxr-xr-x | python/fatcat_ingest.py | 134 | ||||
-rw-r--r-- | python/fatcat_tools/__init__.py | 1 | ||||
-rw-r--r-- | python/fatcat_tools/importers/ingest.py | 7 | ||||
-rw-r--r-- | python/fatcat_tools/kafka.py | 22 |
6 files changed, 181 insertions, 3 deletions
diff --git a/python/Pipfile b/python/Pipfile index 1c15cab2..ae261053 100644 --- a/python/Pipfile +++ b/python/Pipfile @@ -45,6 +45,8 @@ bs4 = "*" python-magic = "*" pylatexenc = "*" pygal = "*" +elasticsearch-dsl = ">=6.0.0,<7.0.0" +elasticsearch = ">=6.0.0,<7.0.0" [requires] # Python 3.5 is the bundled (system) version of python for Ubuntu 16.04 diff --git a/python/Pipfile.lock b/python/Pipfile.lock index 5a8ef462..8ced4d88 100644 --- a/python/Pipfile.lock +++ b/python/Pipfile.lock @@ -1,7 +1,7 @@ { "_meta": { "hash": { - "sha256": "e2e05ace1d00d2859f8942ac21bbb7fcbde9dc2f28df74f4173b3dfd7c7d3932" + "sha256": "7ef50f1c42fdcd59e1016ca4581bf6ab1fe60cd3133417532eddb107dd402c8d" }, "pipfile-spec": 6, "requires": { @@ -139,6 +139,22 @@ "index": "pypi", "version": "==1.1.0" }, + "elasticsearch": { + "hashes": [ + "sha256:1f0f633e3b500d5042424f75a505badf8c4b9962c1b4734cdfb3087fb67920be", + "sha256:fb5ab15ee283f104b5a7a5695c7e879cb2927e4eb5aed9c530811590b41259ad" + ], + "index": "pypi", + "version": "==6.4.0" + }, + "elasticsearch-dsl": { + "hashes": [ + "sha256:26416f4dd46ceca43d62ef74970d9de4bdd6f4b0f163316f0b432c9e61a08bec", + "sha256:f60aea7fd756ac1fbe7ce114bbf4949aefbf495dfe8896640e787c67344f12f6" + ], + "index": "pypi", + "version": "==6.4.0" + }, "fatcat-openapi-client": { "path": "./../python_openapi_client" }, diff --git a/python/fatcat_ingest.py b/python/fatcat_ingest.py new file mode 100755 index 00000000..9f0bf22e --- /dev/null +++ b/python/fatcat_ingest.py @@ -0,0 +1,134 @@ +#!/usr/bin/env python3 + +""" +Intended to be a command line interface to "Save Paper Now" and ingest +request/response. +""" + +import sys +import json +import argparse +from collections import Counter + +from fatcat_tools import public_api, simple_kafka_producer, kafka_fail_fast +from fatcat_tools.transforms import release_ingest_request +import elasticsearch +from elasticsearch_dsl import Search + + +def run_ingest_container(args): + """ + This command queries elasticsearch for releases from a given container (eg, + journal), and prepares ingest requests for them. + + By default it filters to releases which don't have any fulltext files + archived in IA, and dumps the ingest requests as JSON. + """ + + # ensure API connection works + args.api.get_changelog() + + kafka_producer = None + ingest_file_request_topic = "sandcrawler-{}.ingest-file-requests".format(args.env) + if args.enqueue_kafka: + print("Will send ingest requests to kafka topic: {}".format(ingest_file_request_topic), file=sys.stderr) + kafka_producer = simple_kafka_producer(args.kafka_hosts) + + client = elasticsearch.Elasticsearch(args.elasticsearch_endpoint) + + s = Search(using=client, index="fatcat_release") \ + .filter("term", in_ia=False) \ + .filter("term", is_oa=True) + + # filter/query by container + if args.container_id: + s = s.filter("term", container_id=args.container_id) + elif args.issnl: + s = s.filter("term", container_issnl=args.issnl) + elif args.publisher: + s = s.query("match", publisher=args.publisher) + elif args.name: + s = s.query("match", container_name=args.name) + else: + print("You must supply at least one query/filter parameter! Eg, ISSN-L", file=sys.stderr) + sys.exit(-1) + + counts = Counter({'ingest_request': 0, 'elasticsearch_release': 0, 'estimate': 0}) + counts['estimate'] = s.count() + print("Expecting {} release objects in search queries".format(counts['estimate']), file=sys.stderr) + + # don't try to clean up scroll if we are connected to public server (behind + # nginx proxy that disallows DELETE) + if args.elasticsearch_endpoint in ( + 'https://search.fatcat.wiki', + 'https://search.qa.fatcat.wiki'): + s = s.params(clear_scroll=False) + + results = s.scan() + for esr in results: + counts['elasticsearch_release'] += 1 + release = args.api.get_release(esr.ident) + ingest_request = release_ingest_request( + release, + oa_only=False, + ingest_request_source="fatcat-ingest-container", + ) + if not ingest_request: + continue + if kafka_producer != None: + kafka_producer.produce( + ingest_file_request_topic, + json.dumps(ingest_request).encode('utf-8'), + #key=None, + on_delivery=kafka_fail_fast, + ) + counts['kafka'] += 1 + # also printing to stdout when in kafka mode; could skip? + print(json.dumps(ingest_request)) + counts['ingest_request'] += 1 + if kafka_producer != None: + kafka_producer.flush() + print(counts, file=sys.stderr) + +def main(): + parser = argparse.ArgumentParser( + formatter_class=argparse.ArgumentDefaultsHelpFormatter) + parser.add_argument('--fatcat-api-url', + default="http://localhost:9411/v0", + help="connect to this host/port") + parser.add_argument('--enqueue-kafka', + action='store_true', + help="send ingest requests directly to sandcrawler kafka topic for processing") + parser.add_argument('--kafka-hosts', + default="localhost:9092", + help="list of Kafka brokers (host/port) to use") + parser.add_argument('--elasticsearch-endpoint', + default="https://search.fatcat.wiki", + help="elasticsearch API. internal endpoint prefered, but public is default") + parser.add_argument('--env', + default="dev", + help="Kafka topic namespace to use (eg, prod, qa, dev)") + subparsers = parser.add_subparsers() + + sub_ingest_container = subparsers.add_parser('ingest-container', + help="Create ingest requests for releases from a specific container") + sub_ingest_container.set_defaults(func=run_ingest_container) + sub_ingest_container.add_argument('--container-id', + help="fatcat container entity ident") + sub_ingest_container.add_argument('--issnl', + help="ISSN-L of container entity") + sub_ingest_container.add_argument('--publisher', + help="publisher name") + sub_ingest_container.add_argument('--name', + help="container name") + + args = parser.parse_args() + if not args.__dict__.get("func"): + print("tell me what to do!") + sys.exit(-1) + + args.api = public_api(args.fatcat_api_url) + args.func(args) + +if __name__ == '__main__': + main() diff --git a/python/fatcat_tools/__init__.py b/python/fatcat_tools/__init__.py index f2798f0b..78c5c90b 100644 --- a/python/fatcat_tools/__init__.py +++ b/python/fatcat_tools/__init__.py @@ -2,3 +2,4 @@ from .api_auth import authenticated_api, public_api from .fcid import fcid2uuid, uuid2fcid from .transforms import * +from .kafka import simple_kafka_producer, kafka_fail_fast diff --git a/python/fatcat_tools/importers/ingest.py b/python/fatcat_tools/importers/ingest.py index 7dad13ce..deb4ef51 100644 --- a/python/fatcat_tools/importers/ingest.py +++ b/python/fatcat_tools/importers/ingest.py @@ -28,13 +28,16 @@ class IngestFileResultImporter(EntityImporter): print("Requiring GROBID status == 200") else: print("NOT checking GROBID success") - self.ingest_request_source_whitelist = ['fatcat-changelog'] + self.ingest_request_source_whitelist = [ + 'fatcat-changelog', + 'fatcat-ingest-container', + ] if kwargs.get('skip_source_whitelist', False): self.ingest_request_source_whitelist = [] def want(self, row): """ - Logic here probably needs work: + Logic here probably needs work (TODO): - Direct ingests via DOI from fatcat-changelog should probably go through regardless of GROBID status diff --git a/python/fatcat_tools/kafka.py b/python/fatcat_tools/kafka.py new file mode 100644 index 00000000..53b62a37 --- /dev/null +++ b/python/fatcat_tools/kafka.py @@ -0,0 +1,22 @@ + +from confluent_kafka import Consumer, Producer, KafkaException + + +def kafka_fail_fast(err, msg): + if err is not None: + print("Kafka producer delivery error: {}".format(err)) + print("Bailing out...") + # TODO: should it be sys.exit(-1)? + raise KafkaException(err) + +def simple_kafka_producer(kafka_hosts): + + kafka_config = { + 'bootstrap.servers': kafka_hosts, + 'message.max.bytes': 20000000, # ~20 MBytes; broker-side max is ~50 MBytes + 'delivery.report.only.error': True, + 'default.topic.config': { + 'request.required.acks': -1, + }, + } + return Producer(kafka_config) |