aboutsummaryrefslogtreecommitdiffstats
path: root/python/sandcrawler/workers.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/sandcrawler/workers.py')
-rw-r--r--python/sandcrawler/workers.py10
1 files changed, 5 insertions, 5 deletions
diff --git a/python/sandcrawler/workers.py b/python/sandcrawler/workers.py
index 3b46cb7..c1203e1 100644
--- a/python/sandcrawler/workers.py
+++ b/python/sandcrawler/workers.py
@@ -310,15 +310,15 @@ class ZipfilePusher(RecordPusher):
class KafkaJsonPusher(RecordPusher):
- def __init__(self, worker, kafka_hosts, kafka_env, topic_suffix, group, **kwargs):
+ def __init__(self, worker, kafka_hosts, consume_topic, group, **kwargs):
self.counts = Counter()
self.worker = worker
self.consumer = make_kafka_consumer(
kafka_hosts,
- kafka_env,
- topic_suffix,
+ consume_topic,
group,
)
+ self.push_batches = kwargs.get('push_batches', False)
self.poll_interval = kwargs.get('poll_interval', 5.0)
self.batch_size = kwargs.get('batch_size', 100)
if self.batch_size in (0, 1):
@@ -375,8 +375,8 @@ class KafkaJsonPusher(RecordPusher):
return self.counts
-def make_kafka_consumer(hosts, env, topic_suffix, group):
- topic_name = "fatcat-{}.{}".format(env, topic_suffix)
+def make_kafka_consumer(hosts, consume_topic, group):
+ topic_name = consume_topic
def fail_fast(err, partitions):
if err is not None: