diff options
Diffstat (limited to 'python/fatcat_ingest.py')
-rwxr-xr-x | python/fatcat_ingest.py | 210 |
1 files changed, 115 insertions, 95 deletions
diff --git a/python/fatcat_ingest.py b/python/fatcat_ingest.py index 165e42f3..21597fae 100755 --- a/python/fatcat_ingest.py +++ b/python/fatcat_ingest.py @@ -42,51 +42,56 @@ def _run_search_dump(args, search): else: ingest_file_request_topic = "sandcrawler-{}.ingest-file-requests-daily".format(args.env) if args.enqueue_kafka: - print("Will send ingest requests to kafka topic: {}".format(ingest_file_request_topic), file=sys.stderr) + print( + "Will send ingest requests to kafka topic: {}".format(ingest_file_request_topic), + file=sys.stderr, + ) kafka_producer = simple_kafka_producer(args.kafka_hosts) if args.limit is not None: - search = search[:args.limit] + search = search[: args.limit] if args.before_year: - search = search \ - .filter("exists", field="release_year") \ - .filter("range", release_date=dict(lt=args.before_year)) + search = search.filter("exists", field="release_year").filter( + "range", release_date=dict(lt=args.before_year) + ) if args.after_year: - search = search \ - .filter("exists", field="release_year") \ - .filter("range", release_date=dict(gte=args.after_year)) + search = search.filter("exists", field="release_year").filter( + "range", release_date=dict(gte=args.after_year) + ) if not args.allow_non_oa: search = search.filter("term", is_oa=True) if args.release_types: - release_types = args.release_types.split(',') - search = search \ - .filter("terms", release_type=release_types) + release_types = args.release_types.split(",") + search = search.filter("terms", release_type=release_types) else: - search = search \ - .filter("bool", must_not=[ - Q("terms", release_type=["stub", "component"]) - ]) + search = search.filter( + "bool", must_not=[Q("terms", release_type=["stub", "component"])] + ) - counts = Counter({'ingest_request': 0, 'elasticsearch_release': 0, 'estimate': 0}) + counts = Counter({"ingest_request": 0, "elasticsearch_release": 0, "estimate": 0}) search = search.params() - counts['estimate'] = search.count() - print("Expecting {} release objects in search queries".format(counts['estimate']), file=sys.stderr) + counts["estimate"] = search.count() + print( + "Expecting {} release objects in search queries".format(counts["estimate"]), + file=sys.stderr, + ) # don't try to clean up scroll if we are connected to public server (behind # nginx proxy that disallows DELETE) if args.elasticsearch_endpoint in ( - 'https://search.fatcat.wiki', - 'https://search.qa.fatcat.wiki'): + "https://search.fatcat.wiki", + "https://search.qa.fatcat.wiki", + ): search = search.params(clear_scroll=False) results = search.scan() for esr in results: - if args.limit and counts['ingest_request'] >= args.limit: + if args.limit and counts["ingest_request"] >= args.limit: break - counts['elasticsearch_release'] += 1 + counts["elasticsearch_release"] += 1 release = args.api.get_release(esr.ident) ingest_request = release_ingest_request( release, @@ -96,18 +101,18 @@ def _run_search_dump(args, search): if not ingest_request: continue if args.force_recrawl: - ingest_request['force_recrawl'] = True - counts['ingest_request'] += 1 + ingest_request["force_recrawl"] = True + counts["ingest_request"] += 1 if args.dry_run: continue if kafka_producer is not None: kafka_producer.produce( ingest_file_request_topic, - json.dumps(ingest_request).encode('utf-8'), - #key=None, + json.dumps(ingest_request).encode("utf-8"), + # key=None, on_delivery=kafka_fail_fast, ) - counts['kafka'] += 1 + counts["kafka"] += 1 else: print(json.dumps(ingest_request)) if kafka_producer is not None: @@ -138,7 +143,9 @@ def run_ingest_container(args): elif args.name: search = search.query("match", container_name=args.name) else: - print("You must supply at least one query/filter parameter! Eg, ISSN-L", file=sys.stderr) + print( + "You must supply at least one query/filter parameter! Eg, ISSN-L", file=sys.stderr + ) sys.exit(-1) return _run_search_dump(args, search) @@ -150,8 +157,9 @@ def run_ingest_query(args): way as searches in the fatcat web interface. """ - search = _init_search(args) \ - .filter("term", in_ia=False) \ + search = ( + _init_search(args) + .filter("term", in_ia=False) .query( "query_string", query=args.query, @@ -160,6 +168,7 @@ def run_ingest_query(args): lenient=True, fields=["title^5", "contrib_names^2", "container_title"], ) + ) return _run_search_dump(args, search) @@ -169,86 +178,96 @@ def run_ingest_extid(args): Selects release entities where the external identifier (extid) exists """ - search = _init_search(args) \ - .filter("term", in_ia=False) \ - .filter("exists", field=args.extid) + search = _init_search(args).filter("term", in_ia=False).filter("exists", field=args.extid) return _run_search_dump(args, search) def main(): - parser = argparse.ArgumentParser( - formatter_class=argparse.ArgumentDefaultsHelpFormatter) - parser.add_argument('--fatcat-api-url', - default="http://localhost:9411/v0", - help="connect to this host/port") - parser.add_argument('--enqueue-kafka', - action='store_true', - help="send ingest requests directly to sandcrawler kafka topic for processing") - parser.add_argument('--kafka-hosts', + parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter) + parser.add_argument( + "--fatcat-api-url", default="http://localhost:9411/v0", help="connect to this host/port" + ) + parser.add_argument( + "--enqueue-kafka", + action="store_true", + help="send ingest requests directly to sandcrawler kafka topic for processing", + ) + parser.add_argument( + "--kafka-hosts", default="localhost:9092", - help="list of Kafka brokers (host/port) to use") - parser.add_argument('--kafka-request-topic', - help="exact Kafka ingest request topic to use") - parser.add_argument('--elasticsearch-endpoint', + help="list of Kafka brokers (host/port) to use", + ) + parser.add_argument("--kafka-request-topic", help="exact Kafka ingest request topic to use") + parser.add_argument( + "--elasticsearch-endpoint", default="https://search.fatcat.wiki", - help="elasticsearch API. internal endpoint preferred, but public is default") - parser.add_argument('--elasticsearch-index', - default="fatcat_release", - help="elasticsearch index to query") - parser.add_argument('--env', - default="dev", - help="Kafka topic namespace to use (eg, prod, qa, dev)") - parser.add_argument('--limit', - default=None, - type=int, - help="Max number of search hits to return") - parser.add_argument('--dry-run', - action='store_true', - help="runs through creating all ingest requests, but doesn't actually output or enqueue") - parser.add_argument('--before-year', + help="elasticsearch API. internal endpoint preferred, but public is default", + ) + parser.add_argument( + "--elasticsearch-index", default="fatcat_release", help="elasticsearch index to query" + ) + parser.add_argument( + "--env", default="dev", help="Kafka topic namespace to use (eg, prod, qa, dev)" + ) + parser.add_argument( + "--limit", default=None, type=int, help="Max number of search hits to return" + ) + parser.add_argument( + "--dry-run", + action="store_true", + help="runs through creating all ingest requests, but doesn't actually output or enqueue", + ) + parser.add_argument( + "--before-year", type=str, - help="filters results to only with release_year before this (not inclusive)") - parser.add_argument('--after-year', + help="filters results to only with release_year before this (not inclusive)", + ) + parser.add_argument( + "--after-year", type=str, - help="filters results to only with release_year after this (inclusive)") - parser.add_argument('--release-types', + help="filters results to only with release_year after this (inclusive)", + ) + parser.add_argument( + "--release-types", type=str, - help="filters results to specified release-types, separated by commas. By default, 'stub' is filtered out.") - parser.add_argument('--allow-non-oa', - action='store_true', - help="By default, we limit to OA releases. This removes that filter") - parser.add_argument('--force-recrawl', - action='store_true', - help="Tell ingest worker to skip GWB history lookup and do SPNv2 crawl") - parser.add_argument('--ingest-type', - default="pdf", - help="What medium to ingest (pdf, xml, html)") + help="filters results to specified release-types, separated by commas. By default, 'stub' is filtered out.", + ) + parser.add_argument( + "--allow-non-oa", + action="store_true", + help="By default, we limit to OA releases. This removes that filter", + ) + parser.add_argument( + "--force-recrawl", + action="store_true", + help="Tell ingest worker to skip GWB history lookup and do SPNv2 crawl", + ) + parser.add_argument( + "--ingest-type", default="pdf", help="What medium to ingest (pdf, xml, html)" + ) subparsers = parser.add_subparsers() - sub_container = subparsers.add_parser('container', - help="Create ingest requests for releases from a specific container") + sub_container = subparsers.add_parser( + "container", help="Create ingest requests for releases from a specific container" + ) sub_container.set_defaults(func=run_ingest_container) - sub_container.add_argument('--container-id', - help="fatcat container entity ident") - sub_container.add_argument('--issnl', - help="ISSN-L of container entity") - sub_container.add_argument('--publisher', - help="publisher name") - sub_container.add_argument('--name', - help="container name") - - sub_query = subparsers.add_parser('query', - help="Create ingest requests for releases from a specific query") + sub_container.add_argument("--container-id", help="fatcat container entity ident") + sub_container.add_argument("--issnl", help="ISSN-L of container entity") + sub_container.add_argument("--publisher", help="publisher name") + sub_container.add_argument("--name", help="container name") + + sub_query = subparsers.add_parser( + "query", help="Create ingest requests for releases from a specific query" + ) sub_query.set_defaults(func=run_ingest_query) - sub_query.add_argument('query', - help="search query (same DSL as web interface search)") + sub_query.add_argument("query", help="search query (same DSL as web interface search)") - sub_extid = subparsers.add_parser('extid', - help="Create ingest requests for releases that have given extid defined") + sub_extid = subparsers.add_parser( + "extid", help="Create ingest requests for releases that have given extid defined" + ) sub_extid.set_defaults(func=run_ingest_extid) - sub_extid.add_argument('extid', - help="extid short name (as included in ES release schema)") + sub_extid.add_argument("extid", help="extid short name (as included in ES release schema)") args = parser.parse_args() if not args.__dict__.get("func"): @@ -258,5 +277,6 @@ def main(): args.api = public_api(args.fatcat_api_url) args.func(args) -if __name__ == '__main__': + +if __name__ == "__main__": main() |