From 2fcc59388a4eb53a7e2370275366272459874e99 Mon Sep 17 00:00:00 2001 From: Bryan Newbold Date: Tue, 21 Jan 2020 10:48:16 -0800 Subject: refactor fatcat_import kafka group names My current understanding is that consumer group names should be one-to-one with topic names. I previously though offsets were stored on a {topic, group} key, but they seem to be mixed and having too many workers in the same group is bad. In particular, we don't want cross-talk or load between QA and prod. All these topics are caught up in prod, so deploying this change and restarting workers should be safe. This commit does not update the elasticsearch or entity updates workers. --- python/fatcat_import.py | 67 +++++++++++++++++++++++++++++++++++++++---------- 1 file changed, 54 insertions(+), 13 deletions(-) (limited to 'python') diff --git a/python/fatcat_import.py b/python/fatcat_import.py index 656b9a05..ad4de0e2 100755 --- a/python/fatcat_import.py +++ b/python/fatcat_import.py @@ -18,8 +18,14 @@ def run_crossref(args): edit_batch_size=args.batch_size, bezerk_mode=args.bezerk_mode) if args.kafka_mode: - KafkaJsonPusher(fci, args.kafka_hosts, args.kafka_env, "api-crossref", - "fatcat-import", consume_batch_size=args.batch_size).run() + KafkaJsonPusher( + fci, + args.kafka_hosts, + args.kafka_env, + "api-crossref", + "fatcat-{}-import-crossref".format(args.kafka_env), + consume_batch_size=args.batch_size, + ).run() else: JsonLinePusher(fci, args.json_file).run() @@ -34,7 +40,13 @@ def run_arxiv(args): edit_batch_size=args.batch_size) if args.kafka_mode: raise NotImplementedError - #KafkaBs4XmlPusher(ari, args.kafka_hosts, args.kafka_env, "api-arxiv", "fatcat-import").run() + #KafkaBs4XmlPusher( + # ari, + # args.kafka_hosts, + # args.kafka_env, + # "api-arxiv", + # "fatcat-{}-import-arxiv".format(args.kafka_env), + #).run() else: Bs4XmlFilePusher(ari, args.xml_file, "record").run() @@ -46,9 +58,20 @@ def run_pubmed(args): lookup_refs=(not args.no_lookup_refs)) if args.kafka_mode: raise NotImplementedError - #KafkaBs4XmlPusher(pi, args.kafka_hosts, args.kafka_env, "api-pubmed", "fatcat-import").run() + #KafkaBs4XmlPusher( + # pi, + # args.kafka_hosts, + # args.kafka_env, + # "api-pubmed", + # "fatcat-{}import-arxiv".format(args.kafka_env), + #).run() else: - Bs4XmlLargeFilePusher(pi, args.xml_file, "PubmedArticle", record_list_tag="PubmedArticleSet").run() + Bs4XmlLargeFilePusher( + pi, + args.xml_file, + "PubmedArticle", + record_list_tag="PubmedArticleSet", + ).run() def run_jstor(args): ji = JstorImporter(args.api, @@ -107,9 +130,15 @@ def run_ingest_file(args): require_grobid=(not args.no_require_grobid), edit_batch_size=args.batch_size) if args.kafka_mode: - KafkaJsonPusher(ifri, args.kafka_hosts, args.kafka_env, "ingest-file-results", - "fatcat-ingest-file-result", kafka_namespace="sandcrawler", - consume_batch_size=args.batch_size).run() + KafkaJsonPusher( + ifri, + args.kafka_hosts, + args.kafka_env, + "ingest-file-results", + "fatcat-{}-ingest-file-result".format(args.kafka_env), + kafka_namespace="sandcrawler", + consume_batch_size=args.batch_size, + ).run() else: JsonLinePusher(ifri, args.json_file).run() @@ -118,9 +147,15 @@ def run_savepapernow_file(args): editgroup_description=args.editgroup_description_override, edit_batch_size=args.batch_size) if args.kafka_mode: - KafkaJsonPusher(ifri, args.kafka_hosts, args.kafka_env, "ingest-file-results", - "savepapernow-file-result", kafka_namespace="sandcrawler", - consume_batch_size=args.batch_size).run() + KafkaJsonPusher( + ifri, + args.kafka_hosts, + args.kafka_env, + "ingest-file-results", + "fatcat-{}-savepapernow-file-result".format(args.kafka_env), + kafka_namespace="sandcrawler", + consume_batch_size=args.batch_size, + ).run() else: JsonLinePusher(ifri, args.json_file).run() @@ -183,8 +218,14 @@ def run_datacite(args): extid_map_file=args.extid_map_file, insert_log_file=args.insert_log_file) if args.kafka_mode: - KafkaJsonPusher(dci, args.kafka_hosts, args.kafka_env, "api-datacite", - "fatcat-import", consume_batch_size=args.batch_size).run() + KafkaJsonPusher( + dci, + args.kafka_hosts, + args.kafka_env, + "api-datacite", + "fatcat-{}-import-datacite".format(args.kafka_env), + consume_batch_size=args.batch_size, + ).run() else: JsonLinePusher(dci, args.json_file).run() -- cgit v1.2.3