From 7724b8e7dd39c2acc5ed404afc32720396dea888 Mon Sep 17 00:00:00 2001 From: Bryan Newbold Date: Tue, 4 Feb 2020 15:01:05 -0800 Subject: ingest: add 'extid' and 'query' modes; filters; refactor This is a large refactor of the ingest script. It adds a number of filtering options (for all modes), and new modes for free-form queries or limiting to specific external identifiers. --- python/fatcat_ingest.py | 185 ++++++++++++++++++++++++++++++++++++++---------- 1 file changed, 147 insertions(+), 38 deletions(-) (limited to 'python/fatcat_ingest.py') diff --git a/python/fatcat_ingest.py b/python/fatcat_ingest.py index 6ce36974..23043794 100755 --- a/python/fatcat_ingest.py +++ b/python/fatcat_ingest.py @@ -11,7 +11,7 @@ import argparse from collections import Counter import raven import elasticsearch -from elasticsearch_dsl import Search +from elasticsearch_dsl import Search, Q from fatcat_tools import public_api, simple_kafka_producer, kafka_fail_fast from fatcat_tools.transforms import release_ingest_request @@ -21,45 +21,54 @@ from fatcat_tools.transforms import release_ingest_request sentry_client = raven.Client() -def run_ingest_container(args): - """ - This command queries elasticsearch for releases from a given container (eg, - journal), and prepares ingest requests for them. - - By default it filters to releases which don't have any fulltext files - archived in IA, and dumps the ingest requests as JSON. - """ +def _init_search(args): # ensure API connection works args.api.get_changelog() + client = elasticsearch.Elasticsearch(args.elasticsearch_endpoint) + search = Search(using=client, index="fatcat_release") + return search + + +def _run_search_dump(args, search): + + if args.dry_run: + print("=== THIS IS A DRY RUN ===") + kafka_producer = None ingest_file_request_topic = "sandcrawler-{}.ingest-file-requests".format(args.env) if args.enqueue_kafka: print("Will send ingest requests to kafka topic: {}".format(ingest_file_request_topic), file=sys.stderr) kafka_producer = simple_kafka_producer(args.kafka_hosts) - client = elasticsearch.Elasticsearch(args.elasticsearch_endpoint) - - s = Search(using=client, index="fatcat_release") \ - .filter("term", in_ia=False) \ - .filter("term", is_oa=True) - - # filter/query by container - if args.container_id: - s = s.filter("term", container_id=args.container_id) - elif args.issnl: - s = s.filter("term", container_issnl=args.issnl) - elif args.publisher: - s = s.query("match", publisher=args.publisher) - elif args.name: - s = s.query("match", container_name=args.name) + if args.limit != None: + search = search[:args.limit] + + if args.before_year: + search = search \ + .filter("exists", field="release_year") \ + .filter("range", release_date=dict(lt=args.before_year)) + if args.after_year: + search = search \ + .filter("exists", field="release_year") \ + .filter("range", release_date=dict(gte=args.after_year)) + + if not args.allow_non_oa: + search = search.filter("term", is_oa=True) + + if args.release_types: + release_types = args.release_types.split(',') + search = search \ + .filter("terms", release_type=release_types) else: - print("You must supply at least one query/filter parameter! Eg, ISSN-L", file=sys.stderr) - sys.exit(-1) + search = search \ + .filter("bool", must_not=[ + Q("terms", release_type=["stub", "component"]) + ]) counts = Counter({'ingest_request': 0, 'elasticsearch_release': 0, 'estimate': 0}) - counts['estimate'] = s.count() + counts['estimate'] = search.count() print("Expecting {} release objects in search queries".format(counts['estimate']), file=sys.stderr) # don't try to clean up scroll if we are connected to public server (behind @@ -67,10 +76,12 @@ def run_ingest_container(args): if args.elasticsearch_endpoint in ( 'https://search.fatcat.wiki', 'https://search.qa.fatcat.wiki'): - s = s.params(clear_scroll=False) + search = search.params(clear_scroll=False) - results = s.scan() + results = search.scan() for esr in results: + if args.limit and counts['ingest_request'] >= args.limit: + break counts['elasticsearch_release'] += 1 release = args.api.get_release(esr.ident) ingest_request = release_ingest_request( @@ -79,6 +90,9 @@ def run_ingest_container(args): ) if not ingest_request: continue + counts['ingest_request'] += 1 + if args.dry_run: + continue if kafka_producer != None: kafka_producer.produce( ingest_file_request_topic, @@ -87,12 +101,73 @@ def run_ingest_container(args): on_delivery=kafka_fail_fast, ) counts['kafka'] += 1 - # also printing to stdout when in kafka mode; could skip? - print(json.dumps(ingest_request)) - counts['ingest_request'] += 1 + else: + print(json.dumps(ingest_request)) if kafka_producer != None: kafka_producer.flush() print(counts, file=sys.stderr) + if args.dry_run: + print("=== THIS WAS A DRY RUN ===") + + +def run_ingest_container(args): + """ + This command queries elasticsearch for releases from a given container (eg, + journal), and prepares ingest requests for them. + + By default it filters to releases which don't have any fulltext files + archived in IA, and dumps the ingest requests as JSON. + """ + + search = _init_search(args).filter("term", in_ia=False) + + # filter/query by container + if args.container_id: + search = search.filter("term", container_id=args.container_id) + elif args.issnl: + search = search.filter("term", container_issnl=args.issnl) + elif args.publisher: + search = search.query("match", publisher=args.publisher) + elif args.name: + search = search.query("match", container_name=args.name) + else: + print("You must supply at least one query/filter parameter! Eg, ISSN-L", file=sys.stderr) + sys.exit(-1) + + return _run_search_dump(args, search) + + +def run_ingest_query(args): + """ + Accepts a free-form Lucene query language string. Intended to work the same + way as searches in the fatcat web interface. + """ + + search = _init_search(args) \ + .filter("term", in_ia=False) \ + .query( + "query_string", + query=args.query, + default_operator="AND", + analyze_wildcard=True, + lenient=True, + fields=["title^5", "contrib_names^2", "container_title"], + ) + + return _run_search_dump(args, search) + + +def run_ingest_extid(args): + """ + Selects release entities where the external identifier (extid) exists + """ + + search = _init_search(args) \ + .filter("term", in_ia=False) \ + .filter("exists", field=args.extid) + + return _run_search_dump(args, search) + def main(): parser = argparse.ArgumentParser( @@ -112,20 +187,54 @@ def main(): parser.add_argument('--env', default="dev", help="Kafka topic namespace to use (eg, prod, qa, dev)") + parser.add_argument('--limit', + default=None, + type=int, + help="Max number of search hits to return") + parser.add_argument('--dry-run', + action='store_true', + help="runs through creating all ingest requests, but doesn't actually output or enqueue") + parser.add_argument('--before-year', + type=str, + help="filters results to only with release_year before this (not inclusive)") + parser.add_argument('--after-year', + type=str, + help="filters results to only with release_year after this (inclusive)") + parser.add_argument('--release-types', + type=str, + help="filters results to specified release-types, separated by commas. By default, 'stub' is filtered out.") + parser.add_argument('--allow-non-oa', + action='store_true', + help="By default, we limit to OA releases. This removes that filter") subparsers = parser.add_subparsers() - sub_ingest_container = subparsers.add_parser('ingest-container', + sub_container = subparsers.add_parser('container', help="Create ingest requests for releases from a specific container") - sub_ingest_container.set_defaults(func=run_ingest_container) - sub_ingest_container.add_argument('--container-id', + sub_container.set_defaults(func=run_ingest_container) + sub_container.add_argument('--container-id', help="fatcat container entity ident") - sub_ingest_container.add_argument('--issnl', + sub_container.add_argument('--issnl', help="ISSN-L of container entity") - sub_ingest_container.add_argument('--publisher', + sub_container.add_argument('--publisher', help="publisher name") - sub_ingest_container.add_argument('--name', + sub_container.add_argument('--name', help="container name") + sub_query = subparsers.add_parser('query', + help="Create ingest requests for releases from a specific query") + sub_query.set_defaults(func=run_ingest_query) + sub_query.add_argument('query', + help="search query (same DSL as web interface search)") + sub_query.add_argument('--allow-non-oa', + action='store_true', + help="by default, we limit to OA releases. This removes that filter") + + sub_extid = subparsers.add_parser('extid', + help="Create ingest requests for releases that have given extid defined") + sub_extid.set_defaults(func=run_ingest_extid) + sub_extid.add_argument('extid', + help="extid short name (as included in ES release schema)") + args = parser.parse_args() if not args.__dict__.get("func"): print("tell me what to do!") -- cgit v1.2.3