diff options
| author | Bryan Newbold <bnewbold@robocracy.org> | 2021-11-02 18:14:59 -0700 | 
|---|---|---|
| committer | Bryan Newbold <bnewbold@robocracy.org> | 2021-11-02 18:14:59 -0700 | 
| commit | 31d1a6a713d177990609767d508209ced19ca396 (patch) | |
| tree | a628a57bdb373669394a6b520102b1b4b5ffe7da /python/fatcat_tools/harvest/harvest_common.py | |
| parent | 9dc891b8098542bb089c8c47098b60a8beb76a53 (diff) | |
| download | fatcat-31d1a6a713d177990609767d508209ced19ca396.tar.gz fatcat-31d1a6a713d177990609767d508209ced19ca396.zip  | |
fmt (black): fatcat_tools/
Diffstat (limited to 'python/fatcat_tools/harvest/harvest_common.py')
| -rw-r--r-- | python/fatcat_tools/harvest/harvest_common.py | 83 | 
1 files changed, 46 insertions, 37 deletions
diff --git a/python/fatcat_tools/harvest/harvest_common.py b/python/fatcat_tools/harvest/harvest_common.py index 45c2b8ea..fda0dc62 100644 --- a/python/fatcat_tools/harvest/harvest_common.py +++ b/python/fatcat_tools/harvest/harvest_common.py @@ -1,4 +1,3 @@ -  import datetime  import json  import sys @@ -14,8 +13,10 @@ from requests.packages.urllib3.util.retry import Retry  # pylint: disable=import  # Used for parsing ISO date format (YYYY-MM-DD)  DATE_FMT = "%Y-%m-%d" -def requests_retry_session(retries=10, backoff_factor=3, -        status_forcelist=(500, 502, 504), session=None): + +def requests_retry_session( +    retries=10, backoff_factor=3, status_forcelist=(500, 502, 504), session=None +):      """      From: https://www.peterbe.com/plog/best-practice-with-retries-with-requests      """ @@ -28,10 +29,11 @@ def requests_retry_session(retries=10, backoff_factor=3,          status_forcelist=status_forcelist,      )      adapter = HTTPAdapter(max_retries=retry) -    session.mount('http://', adapter) -    session.mount('https://', adapter) +    session.mount("http://", adapter) +    session.mount("https://", adapter)      return session +  class HarvestState:      """      First version of this works with full days (dates) @@ -57,8 +59,9 @@ class HarvestState:              self.enqueue_period(start_date, end_date, catchup_days)      def __str__(self): -        return '<HarvestState to_process={}, completed={}>'.format( -            len(self.to_process), len(self.completed)) +        return "<HarvestState to_process={}, completed={}>".format( +            len(self.to_process), len(self.completed) +        )      def enqueue_period(self, start_date=None, end_date=None, catchup_days=14):          """ @@ -92,7 +95,9 @@ class HarvestState:          """          if continuous:              # enqueue yesterday -            self.enqueue_period(start_date=datetime.datetime.utcnow().date() - datetime.timedelta(days=1)) +            self.enqueue_period( +                start_date=datetime.datetime.utcnow().date() - datetime.timedelta(days=1) +            )          if not self.to_process:              return None          return sorted(list(self.to_process))[0] @@ -105,8 +110,8 @@ class HarvestState:          state stored on disk or in Kafka.          """          state = json.loads(state_json) -        if 'completed-date' in state: -            date = datetime.datetime.strptime(state['completed-date'], DATE_FMT).date() +        if "completed-date" in state: +            date = datetime.datetime.strptime(state["completed-date"], DATE_FMT).date()              self.complete(date)      def complete(self, date, kafka_topic=None, kafka_config=None): @@ -123,12 +128,14 @@ class HarvestState:          except KeyError:              pass          self.completed.add(date) -        state_json = json.dumps({ -            'in-progress-dates': [str(d) for d in self.to_process], -            'completed-date': str(date), -        }).encode('utf-8') +        state_json = json.dumps( +            { +                "in-progress-dates": [str(d) for d in self.to_process], +                "completed-date": str(date), +            } +        ).encode("utf-8")          if kafka_topic: -            assert(kafka_config) +            assert kafka_config              def fail_fast(err, msg):                  if err: @@ -136,17 +143,16 @@ class HarvestState:              print("Committing status to Kafka: {}".format(kafka_topic), file=sys.stderr)              producer_conf = kafka_config.copy() -            producer_conf.update({ -                'delivery.report.only.error': True, -                'default.topic.config': { -                    'request.required.acks': -1, # all brokers must confirm -                }, -            }) +            producer_conf.update( +                { +                    "delivery.report.only.error": True, +                    "default.topic.config": { +                        "request.required.acks": -1,  # all brokers must confirm +                    }, +                } +            )              producer = Producer(producer_conf) -            producer.produce( -                kafka_topic, -                state_json, -                on_delivery=fail_fast) +            producer.produce(kafka_topic, state_json, on_delivery=fail_fast)              producer.flush()          return state_json @@ -166,22 +172,25 @@ class HarvestState:                  raise KafkaException(err)          conf = kafka_config.copy() -        conf.update({ -            'group.id': 'dummy_init_group', # should never be committed -            'enable.auto.commit': False, -            'auto.offset.reset': 'earliest', -            'session.timeout.ms': 10000, -        }) +        conf.update( +            { +                "group.id": "dummy_init_group",  # should never be committed +                "enable.auto.commit": False, +                "auto.offset.reset": "earliest", +                "session.timeout.ms": 10000, +            } +        )          consumer = Consumer(conf)          # this watermark fetch is mostly to ensure we are connected to broker and          # fail fast if not, but we also confirm that we read to end below.          hwm = consumer.get_watermark_offsets( -            TopicPartition(kafka_topic, 0), -            timeout=5.0, -            cached=False) +            TopicPartition(kafka_topic, 0), timeout=5.0, cached=False +        )          if not hwm: -            raise Exception("Kafka consumer timeout, or topic {} doesn't exist".format(kafka_topic)) +            raise Exception( +                "Kafka consumer timeout, or topic {} doesn't exist".format(kafka_topic) +            )          consumer.assign([TopicPartition(kafka_topic, 0, 0)])          c = 0 @@ -191,8 +200,8 @@ class HarvestState:                  break              if msg.error():                  raise KafkaException(msg.error()) -            #sys.stdout.write('.') -            self.update(msg.value().decode('utf-8')) +            # sys.stdout.write('.') +            self.update(msg.value().decode("utf-8"))              c += 1          consumer.close()  | 
