diff options
Diffstat (limited to 'python/fatcat_tools/harvest/harvest_common.py')
-rw-r--r-- | python/fatcat_tools/harvest/harvest_common.py | 124 |
1 files changed, 124 insertions, 0 deletions
diff --git a/python/fatcat_tools/harvest/harvest_common.py b/python/fatcat_tools/harvest/harvest_common.py new file mode 100644 index 00000000..f0ef51aa --- /dev/null +++ b/python/fatcat_tools/harvest/harvest_common.py @@ -0,0 +1,124 @@ + +import sys +import json +import time +import datetime + + +DATE_FMT = "%Y-%m-%d" + +class HarvestState: + """ + First version of this works with full days (dates) + + General concept is to have harvesters serialize state when they make + progress and push to kafka. On startup, harvesters are given a task (extend + of work), and consume the full history to see what work remains to be done. + + The simplest flow is: + - harvester is told to collect last N days of updates + - creates an to_process set + - for each update, pops date from in_progress (if exits) + + NOTE: this thing is sorta over-engineered... but might grow in the future + NOTE: should this class manage the state topic as well? Hrm. + """ + + def __init__(self, start_date=None, end_date=None, catchup_days=7): + self.to_process = set() + self.completed = set() + + if catchup_days or start_date or end_date: + self.enqueue_period(start_date, end_date, catchup_days) + + def enqueue_period(self, start_date=None, end_date=None, catchup_days=7): + """ + This function adds a time period to the "TODO" list, unless the dates + have already been processed. + + By default the period is "<catchup_days> ago until yesterday" + """ + + today_utc = datetime.datetime.utcnow().date() + if start_date is None: + # bootstrap to N days ago + start_date = today_utc - datetime.timedelta(days=catchup_days) + if end_date is None: + # bootstrap to yesterday (don't want to start on today until it's over) + end_date = today_utc - datetime.timedelta(days=1) + + current = start_date + while current <= end_date: + if not current in self.completed: + self.to_process.add(current) + current += datetime.timedelta(days=1) + + def next(self, continuous=False): + """ + Gets next timespan (date) to be processed, or returns None if completed. + + If 'continuous' arg is True, will try to enqueue recent possibly valid + timespans; the idea is to call next() repeatedly, and it will return a + new timespan when it becomes "available". + """ + if continuous: + # enqueue yesterday + self.enqueue_period(start_date=datetime.datetime.utcnow().date() - datetime.timedelta(days=1)) + if not self.to_process: + return None + return sorted(list(self.to_process))[0] + + def update(self, state_json): + """ + Merges a state JSON object into the current state. + + This is expected to be used to "catch-up" on previously serialized + state stored on disk or in Kafka. + """ + state = json.loads(state_json) + if 'completed-date' in state: + date = datetime.datetime.strptime(state['completed-date'], DATE_FMT).date() + self.complete(date) + + def complete(self, date, kafka_topic=None): + """ + Records that a date has been processed successfully. + + Updates internal state and returns a JSON representation to be + serialized. Will publish to a kafka topic if passed as an argument. + + kafka_topic should have type pykafka.Topic (not str) + """ + try: + self.to_process.remove(date) + except KeyError: + pass + self.completed.add(date) + state_json = json.dumps({ + 'in-progress-dates': [str(d) for d in self.to_process], + 'completed-date': str(date), + }).encode('utf-8') + if kafka_topic: + with kafka_topic.get_sync_producer() as producer: + producer.produce(state_json) + return state_json + + def initialize_from_kafka(self, kafka_topic): + """ + kafka_topic should have type pykafka.Topic (not str) + """ + if not kafka_topic: + return + + print("Fetching state from kafka topic: {}".format(kafka_topic.name)) + consumer = kafka_topic.get_simple_consumer(consumer_timeout_ms=1000) + c = 0 + while True: + msg = consumer.consume(block=True) + if not msg: + break + #sys.stdout.write('.') + self.update(msg.value.decode('utf-8')) + c += 1 + print("... got {} state update messages, done".format(c)) + |