summaryrefslogtreecommitdiffstats
path: root/python
diff options
context:
space:
mode:
Diffstat (limited to 'python')
-rwxr-xr-xpython/fatcat_import.py2
-rw-r--r--python/fatcat_tools/harvest/doi_registrars.py2
-rw-r--r--python/fatcat_tools/harvest/oaipmh.py2
-rw-r--r--python/fatcat_tools/importers/common.py34
-rw-r--r--python/fatcat_tools/workers/changelog.py14
-rw-r--r--python/fatcat_tools/workers/elasticsearch.py33
-rw-r--r--python/fatcat_tools/workers/worker_common.py6
-rwxr-xr-xpython/fatcat_worker.py2
8 files changed, 69 insertions, 26 deletions
diff --git a/python/fatcat_import.py b/python/fatcat_import.py
index 7a0cd2ae..2239f179 100755
--- a/python/fatcat_import.py
+++ b/python/fatcat_import.py
@@ -13,7 +13,7 @@ def run_crossref(args):
bezerk_mode=args.bezerk_mode)
if args.kafka_mode:
KafkaJsonPusher(fci, args.kafka_hosts, args.kafka_env, "api-crossref",
- "fatcat-import", edit_batch_size=args.batch_size).run()
+ "fatcat-import", consume_batch_size=args.batch_size).run()
else:
JsonLinePusher(fci, args.json_file).run()
diff --git a/python/fatcat_tools/harvest/doi_registrars.py b/python/fatcat_tools/harvest/doi_registrars.py
index 133a3f01..3362df35 100644
--- a/python/fatcat_tools/harvest/doi_registrars.py
+++ b/python/fatcat_tools/harvest/doi_registrars.py
@@ -7,7 +7,7 @@ import time
import itertools
import datetime
import requests
-from confluent_kafka import Producer
+from confluent_kafka import Producer, KafkaException
from fatcat_tools.workers import most_recent_message
from .harvest_common import HarvestState, requests_retry_session
diff --git a/python/fatcat_tools/harvest/oaipmh.py b/python/fatcat_tools/harvest/oaipmh.py
index a0c3c2cf..3e3bea03 100644
--- a/python/fatcat_tools/harvest/oaipmh.py
+++ b/python/fatcat_tools/harvest/oaipmh.py
@@ -8,7 +8,7 @@ import itertools
import datetime
import requests
import sickle
-from confluent_kafka import Producer
+from confluent_kafka import Producer, KafkaException
from fatcat_tools.workers import most_recent_message
from .harvest_common import HarvestState
diff --git a/python/fatcat_tools/importers/common.py b/python/fatcat_tools/importers/common.py
index 32bb210a..42fe38aa 100644
--- a/python/fatcat_tools/importers/common.py
+++ b/python/fatcat_tools/importers/common.py
@@ -703,14 +703,23 @@ class KafkaJsonPusher(RecordPusher):
topic_suffix,
group,
)
+ self.poll_interval = kwargs.get('poll_interval', 5.0)
+ self.consume_batch_size = kwargs.get('consume_batch_size', 100)
def run(self):
count = 0
while True:
+ # TODO: this is batch-oriented, because underlying importer is
+ # often batch-oriented, but this doesn't confirm that entire batch
+ # has been pushed to fatcat before commiting offset. Eg, consider
+ # case where there there is one update and thousands of creates;
+ # update would be lingering in importer, and if importer crashed
+ # never created. Not great.
batch = self.consumer.consume(
- num_messages=self.edit_batch_size,
- timeout=3.0)
- print("... got {} kafka messages".format(len(batch)))
+ num_messages=self.consume_batch_size,
+ timeout=self.poll_interval)
+ print("... got {} kafka messages ({}sec poll interval)".format(
+ len(batch), self.poll_interval))
if not batch:
# TODO: could have some larger timeout here and
# self.importer.finish() if it's been more than, eg, a couple
@@ -727,10 +736,11 @@ class KafkaJsonPusher(RecordPusher):
count += 1
if count % 500 == 0:
print("Import counts: {}".format(self.importer.counts))
- # locally store the last processed message; will be auto-commited
- # from this "stored" value
- assert msg
- self.consumer.store_offsets(msg)
+ for msg in batch:
+ # locally store offsets of processed messages; will be
+ # auto-commited by librdkafka from this "stored" value
+ self.consumer.store_offsets(message=msg)
+
# TODO: should catch UNIX signals (HUP?) to shutdown cleanly, and/or
# commit the current batch if it has been lingering
counts = self.importer.finish()
@@ -750,7 +760,6 @@ def make_kafka_consumer(hosts, env, topic_suffix, group):
raise KafkaException(err)
for p in partitions:
# check for partition-specific commit errors
- print(p)
if p.error:
print("Kafka consumer commit error: {}".format(p.error))
print("Bailing out...")
@@ -764,12 +773,17 @@ def make_kafka_consumer(hosts, env, topic_suffix, group):
#auto_commit_interval_ms=30000, # 30 seconds
conf = {
'bootstrap.servers': hosts,
- 'group.id': group.encode('utf-8'),
+ 'group.id': group,
'on_commit': fail_fast,
'delivery.report.only.error': True,
+ # messages don't have offset marked as stored until pushed to
+ # elastic, but we do auto-commit stored offsets to broker
'enable.auto.offset.store': False,
+ 'enable.auto.commit': True,
+ # user code timeout; if no poll after this long, assume user code
+ # hung and rebalance (default: 5min)
+ 'max.poll.interval.ms': 120000,
'default.topic.config': {
- 'request.required.acks': -1,
'auto.offset.reset': 'latest',
},
}
diff --git a/python/fatcat_tools/workers/changelog.py b/python/fatcat_tools/workers/changelog.py
index 4a54c649..c134bde2 100644
--- a/python/fatcat_tools/workers/changelog.py
+++ b/python/fatcat_tools/workers/changelog.py
@@ -13,7 +13,6 @@ class ChangelogWorker(FatcatWorker):
"""
def __init__(self, api, kafka_hosts, produce_topic, poll_interval=10.0, offset=None):
- # TODO: should be offset=0
super().__init__(kafka_hosts=kafka_hosts,
produce_topic=produce_topic,
api=api)
@@ -118,7 +117,15 @@ class EntityUpdatesWorker(FatcatWorker):
consumer_conf = self.kafka_config.copy()
consumer_conf.update({
'group.id': self.consumer_group,
+ 'on_commit': fail_fast,
+ 'delivery.report.only.error': True,
+ # messages don't have offset marked as stored until pushed to
+ # elastic, but we do auto-commit stored offsets to broker
+ 'enable.auto.commit': True,
'enable.auto.offset.store': False,
+ # user code timeout; if no poll after this long, assume user code
+ # hung and rebalance (default: 5min)
+ 'max.poll.interval.ms': 180000,
'default.topic.config': {
'auto.offset.reset': 'latest',
},
@@ -142,8 +149,7 @@ class EntityUpdatesWorker(FatcatWorker):
while True:
msg = consumer.poll(self.poll_interval)
if not msg:
- print("nothing new from kafka (interval:{})".format(self.poll_interval))
- consumer.commit()
+ print("nothing new from kafka (poll_interval: {} sec)".format(self.poll_interval))
continue
if msg.error():
raise KafkaException(msg.error())
@@ -202,5 +208,5 @@ class EntityUpdatesWorker(FatcatWorker):
on_delivery=fail_fast,
)
# TODO: actually update works
- consumer.store_offsets(msg)
+ consumer.store_offsets(message=msg)
diff --git a/python/fatcat_tools/workers/elasticsearch.py b/python/fatcat_tools/workers/elasticsearch.py
index 547e270c..acb705c2 100644
--- a/python/fatcat_tools/workers/elasticsearch.py
+++ b/python/fatcat_tools/workers/elasticsearch.py
@@ -22,7 +22,7 @@ class ElasticsearchReleaseWorker(FatcatWorker):
batch_size=200):
super().__init__(kafka_hosts=kafka_hosts,
consume_topic=consume_topic)
- self.consumer_group = "elasticsearch-updates"
+ self.consumer_group = "elasticsearch-updates3"
self.batch_size = batch_size
self.poll_interval = poll_interval
self.elasticsearch_backend = elasticsearch_backend
@@ -34,6 +34,22 @@ class ElasticsearchReleaseWorker(FatcatWorker):
def run(self):
ac = ApiClient()
+ def fail_fast(err, partitions):
+ if err is not None:
+ print("Kafka consumer commit error: {}".format(err))
+ print("Bailing out...")
+ # TODO: should it be sys.exit(-1)?
+ raise KafkaException(err)
+ for p in partitions:
+ # check for partition-specific commit errors
+ if p.error:
+ print("Kafka consumer commit error: {}".format(p.error))
+ print("Bailing out...")
+ # TODO: should it be sys.exit(-1)?
+ raise KafkaException(err)
+ #print("Kafka consumer commit successful")
+ pass
+
def on_rebalance(consumer, partitions):
for p in partitions:
if p.error:
@@ -44,7 +60,15 @@ class ElasticsearchReleaseWorker(FatcatWorker):
consumer_conf = self.kafka_config.copy()
consumer_conf.update({
'group.id': self.consumer_group,
+ 'on_commit': fail_fast,
+ 'delivery.report.only.error': True,
+ # messages don't have offset marked as stored until pushed to
+ # elastic, but we do auto-commit stored offsets to broker
+ 'enable.auto.commit': True,
'enable.auto.offset.store': False,
+ # user code timeout; if no poll after this long, assume user code
+ # hung and rebalance (default: 5min)
+ 'max.poll.interval.ms': 60000,
'default.topic.config': {
'auto.offset.reset': 'latest',
},
@@ -92,7 +116,10 @@ class ElasticsearchReleaseWorker(FatcatWorker):
print(desc)
print(resp.content)
raise Exception(desc)
- consumer.store_offsets(batch[-1])
+ for msg in batch:
+ # offsets are *committed* (to brokers) automatically, but need
+ # to be marked as processed here
+ consumer.store_offsets(message=msg)
@@ -109,7 +136,7 @@ class ElasticsearchContainerWorker(ElasticsearchReleaseWorker):
elasticsearch_index=elasticsearch_index,
batch_size=batch_size)
# previous group got corrupted (by pykafka library?)
- self.consumer_group = "elasticsearch-updates2"
+ self.consumer_group = "elasticsearch-updates3"
self.entity_type = ContainerEntity
self.elasticsearch_document_name = "container"
self.transform_func = container_to_elasticsearch
diff --git a/python/fatcat_tools/workers/worker_common.py b/python/fatcat_tools/workers/worker_common.py
index fb8cfc19..ef79f528 100644
--- a/python/fatcat_tools/workers/worker_common.py
+++ b/python/fatcat_tools/workers/worker_common.py
@@ -66,11 +66,7 @@ class FatcatWorker:
self.api = api
self.kafka_config = {
'bootstrap.servers': kafka_hosts,
- 'delivery.report.only.error': True,
- 'message.max.bytes': 20000000, # ~20 MBytes; broker is ~50 MBytes
- 'default.topic.config': {
- 'request.required.acks': 'all',
- },
+ 'message.max.bytes': 20000000, # ~20 MBytes; broker-side max is ~50 MBytes
}
self.produce_topic = produce_topic
self.consume_topic = consume_topic
diff --git a/python/fatcat_worker.py b/python/fatcat_worker.py
index 9ac084c6..628312be 100755
--- a/python/fatcat_worker.py
+++ b/python/fatcat_worker.py
@@ -65,7 +65,7 @@ def main():
sub_changelog.set_defaults(func=run_changelog)
sub_changelog.add_argument('--poll-interval',
help="how long to wait between polling (seconds)",
- default=10.0, type=float)
+ default=5.0, type=float)
sub_entity_updates = subparsers.add_parser('entity-updates')
sub_entity_updates.set_defaults(func=run_entity_updates)