aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorBryan Newbold <bnewbold@archive.org>2019-11-13 16:42:05 -0800
committerBryan Newbold <bnewbold@archive.org>2019-11-13 16:42:07 -0800
commite456e6a17eb9655afc4dc8146f50a7dba4fd8601 (patch)
tree18040877fb6bf72fca6e64438a69d529c0eac2b7
parent0897e3a5714ff8549e8ab68f44f0c49ce3f0405d (diff)
downloadsandcrawler-e456e6a17eb9655afc4dc8146f50a7dba4fd8601.tar.gz
sandcrawler-e456e6a17eb9655afc4dc8146f50a7dba4fd8601.zip
refactor consume_topic name out of make_kafka_consumer()
Best to do this in wrapping code for full flexibility.
-rw-r--r--python/sandcrawler/workers.py10
1 files changed, 5 insertions, 5 deletions
diff --git a/python/sandcrawler/workers.py b/python/sandcrawler/workers.py
index 3b46cb7..c1203e1 100644
--- a/python/sandcrawler/workers.py
+++ b/python/sandcrawler/workers.py
@@ -310,15 +310,15 @@ class ZipfilePusher(RecordPusher):
class KafkaJsonPusher(RecordPusher):
- def __init__(self, worker, kafka_hosts, kafka_env, topic_suffix, group, **kwargs):
+ def __init__(self, worker, kafka_hosts, consume_topic, group, **kwargs):
self.counts = Counter()
self.worker = worker
self.consumer = make_kafka_consumer(
kafka_hosts,
- kafka_env,
- topic_suffix,
+ consume_topic,
group,
)
+ self.push_batches = kwargs.get('push_batches', False)
self.poll_interval = kwargs.get('poll_interval', 5.0)
self.batch_size = kwargs.get('batch_size', 100)
if self.batch_size in (0, 1):
@@ -375,8 +375,8 @@ class KafkaJsonPusher(RecordPusher):
return self.counts
-def make_kafka_consumer(hosts, env, topic_suffix, group):
- topic_name = "fatcat-{}.{}".format(env, topic_suffix)
+def make_kafka_consumer(hosts, consume_topic, group):
+ topic_name = consume_topic
def fail_fast(err, partitions):
if err is not None: