From 65bdebea35f2ab3c9c8b0f8a8b0a9a577a36bee2 Mon Sep 17 00:00:00 2001 From: Bryan Newbold Date: Mon, 19 Nov 2018 20:57:19 -0800 Subject: better DOI registrar harvesters --- python/fatcat_harvest.py | 7 +- python/fatcat_tools/harvest/__init__.py | 1 + python/fatcat_tools/harvest/doi_registrars.py | 68 +++++--------- python/fatcat_tools/harvest/harvest_common.py | 124 ++++++++++++++++++++++++++ python/tests/harvest_state.py | 40 +++++++++ 5 files changed, 190 insertions(+), 50 deletions(-) create mode 100644 python/fatcat_tools/harvest/harvest_common.py create mode 100644 python/tests/harvest_state.py (limited to 'python') diff --git a/python/fatcat_harvest.py b/python/fatcat_harvest.py index dd98d22a..f1bb3416 100755 --- a/python/fatcat_harvest.py +++ b/python/fatcat_harvest.py @@ -13,7 +13,7 @@ def run_crossref(args): contact_email=args.contact_email, start_date=args.start_date, end_date=args.end_date) - worker.run_once() + worker.run() def run_datacite(args): worker = HarvestDataciteWorker( @@ -23,7 +23,7 @@ def run_datacite(args): contact_email=args.contact_email, start_date=args.start_date, end_date=args.end_date) - worker.run_once() + worker.run() def mkdate(raw): return datetime.datetime.strptime(raw, "%Y-%m-%d").date() @@ -48,6 +48,9 @@ def main(): parser.add_argument('--contact-email', default="undefined", # better? help="contact email to use in API header") + parser.add_argument('--continuous', + default=False, + help="continue harvesting indefinitely in a loop?") subparsers = parser.add_subparsers() sub_crossref = subparsers.add_parser('crossref') diff --git a/python/fatcat_tools/harvest/__init__.py b/python/fatcat_tools/harvest/__init__.py index e1bde753..4de2cbde 100644 --- a/python/fatcat_tools/harvest/__init__.py +++ b/python/fatcat_tools/harvest/__init__.py @@ -1,2 +1,3 @@ +from .harvest_common import HarvestState from .doi_registrars import HarvestCrossrefWorker, HarvestDataciteWorker diff --git a/python/fatcat_tools/harvest/doi_registrars.py b/python/fatcat_tools/harvest/doi_registrars.py index ed80cfc9..d5e4b7ec 100644 --- a/python/fatcat_tools/harvest/doi_registrars.py +++ b/python/fatcat_tools/harvest/doi_registrars.py @@ -9,7 +9,8 @@ import itertools import datetime from pykafka import KafkaClient -from fatcat_tools.workers.worker_common import most_recent_message +from fatcat_tools.workers import most_recent_message +from .harvest_common import HarvestState # Skip pylint due to: # AttributeError: 'NoneType' object has no attribute 'scope' @@ -62,24 +63,14 @@ class HarvestCrossrefWorker: self.kafka = KafkaClient(hosts=kafka_hosts, broker_version="1.0.0") self.is_update_filter = is_update_filter - # these are both optional, and should be datetime.date - self.start_date = start_date - self.end_date = end_date + self.state = HarvestState(start_date, end_date) + self.state.initialize_from_kafka(self.kafka.topics[self.state_topic]) self.loop_sleep = 60*60 # how long to wait, in seconds, between date checks self.api_batch_size = 50 # for crossref, it's "from-index-date" self.name = "Crossref" - def get_latest_date(self): - - state_topic = self.kafka.topics[self.state_topic] - latest = most_recent_message(state_topic) - if latest: - latest = datetime.datetime.strptime(latest.decode('utf-8'), DATE_FMT).date() - print("Latest date found: {}".format(latest)) - return latest - def params(self, date_str): filter_param = 'from-index-date:{},until-index-date:{}'.format( date_str, date_str) @@ -97,7 +88,6 @@ class HarvestCrossrefWorker: def fetch_date(self, date): - state_topic = self.kafka.topics[self.state_topic] produce_topic = self.kafka.topics[self.produce_topic] date_str = date.strftime(DATE_FMT) @@ -118,7 +108,7 @@ class HarvestCrossrefWorker: resp = http_resp.json() items = self.extract_items(resp) count += len(items) - print("... got {} ({} of {}) in {}".format(len(items), count, + print("... got {} ({} of {}), HTTP fetch took {}".format(len(items), count, self.extract_total(resp), http_resp.elapsed)) #print(json.dumps(resp)) for work in items: @@ -127,46 +117,28 @@ class HarvestCrossrefWorker: break params = self.update_params(params, resp) - # record our completion state - with state_topic.get_sync_producer() as producer: - producer.produce(date.strftime(DATE_FMT).encode('utf-8')) - def extract_items(self, resp): return resp['message']['items'] def extract_total(self, resp): return resp['message']['total-results'] - def run_once(self): - today_utc = datetime.datetime.utcnow().date() - if self.start_date is None: - self.start_date = self.get_latest_date() - if self.start_date: - # if we are continuing, start day after last success - self.start_date = self.start_date + datetime.timedelta(days=1) - if self.start_date is None: - # bootstrap to yesterday (don't want to start on today until it's over) - self.start_date = datetime.datetime.utcnow().date() - if self.end_date is None: - # bootstrap to yesterday (don't want to start on today until it's over) - self.end_date = today_utc - datetime.timedelta(days=1) - print("Harvesting from {} through {}".format(self.start_date, self.end_date)) - current = self.start_date - while current <= self.end_date: - print("Fetching DOIs updated on {} (UTC)".format(current)) - self.fetch_date(current) - current += datetime.timedelta(days=1) - print("{} DOI ingest caught up through {}".format(self.name, self.end_date)) - return self.end_date - - def run_loop(self): - while True: - last = self.run_once() - self.start_date = last - self.end_date = None - print("Sleeping {} seconds...".format(self.loop_sleep)) - time.sleep(self.loop_sleep()) + def run(self, continuous=False): + while True: + current = self.state.next(continuous) + if current: + print("Fetching DOIs updated on {} (UTC)".format(current)) + self.fetch_date(current) + self.state.complete(current, kafka_topic=self.kafka.topics[self.state_topic]) + continue + + if continuous: + print("Sleeping {} seconds...".format(self.loop_sleep)) + time.sleep(self.loop_sleep()) + else: + break + print("{} DOI ingest caught up".format(self.name)) class HarvestDataciteWorker(HarvestCrossrefWorker): diff --git a/python/fatcat_tools/harvest/harvest_common.py b/python/fatcat_tools/harvest/harvest_common.py new file mode 100644 index 00000000..f0ef51aa --- /dev/null +++ b/python/fatcat_tools/harvest/harvest_common.py @@ -0,0 +1,124 @@ + +import sys +import json +import time +import datetime + + +DATE_FMT = "%Y-%m-%d" + +class HarvestState: + """ + First version of this works with full days (dates) + + General concept is to have harvesters serialize state when they make + progress and push to kafka. On startup, harvesters are given a task (extend + of work), and consume the full history to see what work remains to be done. + + The simplest flow is: + - harvester is told to collect last N days of updates + - creates an to_process set + - for each update, pops date from in_progress (if exits) + + NOTE: this thing is sorta over-engineered... but might grow in the future + NOTE: should this class manage the state topic as well? Hrm. + """ + + def __init__(self, start_date=None, end_date=None, catchup_days=7): + self.to_process = set() + self.completed = set() + + if catchup_days or start_date or end_date: + self.enqueue_period(start_date, end_date, catchup_days) + + def enqueue_period(self, start_date=None, end_date=None, catchup_days=7): + """ + This function adds a time period to the "TODO" list, unless the dates + have already been processed. + + By default the period is " ago until yesterday" + """ + + today_utc = datetime.datetime.utcnow().date() + if start_date is None: + # bootstrap to N days ago + start_date = today_utc - datetime.timedelta(days=catchup_days) + if end_date is None: + # bootstrap to yesterday (don't want to start on today until it's over) + end_date = today_utc - datetime.timedelta(days=1) + + current = start_date + while current <= end_date: + if not current in self.completed: + self.to_process.add(current) + current += datetime.timedelta(days=1) + + def next(self, continuous=False): + """ + Gets next timespan (date) to be processed, or returns None if completed. + + If 'continuous' arg is True, will try to enqueue recent possibly valid + timespans; the idea is to call next() repeatedly, and it will return a + new timespan when it becomes "available". + """ + if continuous: + # enqueue yesterday + self.enqueue_period(start_date=datetime.datetime.utcnow().date() - datetime.timedelta(days=1)) + if not self.to_process: + return None + return sorted(list(self.to_process))[0] + + def update(self, state_json): + """ + Merges a state JSON object into the current state. + + This is expected to be used to "catch-up" on previously serialized + state stored on disk or in Kafka. + """ + state = json.loads(state_json) + if 'completed-date' in state: + date = datetime.datetime.strptime(state['completed-date'], DATE_FMT).date() + self.complete(date) + + def complete(self, date, kafka_topic=None): + """ + Records that a date has been processed successfully. + + Updates internal state and returns a JSON representation to be + serialized. Will publish to a kafka topic if passed as an argument. + + kafka_topic should have type pykafka.Topic (not str) + """ + try: + self.to_process.remove(date) + except KeyError: + pass + self.completed.add(date) + state_json = json.dumps({ + 'in-progress-dates': [str(d) for d in self.to_process], + 'completed-date': str(date), + }).encode('utf-8') + if kafka_topic: + with kafka_topic.get_sync_producer() as producer: + producer.produce(state_json) + return state_json + + def initialize_from_kafka(self, kafka_topic): + """ + kafka_topic should have type pykafka.Topic (not str) + """ + if not kafka_topic: + return + + print("Fetching state from kafka topic: {}".format(kafka_topic.name)) + consumer = kafka_topic.get_simple_consumer(consumer_timeout_ms=1000) + c = 0 + while True: + msg = consumer.consume(block=True) + if not msg: + break + #sys.stdout.write('.') + self.update(msg.value.decode('utf-8')) + c += 1 + print("... got {} state update messages, done".format(c)) + diff --git a/python/tests/harvest_state.py b/python/tests/harvest_state.py new file mode 100644 index 00000000..85cd2c99 --- /dev/null +++ b/python/tests/harvest_state.py @@ -0,0 +1,40 @@ + +import json +import pytest +import datetime +from fatcat_tools.harvest import * + + +def test_harvest_state(): + + today = datetime.datetime.utcnow().date() + + hs = HarvestState(catchup_days=5) + assert max(hs.to_process) < today + assert len(hs.to_process) is 5 + + for d in list(hs.to_process): + hs.complete(d) + + assert hs.next() is None + + hs = HarvestState( + start_date=datetime.date(2000,1,1), + end_date=datetime.date(2000,1,3), + ) + assert len(hs.to_process) is 3 + hs = HarvestState( + start_date=datetime.date(2000,1,29), + end_date=datetime.date(2000,2,2), + ) + assert len(hs.to_process) is 5 + + hs = HarvestState(catchup_days=0) + assert hs.next() is None + hs.enqueue_period( + start_date=datetime.date(2000,1,1), + end_date=datetime.date(2000,1,3), + ) + assert len(hs.to_process) is 3 + hs.update('{"completed-date": "2000-01-02"}') + assert len(hs.to_process) is 2 -- cgit v1.2.3