diff options
Diffstat (limited to 'python/fatcat_tools/harvest/doi_registrars.py')
-rw-r--r-- | python/fatcat_tools/harvest/doi_registrars.py | 68 |
1 files changed, 20 insertions, 48 deletions
diff --git a/python/fatcat_tools/harvest/doi_registrars.py b/python/fatcat_tools/harvest/doi_registrars.py index ed80cfc9..d5e4b7ec 100644 --- a/python/fatcat_tools/harvest/doi_registrars.py +++ b/python/fatcat_tools/harvest/doi_registrars.py @@ -9,7 +9,8 @@ import itertools import datetime from pykafka import KafkaClient -from fatcat_tools.workers.worker_common import most_recent_message +from fatcat_tools.workers import most_recent_message +from .harvest_common import HarvestState # Skip pylint due to: # AttributeError: 'NoneType' object has no attribute 'scope' @@ -62,24 +63,14 @@ class HarvestCrossrefWorker: self.kafka = KafkaClient(hosts=kafka_hosts, broker_version="1.0.0") self.is_update_filter = is_update_filter - # these are both optional, and should be datetime.date - self.start_date = start_date - self.end_date = end_date + self.state = HarvestState(start_date, end_date) + self.state.initialize_from_kafka(self.kafka.topics[self.state_topic]) self.loop_sleep = 60*60 # how long to wait, in seconds, between date checks self.api_batch_size = 50 # for crossref, it's "from-index-date" self.name = "Crossref" - def get_latest_date(self): - - state_topic = self.kafka.topics[self.state_topic] - latest = most_recent_message(state_topic) - if latest: - latest = datetime.datetime.strptime(latest.decode('utf-8'), DATE_FMT).date() - print("Latest date found: {}".format(latest)) - return latest - def params(self, date_str): filter_param = 'from-index-date:{},until-index-date:{}'.format( date_str, date_str) @@ -97,7 +88,6 @@ class HarvestCrossrefWorker: def fetch_date(self, date): - state_topic = self.kafka.topics[self.state_topic] produce_topic = self.kafka.topics[self.produce_topic] date_str = date.strftime(DATE_FMT) @@ -118,7 +108,7 @@ class HarvestCrossrefWorker: resp = http_resp.json() items = self.extract_items(resp) count += len(items) - print("... got {} ({} of {}) in {}".format(len(items), count, + print("... got {} ({} of {}), HTTP fetch took {}".format(len(items), count, self.extract_total(resp), http_resp.elapsed)) #print(json.dumps(resp)) for work in items: @@ -127,46 +117,28 @@ class HarvestCrossrefWorker: break params = self.update_params(params, resp) - # record our completion state - with state_topic.get_sync_producer() as producer: - producer.produce(date.strftime(DATE_FMT).encode('utf-8')) - def extract_items(self, resp): return resp['message']['items'] def extract_total(self, resp): return resp['message']['total-results'] - def run_once(self): - today_utc = datetime.datetime.utcnow().date() - if self.start_date is None: - self.start_date = self.get_latest_date() - if self.start_date: - # if we are continuing, start day after last success - self.start_date = self.start_date + datetime.timedelta(days=1) - if self.start_date is None: - # bootstrap to yesterday (don't want to start on today until it's over) - self.start_date = datetime.datetime.utcnow().date() - if self.end_date is None: - # bootstrap to yesterday (don't want to start on today until it's over) - self.end_date = today_utc - datetime.timedelta(days=1) - print("Harvesting from {} through {}".format(self.start_date, self.end_date)) - current = self.start_date - while current <= self.end_date: - print("Fetching DOIs updated on {} (UTC)".format(current)) - self.fetch_date(current) - current += datetime.timedelta(days=1) - print("{} DOI ingest caught up through {}".format(self.name, self.end_date)) - return self.end_date - - def run_loop(self): - while True: - last = self.run_once() - self.start_date = last - self.end_date = None - print("Sleeping {} seconds...".format(self.loop_sleep)) - time.sleep(self.loop_sleep()) + def run(self, continuous=False): + while True: + current = self.state.next(continuous) + if current: + print("Fetching DOIs updated on {} (UTC)".format(current)) + self.fetch_date(current) + self.state.complete(current, kafka_topic=self.kafka.topics[self.state_topic]) + continue + + if continuous: + print("Sleeping {} seconds...".format(self.loop_sleep)) + time.sleep(self.loop_sleep()) + else: + break + print("{} DOI ingest caught up".format(self.name)) class HarvestDataciteWorker(HarvestCrossrefWorker): |