diff options
Diffstat (limited to 'python/fatcat_tools/harvest/harvest_common.py')
-rw-r--r-- | python/fatcat_tools/harvest/harvest_common.py | 40 |
1 files changed, 29 insertions, 11 deletions
diff --git a/python/fatcat_tools/harvest/harvest_common.py b/python/fatcat_tools/harvest/harvest_common.py index da06275f..90f499da 100644 --- a/python/fatcat_tools/harvest/harvest_common.py +++ b/python/fatcat_tools/harvest/harvest_common.py @@ -8,6 +8,8 @@ from requests.adapters import HTTPAdapter # unclear why pylint chokes on this import. Recent 'requests' and 'urllib3' are # in Pipenv.lock, and there are no errors in QA from requests.packages.urllib3.util.retry import Retry # pylint: disable=import-error +from confluent_kafka import Producer, Consumer, TopicPartition, KafkaException, \ + OFFSET_BEGINNING # Used for parsing ISO date format (YYYY-MM-DD) @@ -104,14 +106,14 @@ class HarvestState: date = datetime.datetime.strptime(state['completed-date'], DATE_FMT).date() self.complete(date) - def complete(self, date, kafka_topic=None): + def complete(self, date, kafka_topic=None, kafka_config=None): """ Records that a date has been processed successfully. Updates internal state and returns a JSON representation to be serialized. Will publish to a kafka topic if passed as an argument. - kafka_topic should have type pykafka.Topic (not str) + kafka_topic should be a string. A producer will be created and destroyed. """ try: self.to_process.remove(date) @@ -123,25 +125,41 @@ class HarvestState: 'completed-date': str(date), }).encode('utf-8') if kafka_topic: - with kafka_topic.get_sync_producer() as producer: - producer.produce(state_json) + assert(kafka_config) + def fail_fast(err, msg): + if err: + raise KafkaException(err) + print("Commiting status to Kafka: {}".format(kafka_topic)) + producer = Producer(kafka_config) + producer.produce(kafka_topic, state_json, on_delivery=fail_fast) + producer.flush() return state_json - def initialize_from_kafka(self, kafka_topic): + def initialize_from_kafka(self, kafka_topic, kafka_config): """ - kafka_topic should have type pykafka.Topic (not str) + kafka_topic should have type str """ if not kafka_topic: return - print("Fetching state from kafka topic: {}".format(kafka_topic.name)) - consumer = kafka_topic.get_simple_consumer(consumer_timeout_ms=1000) + print("Fetching state from kafka topic: {}".format(kafka_topic)) + conf = kafka_config.copy() + conf.update({ + 'auto.offset.reset': 'earliest', + 'session.timeout.ms': 10000, + 'group.id': kafka_topic + "-init", + }) + consumer = Consumer(conf) + consumer.assign([TopicPartition(kafka_topic, 0, OFFSET_BEGINNING)]) c = 0 while True: - msg = consumer.consume(block=True) + msg = consumer.poll(timeout=1.0) if not msg: break - #sys.stdout.write('.') - self.update(msg.value.decode('utf-8')) + if msg.error(): + raise KafkaException(msg.error()) + sys.stdout.write('.') # XXX: + self.update(msg.value().decode('utf-8')) c += 1 + consumer.close() print("... got {} state update messages, done".format(c)) |