diff options
author | Bryan Newbold <bnewbold@robocracy.org> | 2019-09-19 17:11:47 -0700 |
---|---|---|
committer | Bryan Newbold <bnewbold@robocracy.org> | 2019-09-20 11:21:11 -0700 |
commit | 80dc9bab9c6e40cdde95f9e9c7fad13ca64b0769 (patch) | |
tree | 61e14bd3a21bd3dbf70a8a54c488cf09c7c2f11f /python/fatcat_tools/workers/changelog.py | |
parent | 6183e95e9739a6fbf0d8cd77603d075e87804abb (diff) | |
download | fatcat-80dc9bab9c6e40cdde95f9e9c7fad13ca64b0769.tar.gz fatcat-80dc9bab9c6e40cdde95f9e9c7fad13ca64b0769.zip |
small fixes to confluent-kafka importers/workers
- decrease default changelog pipeline to 5.0sec
- fix missing KafkaException harvester imports
- more confluent-kafka tweaks
- updates to kafka consumer configs
- bump elastic updates consumergroup (again)
Diffstat (limited to 'python/fatcat_tools/workers/changelog.py')
-rw-r--r-- | python/fatcat_tools/workers/changelog.py | 14 |
1 files changed, 10 insertions, 4 deletions
diff --git a/python/fatcat_tools/workers/changelog.py b/python/fatcat_tools/workers/changelog.py index 4a54c649..c134bde2 100644 --- a/python/fatcat_tools/workers/changelog.py +++ b/python/fatcat_tools/workers/changelog.py @@ -13,7 +13,6 @@ class ChangelogWorker(FatcatWorker): """ def __init__(self, api, kafka_hosts, produce_topic, poll_interval=10.0, offset=None): - # TODO: should be offset=0 super().__init__(kafka_hosts=kafka_hosts, produce_topic=produce_topic, api=api) @@ -118,7 +117,15 @@ class EntityUpdatesWorker(FatcatWorker): consumer_conf = self.kafka_config.copy() consumer_conf.update({ 'group.id': self.consumer_group, + 'on_commit': fail_fast, + 'delivery.report.only.error': True, + # messages don't have offset marked as stored until pushed to + # elastic, but we do auto-commit stored offsets to broker + 'enable.auto.commit': True, 'enable.auto.offset.store': False, + # user code timeout; if no poll after this long, assume user code + # hung and rebalance (default: 5min) + 'max.poll.interval.ms': 180000, 'default.topic.config': { 'auto.offset.reset': 'latest', }, @@ -142,8 +149,7 @@ class EntityUpdatesWorker(FatcatWorker): while True: msg = consumer.poll(self.poll_interval) if not msg: - print("nothing new from kafka (interval:{})".format(self.poll_interval)) - consumer.commit() + print("nothing new from kafka (poll_interval: {} sec)".format(self.poll_interval)) continue if msg.error(): raise KafkaException(msg.error()) @@ -202,5 +208,5 @@ class EntityUpdatesWorker(FatcatWorker): on_delivery=fail_fast, ) # TODO: actually update works - consumer.store_offsets(msg) + consumer.store_offsets(message=msg) |