aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--python/Pipfile2
-rw-r--r--python/Pipfile.lock18
-rwxr-xr-xpython/fatcat_ingest.py134
-rw-r--r--python/fatcat_tools/__init__.py1
-rw-r--r--python/fatcat_tools/importers/ingest.py7
-rw-r--r--python/fatcat_tools/kafka.py22
6 files changed, 181 insertions, 3 deletions
diff --git a/python/Pipfile b/python/Pipfile
index 1c15cab2..ae261053 100644
--- a/python/Pipfile
+++ b/python/Pipfile
@@ -45,6 +45,8 @@ bs4 = "*"
python-magic = "*"
pylatexenc = "*"
pygal = "*"
+elasticsearch-dsl = ">=6.0.0,<7.0.0"
+elasticsearch = ">=6.0.0,<7.0.0"
[requires]
# Python 3.5 is the bundled (system) version of python for Ubuntu 16.04
diff --git a/python/Pipfile.lock b/python/Pipfile.lock
index 5a8ef462..8ced4d88 100644
--- a/python/Pipfile.lock
+++ b/python/Pipfile.lock
@@ -1,7 +1,7 @@
{
"_meta": {
"hash": {
- "sha256": "e2e05ace1d00d2859f8942ac21bbb7fcbde9dc2f28df74f4173b3dfd7c7d3932"
+ "sha256": "7ef50f1c42fdcd59e1016ca4581bf6ab1fe60cd3133417532eddb107dd402c8d"
},
"pipfile-spec": 6,
"requires": {
@@ -139,6 +139,22 @@
"index": "pypi",
"version": "==1.1.0"
},
+ "elasticsearch": {
+ "hashes": [
+ "sha256:1f0f633e3b500d5042424f75a505badf8c4b9962c1b4734cdfb3087fb67920be",
+ "sha256:fb5ab15ee283f104b5a7a5695c7e879cb2927e4eb5aed9c530811590b41259ad"
+ ],
+ "index": "pypi",
+ "version": "==6.4.0"
+ },
+ "elasticsearch-dsl": {
+ "hashes": [
+ "sha256:26416f4dd46ceca43d62ef74970d9de4bdd6f4b0f163316f0b432c9e61a08bec",
+ "sha256:f60aea7fd756ac1fbe7ce114bbf4949aefbf495dfe8896640e787c67344f12f6"
+ ],
+ "index": "pypi",
+ "version": "==6.4.0"
+ },
"fatcat-openapi-client": {
"path": "./../python_openapi_client"
},
diff --git a/python/fatcat_ingest.py b/python/fatcat_ingest.py
new file mode 100755
index 00000000..9f0bf22e
--- /dev/null
+++ b/python/fatcat_ingest.py
@@ -0,0 +1,134 @@
+#!/usr/bin/env python3
+
+"""
+Intended to be a command line interface to "Save Paper Now" and ingest
+request/response.
+"""
+
+import sys
+import json
+import argparse
+from collections import Counter
+
+from fatcat_tools import public_api, simple_kafka_producer, kafka_fail_fast
+from fatcat_tools.transforms import release_ingest_request
+import elasticsearch
+from elasticsearch_dsl import Search
+
+
+def run_ingest_container(args):
+ """
+ This command queries elasticsearch for releases from a given container (eg,
+ journal), and prepares ingest requests for them.
+
+ By default it filters to releases which don't have any fulltext files
+ archived in IA, and dumps the ingest requests as JSON.
+ """
+
+ # ensure API connection works
+ args.api.get_changelog()
+
+ kafka_producer = None
+ ingest_file_request_topic = "sandcrawler-{}.ingest-file-requests".format(args.env)
+ if args.enqueue_kafka:
+ print("Will send ingest requests to kafka topic: {}".format(ingest_file_request_topic), file=sys.stderr)
+ kafka_producer = simple_kafka_producer(args.kafka_hosts)
+
+ client = elasticsearch.Elasticsearch(args.elasticsearch_endpoint)
+
+ s = Search(using=client, index="fatcat_release") \
+ .filter("term", in_ia=False) \
+ .filter("term", is_oa=True)
+
+ # filter/query by container
+ if args.container_id:
+ s = s.filter("term", container_id=args.container_id)
+ elif args.issnl:
+ s = s.filter("term", container_issnl=args.issnl)
+ elif args.publisher:
+ s = s.query("match", publisher=args.publisher)
+ elif args.name:
+ s = s.query("match", container_name=args.name)
+ else:
+ print("You must supply at least one query/filter parameter! Eg, ISSN-L", file=sys.stderr)
+ sys.exit(-1)
+
+ counts = Counter({'ingest_request': 0, 'elasticsearch_release': 0, 'estimate': 0})
+ counts['estimate'] = s.count()
+ print("Expecting {} release objects in search queries".format(counts['estimate']), file=sys.stderr)
+
+ # don't try to clean up scroll if we are connected to public server (behind
+ # nginx proxy that disallows DELETE)
+ if args.elasticsearch_endpoint in (
+ 'https://search.fatcat.wiki',
+ 'https://search.qa.fatcat.wiki'):
+ s = s.params(clear_scroll=False)
+
+ results = s.scan()
+ for esr in results:
+ counts['elasticsearch_release'] += 1
+ release = args.api.get_release(esr.ident)
+ ingest_request = release_ingest_request(
+ release,
+ oa_only=False,
+ ingest_request_source="fatcat-ingest-container",
+ )
+ if not ingest_request:
+ continue
+ if kafka_producer != None:
+ kafka_producer.produce(
+ ingest_file_request_topic,
+ json.dumps(ingest_request).encode('utf-8'),
+ #key=None,
+ on_delivery=kafka_fail_fast,
+ )
+ counts['kafka'] += 1
+ # also printing to stdout when in kafka mode; could skip?
+ print(json.dumps(ingest_request))
+ counts['ingest_request'] += 1
+ if kafka_producer != None:
+ kafka_producer.flush()
+ print(counts, file=sys.stderr)
+
+def main():
+ parser = argparse.ArgumentParser(
+ formatter_class=argparse.ArgumentDefaultsHelpFormatter)
+ parser.add_argument('--fatcat-api-url',
+ default="http://localhost:9411/v0",
+ help="connect to this host/port")
+ parser.add_argument('--enqueue-kafka',
+ action='store_true',
+ help="send ingest requests directly to sandcrawler kafka topic for processing")
+ parser.add_argument('--kafka-hosts',
+ default="localhost:9092",
+ help="list of Kafka brokers (host/port) to use")
+ parser.add_argument('--elasticsearch-endpoint',
+ default="https://search.fatcat.wiki",
+ help="elasticsearch API. internal endpoint prefered, but public is default")
+ parser.add_argument('--env',
+ default="dev",
+ help="Kafka topic namespace to use (eg, prod, qa, dev)")
+ subparsers = parser.add_subparsers()
+
+ sub_ingest_container = subparsers.add_parser('ingest-container',
+ help="Create ingest requests for releases from a specific container")
+ sub_ingest_container.set_defaults(func=run_ingest_container)
+ sub_ingest_container.add_argument('--container-id',
+ help="fatcat container entity ident")
+ sub_ingest_container.add_argument('--issnl',
+ help="ISSN-L of container entity")
+ sub_ingest_container.add_argument('--publisher',
+ help="publisher name")
+ sub_ingest_container.add_argument('--name',
+ help="container name")
+
+ args = parser.parse_args()
+ if not args.__dict__.get("func"):
+ print("tell me what to do!")
+ sys.exit(-1)
+
+ args.api = public_api(args.fatcat_api_url)
+ args.func(args)
+
+if __name__ == '__main__':
+ main()
diff --git a/python/fatcat_tools/__init__.py b/python/fatcat_tools/__init__.py
index f2798f0b..78c5c90b 100644
--- a/python/fatcat_tools/__init__.py
+++ b/python/fatcat_tools/__init__.py
@@ -2,3 +2,4 @@
from .api_auth import authenticated_api, public_api
from .fcid import fcid2uuid, uuid2fcid
from .transforms import *
+from .kafka import simple_kafka_producer, kafka_fail_fast
diff --git a/python/fatcat_tools/importers/ingest.py b/python/fatcat_tools/importers/ingest.py
index 7dad13ce..deb4ef51 100644
--- a/python/fatcat_tools/importers/ingest.py
+++ b/python/fatcat_tools/importers/ingest.py
@@ -28,13 +28,16 @@ class IngestFileResultImporter(EntityImporter):
print("Requiring GROBID status == 200")
else:
print("NOT checking GROBID success")
- self.ingest_request_source_whitelist = ['fatcat-changelog']
+ self.ingest_request_source_whitelist = [
+ 'fatcat-changelog',
+ 'fatcat-ingest-container',
+ ]
if kwargs.get('skip_source_whitelist', False):
self.ingest_request_source_whitelist = []
def want(self, row):
"""
- Logic here probably needs work:
+ Logic here probably needs work (TODO):
- Direct ingests via DOI from fatcat-changelog should probably go
through regardless of GROBID status
diff --git a/python/fatcat_tools/kafka.py b/python/fatcat_tools/kafka.py
new file mode 100644
index 00000000..53b62a37
--- /dev/null
+++ b/python/fatcat_tools/kafka.py
@@ -0,0 +1,22 @@
+
+from confluent_kafka import Consumer, Producer, KafkaException
+
+
+def kafka_fail_fast(err, msg):
+ if err is not None:
+ print("Kafka producer delivery error: {}".format(err))
+ print("Bailing out...")
+ # TODO: should it be sys.exit(-1)?
+ raise KafkaException(err)
+
+def simple_kafka_producer(kafka_hosts):
+
+ kafka_config = {
+ 'bootstrap.servers': kafka_hosts,
+ 'message.max.bytes': 20000000, # ~20 MBytes; broker-side max is ~50 MBytes
+ 'delivery.report.only.error': True,
+ 'default.topic.config': {
+ 'request.required.acks': -1,
+ },
+ }
+ return Producer(kafka_config)