diff options
-rwxr-xr-x | python/fatcat_import.py | 67 |
1 files changed, 54 insertions, 13 deletions
diff --git a/python/fatcat_import.py b/python/fatcat_import.py index 656b9a05..ad4de0e2 100755 --- a/python/fatcat_import.py +++ b/python/fatcat_import.py @@ -18,8 +18,14 @@ def run_crossref(args): edit_batch_size=args.batch_size, bezerk_mode=args.bezerk_mode) if args.kafka_mode: - KafkaJsonPusher(fci, args.kafka_hosts, args.kafka_env, "api-crossref", - "fatcat-import", consume_batch_size=args.batch_size).run() + KafkaJsonPusher( + fci, + args.kafka_hosts, + args.kafka_env, + "api-crossref", + "fatcat-{}-import-crossref".format(args.kafka_env), + consume_batch_size=args.batch_size, + ).run() else: JsonLinePusher(fci, args.json_file).run() @@ -34,7 +40,13 @@ def run_arxiv(args): edit_batch_size=args.batch_size) if args.kafka_mode: raise NotImplementedError - #KafkaBs4XmlPusher(ari, args.kafka_hosts, args.kafka_env, "api-arxiv", "fatcat-import").run() + #KafkaBs4XmlPusher( + # ari, + # args.kafka_hosts, + # args.kafka_env, + # "api-arxiv", + # "fatcat-{}-import-arxiv".format(args.kafka_env), + #).run() else: Bs4XmlFilePusher(ari, args.xml_file, "record").run() @@ -46,9 +58,20 @@ def run_pubmed(args): lookup_refs=(not args.no_lookup_refs)) if args.kafka_mode: raise NotImplementedError - #KafkaBs4XmlPusher(pi, args.kafka_hosts, args.kafka_env, "api-pubmed", "fatcat-import").run() + #KafkaBs4XmlPusher( + # pi, + # args.kafka_hosts, + # args.kafka_env, + # "api-pubmed", + # "fatcat-{}import-arxiv".format(args.kafka_env), + #).run() else: - Bs4XmlLargeFilePusher(pi, args.xml_file, "PubmedArticle", record_list_tag="PubmedArticleSet").run() + Bs4XmlLargeFilePusher( + pi, + args.xml_file, + "PubmedArticle", + record_list_tag="PubmedArticleSet", + ).run() def run_jstor(args): ji = JstorImporter(args.api, @@ -107,9 +130,15 @@ def run_ingest_file(args): require_grobid=(not args.no_require_grobid), edit_batch_size=args.batch_size) if args.kafka_mode: - KafkaJsonPusher(ifri, args.kafka_hosts, args.kafka_env, "ingest-file-results", - "fatcat-ingest-file-result", kafka_namespace="sandcrawler", - consume_batch_size=args.batch_size).run() + KafkaJsonPusher( + ifri, + args.kafka_hosts, + args.kafka_env, + "ingest-file-results", + "fatcat-{}-ingest-file-result".format(args.kafka_env), + kafka_namespace="sandcrawler", + consume_batch_size=args.batch_size, + ).run() else: JsonLinePusher(ifri, args.json_file).run() @@ -118,9 +147,15 @@ def run_savepapernow_file(args): editgroup_description=args.editgroup_description_override, edit_batch_size=args.batch_size) if args.kafka_mode: - KafkaJsonPusher(ifri, args.kafka_hosts, args.kafka_env, "ingest-file-results", - "savepapernow-file-result", kafka_namespace="sandcrawler", - consume_batch_size=args.batch_size).run() + KafkaJsonPusher( + ifri, + args.kafka_hosts, + args.kafka_env, + "ingest-file-results", + "fatcat-{}-savepapernow-file-result".format(args.kafka_env), + kafka_namespace="sandcrawler", + consume_batch_size=args.batch_size, + ).run() else: JsonLinePusher(ifri, args.json_file).run() @@ -183,8 +218,14 @@ def run_datacite(args): extid_map_file=args.extid_map_file, insert_log_file=args.insert_log_file) if args.kafka_mode: - KafkaJsonPusher(dci, args.kafka_hosts, args.kafka_env, "api-datacite", - "fatcat-import", consume_batch_size=args.batch_size).run() + KafkaJsonPusher( + dci, + args.kafka_hosts, + args.kafka_env, + "api-datacite", + "fatcat-{}-import-datacite".format(args.kafka_env), + consume_batch_size=args.batch_size, + ).run() else: JsonLinePusher(dci, args.json_file).run() |