diff options
-rwxr-xr-x | python/fatcat_import.py | 2 | ||||
-rw-r--r-- | python/fatcat_tools/harvest/doi_registrars.py | 2 | ||||
-rw-r--r-- | python/fatcat_tools/harvest/oaipmh.py | 2 | ||||
-rw-r--r-- | python/fatcat_tools/importers/common.py | 34 | ||||
-rw-r--r-- | python/fatcat_tools/workers/changelog.py | 14 | ||||
-rw-r--r-- | python/fatcat_tools/workers/elasticsearch.py | 33 | ||||
-rw-r--r-- | python/fatcat_tools/workers/worker_common.py | 6 | ||||
-rwxr-xr-x | python/fatcat_worker.py | 2 |
8 files changed, 69 insertions, 26 deletions
diff --git a/python/fatcat_import.py b/python/fatcat_import.py index 7a0cd2ae..2239f179 100755 --- a/python/fatcat_import.py +++ b/python/fatcat_import.py @@ -13,7 +13,7 @@ def run_crossref(args): bezerk_mode=args.bezerk_mode) if args.kafka_mode: KafkaJsonPusher(fci, args.kafka_hosts, args.kafka_env, "api-crossref", - "fatcat-import", edit_batch_size=args.batch_size).run() + "fatcat-import", consume_batch_size=args.batch_size).run() else: JsonLinePusher(fci, args.json_file).run() diff --git a/python/fatcat_tools/harvest/doi_registrars.py b/python/fatcat_tools/harvest/doi_registrars.py index 133a3f01..3362df35 100644 --- a/python/fatcat_tools/harvest/doi_registrars.py +++ b/python/fatcat_tools/harvest/doi_registrars.py @@ -7,7 +7,7 @@ import time import itertools import datetime import requests -from confluent_kafka import Producer +from confluent_kafka import Producer, KafkaException from fatcat_tools.workers import most_recent_message from .harvest_common import HarvestState, requests_retry_session diff --git a/python/fatcat_tools/harvest/oaipmh.py b/python/fatcat_tools/harvest/oaipmh.py index a0c3c2cf..3e3bea03 100644 --- a/python/fatcat_tools/harvest/oaipmh.py +++ b/python/fatcat_tools/harvest/oaipmh.py @@ -8,7 +8,7 @@ import itertools import datetime import requests import sickle -from confluent_kafka import Producer +from confluent_kafka import Producer, KafkaException from fatcat_tools.workers import most_recent_message from .harvest_common import HarvestState diff --git a/python/fatcat_tools/importers/common.py b/python/fatcat_tools/importers/common.py index 32bb210a..42fe38aa 100644 --- a/python/fatcat_tools/importers/common.py +++ b/python/fatcat_tools/importers/common.py @@ -703,14 +703,23 @@ class KafkaJsonPusher(RecordPusher): topic_suffix, group, ) + self.poll_interval = kwargs.get('poll_interval', 5.0) + self.consume_batch_size = kwargs.get('consume_batch_size', 100) def run(self): count = 0 while True: + # TODO: this is batch-oriented, because underlying importer is + # often batch-oriented, but this doesn't confirm that entire batch + # has been pushed to fatcat before commiting offset. Eg, consider + # case where there there is one update and thousands of creates; + # update would be lingering in importer, and if importer crashed + # never created. Not great. batch = self.consumer.consume( - num_messages=self.edit_batch_size, - timeout=3.0) - print("... got {} kafka messages".format(len(batch))) + num_messages=self.consume_batch_size, + timeout=self.poll_interval) + print("... got {} kafka messages ({}sec poll interval)".format( + len(batch), self.poll_interval)) if not batch: # TODO: could have some larger timeout here and # self.importer.finish() if it's been more than, eg, a couple @@ -727,10 +736,11 @@ class KafkaJsonPusher(RecordPusher): count += 1 if count % 500 == 0: print("Import counts: {}".format(self.importer.counts)) - # locally store the last processed message; will be auto-commited - # from this "stored" value - assert msg - self.consumer.store_offsets(msg) + for msg in batch: + # locally store offsets of processed messages; will be + # auto-commited by librdkafka from this "stored" value + self.consumer.store_offsets(message=msg) + # TODO: should catch UNIX signals (HUP?) to shutdown cleanly, and/or # commit the current batch if it has been lingering counts = self.importer.finish() @@ -750,7 +760,6 @@ def make_kafka_consumer(hosts, env, topic_suffix, group): raise KafkaException(err) for p in partitions: # check for partition-specific commit errors - print(p) if p.error: print("Kafka consumer commit error: {}".format(p.error)) print("Bailing out...") @@ -764,12 +773,17 @@ def make_kafka_consumer(hosts, env, topic_suffix, group): #auto_commit_interval_ms=30000, # 30 seconds conf = { 'bootstrap.servers': hosts, - 'group.id': group.encode('utf-8'), + 'group.id': group, 'on_commit': fail_fast, 'delivery.report.only.error': True, + # messages don't have offset marked as stored until pushed to + # elastic, but we do auto-commit stored offsets to broker 'enable.auto.offset.store': False, + 'enable.auto.commit': True, + # user code timeout; if no poll after this long, assume user code + # hung and rebalance (default: 5min) + 'max.poll.interval.ms': 120000, 'default.topic.config': { - 'request.required.acks': -1, 'auto.offset.reset': 'latest', }, } diff --git a/python/fatcat_tools/workers/changelog.py b/python/fatcat_tools/workers/changelog.py index 4a54c649..c134bde2 100644 --- a/python/fatcat_tools/workers/changelog.py +++ b/python/fatcat_tools/workers/changelog.py @@ -13,7 +13,6 @@ class ChangelogWorker(FatcatWorker): """ def __init__(self, api, kafka_hosts, produce_topic, poll_interval=10.0, offset=None): - # TODO: should be offset=0 super().__init__(kafka_hosts=kafka_hosts, produce_topic=produce_topic, api=api) @@ -118,7 +117,15 @@ class EntityUpdatesWorker(FatcatWorker): consumer_conf = self.kafka_config.copy() consumer_conf.update({ 'group.id': self.consumer_group, + 'on_commit': fail_fast, + 'delivery.report.only.error': True, + # messages don't have offset marked as stored until pushed to + # elastic, but we do auto-commit stored offsets to broker + 'enable.auto.commit': True, 'enable.auto.offset.store': False, + # user code timeout; if no poll after this long, assume user code + # hung and rebalance (default: 5min) + 'max.poll.interval.ms': 180000, 'default.topic.config': { 'auto.offset.reset': 'latest', }, @@ -142,8 +149,7 @@ class EntityUpdatesWorker(FatcatWorker): while True: msg = consumer.poll(self.poll_interval) if not msg: - print("nothing new from kafka (interval:{})".format(self.poll_interval)) - consumer.commit() + print("nothing new from kafka (poll_interval: {} sec)".format(self.poll_interval)) continue if msg.error(): raise KafkaException(msg.error()) @@ -202,5 +208,5 @@ class EntityUpdatesWorker(FatcatWorker): on_delivery=fail_fast, ) # TODO: actually update works - consumer.store_offsets(msg) + consumer.store_offsets(message=msg) diff --git a/python/fatcat_tools/workers/elasticsearch.py b/python/fatcat_tools/workers/elasticsearch.py index 547e270c..acb705c2 100644 --- a/python/fatcat_tools/workers/elasticsearch.py +++ b/python/fatcat_tools/workers/elasticsearch.py @@ -22,7 +22,7 @@ class ElasticsearchReleaseWorker(FatcatWorker): batch_size=200): super().__init__(kafka_hosts=kafka_hosts, consume_topic=consume_topic) - self.consumer_group = "elasticsearch-updates" + self.consumer_group = "elasticsearch-updates3" self.batch_size = batch_size self.poll_interval = poll_interval self.elasticsearch_backend = elasticsearch_backend @@ -34,6 +34,22 @@ class ElasticsearchReleaseWorker(FatcatWorker): def run(self): ac = ApiClient() + def fail_fast(err, partitions): + if err is not None: + print("Kafka consumer commit error: {}".format(err)) + print("Bailing out...") + # TODO: should it be sys.exit(-1)? + raise KafkaException(err) + for p in partitions: + # check for partition-specific commit errors + if p.error: + print("Kafka consumer commit error: {}".format(p.error)) + print("Bailing out...") + # TODO: should it be sys.exit(-1)? + raise KafkaException(err) + #print("Kafka consumer commit successful") + pass + def on_rebalance(consumer, partitions): for p in partitions: if p.error: @@ -44,7 +60,15 @@ class ElasticsearchReleaseWorker(FatcatWorker): consumer_conf = self.kafka_config.copy() consumer_conf.update({ 'group.id': self.consumer_group, + 'on_commit': fail_fast, + 'delivery.report.only.error': True, + # messages don't have offset marked as stored until pushed to + # elastic, but we do auto-commit stored offsets to broker + 'enable.auto.commit': True, 'enable.auto.offset.store': False, + # user code timeout; if no poll after this long, assume user code + # hung and rebalance (default: 5min) + 'max.poll.interval.ms': 60000, 'default.topic.config': { 'auto.offset.reset': 'latest', }, @@ -92,7 +116,10 @@ class ElasticsearchReleaseWorker(FatcatWorker): print(desc) print(resp.content) raise Exception(desc) - consumer.store_offsets(batch[-1]) + for msg in batch: + # offsets are *committed* (to brokers) automatically, but need + # to be marked as processed here + consumer.store_offsets(message=msg) @@ -109,7 +136,7 @@ class ElasticsearchContainerWorker(ElasticsearchReleaseWorker): elasticsearch_index=elasticsearch_index, batch_size=batch_size) # previous group got corrupted (by pykafka library?) - self.consumer_group = "elasticsearch-updates2" + self.consumer_group = "elasticsearch-updates3" self.entity_type = ContainerEntity self.elasticsearch_document_name = "container" self.transform_func = container_to_elasticsearch diff --git a/python/fatcat_tools/workers/worker_common.py b/python/fatcat_tools/workers/worker_common.py index fb8cfc19..ef79f528 100644 --- a/python/fatcat_tools/workers/worker_common.py +++ b/python/fatcat_tools/workers/worker_common.py @@ -66,11 +66,7 @@ class FatcatWorker: self.api = api self.kafka_config = { 'bootstrap.servers': kafka_hosts, - 'delivery.report.only.error': True, - 'message.max.bytes': 20000000, # ~20 MBytes; broker is ~50 MBytes - 'default.topic.config': { - 'request.required.acks': 'all', - }, + 'message.max.bytes': 20000000, # ~20 MBytes; broker-side max is ~50 MBytes } self.produce_topic = produce_topic self.consume_topic = consume_topic diff --git a/python/fatcat_worker.py b/python/fatcat_worker.py index 9ac084c6..628312be 100755 --- a/python/fatcat_worker.py +++ b/python/fatcat_worker.py @@ -65,7 +65,7 @@ def main(): sub_changelog.set_defaults(func=run_changelog) sub_changelog.add_argument('--poll-interval', help="how long to wait between polling (seconds)", - default=10.0, type=float) + default=5.0, type=float) sub_entity_updates = subparsers.add_parser('entity-updates') sub_entity_updates.set_defaults(func=run_entity_updates) |