summaryrefslogtreecommitdiffstats
path: root/python/fatcat_ingest.py
diff options
context:
space:
mode:
authorBryan Newbold <bnewbold@robocracy.org>2021-11-02 18:14:09 -0700
committerBryan Newbold <bnewbold@robocracy.org>2021-11-02 18:14:09 -0700
commit6464631dbe5c4afeb76f2f3c9d63b89f917c9a3b (patch)
tree633303839cafc7d901cf8565e034542606a5bb27 /python/fatcat_ingest.py
parentcdfd6b85b386b7bbf9d5a5179ef26970b6e5a4e7 (diff)
downloadfatcat-6464631dbe5c4afeb76f2f3c9d63b89f917c9a3b.tar.gz
fatcat-6464631dbe5c4afeb76f2f3c9d63b89f917c9a3b.zip
fmt (black): *.py
Diffstat (limited to 'python/fatcat_ingest.py')
-rwxr-xr-xpython/fatcat_ingest.py210
1 files changed, 115 insertions, 95 deletions
diff --git a/python/fatcat_ingest.py b/python/fatcat_ingest.py
index 165e42f3..21597fae 100755
--- a/python/fatcat_ingest.py
+++ b/python/fatcat_ingest.py
@@ -42,51 +42,56 @@ def _run_search_dump(args, search):
else:
ingest_file_request_topic = "sandcrawler-{}.ingest-file-requests-daily".format(args.env)
if args.enqueue_kafka:
- print("Will send ingest requests to kafka topic: {}".format(ingest_file_request_topic), file=sys.stderr)
+ print(
+ "Will send ingest requests to kafka topic: {}".format(ingest_file_request_topic),
+ file=sys.stderr,
+ )
kafka_producer = simple_kafka_producer(args.kafka_hosts)
if args.limit is not None:
- search = search[:args.limit]
+ search = search[: args.limit]
if args.before_year:
- search = search \
- .filter("exists", field="release_year") \
- .filter("range", release_date=dict(lt=args.before_year))
+ search = search.filter("exists", field="release_year").filter(
+ "range", release_date=dict(lt=args.before_year)
+ )
if args.after_year:
- search = search \
- .filter("exists", field="release_year") \
- .filter("range", release_date=dict(gte=args.after_year))
+ search = search.filter("exists", field="release_year").filter(
+ "range", release_date=dict(gte=args.after_year)
+ )
if not args.allow_non_oa:
search = search.filter("term", is_oa=True)
if args.release_types:
- release_types = args.release_types.split(',')
- search = search \
- .filter("terms", release_type=release_types)
+ release_types = args.release_types.split(",")
+ search = search.filter("terms", release_type=release_types)
else:
- search = search \
- .filter("bool", must_not=[
- Q("terms", release_type=["stub", "component"])
- ])
+ search = search.filter(
+ "bool", must_not=[Q("terms", release_type=["stub", "component"])]
+ )
- counts = Counter({'ingest_request': 0, 'elasticsearch_release': 0, 'estimate': 0})
+ counts = Counter({"ingest_request": 0, "elasticsearch_release": 0, "estimate": 0})
search = search.params()
- counts['estimate'] = search.count()
- print("Expecting {} release objects in search queries".format(counts['estimate']), file=sys.stderr)
+ counts["estimate"] = search.count()
+ print(
+ "Expecting {} release objects in search queries".format(counts["estimate"]),
+ file=sys.stderr,
+ )
# don't try to clean up scroll if we are connected to public server (behind
# nginx proxy that disallows DELETE)
if args.elasticsearch_endpoint in (
- 'https://search.fatcat.wiki',
- 'https://search.qa.fatcat.wiki'):
+ "https://search.fatcat.wiki",
+ "https://search.qa.fatcat.wiki",
+ ):
search = search.params(clear_scroll=False)
results = search.scan()
for esr in results:
- if args.limit and counts['ingest_request'] >= args.limit:
+ if args.limit and counts["ingest_request"] >= args.limit:
break
- counts['elasticsearch_release'] += 1
+ counts["elasticsearch_release"] += 1
release = args.api.get_release(esr.ident)
ingest_request = release_ingest_request(
release,
@@ -96,18 +101,18 @@ def _run_search_dump(args, search):
if not ingest_request:
continue
if args.force_recrawl:
- ingest_request['force_recrawl'] = True
- counts['ingest_request'] += 1
+ ingest_request["force_recrawl"] = True
+ counts["ingest_request"] += 1
if args.dry_run:
continue
if kafka_producer is not None:
kafka_producer.produce(
ingest_file_request_topic,
- json.dumps(ingest_request).encode('utf-8'),
- #key=None,
+ json.dumps(ingest_request).encode("utf-8"),
+ # key=None,
on_delivery=kafka_fail_fast,
)
- counts['kafka'] += 1
+ counts["kafka"] += 1
else:
print(json.dumps(ingest_request))
if kafka_producer is not None:
@@ -138,7 +143,9 @@ def run_ingest_container(args):
elif args.name:
search = search.query("match", container_name=args.name)
else:
- print("You must supply at least one query/filter parameter! Eg, ISSN-L", file=sys.stderr)
+ print(
+ "You must supply at least one query/filter parameter! Eg, ISSN-L", file=sys.stderr
+ )
sys.exit(-1)
return _run_search_dump(args, search)
@@ -150,8 +157,9 @@ def run_ingest_query(args):
way as searches in the fatcat web interface.
"""
- search = _init_search(args) \
- .filter("term", in_ia=False) \
+ search = (
+ _init_search(args)
+ .filter("term", in_ia=False)
.query(
"query_string",
query=args.query,
@@ -160,6 +168,7 @@ def run_ingest_query(args):
lenient=True,
fields=["title^5", "contrib_names^2", "container_title"],
)
+ )
return _run_search_dump(args, search)
@@ -169,86 +178,96 @@ def run_ingest_extid(args):
Selects release entities where the external identifier (extid) exists
"""
- search = _init_search(args) \
- .filter("term", in_ia=False) \
- .filter("exists", field=args.extid)
+ search = _init_search(args).filter("term", in_ia=False).filter("exists", field=args.extid)
return _run_search_dump(args, search)
def main():
- parser = argparse.ArgumentParser(
- formatter_class=argparse.ArgumentDefaultsHelpFormatter)
- parser.add_argument('--fatcat-api-url',
- default="http://localhost:9411/v0",
- help="connect to this host/port")
- parser.add_argument('--enqueue-kafka',
- action='store_true',
- help="send ingest requests directly to sandcrawler kafka topic for processing")
- parser.add_argument('--kafka-hosts',
+ parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter)
+ parser.add_argument(
+ "--fatcat-api-url", default="http://localhost:9411/v0", help="connect to this host/port"
+ )
+ parser.add_argument(
+ "--enqueue-kafka",
+ action="store_true",
+ help="send ingest requests directly to sandcrawler kafka topic for processing",
+ )
+ parser.add_argument(
+ "--kafka-hosts",
default="localhost:9092",
- help="list of Kafka brokers (host/port) to use")
- parser.add_argument('--kafka-request-topic',
- help="exact Kafka ingest request topic to use")
- parser.add_argument('--elasticsearch-endpoint',
+ help="list of Kafka brokers (host/port) to use",
+ )
+ parser.add_argument("--kafka-request-topic", help="exact Kafka ingest request topic to use")
+ parser.add_argument(
+ "--elasticsearch-endpoint",
default="https://search.fatcat.wiki",
- help="elasticsearch API. internal endpoint preferred, but public is default")
- parser.add_argument('--elasticsearch-index',
- default="fatcat_release",
- help="elasticsearch index to query")
- parser.add_argument('--env',
- default="dev",
- help="Kafka topic namespace to use (eg, prod, qa, dev)")
- parser.add_argument('--limit',
- default=None,
- type=int,
- help="Max number of search hits to return")
- parser.add_argument('--dry-run',
- action='store_true',
- help="runs through creating all ingest requests, but doesn't actually output or enqueue")
- parser.add_argument('--before-year',
+ help="elasticsearch API. internal endpoint preferred, but public is default",
+ )
+ parser.add_argument(
+ "--elasticsearch-index", default="fatcat_release", help="elasticsearch index to query"
+ )
+ parser.add_argument(
+ "--env", default="dev", help="Kafka topic namespace to use (eg, prod, qa, dev)"
+ )
+ parser.add_argument(
+ "--limit", default=None, type=int, help="Max number of search hits to return"
+ )
+ parser.add_argument(
+ "--dry-run",
+ action="store_true",
+ help="runs through creating all ingest requests, but doesn't actually output or enqueue",
+ )
+ parser.add_argument(
+ "--before-year",
type=str,
- help="filters results to only with release_year before this (not inclusive)")
- parser.add_argument('--after-year',
+ help="filters results to only with release_year before this (not inclusive)",
+ )
+ parser.add_argument(
+ "--after-year",
type=str,
- help="filters results to only with release_year after this (inclusive)")
- parser.add_argument('--release-types',
+ help="filters results to only with release_year after this (inclusive)",
+ )
+ parser.add_argument(
+ "--release-types",
type=str,
- help="filters results to specified release-types, separated by commas. By default, 'stub' is filtered out.")
- parser.add_argument('--allow-non-oa',
- action='store_true',
- help="By default, we limit to OA releases. This removes that filter")
- parser.add_argument('--force-recrawl',
- action='store_true',
- help="Tell ingest worker to skip GWB history lookup and do SPNv2 crawl")
- parser.add_argument('--ingest-type',
- default="pdf",
- help="What medium to ingest (pdf, xml, html)")
+ help="filters results to specified release-types, separated by commas. By default, 'stub' is filtered out.",
+ )
+ parser.add_argument(
+ "--allow-non-oa",
+ action="store_true",
+ help="By default, we limit to OA releases. This removes that filter",
+ )
+ parser.add_argument(
+ "--force-recrawl",
+ action="store_true",
+ help="Tell ingest worker to skip GWB history lookup and do SPNv2 crawl",
+ )
+ parser.add_argument(
+ "--ingest-type", default="pdf", help="What medium to ingest (pdf, xml, html)"
+ )
subparsers = parser.add_subparsers()
- sub_container = subparsers.add_parser('container',
- help="Create ingest requests for releases from a specific container")
+ sub_container = subparsers.add_parser(
+ "container", help="Create ingest requests for releases from a specific container"
+ )
sub_container.set_defaults(func=run_ingest_container)
- sub_container.add_argument('--container-id',
- help="fatcat container entity ident")
- sub_container.add_argument('--issnl',
- help="ISSN-L of container entity")
- sub_container.add_argument('--publisher',
- help="publisher name")
- sub_container.add_argument('--name',
- help="container name")
-
- sub_query = subparsers.add_parser('query',
- help="Create ingest requests for releases from a specific query")
+ sub_container.add_argument("--container-id", help="fatcat container entity ident")
+ sub_container.add_argument("--issnl", help="ISSN-L of container entity")
+ sub_container.add_argument("--publisher", help="publisher name")
+ sub_container.add_argument("--name", help="container name")
+
+ sub_query = subparsers.add_parser(
+ "query", help="Create ingest requests for releases from a specific query"
+ )
sub_query.set_defaults(func=run_ingest_query)
- sub_query.add_argument('query',
- help="search query (same DSL as web interface search)")
+ sub_query.add_argument("query", help="search query (same DSL as web interface search)")
- sub_extid = subparsers.add_parser('extid',
- help="Create ingest requests for releases that have given extid defined")
+ sub_extid = subparsers.add_parser(
+ "extid", help="Create ingest requests for releases that have given extid defined"
+ )
sub_extid.set_defaults(func=run_ingest_extid)
- sub_extid.add_argument('extid',
- help="extid short name (as included in ES release schema)")
+ sub_extid.add_argument("extid", help="extid short name (as included in ES release schema)")
args = parser.parse_args()
if not args.__dict__.get("func"):
@@ -258,5 +277,6 @@ def main():
args.api = public_api(args.fatcat_api_url)
args.func(args)
-if __name__ == '__main__':
+
+if __name__ == "__main__":
main()