diff options
author | Bryan Newbold <bnewbold@robocracy.org> | 2020-01-21 10:48:16 -0800 |
---|---|---|
committer | Bryan Newbold <bnewbold@robocracy.org> | 2020-01-21 10:52:43 -0800 |
commit | 2fcc59388a4eb53a7e2370275366272459874e99 (patch) | |
tree | 7d092d73dae564f39f68ca4cfb91b9c245ae7eca /python | |
parent | f7b6b8b3cda32b258ea21518a42bea41bd87532d (diff) | |
download | fatcat-2fcc59388a4eb53a7e2370275366272459874e99.tar.gz fatcat-2fcc59388a4eb53a7e2370275366272459874e99.zip |
refactor fatcat_import kafka group names
My current understanding is that consumer group names should be
one-to-one with topic names. I previously though offsets were stored on
a {topic, group} key, but they seem to be mixed and having too many
workers in the same group is bad. In particular, we don't want
cross-talk or load between QA and prod.
All these topics are caught up in prod, so deploying this change and
restarting workers should be safe.
This commit does not update the elasticsearch or entity updates workers.
Diffstat (limited to 'python')
-rwxr-xr-x | python/fatcat_import.py | 67 |
1 files changed, 54 insertions, 13 deletions
diff --git a/python/fatcat_import.py b/python/fatcat_import.py index 656b9a05..ad4de0e2 100755 --- a/python/fatcat_import.py +++ b/python/fatcat_import.py @@ -18,8 +18,14 @@ def run_crossref(args): edit_batch_size=args.batch_size, bezerk_mode=args.bezerk_mode) if args.kafka_mode: - KafkaJsonPusher(fci, args.kafka_hosts, args.kafka_env, "api-crossref", - "fatcat-import", consume_batch_size=args.batch_size).run() + KafkaJsonPusher( + fci, + args.kafka_hosts, + args.kafka_env, + "api-crossref", + "fatcat-{}-import-crossref".format(args.kafka_env), + consume_batch_size=args.batch_size, + ).run() else: JsonLinePusher(fci, args.json_file).run() @@ -34,7 +40,13 @@ def run_arxiv(args): edit_batch_size=args.batch_size) if args.kafka_mode: raise NotImplementedError - #KafkaBs4XmlPusher(ari, args.kafka_hosts, args.kafka_env, "api-arxiv", "fatcat-import").run() + #KafkaBs4XmlPusher( + # ari, + # args.kafka_hosts, + # args.kafka_env, + # "api-arxiv", + # "fatcat-{}-import-arxiv".format(args.kafka_env), + #).run() else: Bs4XmlFilePusher(ari, args.xml_file, "record").run() @@ -46,9 +58,20 @@ def run_pubmed(args): lookup_refs=(not args.no_lookup_refs)) if args.kafka_mode: raise NotImplementedError - #KafkaBs4XmlPusher(pi, args.kafka_hosts, args.kafka_env, "api-pubmed", "fatcat-import").run() + #KafkaBs4XmlPusher( + # pi, + # args.kafka_hosts, + # args.kafka_env, + # "api-pubmed", + # "fatcat-{}import-arxiv".format(args.kafka_env), + #).run() else: - Bs4XmlLargeFilePusher(pi, args.xml_file, "PubmedArticle", record_list_tag="PubmedArticleSet").run() + Bs4XmlLargeFilePusher( + pi, + args.xml_file, + "PubmedArticle", + record_list_tag="PubmedArticleSet", + ).run() def run_jstor(args): ji = JstorImporter(args.api, @@ -107,9 +130,15 @@ def run_ingest_file(args): require_grobid=(not args.no_require_grobid), edit_batch_size=args.batch_size) if args.kafka_mode: - KafkaJsonPusher(ifri, args.kafka_hosts, args.kafka_env, "ingest-file-results", - "fatcat-ingest-file-result", kafka_namespace="sandcrawler", - consume_batch_size=args.batch_size).run() + KafkaJsonPusher( + ifri, + args.kafka_hosts, + args.kafka_env, + "ingest-file-results", + "fatcat-{}-ingest-file-result".format(args.kafka_env), + kafka_namespace="sandcrawler", + consume_batch_size=args.batch_size, + ).run() else: JsonLinePusher(ifri, args.json_file).run() @@ -118,9 +147,15 @@ def run_savepapernow_file(args): editgroup_description=args.editgroup_description_override, edit_batch_size=args.batch_size) if args.kafka_mode: - KafkaJsonPusher(ifri, args.kafka_hosts, args.kafka_env, "ingest-file-results", - "savepapernow-file-result", kafka_namespace="sandcrawler", - consume_batch_size=args.batch_size).run() + KafkaJsonPusher( + ifri, + args.kafka_hosts, + args.kafka_env, + "ingest-file-results", + "fatcat-{}-savepapernow-file-result".format(args.kafka_env), + kafka_namespace="sandcrawler", + consume_batch_size=args.batch_size, + ).run() else: JsonLinePusher(ifri, args.json_file).run() @@ -183,8 +218,14 @@ def run_datacite(args): extid_map_file=args.extid_map_file, insert_log_file=args.insert_log_file) if args.kafka_mode: - KafkaJsonPusher(dci, args.kafka_hosts, args.kafka_env, "api-datacite", - "fatcat-import", consume_batch_size=args.batch_size).run() + KafkaJsonPusher( + dci, + args.kafka_hosts, + args.kafka_env, + "api-datacite", + "fatcat-{}-import-datacite".format(args.kafka_env), + consume_batch_size=args.batch_size, + ).run() else: JsonLinePusher(dci, args.json_file).run() |