1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
|
import sys
import json
import time
import datetime
# Used for parsing ISO date format (YYYY-MM-DD)
DATE_FMT = "%Y-%m-%d"
class HarvestState:
"""
First version of this works with full days (dates)
General concept is to have harvesters serialize state when they make
progress and push to kafka. On startup, harvesters are given a task (extend
of work), and consume the full history to see what work remains to be done.
The simplest flow is:
- harvester is told to collect last N days of updates
- creates an to_process set
- for each update, pops date from in_progress (if exits)
NOTE: this thing is sorta over-engineered... but might grow in the future
NOTE: should this class manage the state topic as well? Hrm.
"""
def __init__(self, start_date=None, end_date=None, catchup_days=7):
self.to_process = set()
self.completed = set()
if catchup_days or start_date or end_date:
self.enqueue_period(start_date, end_date, catchup_days)
def enqueue_period(self, start_date=None, end_date=None, catchup_days=7):
"""
This function adds a time period to the "TODO" list, unless the dates
have already been processed.
By default the period is "<catchup_days> ago until yesterday"
"""
today_utc = datetime.datetime.utcnow().date()
if start_date is None:
# bootstrap to N days ago
start_date = today_utc - datetime.timedelta(days=catchup_days)
if end_date is None:
# bootstrap to yesterday (don't want to start on today until it's over)
end_date = today_utc - datetime.timedelta(days=1)
current = start_date
while current <= end_date:
if not current in self.completed:
self.to_process.add(current)
current += datetime.timedelta(days=1)
def next(self, continuous=False):
"""
Gets next timespan (date) to be processed, or returns None if completed.
If 'continuous' arg is True, will try to enqueue recent possibly valid
timespans; the idea is to call next() repeatedly, and it will return a
new timespan when it becomes "available".
"""
if continuous:
# enqueue yesterday
self.enqueue_period(start_date=datetime.datetime.utcnow().date() - datetime.timedelta(days=1))
if not self.to_process:
return None
return sorted(list(self.to_process))[0]
def update(self, state_json):
"""
Merges a state JSON object into the current state.
This is expected to be used to "catch-up" on previously serialized
state stored on disk or in Kafka.
"""
state = json.loads(state_json)
if 'completed-date' in state:
date = datetime.datetime.strptime(state['completed-date'], DATE_FMT).date()
self.complete(date)
def complete(self, date, kafka_topic=None):
"""
Records that a date has been processed successfully.
Updates internal state and returns a JSON representation to be
serialized. Will publish to a kafka topic if passed as an argument.
kafka_topic should have type pykafka.Topic (not str)
"""
try:
self.to_process.remove(date)
except KeyError:
pass
self.completed.add(date)
state_json = json.dumps({
'in-progress-dates': [str(d) for d in self.to_process],
'completed-date': str(date),
}).encode('utf-8')
if kafka_topic:
with kafka_topic.get_sync_producer() as producer:
producer.produce(state_json)
return state_json
def initialize_from_kafka(self, kafka_topic):
"""
kafka_topic should have type pykafka.Topic (not str)
"""
if not kafka_topic:
return
print("Fetching state from kafka topic: {}".format(kafka_topic.name))
consumer = kafka_topic.get_simple_consumer(consumer_timeout_ms=1000)
c = 0
while True:
msg = consumer.consume(block=True)
if not msg:
break
#sys.stdout.write('.')
self.update(msg.value.decode('utf-8'))
c += 1
print("... got {} state update messages, done".format(c))
|