aboutsummaryrefslogtreecommitdiffstats
path: root/python/fatcat_tools/importers/common.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/fatcat_tools/importers/common.py')
-rw-r--r--python/fatcat_tools/importers/common.py34
1 files changed, 24 insertions, 10 deletions
diff --git a/python/fatcat_tools/importers/common.py b/python/fatcat_tools/importers/common.py
index 32bb210a..42fe38aa 100644
--- a/python/fatcat_tools/importers/common.py
+++ b/python/fatcat_tools/importers/common.py
@@ -703,14 +703,23 @@ class KafkaJsonPusher(RecordPusher):
topic_suffix,
group,
)
+ self.poll_interval = kwargs.get('poll_interval', 5.0)
+ self.consume_batch_size = kwargs.get('consume_batch_size', 100)
def run(self):
count = 0
while True:
+ # TODO: this is batch-oriented, because underlying importer is
+ # often batch-oriented, but this doesn't confirm that entire batch
+ # has been pushed to fatcat before commiting offset. Eg, consider
+ # case where there there is one update and thousands of creates;
+ # update would be lingering in importer, and if importer crashed
+ # never created. Not great.
batch = self.consumer.consume(
- num_messages=self.edit_batch_size,
- timeout=3.0)
- print("... got {} kafka messages".format(len(batch)))
+ num_messages=self.consume_batch_size,
+ timeout=self.poll_interval)
+ print("... got {} kafka messages ({}sec poll interval)".format(
+ len(batch), self.poll_interval))
if not batch:
# TODO: could have some larger timeout here and
# self.importer.finish() if it's been more than, eg, a couple
@@ -727,10 +736,11 @@ class KafkaJsonPusher(RecordPusher):
count += 1
if count % 500 == 0:
print("Import counts: {}".format(self.importer.counts))
- # locally store the last processed message; will be auto-commited
- # from this "stored" value
- assert msg
- self.consumer.store_offsets(msg)
+ for msg in batch:
+ # locally store offsets of processed messages; will be
+ # auto-commited by librdkafka from this "stored" value
+ self.consumer.store_offsets(message=msg)
+
# TODO: should catch UNIX signals (HUP?) to shutdown cleanly, and/or
# commit the current batch if it has been lingering
counts = self.importer.finish()
@@ -750,7 +760,6 @@ def make_kafka_consumer(hosts, env, topic_suffix, group):
raise KafkaException(err)
for p in partitions:
# check for partition-specific commit errors
- print(p)
if p.error:
print("Kafka consumer commit error: {}".format(p.error))
print("Bailing out...")
@@ -764,12 +773,17 @@ def make_kafka_consumer(hosts, env, topic_suffix, group):
#auto_commit_interval_ms=30000, # 30 seconds
conf = {
'bootstrap.servers': hosts,
- 'group.id': group.encode('utf-8'),
+ 'group.id': group,
'on_commit': fail_fast,
'delivery.report.only.error': True,
+ # messages don't have offset marked as stored until pushed to
+ # elastic, but we do auto-commit stored offsets to broker
'enable.auto.offset.store': False,
+ 'enable.auto.commit': True,
+ # user code timeout; if no poll after this long, assume user code
+ # hung and rebalance (default: 5min)
+ 'max.poll.interval.ms': 120000,
'default.topic.config': {
- 'request.required.acks': -1,
'auto.offset.reset': 'latest',
},
}