summaryrefslogtreecommitdiffstats
path: root/python/fatcat_tools/harvest/harvest_common.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/fatcat_tools/harvest/harvest_common.py')
-rw-r--r--python/fatcat_tools/harvest/harvest_common.py40
1 files changed, 29 insertions, 11 deletions
diff --git a/python/fatcat_tools/harvest/harvest_common.py b/python/fatcat_tools/harvest/harvest_common.py
index da06275f..90f499da 100644
--- a/python/fatcat_tools/harvest/harvest_common.py
+++ b/python/fatcat_tools/harvest/harvest_common.py
@@ -8,6 +8,8 @@ from requests.adapters import HTTPAdapter
# unclear why pylint chokes on this import. Recent 'requests' and 'urllib3' are
# in Pipenv.lock, and there are no errors in QA
from requests.packages.urllib3.util.retry import Retry # pylint: disable=import-error
+from confluent_kafka import Producer, Consumer, TopicPartition, KafkaException, \
+ OFFSET_BEGINNING
# Used for parsing ISO date format (YYYY-MM-DD)
@@ -104,14 +106,14 @@ class HarvestState:
date = datetime.datetime.strptime(state['completed-date'], DATE_FMT).date()
self.complete(date)
- def complete(self, date, kafka_topic=None):
+ def complete(self, date, kafka_topic=None, kafka_config=None):
"""
Records that a date has been processed successfully.
Updates internal state and returns a JSON representation to be
serialized. Will publish to a kafka topic if passed as an argument.
- kafka_topic should have type pykafka.Topic (not str)
+ kafka_topic should be a string. A producer will be created and destroyed.
"""
try:
self.to_process.remove(date)
@@ -123,25 +125,41 @@ class HarvestState:
'completed-date': str(date),
}).encode('utf-8')
if kafka_topic:
- with kafka_topic.get_sync_producer() as producer:
- producer.produce(state_json)
+ assert(kafka_config)
+ def fail_fast(err, msg):
+ if err:
+ raise KafkaException(err)
+ print("Commiting status to Kafka: {}".format(kafka_topic))
+ producer = Producer(kafka_config)
+ producer.produce(kafka_topic, state_json, on_delivery=fail_fast)
+ producer.flush()
return state_json
- def initialize_from_kafka(self, kafka_topic):
+ def initialize_from_kafka(self, kafka_topic, kafka_config):
"""
- kafka_topic should have type pykafka.Topic (not str)
+ kafka_topic should have type str
"""
if not kafka_topic:
return
- print("Fetching state from kafka topic: {}".format(kafka_topic.name))
- consumer = kafka_topic.get_simple_consumer(consumer_timeout_ms=1000)
+ print("Fetching state from kafka topic: {}".format(kafka_topic))
+ conf = kafka_config.copy()
+ conf.update({
+ 'auto.offset.reset': 'earliest',
+ 'session.timeout.ms': 10000,
+ 'group.id': kafka_topic + "-init",
+ })
+ consumer = Consumer(conf)
+ consumer.assign([TopicPartition(kafka_topic, 0, OFFSET_BEGINNING)])
c = 0
while True:
- msg = consumer.consume(block=True)
+ msg = consumer.poll(timeout=1.0)
if not msg:
break
- #sys.stdout.write('.')
- self.update(msg.value.decode('utf-8'))
+ if msg.error():
+ raise KafkaException(msg.error())
+ sys.stdout.write('.') # XXX:
+ self.update(msg.value().decode('utf-8'))
c += 1
+ consumer.close()
print("... got {} state update messages, done".format(c))