diff options
author | Bryan Newbold <bnewbold@robocracy.org> | 2021-11-02 18:14:59 -0700 |
---|---|---|
committer | Bryan Newbold <bnewbold@robocracy.org> | 2021-11-02 18:14:59 -0700 |
commit | 31d1a6a713d177990609767d508209ced19ca396 (patch) | |
tree | a628a57bdb373669394a6b520102b1b4b5ffe7da /python/fatcat_tools/harvest/harvest_common.py | |
parent | 9dc891b8098542bb089c8c47098b60a8beb76a53 (diff) | |
download | fatcat-31d1a6a713d177990609767d508209ced19ca396.tar.gz fatcat-31d1a6a713d177990609767d508209ced19ca396.zip |
fmt (black): fatcat_tools/
Diffstat (limited to 'python/fatcat_tools/harvest/harvest_common.py')
-rw-r--r-- | python/fatcat_tools/harvest/harvest_common.py | 83 |
1 files changed, 46 insertions, 37 deletions
diff --git a/python/fatcat_tools/harvest/harvest_common.py b/python/fatcat_tools/harvest/harvest_common.py index 45c2b8ea..fda0dc62 100644 --- a/python/fatcat_tools/harvest/harvest_common.py +++ b/python/fatcat_tools/harvest/harvest_common.py @@ -1,4 +1,3 @@ - import datetime import json import sys @@ -14,8 +13,10 @@ from requests.packages.urllib3.util.retry import Retry # pylint: disable=import # Used for parsing ISO date format (YYYY-MM-DD) DATE_FMT = "%Y-%m-%d" -def requests_retry_session(retries=10, backoff_factor=3, - status_forcelist=(500, 502, 504), session=None): + +def requests_retry_session( + retries=10, backoff_factor=3, status_forcelist=(500, 502, 504), session=None +): """ From: https://www.peterbe.com/plog/best-practice-with-retries-with-requests """ @@ -28,10 +29,11 @@ def requests_retry_session(retries=10, backoff_factor=3, status_forcelist=status_forcelist, ) adapter = HTTPAdapter(max_retries=retry) - session.mount('http://', adapter) - session.mount('https://', adapter) + session.mount("http://", adapter) + session.mount("https://", adapter) return session + class HarvestState: """ First version of this works with full days (dates) @@ -57,8 +59,9 @@ class HarvestState: self.enqueue_period(start_date, end_date, catchup_days) def __str__(self): - return '<HarvestState to_process={}, completed={}>'.format( - len(self.to_process), len(self.completed)) + return "<HarvestState to_process={}, completed={}>".format( + len(self.to_process), len(self.completed) + ) def enqueue_period(self, start_date=None, end_date=None, catchup_days=14): """ @@ -92,7 +95,9 @@ class HarvestState: """ if continuous: # enqueue yesterday - self.enqueue_period(start_date=datetime.datetime.utcnow().date() - datetime.timedelta(days=1)) + self.enqueue_period( + start_date=datetime.datetime.utcnow().date() - datetime.timedelta(days=1) + ) if not self.to_process: return None return sorted(list(self.to_process))[0] @@ -105,8 +110,8 @@ class HarvestState: state stored on disk or in Kafka. """ state = json.loads(state_json) - if 'completed-date' in state: - date = datetime.datetime.strptime(state['completed-date'], DATE_FMT).date() + if "completed-date" in state: + date = datetime.datetime.strptime(state["completed-date"], DATE_FMT).date() self.complete(date) def complete(self, date, kafka_topic=None, kafka_config=None): @@ -123,12 +128,14 @@ class HarvestState: except KeyError: pass self.completed.add(date) - state_json = json.dumps({ - 'in-progress-dates': [str(d) for d in self.to_process], - 'completed-date': str(date), - }).encode('utf-8') + state_json = json.dumps( + { + "in-progress-dates": [str(d) for d in self.to_process], + "completed-date": str(date), + } + ).encode("utf-8") if kafka_topic: - assert(kafka_config) + assert kafka_config def fail_fast(err, msg): if err: @@ -136,17 +143,16 @@ class HarvestState: print("Committing status to Kafka: {}".format(kafka_topic), file=sys.stderr) producer_conf = kafka_config.copy() - producer_conf.update({ - 'delivery.report.only.error': True, - 'default.topic.config': { - 'request.required.acks': -1, # all brokers must confirm - }, - }) + producer_conf.update( + { + "delivery.report.only.error": True, + "default.topic.config": { + "request.required.acks": -1, # all brokers must confirm + }, + } + ) producer = Producer(producer_conf) - producer.produce( - kafka_topic, - state_json, - on_delivery=fail_fast) + producer.produce(kafka_topic, state_json, on_delivery=fail_fast) producer.flush() return state_json @@ -166,22 +172,25 @@ class HarvestState: raise KafkaException(err) conf = kafka_config.copy() - conf.update({ - 'group.id': 'dummy_init_group', # should never be committed - 'enable.auto.commit': False, - 'auto.offset.reset': 'earliest', - 'session.timeout.ms': 10000, - }) + conf.update( + { + "group.id": "dummy_init_group", # should never be committed + "enable.auto.commit": False, + "auto.offset.reset": "earliest", + "session.timeout.ms": 10000, + } + ) consumer = Consumer(conf) # this watermark fetch is mostly to ensure we are connected to broker and # fail fast if not, but we also confirm that we read to end below. hwm = consumer.get_watermark_offsets( - TopicPartition(kafka_topic, 0), - timeout=5.0, - cached=False) + TopicPartition(kafka_topic, 0), timeout=5.0, cached=False + ) if not hwm: - raise Exception("Kafka consumer timeout, or topic {} doesn't exist".format(kafka_topic)) + raise Exception( + "Kafka consumer timeout, or topic {} doesn't exist".format(kafka_topic) + ) consumer.assign([TopicPartition(kafka_topic, 0, 0)]) c = 0 @@ -191,8 +200,8 @@ class HarvestState: break if msg.error(): raise KafkaException(msg.error()) - #sys.stdout.write('.') - self.update(msg.value().decode('utf-8')) + # sys.stdout.write('.') + self.update(msg.value().decode("utf-8")) c += 1 consumer.close() |