diff options
| author | Bryan Newbold <bnewbold@robocracy.org> | 2018-11-19 20:57:19 -0800 | 
|---|---|---|
| committer | Bryan Newbold <bnewbold@robocracy.org> | 2018-11-19 20:57:19 -0800 | 
| commit | 65bdebea35f2ab3c9c8b0f8a8b0a9a577a36bee2 (patch) | |
| tree | 145321b85a9af61f0a93112831724717faced5e0 /python/fatcat_tools | |
| parent | 337416e965457c07dab44864f6dabb516fdf6a03 (diff) | |
| download | fatcat-65bdebea35f2ab3c9c8b0f8a8b0a9a577a36bee2.tar.gz fatcat-65bdebea35f2ab3c9c8b0f8a8b0a9a577a36bee2.zip | |
better DOI registrar harvesters
Diffstat (limited to 'python/fatcat_tools')
| -rw-r--r-- | python/fatcat_tools/harvest/__init__.py | 1 | ||||
| -rw-r--r-- | python/fatcat_tools/harvest/doi_registrars.py | 68 | ||||
| -rw-r--r-- | python/fatcat_tools/harvest/harvest_common.py | 124 | 
3 files changed, 145 insertions, 48 deletions
| diff --git a/python/fatcat_tools/harvest/__init__.py b/python/fatcat_tools/harvest/__init__.py index e1bde753..4de2cbde 100644 --- a/python/fatcat_tools/harvest/__init__.py +++ b/python/fatcat_tools/harvest/__init__.py @@ -1,2 +1,3 @@ +from .harvest_common import HarvestState  from .doi_registrars import HarvestCrossrefWorker, HarvestDataciteWorker diff --git a/python/fatcat_tools/harvest/doi_registrars.py b/python/fatcat_tools/harvest/doi_registrars.py index ed80cfc9..d5e4b7ec 100644 --- a/python/fatcat_tools/harvest/doi_registrars.py +++ b/python/fatcat_tools/harvest/doi_registrars.py @@ -9,7 +9,8 @@ import itertools  import datetime  from pykafka import KafkaClient -from fatcat_tools.workers.worker_common import most_recent_message +from fatcat_tools.workers import most_recent_message +from .harvest_common import HarvestState  # Skip pylint due to:  #   AttributeError: 'NoneType' object has no attribute 'scope' @@ -62,24 +63,14 @@ class HarvestCrossrefWorker:          self.kafka = KafkaClient(hosts=kafka_hosts, broker_version="1.0.0")          self.is_update_filter = is_update_filter -        # these are both optional, and should be datetime.date -        self.start_date = start_date -        self.end_date = end_date +        self.state = HarvestState(start_date, end_date) +        self.state.initialize_from_kafka(self.kafka.topics[self.state_topic])          self.loop_sleep = 60*60 # how long to wait, in seconds, between date checks          self.api_batch_size = 50          # for crossref, it's "from-index-date"          self.name = "Crossref" -    def get_latest_date(self): - -        state_topic = self.kafka.topics[self.state_topic] -        latest = most_recent_message(state_topic) -        if latest: -            latest = datetime.datetime.strptime(latest.decode('utf-8'), DATE_FMT).date() -        print("Latest date found: {}".format(latest)) -        return latest -      def params(self, date_str):          filter_param = 'from-index-date:{},until-index-date:{}'.format(              date_str, date_str) @@ -97,7 +88,6 @@ class HarvestCrossrefWorker:      def fetch_date(self, date): -        state_topic = self.kafka.topics[self.state_topic]          produce_topic = self.kafka.topics[self.produce_topic]          date_str = date.strftime(DATE_FMT) @@ -118,7 +108,7 @@ class HarvestCrossrefWorker:                  resp = http_resp.json()                  items = self.extract_items(resp)                  count += len(items) -                print("... got {} ({} of {}) in {}".format(len(items), count, +                print("... got {} ({} of {}), HTTP fetch took {}".format(len(items), count,                      self.extract_total(resp), http_resp.elapsed))                  #print(json.dumps(resp))                  for work in items: @@ -127,46 +117,28 @@ class HarvestCrossrefWorker:                      break                  params = self.update_params(params, resp) -        # record our completion state -        with state_topic.get_sync_producer() as producer: -            producer.produce(date.strftime(DATE_FMT).encode('utf-8')) -      def extract_items(self, resp):          return resp['message']['items']      def extract_total(self, resp):          return resp['message']['total-results'] -    def run_once(self): -        today_utc = datetime.datetime.utcnow().date() -        if self.start_date is None: -            self.start_date = self.get_latest_date() -            if self.start_date: -                # if we are continuing, start day after last success -                self.start_date = self.start_date + datetime.timedelta(days=1) -        if self.start_date is None: -            # bootstrap to yesterday (don't want to start on today until it's over) -            self.start_date = datetime.datetime.utcnow().date() -        if self.end_date is None: -            # bootstrap to yesterday (don't want to start on today until it's over) -            self.end_date = today_utc - datetime.timedelta(days=1) -        print("Harvesting from {} through {}".format(self.start_date, self.end_date)) -        current = self.start_date -        while current <= self.end_date: -            print("Fetching DOIs updated on {} (UTC)".format(current)) -            self.fetch_date(current) -            current += datetime.timedelta(days=1) -        print("{} DOI ingest caught up through {}".format(self.name, self.end_date)) -        return self.end_date - -    def run_loop(self): -        while True: -            last = self.run_once() -            self.start_date = last -            self.end_date = None -            print("Sleeping {} seconds...".format(self.loop_sleep)) -            time.sleep(self.loop_sleep()) +    def run(self, continuous=False): +        while True: +            current = self.state.next(continuous) +            if current: +                print("Fetching DOIs updated on {} (UTC)".format(current)) +                self.fetch_date(current) +                self.state.complete(current, kafka_topic=self.kafka.topics[self.state_topic]) +                continue + +            if continuous: +                print("Sleeping {} seconds...".format(self.loop_sleep)) +                time.sleep(self.loop_sleep()) +            else: +                break +        print("{} DOI ingest caught up".format(self.name))  class HarvestDataciteWorker(HarvestCrossrefWorker): diff --git a/python/fatcat_tools/harvest/harvest_common.py b/python/fatcat_tools/harvest/harvest_common.py new file mode 100644 index 00000000..f0ef51aa --- /dev/null +++ b/python/fatcat_tools/harvest/harvest_common.py @@ -0,0 +1,124 @@ + +import sys +import json +import time +import datetime + + +DATE_FMT = "%Y-%m-%d" + +class HarvestState: +    """ +    First version of this works with full days (dates) + +    General concept is to have harvesters serialize state when they make +    progress and push to kafka. On startup, harvesters are given a task (extend +    of work), and consume the full history to see what work remains to be done. + +    The simplest flow is: +    - harvester is told to collect last N days of updates +    - creates an to_process set +    - for each update, pops date from in_progress (if exits) + +    NOTE: this thing is sorta over-engineered... but might grow in the future +    NOTE: should this class manage the state topic as well? Hrm. +    """ + +    def __init__(self, start_date=None, end_date=None, catchup_days=7): +        self.to_process = set() +        self.completed = set() + +        if catchup_days or start_date or end_date: +            self.enqueue_period(start_date, end_date, catchup_days) + +    def enqueue_period(self, start_date=None, end_date=None, catchup_days=7): +        """ +        This function adds a time period to the "TODO" list, unless the dates +        have already been processed. + +        By default the period is "<catchup_days> ago until yesterday" +        """ + +        today_utc = datetime.datetime.utcnow().date() +        if start_date is None: +            # bootstrap to N days ago +            start_date = today_utc - datetime.timedelta(days=catchup_days) +        if end_date is None: +            # bootstrap to yesterday (don't want to start on today until it's over) +            end_date = today_utc - datetime.timedelta(days=1) + +        current = start_date +        while current <= end_date: +            if not current in self.completed: +                self.to_process.add(current) +            current += datetime.timedelta(days=1) + +    def next(self, continuous=False): +        """ +        Gets next timespan (date) to be processed, or returns None if completed. + +        If 'continuous' arg is True, will try to enqueue recent possibly valid +        timespans; the idea is to call next() repeatedly, and it will return a +        new timespan when it becomes "available". +        """ +        if continuous: +            # enqueue yesterday +            self.enqueue_period(start_date=datetime.datetime.utcnow().date() - datetime.timedelta(days=1)) +        if not self.to_process: +            return None +        return sorted(list(self.to_process))[0] + +    def update(self, state_json): +        """ +        Merges a state JSON object into the current state. + +        This is expected to be used to "catch-up" on previously serialized +        state stored on disk or in Kafka. +        """ +        state = json.loads(state_json) +        if 'completed-date' in state: +            date = datetime.datetime.strptime(state['completed-date'], DATE_FMT).date() +            self.complete(date) + +    def complete(self, date, kafka_topic=None): +        """ +        Records that a date has been processed successfully. + +        Updates internal state and returns a JSON representation to be +        serialized. Will publish to a kafka topic if passed as an argument. + +        kafka_topic should have type pykafka.Topic (not str) +        """ +        try: +            self.to_process.remove(date) +        except KeyError: +            pass +        self.completed.add(date) +        state_json = json.dumps({ +            'in-progress-dates': [str(d) for d in self.to_process], +            'completed-date': str(date), +        }).encode('utf-8') +        if kafka_topic: +            with kafka_topic.get_sync_producer() as producer: +                producer.produce(state_json) +        return state_json + +    def initialize_from_kafka(self, kafka_topic): +        """ +        kafka_topic should have type pykafka.Topic (not str) +        """ +        if not kafka_topic: +            return + +        print("Fetching state from kafka topic: {}".format(kafka_topic.name)) +        consumer = kafka_topic.get_simple_consumer(consumer_timeout_ms=1000) +        c = 0 +        while True: +            msg = consumer.consume(block=True) +            if not msg: +                break +            #sys.stdout.write('.') +            self.update(msg.value.decode('utf-8')) +            c += 1 +        print("... got {} state update messages, done".format(c)) + | 
