summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorBryan Newbold <bnewbold@robocracy.org>2020-01-21 10:48:16 -0800
committerBryan Newbold <bnewbold@robocracy.org>2020-01-21 10:52:43 -0800
commit2fcc59388a4eb53a7e2370275366272459874e99 (patch)
tree7d092d73dae564f39f68ca4cfb91b9c245ae7eca
parentf7b6b8b3cda32b258ea21518a42bea41bd87532d (diff)
downloadfatcat-2fcc59388a4eb53a7e2370275366272459874e99.tar.gz
fatcat-2fcc59388a4eb53a7e2370275366272459874e99.zip
refactor fatcat_import kafka group names
My current understanding is that consumer group names should be one-to-one with topic names. I previously though offsets were stored on a {topic, group} key, but they seem to be mixed and having too many workers in the same group is bad. In particular, we don't want cross-talk or load between QA and prod. All these topics are caught up in prod, so deploying this change and restarting workers should be safe. This commit does not update the elasticsearch or entity updates workers.
-rwxr-xr-xpython/fatcat_import.py67
1 files changed, 54 insertions, 13 deletions
diff --git a/python/fatcat_import.py b/python/fatcat_import.py
index 656b9a05..ad4de0e2 100755
--- a/python/fatcat_import.py
+++ b/python/fatcat_import.py
@@ -18,8 +18,14 @@ def run_crossref(args):
edit_batch_size=args.batch_size,
bezerk_mode=args.bezerk_mode)
if args.kafka_mode:
- KafkaJsonPusher(fci, args.kafka_hosts, args.kafka_env, "api-crossref",
- "fatcat-import", consume_batch_size=args.batch_size).run()
+ KafkaJsonPusher(
+ fci,
+ args.kafka_hosts,
+ args.kafka_env,
+ "api-crossref",
+ "fatcat-{}-import-crossref".format(args.kafka_env),
+ consume_batch_size=args.batch_size,
+ ).run()
else:
JsonLinePusher(fci, args.json_file).run()
@@ -34,7 +40,13 @@ def run_arxiv(args):
edit_batch_size=args.batch_size)
if args.kafka_mode:
raise NotImplementedError
- #KafkaBs4XmlPusher(ari, args.kafka_hosts, args.kafka_env, "api-arxiv", "fatcat-import").run()
+ #KafkaBs4XmlPusher(
+ # ari,
+ # args.kafka_hosts,
+ # args.kafka_env,
+ # "api-arxiv",
+ # "fatcat-{}-import-arxiv".format(args.kafka_env),
+ #).run()
else:
Bs4XmlFilePusher(ari, args.xml_file, "record").run()
@@ -46,9 +58,20 @@ def run_pubmed(args):
lookup_refs=(not args.no_lookup_refs))
if args.kafka_mode:
raise NotImplementedError
- #KafkaBs4XmlPusher(pi, args.kafka_hosts, args.kafka_env, "api-pubmed", "fatcat-import").run()
+ #KafkaBs4XmlPusher(
+ # pi,
+ # args.kafka_hosts,
+ # args.kafka_env,
+ # "api-pubmed",
+ # "fatcat-{}import-arxiv".format(args.kafka_env),
+ #).run()
else:
- Bs4XmlLargeFilePusher(pi, args.xml_file, "PubmedArticle", record_list_tag="PubmedArticleSet").run()
+ Bs4XmlLargeFilePusher(
+ pi,
+ args.xml_file,
+ "PubmedArticle",
+ record_list_tag="PubmedArticleSet",
+ ).run()
def run_jstor(args):
ji = JstorImporter(args.api,
@@ -107,9 +130,15 @@ def run_ingest_file(args):
require_grobid=(not args.no_require_grobid),
edit_batch_size=args.batch_size)
if args.kafka_mode:
- KafkaJsonPusher(ifri, args.kafka_hosts, args.kafka_env, "ingest-file-results",
- "fatcat-ingest-file-result", kafka_namespace="sandcrawler",
- consume_batch_size=args.batch_size).run()
+ KafkaJsonPusher(
+ ifri,
+ args.kafka_hosts,
+ args.kafka_env,
+ "ingest-file-results",
+ "fatcat-{}-ingest-file-result".format(args.kafka_env),
+ kafka_namespace="sandcrawler",
+ consume_batch_size=args.batch_size,
+ ).run()
else:
JsonLinePusher(ifri, args.json_file).run()
@@ -118,9 +147,15 @@ def run_savepapernow_file(args):
editgroup_description=args.editgroup_description_override,
edit_batch_size=args.batch_size)
if args.kafka_mode:
- KafkaJsonPusher(ifri, args.kafka_hosts, args.kafka_env, "ingest-file-results",
- "savepapernow-file-result", kafka_namespace="sandcrawler",
- consume_batch_size=args.batch_size).run()
+ KafkaJsonPusher(
+ ifri,
+ args.kafka_hosts,
+ args.kafka_env,
+ "ingest-file-results",
+ "fatcat-{}-savepapernow-file-result".format(args.kafka_env),
+ kafka_namespace="sandcrawler",
+ consume_batch_size=args.batch_size,
+ ).run()
else:
JsonLinePusher(ifri, args.json_file).run()
@@ -183,8 +218,14 @@ def run_datacite(args):
extid_map_file=args.extid_map_file,
insert_log_file=args.insert_log_file)
if args.kafka_mode:
- KafkaJsonPusher(dci, args.kafka_hosts, args.kafka_env, "api-datacite",
- "fatcat-import", consume_batch_size=args.batch_size).run()
+ KafkaJsonPusher(
+ dci,
+ args.kafka_hosts,
+ args.kafka_env,
+ "api-datacite",
+ "fatcat-{}-import-datacite".format(args.kafka_env),
+ consume_batch_size=args.batch_size,
+ ).run()
else:
JsonLinePusher(dci, args.json_file).run()