summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorBryan Newbold <bnewbold@robocracy.org>2018-11-19 20:57:19 -0800
committerBryan Newbold <bnewbold@robocracy.org>2018-11-19 20:57:19 -0800
commit65bdebea35f2ab3c9c8b0f8a8b0a9a577a36bee2 (patch)
tree145321b85a9af61f0a93112831724717faced5e0
parent337416e965457c07dab44864f6dabb516fdf6a03 (diff)
downloadfatcat-65bdebea35f2ab3c9c8b0f8a8b0a9a577a36bee2.tar.gz
fatcat-65bdebea35f2ab3c9c8b0f8a8b0a9a577a36bee2.zip
better DOI registrar harvesters
-rwxr-xr-xpython/fatcat_harvest.py7
-rw-r--r--python/fatcat_tools/harvest/__init__.py1
-rw-r--r--python/fatcat_tools/harvest/doi_registrars.py68
-rw-r--r--python/fatcat_tools/harvest/harvest_common.py124
-rw-r--r--python/tests/harvest_state.py40
5 files changed, 190 insertions, 50 deletions
diff --git a/python/fatcat_harvest.py b/python/fatcat_harvest.py
index dd98d22a..f1bb3416 100755
--- a/python/fatcat_harvest.py
+++ b/python/fatcat_harvest.py
@@ -13,7 +13,7 @@ def run_crossref(args):
contact_email=args.contact_email,
start_date=args.start_date,
end_date=args.end_date)
- worker.run_once()
+ worker.run()
def run_datacite(args):
worker = HarvestDataciteWorker(
@@ -23,7 +23,7 @@ def run_datacite(args):
contact_email=args.contact_email,
start_date=args.start_date,
end_date=args.end_date)
- worker.run_once()
+ worker.run()
def mkdate(raw):
return datetime.datetime.strptime(raw, "%Y-%m-%d").date()
@@ -48,6 +48,9 @@ def main():
parser.add_argument('--contact-email',
default="undefined", # better?
help="contact email to use in API header")
+ parser.add_argument('--continuous',
+ default=False,
+ help="continue harvesting indefinitely in a loop?")
subparsers = parser.add_subparsers()
sub_crossref = subparsers.add_parser('crossref')
diff --git a/python/fatcat_tools/harvest/__init__.py b/python/fatcat_tools/harvest/__init__.py
index e1bde753..4de2cbde 100644
--- a/python/fatcat_tools/harvest/__init__.py
+++ b/python/fatcat_tools/harvest/__init__.py
@@ -1,2 +1,3 @@
+from .harvest_common import HarvestState
from .doi_registrars import HarvestCrossrefWorker, HarvestDataciteWorker
diff --git a/python/fatcat_tools/harvest/doi_registrars.py b/python/fatcat_tools/harvest/doi_registrars.py
index ed80cfc9..d5e4b7ec 100644
--- a/python/fatcat_tools/harvest/doi_registrars.py
+++ b/python/fatcat_tools/harvest/doi_registrars.py
@@ -9,7 +9,8 @@ import itertools
import datetime
from pykafka import KafkaClient
-from fatcat_tools.workers.worker_common import most_recent_message
+from fatcat_tools.workers import most_recent_message
+from .harvest_common import HarvestState
# Skip pylint due to:
# AttributeError: 'NoneType' object has no attribute 'scope'
@@ -62,24 +63,14 @@ class HarvestCrossrefWorker:
self.kafka = KafkaClient(hosts=kafka_hosts, broker_version="1.0.0")
self.is_update_filter = is_update_filter
- # these are both optional, and should be datetime.date
- self.start_date = start_date
- self.end_date = end_date
+ self.state = HarvestState(start_date, end_date)
+ self.state.initialize_from_kafka(self.kafka.topics[self.state_topic])
self.loop_sleep = 60*60 # how long to wait, in seconds, between date checks
self.api_batch_size = 50
# for crossref, it's "from-index-date"
self.name = "Crossref"
- def get_latest_date(self):
-
- state_topic = self.kafka.topics[self.state_topic]
- latest = most_recent_message(state_topic)
- if latest:
- latest = datetime.datetime.strptime(latest.decode('utf-8'), DATE_FMT).date()
- print("Latest date found: {}".format(latest))
- return latest
-
def params(self, date_str):
filter_param = 'from-index-date:{},until-index-date:{}'.format(
date_str, date_str)
@@ -97,7 +88,6 @@ class HarvestCrossrefWorker:
def fetch_date(self, date):
- state_topic = self.kafka.topics[self.state_topic]
produce_topic = self.kafka.topics[self.produce_topic]
date_str = date.strftime(DATE_FMT)
@@ -118,7 +108,7 @@ class HarvestCrossrefWorker:
resp = http_resp.json()
items = self.extract_items(resp)
count += len(items)
- print("... got {} ({} of {}) in {}".format(len(items), count,
+ print("... got {} ({} of {}), HTTP fetch took {}".format(len(items), count,
self.extract_total(resp), http_resp.elapsed))
#print(json.dumps(resp))
for work in items:
@@ -127,46 +117,28 @@ class HarvestCrossrefWorker:
break
params = self.update_params(params, resp)
- # record our completion state
- with state_topic.get_sync_producer() as producer:
- producer.produce(date.strftime(DATE_FMT).encode('utf-8'))
-
def extract_items(self, resp):
return resp['message']['items']
def extract_total(self, resp):
return resp['message']['total-results']
- def run_once(self):
- today_utc = datetime.datetime.utcnow().date()
- if self.start_date is None:
- self.start_date = self.get_latest_date()
- if self.start_date:
- # if we are continuing, start day after last success
- self.start_date = self.start_date + datetime.timedelta(days=1)
- if self.start_date is None:
- # bootstrap to yesterday (don't want to start on today until it's over)
- self.start_date = datetime.datetime.utcnow().date()
- if self.end_date is None:
- # bootstrap to yesterday (don't want to start on today until it's over)
- self.end_date = today_utc - datetime.timedelta(days=1)
- print("Harvesting from {} through {}".format(self.start_date, self.end_date))
- current = self.start_date
- while current <= self.end_date:
- print("Fetching DOIs updated on {} (UTC)".format(current))
- self.fetch_date(current)
- current += datetime.timedelta(days=1)
- print("{} DOI ingest caught up through {}".format(self.name, self.end_date))
- return self.end_date
-
- def run_loop(self):
- while True:
- last = self.run_once()
- self.start_date = last
- self.end_date = None
- print("Sleeping {} seconds...".format(self.loop_sleep))
- time.sleep(self.loop_sleep())
+ def run(self, continuous=False):
+ while True:
+ current = self.state.next(continuous)
+ if current:
+ print("Fetching DOIs updated on {} (UTC)".format(current))
+ self.fetch_date(current)
+ self.state.complete(current, kafka_topic=self.kafka.topics[self.state_topic])
+ continue
+
+ if continuous:
+ print("Sleeping {} seconds...".format(self.loop_sleep))
+ time.sleep(self.loop_sleep())
+ else:
+ break
+ print("{} DOI ingest caught up".format(self.name))
class HarvestDataciteWorker(HarvestCrossrefWorker):
diff --git a/python/fatcat_tools/harvest/harvest_common.py b/python/fatcat_tools/harvest/harvest_common.py
new file mode 100644
index 00000000..f0ef51aa
--- /dev/null
+++ b/python/fatcat_tools/harvest/harvest_common.py
@@ -0,0 +1,124 @@
+
+import sys
+import json
+import time
+import datetime
+
+
+DATE_FMT = "%Y-%m-%d"
+
+class HarvestState:
+ """
+ First version of this works with full days (dates)
+
+ General concept is to have harvesters serialize state when they make
+ progress and push to kafka. On startup, harvesters are given a task (extend
+ of work), and consume the full history to see what work remains to be done.
+
+ The simplest flow is:
+ - harvester is told to collect last N days of updates
+ - creates an to_process set
+ - for each update, pops date from in_progress (if exits)
+
+ NOTE: this thing is sorta over-engineered... but might grow in the future
+ NOTE: should this class manage the state topic as well? Hrm.
+ """
+
+ def __init__(self, start_date=None, end_date=None, catchup_days=7):
+ self.to_process = set()
+ self.completed = set()
+
+ if catchup_days or start_date or end_date:
+ self.enqueue_period(start_date, end_date, catchup_days)
+
+ def enqueue_period(self, start_date=None, end_date=None, catchup_days=7):
+ """
+ This function adds a time period to the "TODO" list, unless the dates
+ have already been processed.
+
+ By default the period is "<catchup_days> ago until yesterday"
+ """
+
+ today_utc = datetime.datetime.utcnow().date()
+ if start_date is None:
+ # bootstrap to N days ago
+ start_date = today_utc - datetime.timedelta(days=catchup_days)
+ if end_date is None:
+ # bootstrap to yesterday (don't want to start on today until it's over)
+ end_date = today_utc - datetime.timedelta(days=1)
+
+ current = start_date
+ while current <= end_date:
+ if not current in self.completed:
+ self.to_process.add(current)
+ current += datetime.timedelta(days=1)
+
+ def next(self, continuous=False):
+ """
+ Gets next timespan (date) to be processed, or returns None if completed.
+
+ If 'continuous' arg is True, will try to enqueue recent possibly valid
+ timespans; the idea is to call next() repeatedly, and it will return a
+ new timespan when it becomes "available".
+ """
+ if continuous:
+ # enqueue yesterday
+ self.enqueue_period(start_date=datetime.datetime.utcnow().date() - datetime.timedelta(days=1))
+ if not self.to_process:
+ return None
+ return sorted(list(self.to_process))[0]
+
+ def update(self, state_json):
+ """
+ Merges a state JSON object into the current state.
+
+ This is expected to be used to "catch-up" on previously serialized
+ state stored on disk or in Kafka.
+ """
+ state = json.loads(state_json)
+ if 'completed-date' in state:
+ date = datetime.datetime.strptime(state['completed-date'], DATE_FMT).date()
+ self.complete(date)
+
+ def complete(self, date, kafka_topic=None):
+ """
+ Records that a date has been processed successfully.
+
+ Updates internal state and returns a JSON representation to be
+ serialized. Will publish to a kafka topic if passed as an argument.
+
+ kafka_topic should have type pykafka.Topic (not str)
+ """
+ try:
+ self.to_process.remove(date)
+ except KeyError:
+ pass
+ self.completed.add(date)
+ state_json = json.dumps({
+ 'in-progress-dates': [str(d) for d in self.to_process],
+ 'completed-date': str(date),
+ }).encode('utf-8')
+ if kafka_topic:
+ with kafka_topic.get_sync_producer() as producer:
+ producer.produce(state_json)
+ return state_json
+
+ def initialize_from_kafka(self, kafka_topic):
+ """
+ kafka_topic should have type pykafka.Topic (not str)
+ """
+ if not kafka_topic:
+ return
+
+ print("Fetching state from kafka topic: {}".format(kafka_topic.name))
+ consumer = kafka_topic.get_simple_consumer(consumer_timeout_ms=1000)
+ c = 0
+ while True:
+ msg = consumer.consume(block=True)
+ if not msg:
+ break
+ #sys.stdout.write('.')
+ self.update(msg.value.decode('utf-8'))
+ c += 1
+ print("... got {} state update messages, done".format(c))
+
diff --git a/python/tests/harvest_state.py b/python/tests/harvest_state.py
new file mode 100644
index 00000000..85cd2c99
--- /dev/null
+++ b/python/tests/harvest_state.py
@@ -0,0 +1,40 @@
+
+import json
+import pytest
+import datetime
+from fatcat_tools.harvest import *
+
+
+def test_harvest_state():
+
+ today = datetime.datetime.utcnow().date()
+
+ hs = HarvestState(catchup_days=5)
+ assert max(hs.to_process) < today
+ assert len(hs.to_process) is 5
+
+ for d in list(hs.to_process):
+ hs.complete(d)
+
+ assert hs.next() is None
+
+ hs = HarvestState(
+ start_date=datetime.date(2000,1,1),
+ end_date=datetime.date(2000,1,3),
+ )
+ assert len(hs.to_process) is 3
+ hs = HarvestState(
+ start_date=datetime.date(2000,1,29),
+ end_date=datetime.date(2000,2,2),
+ )
+ assert len(hs.to_process) is 5
+
+ hs = HarvestState(catchup_days=0)
+ assert hs.next() is None
+ hs.enqueue_period(
+ start_date=datetime.date(2000,1,1),
+ end_date=datetime.date(2000,1,3),
+ )
+ assert len(hs.to_process) is 3
+ hs.update('{"completed-date": "2000-01-02"}')
+ assert len(hs.to_process) is 2