summaryrefslogtreecommitdiffstats
path: root/python/fatcat_tools/harvest/doi_registrars.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/fatcat_tools/harvest/doi_registrars.py')
-rw-r--r--python/fatcat_tools/harvest/doi_registrars.py68
1 files changed, 20 insertions, 48 deletions
diff --git a/python/fatcat_tools/harvest/doi_registrars.py b/python/fatcat_tools/harvest/doi_registrars.py
index ed80cfc9..d5e4b7ec 100644
--- a/python/fatcat_tools/harvest/doi_registrars.py
+++ b/python/fatcat_tools/harvest/doi_registrars.py
@@ -9,7 +9,8 @@ import itertools
import datetime
from pykafka import KafkaClient
-from fatcat_tools.workers.worker_common import most_recent_message
+from fatcat_tools.workers import most_recent_message
+from .harvest_common import HarvestState
# Skip pylint due to:
# AttributeError: 'NoneType' object has no attribute 'scope'
@@ -62,24 +63,14 @@ class HarvestCrossrefWorker:
self.kafka = KafkaClient(hosts=kafka_hosts, broker_version="1.0.0")
self.is_update_filter = is_update_filter
- # these are both optional, and should be datetime.date
- self.start_date = start_date
- self.end_date = end_date
+ self.state = HarvestState(start_date, end_date)
+ self.state.initialize_from_kafka(self.kafka.topics[self.state_topic])
self.loop_sleep = 60*60 # how long to wait, in seconds, between date checks
self.api_batch_size = 50
# for crossref, it's "from-index-date"
self.name = "Crossref"
- def get_latest_date(self):
-
- state_topic = self.kafka.topics[self.state_topic]
- latest = most_recent_message(state_topic)
- if latest:
- latest = datetime.datetime.strptime(latest.decode('utf-8'), DATE_FMT).date()
- print("Latest date found: {}".format(latest))
- return latest
-
def params(self, date_str):
filter_param = 'from-index-date:{},until-index-date:{}'.format(
date_str, date_str)
@@ -97,7 +88,6 @@ class HarvestCrossrefWorker:
def fetch_date(self, date):
- state_topic = self.kafka.topics[self.state_topic]
produce_topic = self.kafka.topics[self.produce_topic]
date_str = date.strftime(DATE_FMT)
@@ -118,7 +108,7 @@ class HarvestCrossrefWorker:
resp = http_resp.json()
items = self.extract_items(resp)
count += len(items)
- print("... got {} ({} of {}) in {}".format(len(items), count,
+ print("... got {} ({} of {}), HTTP fetch took {}".format(len(items), count,
self.extract_total(resp), http_resp.elapsed))
#print(json.dumps(resp))
for work in items:
@@ -127,46 +117,28 @@ class HarvestCrossrefWorker:
break
params = self.update_params(params, resp)
- # record our completion state
- with state_topic.get_sync_producer() as producer:
- producer.produce(date.strftime(DATE_FMT).encode('utf-8'))
-
def extract_items(self, resp):
return resp['message']['items']
def extract_total(self, resp):
return resp['message']['total-results']
- def run_once(self):
- today_utc = datetime.datetime.utcnow().date()
- if self.start_date is None:
- self.start_date = self.get_latest_date()
- if self.start_date:
- # if we are continuing, start day after last success
- self.start_date = self.start_date + datetime.timedelta(days=1)
- if self.start_date is None:
- # bootstrap to yesterday (don't want to start on today until it's over)
- self.start_date = datetime.datetime.utcnow().date()
- if self.end_date is None:
- # bootstrap to yesterday (don't want to start on today until it's over)
- self.end_date = today_utc - datetime.timedelta(days=1)
- print("Harvesting from {} through {}".format(self.start_date, self.end_date))
- current = self.start_date
- while current <= self.end_date:
- print("Fetching DOIs updated on {} (UTC)".format(current))
- self.fetch_date(current)
- current += datetime.timedelta(days=1)
- print("{} DOI ingest caught up through {}".format(self.name, self.end_date))
- return self.end_date
-
- def run_loop(self):
- while True:
- last = self.run_once()
- self.start_date = last
- self.end_date = None
- print("Sleeping {} seconds...".format(self.loop_sleep))
- time.sleep(self.loop_sleep())
+ def run(self, continuous=False):
+ while True:
+ current = self.state.next(continuous)
+ if current:
+ print("Fetching DOIs updated on {} (UTC)".format(current))
+ self.fetch_date(current)
+ self.state.complete(current, kafka_topic=self.kafka.topics[self.state_topic])
+ continue
+
+ if continuous:
+ print("Sleeping {} seconds...".format(self.loop_sleep))
+ time.sleep(self.loop_sleep())
+ else:
+ break
+ print("{} DOI ingest caught up".format(self.name))
class HarvestDataciteWorker(HarvestCrossrefWorker):