From 90b5cb354d7d73c920288394aa9fd8d58e752157 Mon Sep 17 00:00:00 2001 From: Bryan Newbold Date: Thu, 19 Sep 2019 20:00:24 -0700 Subject: review/fix all confluent-kafka produce code --- python/fatcat_tools/harvest/doi_registrars.py | 13 ++++++---- python/fatcat_tools/harvest/harvest_common.py | 37 ++++++++++++++++++++++++--- python/fatcat_tools/harvest/oaipmh.py | 13 ++++++---- python/fatcat_tools/importers/common.py | 1 - python/fatcat_tools/workers/changelog.py | 16 +++++++++--- python/fatcat_tools/workers/elasticsearch.py | 22 ++++++++++------ 6 files changed, 75 insertions(+), 27 deletions(-) (limited to 'python/fatcat_tools') diff --git a/python/fatcat_tools/harvest/doi_registrars.py b/python/fatcat_tools/harvest/doi_registrars.py index 3362df35..7e791745 100644 --- a/python/fatcat_tools/harvest/doi_registrars.py +++ b/python/fatcat_tools/harvest/doi_registrars.py @@ -56,11 +56,7 @@ class HarvestCrossrefWorker: self.is_update_filter = is_update_filter self.kafka_config = { 'bootstrap.servers': kafka_hosts, - 'delivery.report.only.error': True, 'message.max.bytes': 20000000, # ~20 MBytes; broker is ~50 MBytes - 'default.topic.config': { - 'request.required.acks': 'all', - }, } self.state = HarvestState(start_date, end_date) @@ -97,7 +93,14 @@ class HarvestCrossrefWorker: # TODO: should it be sys.exit(-1)? raise KafkaException(err) - producer = Producer(self.kafka_config) + producer_conf = self.kafka_config.copy() + producer_conf.update({ + 'delivery.report.only.error': True, + 'default.topic.config': { + 'request.required.acks': -1, # all brokers must confirm + }, + }) + producer = Producer(producer_conf) date_str = date.isoformat() params = self.params(date_str) diff --git a/python/fatcat_tools/harvest/harvest_common.py b/python/fatcat_tools/harvest/harvest_common.py index aa7a69f5..78830a1c 100644 --- a/python/fatcat_tools/harvest/harvest_common.py +++ b/python/fatcat_tools/harvest/harvest_common.py @@ -130,30 +130,56 @@ class HarvestState: if err: raise KafkaException(err) print("Commiting status to Kafka: {}".format(kafka_topic)) - producer = Producer(kafka_config) - producer.produce(kafka_topic, state_json, on_delivery=fail_fast) + producer_conf = kafka_config.copy() + producer_conf.update({ + 'delivery.report.only.error': True, + 'default.topic.config': { + 'request.required.acks': -1, # all brokers must confirm + }, + }) + producer = Producer(producer_conf) + producer.produce( + kafka_topic, + state_json, + on_delivery=fail_fast) producer.flush() return state_json def initialize_from_kafka(self, kafka_topic, kafka_config): """ kafka_topic should have type str + + TODO: this method does not fail if client can't connect to host. """ if not kafka_topic: return print("Fetching state from kafka topic: {}".format(kafka_topic)) + def fail_fast(err, msg): + if err: + raise KafkaException(err) conf = kafka_config.copy() conf.update({ + 'group.id': 'dummy_init_group', # should never be commited + 'enable.auto.commit': False, 'auto.offset.reset': 'earliest', 'session.timeout.ms': 10000, - 'group.id': kafka_topic + "-init", }) consumer = Consumer(conf) + + # this watermark fetch is mostly to ensure we are connected to broker and + # fail fast if not, but we also confirm that we read to end below. + hwm = consumer.get_watermark_offsets( + TopicPartition(kafka_topic, 0), + timeout=5.0, + cached=False) + if not hwm: + raise Exception("Kafka consumer timeout, or topic {} doesn't exist".format(kafka_topic)) + consumer.assign([TopicPartition(kafka_topic, 0, 0)]) c = 0 while True: - msg = consumer.poll(timeout=1.0) + msg = consumer.poll(timeout=2.0) if not msg: break if msg.error(): @@ -162,4 +188,7 @@ class HarvestState: self.update(msg.value().decode('utf-8')) c += 1 consumer.close() + + # verify that we got at least to HWM + assert c >= hwm[1] print("... got {} state update messages, done".format(c)) diff --git a/python/fatcat_tools/harvest/oaipmh.py b/python/fatcat_tools/harvest/oaipmh.py index 3e3bea03..f908ba83 100644 --- a/python/fatcat_tools/harvest/oaipmh.py +++ b/python/fatcat_tools/harvest/oaipmh.py @@ -39,10 +39,7 @@ class HarvestOaiPmhWorker: self.state_topic = state_topic self.kafka_config = { 'bootstrap.servers': kafka_hosts, - 'delivery.report.only.error': True, 'message.max.bytes': 20000000, # ~20 MBytes; broker is ~50 MBytes - 'default.topic.config': - {'request.required.acks': 'all'}, } self.loop_sleep = 60*60 # how long to wait, in seconds, between date checks @@ -62,7 +59,14 @@ class HarvestOaiPmhWorker: # TODO: should it be sys.exit(-1)? raise KafkaException(err) - producer = Producer(self.kafka_config) + producer_conf = self.kafka_config.copy() + producer_conf.update({ + 'delivery.report.only.error': True, + 'default.topic.config': { + 'request.required.acks': -1, # all brokers must confirm + }, + }) + producer = Producer(producer_conf) api = sickle.Sickle(self.endpoint_url) date_str = date.isoformat() @@ -88,7 +92,6 @@ class HarvestOaiPmhWorker: item.raw.encode('utf-8'), key=item.header.identifier.encode('utf-8'), on_delivery=fail_fast) - producer.poll(0) producer.flush() def run(self, continuous=False): diff --git a/python/fatcat_tools/importers/common.py b/python/fatcat_tools/importers/common.py index 42fe38aa..a25c3196 100644 --- a/python/fatcat_tools/importers/common.py +++ b/python/fatcat_tools/importers/common.py @@ -775,7 +775,6 @@ def make_kafka_consumer(hosts, env, topic_suffix, group): 'bootstrap.servers': hosts, 'group.id': group, 'on_commit': fail_fast, - 'delivery.report.only.error': True, # messages don't have offset marked as stored until pushed to # elastic, but we do auto-commit stored offsets to broker 'enable.auto.offset.store': False, diff --git a/python/fatcat_tools/workers/changelog.py b/python/fatcat_tools/workers/changelog.py index c134bde2..8b1ba5e9 100644 --- a/python/fatcat_tools/workers/changelog.py +++ b/python/fatcat_tools/workers/changelog.py @@ -40,7 +40,14 @@ class ChangelogWorker(FatcatWorker): # TODO: should it be sys.exit(-1)? raise KafkaException(err) - producer = Producer(self.kafka_config) + producer_conf = self.kafka_config.copy() + producer_conf.update({ + 'delivery.report.only.error': True, + 'default.topic.config': { + 'request.required.acks': -1, # all brokers must confirm + }, + }) + producer = Producer(producer_conf) while True: latest = int(self.api.get_changelog(limit=1)[0].index) @@ -58,7 +65,7 @@ class ChangelogWorker(FatcatWorker): #NOTE timestamp could be timestamp=cle.timestamp (?) ) self.offset = i - producer.poll(0) + producer.flush() print("Sleeping {} seconds...".format(self.poll_interval)) time.sleep(self.poll_interval) @@ -118,7 +125,6 @@ class EntityUpdatesWorker(FatcatWorker): consumer_conf.update({ 'group.id': self.consumer_group, 'on_commit': fail_fast, - 'delivery.report.only.error': True, # messages don't have offset marked as stored until pushed to # elastic, but we do auto-commit stored offsets to broker 'enable.auto.commit': True, @@ -134,8 +140,9 @@ class EntityUpdatesWorker(FatcatWorker): producer_conf = self.kafka_config.copy() producer_conf.update({ + 'delivery.report.only.error': True, 'default.topic.config': { - 'request.required.acks': -1, + 'request.required.acks': -1, # all brokers must confirm }, }) producer = Producer(producer_conf) @@ -207,6 +214,7 @@ class EntityUpdatesWorker(FatcatWorker): key=ident.encode('utf-8'), on_delivery=fail_fast, ) + producer.flush() # TODO: actually update works consumer.store_offsets(message=msg) diff --git a/python/fatcat_tools/workers/elasticsearch.py b/python/fatcat_tools/workers/elasticsearch.py index acb705c2..2ba241eb 100644 --- a/python/fatcat_tools/workers/elasticsearch.py +++ b/python/fatcat_tools/workers/elasticsearch.py @@ -2,7 +2,7 @@ import json import time import requests -from confluent_kafka import Consumer, Producer, KafkaException +from confluent_kafka import Consumer, KafkaException from fatcat_openapi_client import ReleaseEntity, ContainerEntity, ApiClient from fatcat_tools import * @@ -61,7 +61,6 @@ class ElasticsearchReleaseWorker(FatcatWorker): consumer_conf.update({ 'group.id': self.consumer_group, 'on_commit': fail_fast, - 'delivery.report.only.error': True, # messages don't have offset marked as stored until pushed to # elastic, but we do auto-commit stored offsets to broker 'enable.auto.commit': True, @@ -97,16 +96,24 @@ class ElasticsearchReleaseWorker(FatcatWorker): bulk_actions = [] for msg in batch: json_str = msg.value().decode('utf-8') - entity = entity_from_json(json_str, ReleaseEntity, api_client=ac) - print("Upserting: release/{}".format(entity.ident)) + # HACK: work around a bug where container entities got published to + # release_v03 topic + if self.elasticsearch_document_name == "release": + entity_dict = json.loads(json_str) + if entity_dict.get('name') and not entity_dict.get('title'): + continue + entity = entity_from_json(json_str, self.entity_type, api_client=ac) + # TODO: handle deletions from index bulk_actions.append(json.dumps({ "index": { "_id": entity.ident, }, })) bulk_actions.append(json.dumps( - release_to_elasticsearch(entity))) - elasticsearch_endpoint = "{}/{}/release/_bulk".format( + self.transform_func(entity))) + print("Upserting, eg, {} (of {} releases in elasticsearch)".format(entity.ident, len(batch))) + elasticsearch_endpoint = "{}/{}/{}/_bulk".format( self.elasticsearch_backend, - self.elasticsearch_index) + self.elasticsearch_index, + self.elasticsearch_document_name) resp = requests.post(elasticsearch_endpoint, headers={"Content-Type": "application/x-ndjson"}, data="\n".join(bulk_actions) + "\n") @@ -141,4 +148,3 @@ class ElasticsearchContainerWorker(ElasticsearchReleaseWorker): self.elasticsearch_document_name = "container" self.transform_func = container_to_elasticsearch - -- cgit v1.2.3