import datetime import json import sys import requests from confluent_kafka import Consumer, KafkaException, Producer, TopicPartition from requests.adapters import HTTPAdapter # unclear why pylint chokes on this import. Recent 'requests' and 'urllib3' are # in Pipenv.lock, and there are no errors in QA from requests.packages.urllib3.util.retry import Retry # pylint: disable=import-error # Used for parsing ISO date format (YYYY-MM-DD) DATE_FMT = "%Y-%m-%d" def requests_retry_session( retries=10, backoff_factor=3, status_forcelist=(500, 502, 504), session=None ): """ From: https://www.peterbe.com/plog/best-practice-with-retries-with-requests """ session = session or requests.Session() retry = Retry( total=retries, read=retries, connect=retries, backoff_factor=backoff_factor, status_forcelist=status_forcelist, ) adapter = HTTPAdapter(max_retries=retry) session.mount("http://", adapter) session.mount("https://", adapter) return session class HarvestState: """ First version of this works with full days (dates) General concept is to have harvesters serialize state when they make progress and push to kafka. On startup, harvesters are given a task (extend of work), and consume the full history to see what work remains to be done. The simplest flow is: - harvester is told to collect last N days of updates - creates an to_process set - for each update, pops date from in_progress (if exits) NOTE: this thing is sorta over-engineered... but might grow in the future NOTE: should this class manage the state topic as well? Hrm. """ def __init__(self, start_date=None, end_date=None, catchup_days=14): self.to_process = set() self.completed = set() if catchup_days or start_date or end_date: self.enqueue_period(start_date, end_date, catchup_days) def __str__(self): return "".format( len(self.to_process), len(self.completed) ) def enqueue_period(self, start_date=None, end_date=None, catchup_days=14): """ This function adds a time period to the "TODO" list, unless the dates have already been processed. By default the period is " ago until yesterday" """ today_utc = datetime.datetime.utcnow().date() if start_date is None: # bootstrap to N days ago start_date = today_utc - datetime.timedelta(days=catchup_days) if end_date is None: # bootstrap to yesterday (don't want to start on today until it's over) end_date = today_utc - datetime.timedelta(days=1) current = start_date while current <= end_date: if current not in self.completed: self.to_process.add(current) current += datetime.timedelta(days=1) def next_span(self, continuous=False): """ Gets next timespan (date) to be processed, or returns None if completed. If 'continuous' arg is True, will try to enqueue recent possibly valid timespans; the idea is to call next_span() repeatedly, and it will return a new timespan when it becomes "available". """ if continuous: # enqueue yesterday self.enqueue_period( start_date=datetime.datetime.utcnow().date() - datetime.timedelta(days=1) ) if not self.to_process: return None return sorted(list(self.to_process))[0] def update(self, state_json): """ Merges a state JSON object into the current state. This is expected to be used to "catch-up" on previously serialized state stored on disk or in Kafka. """ state = json.loads(state_json) if "completed-date" in state: date = datetime.datetime.strptime(state["completed-date"], DATE_FMT).date() self.complete(date) def complete(self, date, kafka_topic=None, kafka_config=None): """ Records that a date has been processed successfully. Updates internal state and returns a JSON representation to be serialized. Will publish to a kafka topic if passed as an argument. kafka_topic should be a string. A producer will be created and destroyed. """ try: self.to_process.remove(date) except KeyError: pass self.completed.add(date) state_json = json.dumps( { "in-progress-dates": [str(d) for d in self.to_process], "completed-date": str(date), } ).encode("utf-8") if kafka_topic: assert kafka_config def fail_fast(err, msg): if err: raise KafkaException(err) print("Committing status to Kafka: {}".format(kafka_topic), file=sys.stderr) producer_conf = kafka_config.copy() producer_conf.update( { "delivery.report.only.error": True, "default.topic.config": { "request.required.acks": -1, # all brokers must confirm }, } ) producer = Producer(producer_conf) producer.produce(kafka_topic, state_json, on_delivery=fail_fast) producer.flush() return state_json def initialize_from_kafka(self, kafka_topic, kafka_config): """ kafka_topic should have type str TODO: this method does not fail if client can't connect to host. """ if not kafka_topic: return print("Fetching state from kafka topic: {}".format(kafka_topic), file=sys.stderr) def fail_fast(err, msg): if err: raise KafkaException(err) conf = kafka_config.copy() conf.update( { "group.id": "dummy_init_group", # should never be committed "enable.auto.commit": False, "auto.offset.reset": "earliest", "session.timeout.ms": 10000, } ) consumer = Consumer(conf) # this watermark fetch is mostly to ensure we are connected to broker and # fail fast if not, but we also confirm that we read to end below. hwm = consumer.get_watermark_offsets( TopicPartition(kafka_topic, 0), timeout=5.0, cached=False ) if not hwm: raise Exception( "Kafka consumer timeout, or topic {} doesn't exist".format(kafka_topic) ) consumer.assign([TopicPartition(kafka_topic, 0, 0)]) c = 0 while True: msg = consumer.poll(timeout=2.0) if not msg: break if msg.error(): raise KafkaException(msg.error()) # sys.stdout.write('.') self.update(msg.value().decode("utf-8")) c += 1 consumer.close() # verify that we got at least to HWM assert c >= hwm[1] print("... got {} state update messages, done".format(c), file=sys.stderr)