aboutsummaryrefslogtreecommitdiffstats
path: root/python/fatcat_ingest.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/fatcat_ingest.py')
-rwxr-xr-xpython/fatcat_ingest.py188
1 files changed, 147 insertions, 41 deletions
diff --git a/python/fatcat_ingest.py b/python/fatcat_ingest.py
index 6ce36974..6fda74c5 100755
--- a/python/fatcat_ingest.py
+++ b/python/fatcat_ingest.py
@@ -11,7 +11,7 @@ import argparse
from collections import Counter
import raven
import elasticsearch
-from elasticsearch_dsl import Search
+from elasticsearch_dsl import Search, Q
from fatcat_tools import public_api, simple_kafka_producer, kafka_fail_fast
from fatcat_tools.transforms import release_ingest_request
@@ -21,45 +21,54 @@ from fatcat_tools.transforms import release_ingest_request
sentry_client = raven.Client()
-def run_ingest_container(args):
- """
- This command queries elasticsearch for releases from a given container (eg,
- journal), and prepares ingest requests for them.
-
- By default it filters to releases which don't have any fulltext files
- archived in IA, and dumps the ingest requests as JSON.
- """
+def _init_search(args):
# ensure API connection works
args.api.get_changelog()
+ client = elasticsearch.Elasticsearch(args.elasticsearch_endpoint)
+ search = Search(using=client, index="fatcat_release")
+ return search
+
+
+def _run_search_dump(args, search):
+
+ if args.dry_run:
+ print("=== THIS IS A DRY RUN ===")
+
kafka_producer = None
ingest_file_request_topic = "sandcrawler-{}.ingest-file-requests".format(args.env)
if args.enqueue_kafka:
print("Will send ingest requests to kafka topic: {}".format(ingest_file_request_topic), file=sys.stderr)
kafka_producer = simple_kafka_producer(args.kafka_hosts)
- client = elasticsearch.Elasticsearch(args.elasticsearch_endpoint)
-
- s = Search(using=client, index="fatcat_release") \
- .filter("term", in_ia=False) \
- .filter("term", is_oa=True)
-
- # filter/query by container
- if args.container_id:
- s = s.filter("term", container_id=args.container_id)
- elif args.issnl:
- s = s.filter("term", container_issnl=args.issnl)
- elif args.publisher:
- s = s.query("match", publisher=args.publisher)
- elif args.name:
- s = s.query("match", container_name=args.name)
+ if args.limit is not None:
+ search = search[:args.limit]
+
+ if args.before_year:
+ search = search \
+ .filter("exists", field="release_year") \
+ .filter("range", release_date=dict(lt=args.before_year))
+ if args.after_year:
+ search = search \
+ .filter("exists", field="release_year") \
+ .filter("range", release_date=dict(gte=args.after_year))
+
+ if not args.allow_non_oa:
+ search = search.filter("term", is_oa=True)
+
+ if args.release_types:
+ release_types = args.release_types.split(',')
+ search = search \
+ .filter("terms", release_type=release_types)
else:
- print("You must supply at least one query/filter parameter! Eg, ISSN-L", file=sys.stderr)
- sys.exit(-1)
+ search = search \
+ .filter("bool", must_not=[
+ Q("terms", release_type=["stub", "component"])
+ ])
counts = Counter({'ingest_request': 0, 'elasticsearch_release': 0, 'estimate': 0})
- counts['estimate'] = s.count()
+ counts['estimate'] = search.count()
print("Expecting {} release objects in search queries".format(counts['estimate']), file=sys.stderr)
# don't try to clean up scroll if we are connected to public server (behind
@@ -67,19 +76,24 @@ def run_ingest_container(args):
if args.elasticsearch_endpoint in (
'https://search.fatcat.wiki',
'https://search.qa.fatcat.wiki'):
- s = s.params(clear_scroll=False)
+ search = search.params(clear_scroll=False)
- results = s.scan()
+ results = search.scan()
for esr in results:
+ if args.limit and counts['ingest_request'] >= args.limit:
+ break
counts['elasticsearch_release'] += 1
release = args.api.get_release(esr.ident)
ingest_request = release_ingest_request(
release,
- ingest_request_source="fatcat-ingest-container",
+ ingest_request_source="fatcat-ingest",
)
if not ingest_request:
continue
- if kafka_producer != None:
+ counts['ingest_request'] += 1
+ if args.dry_run:
+ continue
+ if kafka_producer is not None:
kafka_producer.produce(
ingest_file_request_topic,
json.dumps(ingest_request).encode('utf-8'),
@@ -87,12 +101,73 @@ def run_ingest_container(args):
on_delivery=kafka_fail_fast,
)
counts['kafka'] += 1
- # also printing to stdout when in kafka mode; could skip?
- print(json.dumps(ingest_request))
- counts['ingest_request'] += 1
- if kafka_producer != None:
+ else:
+ print(json.dumps(ingest_request))
+ if kafka_producer is not None:
kafka_producer.flush()
print(counts, file=sys.stderr)
+ if args.dry_run:
+ print("=== THIS WAS A DRY RUN ===")
+
+
+def run_ingest_container(args):
+ """
+ This command queries elasticsearch for releases from a given container (eg,
+ journal), and prepares ingest requests for them.
+
+ By default it filters to releases which don't have any fulltext files
+ archived in IA, and dumps the ingest requests as JSON.
+ """
+
+ search = _init_search(args).filter("term", in_ia=False)
+
+ # filter/query by container
+ if args.container_id:
+ search = search.filter("term", container_id=args.container_id)
+ elif args.issnl:
+ search = search.filter("term", container_issnl=args.issnl)
+ elif args.publisher:
+ search = search.query("match", publisher=args.publisher)
+ elif args.name:
+ search = search.query("match", container_name=args.name)
+ else:
+ print("You must supply at least one query/filter parameter! Eg, ISSN-L", file=sys.stderr)
+ sys.exit(-1)
+
+ return _run_search_dump(args, search)
+
+
+def run_ingest_query(args):
+ """
+ Accepts a free-form Lucene query language string. Intended to work the same
+ way as searches in the fatcat web interface.
+ """
+
+ search = _init_search(args) \
+ .filter("term", in_ia=False) \
+ .query(
+ "query_string",
+ query=args.query,
+ default_operator="AND",
+ analyze_wildcard=True,
+ lenient=True,
+ fields=["title^5", "contrib_names^2", "container_title"],
+ )
+
+ return _run_search_dump(args, search)
+
+
+def run_ingest_extid(args):
+ """
+ Selects release entities where the external identifier (extid) exists
+ """
+
+ search = _init_search(args) \
+ .filter("term", in_ia=False) \
+ .filter("exists", field=args.extid)
+
+ return _run_search_dump(args, search)
+
def main():
parser = argparse.ArgumentParser(
@@ -112,20 +187,51 @@ def main():
parser.add_argument('--env',
default="dev",
help="Kafka topic namespace to use (eg, prod, qa, dev)")
+ parser.add_argument('--limit',
+ default=None,
+ type=int,
+ help="Max number of search hits to return")
+ parser.add_argument('--dry-run',
+ action='store_true',
+ help="runs through creating all ingest requests, but doesn't actually output or enqueue")
+ parser.add_argument('--before-year',
+ type=str,
+ help="filters results to only with release_year before this (not inclusive)")
+ parser.add_argument('--after-year',
+ type=str,
+ help="filters results to only with release_year after this (inclusive)")
+ parser.add_argument('--release-types',
+ type=str,
+ help="filters results to specified release-types, separated by commas. By default, 'stub' is filtered out.")
+ parser.add_argument('--allow-non-oa',
+ action='store_true',
+ help="By default, we limit to OA releases. This removes that filter")
subparsers = parser.add_subparsers()
- sub_ingest_container = subparsers.add_parser('ingest-container',
+ sub_container = subparsers.add_parser('container',
help="Create ingest requests for releases from a specific container")
- sub_ingest_container.set_defaults(func=run_ingest_container)
- sub_ingest_container.add_argument('--container-id',
+ sub_container.set_defaults(func=run_ingest_container)
+ sub_container.add_argument('--container-id',
help="fatcat container entity ident")
- sub_ingest_container.add_argument('--issnl',
+ sub_container.add_argument('--issnl',
help="ISSN-L of container entity")
- sub_ingest_container.add_argument('--publisher',
+ sub_container.add_argument('--publisher',
help="publisher name")
- sub_ingest_container.add_argument('--name',
+ sub_container.add_argument('--name',
help="container name")
+ sub_query = subparsers.add_parser('query',
+ help="Create ingest requests for releases from a specific query")
+ sub_query.set_defaults(func=run_ingest_query)
+ sub_query.add_argument('query',
+ help="search query (same DSL as web interface search)")
+
+ sub_extid = subparsers.add_parser('extid',
+ help="Create ingest requests for releases that have given extid defined")
+ sub_extid.set_defaults(func=run_ingest_extid)
+ sub_extid.add_argument('extid',
+ help="extid short name (as included in ES release schema)")
+
args = parser.parse_args()
if not args.__dict__.get("func"):
print("tell me what to do!")