summaryrefslogtreecommitdiffstats
path: root/python/fatcat_tools/harvest/harvest_common.py
blob: 6041a36f60d203ea04c92e5c704544f6da1274a8 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124

import sys
import json
import time
import datetime

# Used for parsing ISO date format (YYYY-MM-DD)
DATE_FMT = "%Y-%m-%d"

class HarvestState:
    """
    First version of this works with full days (dates)

    General concept is to have harvesters serialize state when they make
    progress and push to kafka. On startup, harvesters are given a task (extend
    of work), and consume the full history to see what work remains to be done.

    The simplest flow is:
    - harvester is told to collect last N days of updates
    - creates an to_process set
    - for each update, pops date from in_progress (if exits)

    NOTE: this thing is sorta over-engineered... but might grow in the future
    NOTE: should this class manage the state topic as well? Hrm.
    """

    def __init__(self, start_date=None, end_date=None, catchup_days=7):
        self.to_process = set()
        self.completed = set()

        if catchup_days or start_date or end_date:
            self.enqueue_period(start_date, end_date, catchup_days)

    def enqueue_period(self, start_date=None, end_date=None, catchup_days=7):
        """
        This function adds a time period to the "TODO" list, unless the dates
        have already been processed.

        By default the period is "<catchup_days> ago until yesterday"
        """

        today_utc = datetime.datetime.utcnow().date()
        if start_date is None:
            # bootstrap to N days ago
            start_date = today_utc - datetime.timedelta(days=catchup_days)
        if end_date is None:
            # bootstrap to yesterday (don't want to start on today until it's over)
            end_date = today_utc - datetime.timedelta(days=1)

        current = start_date
        while current <= end_date:
            if not current in self.completed:
                self.to_process.add(current)
            current += datetime.timedelta(days=1)

    def next(self, continuous=False):
        """
        Gets next timespan (date) to be processed, or returns None if completed.

        If 'continuous' arg is True, will try to enqueue recent possibly valid
        timespans; the idea is to call next() repeatedly, and it will return a
        new timespan when it becomes "available".
        """
        if continuous:
            # enqueue yesterday
            self.enqueue_period(start_date=datetime.datetime.utcnow().date() - datetime.timedelta(days=1))
        if not self.to_process:
            return None
        return sorted(list(self.to_process))[0]

    def update(self, state_json):
        """
        Merges a state JSON object into the current state.

        This is expected to be used to "catch-up" on previously serialized
        state stored on disk or in Kafka.
        """
        state = json.loads(state_json)
        if 'completed-date' in state:
            date = datetime.datetime.strptime(state['completed-date'], DATE_FMT).date()
            self.complete(date)

    def complete(self, date, kafka_topic=None):
        """
        Records that a date has been processed successfully.

        Updates internal state and returns a JSON representation to be
        serialized. Will publish to a kafka topic if passed as an argument.

        kafka_topic should have type pykafka.Topic (not str)
        """
        try:
            self.to_process.remove(date)
        except KeyError:
            pass
        self.completed.add(date)
        state_json = json.dumps({
            'in-progress-dates': [str(d) for d in self.to_process],
            'completed-date': str(date),
        }).encode('utf-8')
        if kafka_topic:
            with kafka_topic.get_sync_producer() as producer:
                producer.produce(state_json)
        return state_json

    def initialize_from_kafka(self, kafka_topic):
        """
        kafka_topic should have type pykafka.Topic (not str)
        """
        if not kafka_topic:
            return

        print("Fetching state from kafka topic: {}".format(kafka_topic.name))
        consumer = kafka_topic.get_simple_consumer(consumer_timeout_ms=1000)
        c = 0
        while True:
            msg = consumer.consume(block=True)
            if not msg:
                break
            #sys.stdout.write('.')
            self.update(msg.value.decode('utf-8'))
            c += 1
        print("... got {} state update messages, done".format(c))