aboutsummaryrefslogtreecommitdiffstats
path: root/python/fatcat_tools/harvest/harvest_common.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/fatcat_tools/harvest/harvest_common.py')
-rw-r--r--python/fatcat_tools/harvest/harvest_common.py83
1 files changed, 46 insertions, 37 deletions
diff --git a/python/fatcat_tools/harvest/harvest_common.py b/python/fatcat_tools/harvest/harvest_common.py
index 45c2b8ea..fda0dc62 100644
--- a/python/fatcat_tools/harvest/harvest_common.py
+++ b/python/fatcat_tools/harvest/harvest_common.py
@@ -1,4 +1,3 @@
-
import datetime
import json
import sys
@@ -14,8 +13,10 @@ from requests.packages.urllib3.util.retry import Retry # pylint: disable=import
# Used for parsing ISO date format (YYYY-MM-DD)
DATE_FMT = "%Y-%m-%d"
-def requests_retry_session(retries=10, backoff_factor=3,
- status_forcelist=(500, 502, 504), session=None):
+
+def requests_retry_session(
+ retries=10, backoff_factor=3, status_forcelist=(500, 502, 504), session=None
+):
"""
From: https://www.peterbe.com/plog/best-practice-with-retries-with-requests
"""
@@ -28,10 +29,11 @@ def requests_retry_session(retries=10, backoff_factor=3,
status_forcelist=status_forcelist,
)
adapter = HTTPAdapter(max_retries=retry)
- session.mount('http://', adapter)
- session.mount('https://', adapter)
+ session.mount("http://", adapter)
+ session.mount("https://", adapter)
return session
+
class HarvestState:
"""
First version of this works with full days (dates)
@@ -57,8 +59,9 @@ class HarvestState:
self.enqueue_period(start_date, end_date, catchup_days)
def __str__(self):
- return '<HarvestState to_process={}, completed={}>'.format(
- len(self.to_process), len(self.completed))
+ return "<HarvestState to_process={}, completed={}>".format(
+ len(self.to_process), len(self.completed)
+ )
def enqueue_period(self, start_date=None, end_date=None, catchup_days=14):
"""
@@ -92,7 +95,9 @@ class HarvestState:
"""
if continuous:
# enqueue yesterday
- self.enqueue_period(start_date=datetime.datetime.utcnow().date() - datetime.timedelta(days=1))
+ self.enqueue_period(
+ start_date=datetime.datetime.utcnow().date() - datetime.timedelta(days=1)
+ )
if not self.to_process:
return None
return sorted(list(self.to_process))[0]
@@ -105,8 +110,8 @@ class HarvestState:
state stored on disk or in Kafka.
"""
state = json.loads(state_json)
- if 'completed-date' in state:
- date = datetime.datetime.strptime(state['completed-date'], DATE_FMT).date()
+ if "completed-date" in state:
+ date = datetime.datetime.strptime(state["completed-date"], DATE_FMT).date()
self.complete(date)
def complete(self, date, kafka_topic=None, kafka_config=None):
@@ -123,12 +128,14 @@ class HarvestState:
except KeyError:
pass
self.completed.add(date)
- state_json = json.dumps({
- 'in-progress-dates': [str(d) for d in self.to_process],
- 'completed-date': str(date),
- }).encode('utf-8')
+ state_json = json.dumps(
+ {
+ "in-progress-dates": [str(d) for d in self.to_process],
+ "completed-date": str(date),
+ }
+ ).encode("utf-8")
if kafka_topic:
- assert(kafka_config)
+ assert kafka_config
def fail_fast(err, msg):
if err:
@@ -136,17 +143,16 @@ class HarvestState:
print("Committing status to Kafka: {}".format(kafka_topic), file=sys.stderr)
producer_conf = kafka_config.copy()
- producer_conf.update({
- 'delivery.report.only.error': True,
- 'default.topic.config': {
- 'request.required.acks': -1, # all brokers must confirm
- },
- })
+ producer_conf.update(
+ {
+ "delivery.report.only.error": True,
+ "default.topic.config": {
+ "request.required.acks": -1, # all brokers must confirm
+ },
+ }
+ )
producer = Producer(producer_conf)
- producer.produce(
- kafka_topic,
- state_json,
- on_delivery=fail_fast)
+ producer.produce(kafka_topic, state_json, on_delivery=fail_fast)
producer.flush()
return state_json
@@ -166,22 +172,25 @@ class HarvestState:
raise KafkaException(err)
conf = kafka_config.copy()
- conf.update({
- 'group.id': 'dummy_init_group', # should never be committed
- 'enable.auto.commit': False,
- 'auto.offset.reset': 'earliest',
- 'session.timeout.ms': 10000,
- })
+ conf.update(
+ {
+ "group.id": "dummy_init_group", # should never be committed
+ "enable.auto.commit": False,
+ "auto.offset.reset": "earliest",
+ "session.timeout.ms": 10000,
+ }
+ )
consumer = Consumer(conf)
# this watermark fetch is mostly to ensure we are connected to broker and
# fail fast if not, but we also confirm that we read to end below.
hwm = consumer.get_watermark_offsets(
- TopicPartition(kafka_topic, 0),
- timeout=5.0,
- cached=False)
+ TopicPartition(kafka_topic, 0), timeout=5.0, cached=False
+ )
if not hwm:
- raise Exception("Kafka consumer timeout, or topic {} doesn't exist".format(kafka_topic))
+ raise Exception(
+ "Kafka consumer timeout, or topic {} doesn't exist".format(kafka_topic)
+ )
consumer.assign([TopicPartition(kafka_topic, 0, 0)])
c = 0
@@ -191,8 +200,8 @@ class HarvestState:
break
if msg.error():
raise KafkaException(msg.error())
- #sys.stdout.write('.')
- self.update(msg.value().decode('utf-8'))
+ # sys.stdout.write('.')
+ self.update(msg.value().decode("utf-8"))
c += 1
consumer.close()