aboutsummaryrefslogtreecommitdiffstats
path: root/python/fatcat_tools/harvest/harvest_common.py
blob: fda0dc62037bd4fe4858887f4e31a9ae564e768a (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
import datetime
import json
import sys

import requests
from confluent_kafka import Consumer, KafkaException, Producer, TopicPartition
from requests.adapters import HTTPAdapter

# unclear why pylint chokes on this import. Recent 'requests' and 'urllib3' are
# in Pipenv.lock, and there are no errors in QA
from requests.packages.urllib3.util.retry import Retry  # pylint: disable=import-error

# Used for parsing ISO date format (YYYY-MM-DD)
DATE_FMT = "%Y-%m-%d"


def requests_retry_session(
    retries=10, backoff_factor=3, status_forcelist=(500, 502, 504), session=None
):
    """
    From: https://www.peterbe.com/plog/best-practice-with-retries-with-requests
    """
    session = session or requests.Session()
    retry = Retry(
        total=retries,
        read=retries,
        connect=retries,
        backoff_factor=backoff_factor,
        status_forcelist=status_forcelist,
    )
    adapter = HTTPAdapter(max_retries=retry)
    session.mount("http://", adapter)
    session.mount("https://", adapter)
    return session


class HarvestState:
    """
    First version of this works with full days (dates)

    General concept is to have harvesters serialize state when they make
    progress and push to kafka. On startup, harvesters are given a task (extend
    of work), and consume the full history to see what work remains to be done.

    The simplest flow is:
    - harvester is told to collect last N days of updates
    - creates an to_process set
    - for each update, pops date from in_progress (if exits)

    NOTE: this thing is sorta over-engineered... but might grow in the future
    NOTE: should this class manage the state topic as well? Hrm.
    """

    def __init__(self, start_date=None, end_date=None, catchup_days=14):
        self.to_process = set()
        self.completed = set()

        if catchup_days or start_date or end_date:
            self.enqueue_period(start_date, end_date, catchup_days)

    def __str__(self):
        return "<HarvestState to_process={}, completed={}>".format(
            len(self.to_process), len(self.completed)
        )

    def enqueue_period(self, start_date=None, end_date=None, catchup_days=14):
        """
        This function adds a time period to the "TODO" list, unless the dates
        have already been processed.

        By default the period is "<catchup_days> ago until yesterday"
        """

        today_utc = datetime.datetime.utcnow().date()
        if start_date is None:
            # bootstrap to N days ago
            start_date = today_utc - datetime.timedelta(days=catchup_days)
        if end_date is None:
            # bootstrap to yesterday (don't want to start on today until it's over)
            end_date = today_utc - datetime.timedelta(days=1)

        current = start_date
        while current <= end_date:
            if current not in self.completed:
                self.to_process.add(current)
            current += datetime.timedelta(days=1)

    def next_span(self, continuous=False):
        """
        Gets next timespan (date) to be processed, or returns None if completed.

        If 'continuous' arg is True, will try to enqueue recent possibly valid
        timespans; the idea is to call next_span() repeatedly, and it will return a
        new timespan when it becomes "available".
        """
        if continuous:
            # enqueue yesterday
            self.enqueue_period(
                start_date=datetime.datetime.utcnow().date() - datetime.timedelta(days=1)
            )
        if not self.to_process:
            return None
        return sorted(list(self.to_process))[0]

    def update(self, state_json):
        """
        Merges a state JSON object into the current state.

        This is expected to be used to "catch-up" on previously serialized
        state stored on disk or in Kafka.
        """
        state = json.loads(state_json)
        if "completed-date" in state:
            date = datetime.datetime.strptime(state["completed-date"], DATE_FMT).date()
            self.complete(date)

    def complete(self, date, kafka_topic=None, kafka_config=None):
        """
        Records that a date has been processed successfully.

        Updates internal state and returns a JSON representation to be
        serialized. Will publish to a kafka topic if passed as an argument.

        kafka_topic should be a string. A producer will be created and destroyed.
        """
        try:
            self.to_process.remove(date)
        except KeyError:
            pass
        self.completed.add(date)
        state_json = json.dumps(
            {
                "in-progress-dates": [str(d) for d in self.to_process],
                "completed-date": str(date),
            }
        ).encode("utf-8")
        if kafka_topic:
            assert kafka_config

            def fail_fast(err, msg):
                if err:
                    raise KafkaException(err)

            print("Committing status to Kafka: {}".format(kafka_topic), file=sys.stderr)
            producer_conf = kafka_config.copy()
            producer_conf.update(
                {
                    "delivery.report.only.error": True,
                    "default.topic.config": {
                        "request.required.acks": -1,  # all brokers must confirm
                    },
                }
            )
            producer = Producer(producer_conf)
            producer.produce(kafka_topic, state_json, on_delivery=fail_fast)
            producer.flush()
        return state_json

    def initialize_from_kafka(self, kafka_topic, kafka_config):
        """
        kafka_topic should have type str

        TODO: this method does not fail if client can't connect to host.
        """
        if not kafka_topic:
            return

        print("Fetching state from kafka topic: {}".format(kafka_topic), file=sys.stderr)

        def fail_fast(err, msg):
            if err:
                raise KafkaException(err)

        conf = kafka_config.copy()
        conf.update(
            {
                "group.id": "dummy_init_group",  # should never be committed
                "enable.auto.commit": False,
                "auto.offset.reset": "earliest",
                "session.timeout.ms": 10000,
            }
        )
        consumer = Consumer(conf)

        # this watermark fetch is mostly to ensure we are connected to broker and
        # fail fast if not, but we also confirm that we read to end below.
        hwm = consumer.get_watermark_offsets(
            TopicPartition(kafka_topic, 0), timeout=5.0, cached=False
        )
        if not hwm:
            raise Exception(
                "Kafka consumer timeout, or topic {} doesn't exist".format(kafka_topic)
            )

        consumer.assign([TopicPartition(kafka_topic, 0, 0)])
        c = 0
        while True:
            msg = consumer.poll(timeout=2.0)
            if not msg:
                break
            if msg.error():
                raise KafkaException(msg.error())
            # sys.stdout.write('.')
            self.update(msg.value().decode("utf-8"))
            c += 1
        consumer.close()

        # verify that we got at least to HWM
        assert c >= hwm[1]
        print("... got {} state update messages, done".format(c), file=sys.stderr)