diff options
author | Bryan Newbold <bnewbold@archive.org> | 2019-11-13 16:42:05 -0800 |
---|---|---|
committer | Bryan Newbold <bnewbold@archive.org> | 2019-11-13 16:42:07 -0800 |
commit | e456e6a17eb9655afc4dc8146f50a7dba4fd8601 (patch) | |
tree | 18040877fb6bf72fca6e64438a69d529c0eac2b7 /python | |
parent | 0897e3a5714ff8549e8ab68f44f0c49ce3f0405d (diff) | |
download | sandcrawler-e456e6a17eb9655afc4dc8146f50a7dba4fd8601.tar.gz sandcrawler-e456e6a17eb9655afc4dc8146f50a7dba4fd8601.zip |
refactor consume_topic name out of make_kafka_consumer()
Best to do this in wrapping code for full flexibility.
Diffstat (limited to 'python')
-rw-r--r-- | python/sandcrawler/workers.py | 10 |
1 files changed, 5 insertions, 5 deletions
diff --git a/python/sandcrawler/workers.py b/python/sandcrawler/workers.py index 3b46cb7..c1203e1 100644 --- a/python/sandcrawler/workers.py +++ b/python/sandcrawler/workers.py @@ -310,15 +310,15 @@ class ZipfilePusher(RecordPusher): class KafkaJsonPusher(RecordPusher): - def __init__(self, worker, kafka_hosts, kafka_env, topic_suffix, group, **kwargs): + def __init__(self, worker, kafka_hosts, consume_topic, group, **kwargs): self.counts = Counter() self.worker = worker self.consumer = make_kafka_consumer( kafka_hosts, - kafka_env, - topic_suffix, + consume_topic, group, ) + self.push_batches = kwargs.get('push_batches', False) self.poll_interval = kwargs.get('poll_interval', 5.0) self.batch_size = kwargs.get('batch_size', 100) if self.batch_size in (0, 1): @@ -375,8 +375,8 @@ class KafkaJsonPusher(RecordPusher): return self.counts -def make_kafka_consumer(hosts, env, topic_suffix, group): - topic_name = "fatcat-{}.{}".format(env, topic_suffix) +def make_kafka_consumer(hosts, consume_topic, group): + topic_name = consume_topic def fail_fast(err, partitions): if err is not None: |