diff options
Diffstat (limited to 'python')
-rw-r--r-- | python/fatcat_tools/harvest/__init__.py | 2 | ||||
-rw-r--r-- | python/fatcat_tools/harvest/crossrefish.py | 39 | ||||
-rw-r--r-- | python/fatcat_tools/harvest/datacite.py | 29 | ||||
-rw-r--r-- | python/fatcat_tools/harvest/ingest_common.py | 127 | ||||
-rwxr-xr-x | python/fatcat_worker.py | 49 |
5 files changed, 240 insertions, 6 deletions
diff --git a/python/fatcat_tools/harvest/__init__.py b/python/fatcat_tools/harvest/__init__.py new file mode 100644 index 00000000..85034f04 --- /dev/null +++ b/python/fatcat_tools/harvest/__init__.py @@ -0,0 +1,2 @@ + +from .crossrefish import HarvestCrossrefWorker diff --git a/python/fatcat_tools/harvest/crossrefish.py b/python/fatcat_tools/harvest/crossrefish.py new file mode 100644 index 00000000..a88cedbd --- /dev/null +++ b/python/fatcat_tools/harvest/crossrefish.py @@ -0,0 +1,39 @@ + +""" +Notes on crossref API: + +- from-index-date is the updated time +- is-update can be false, to catch only new or only old works + +https://api.crossref.org/works?filter=from-index-date:2018-11-14,is-update:false&rows=2 + +I think the design is going to have to be a cronjob or long-running job +(with long sleeps) which publishes "success through" to a separate state +queue, as simple YYYY-MM-DD strings. + +Within a day, will need to use a resumption token. Maybe should use a +crossref library... meh. + +will want to have some mechanism in kafka consumer (pushing to fatcat) to group +in batches as well. maybe even pass through as batches? or just use timeouts on +iteration. +""" + +from fatcat_tools.harvest.ingest_common import DoiApiHarvest + +class HarvestCrossrefWorker(DoiApiHarvest): + + def __init__(self, kafka_hosts, produce_topic, state_topic, contact_email, + api_host_url="https://api.crossref.org/works", + is_update_filter=None, + start_date=None, end_date=None): + super().__init__(kafka_hosts=kafka_hosts, + produce_topic=produce_topic, + state_topic=state_topic, + api_host_url=api_host_url, + contact_email=contact_email, + start_date=start_date, + end_date=end_date) + + self.is_update_filter = is_update_filter + diff --git a/python/fatcat_tools/harvest/datacite.py b/python/fatcat_tools/harvest/datacite.py new file mode 100644 index 00000000..12860810 --- /dev/null +++ b/python/fatcat_tools/harvest/datacite.py @@ -0,0 +1,29 @@ + +""" +datacite has a REST API as well as OAI-PMH endpoint. + +have about 8 million + +bulk export notes: https://github.com/datacite/datacite/issues/188 + +fundamentally, very similar to crossref. don't have a scrape... maybe +could/should use this script for that, and dump to JSON? +""" + +from fatcat_tools.harvest.ingest_common import DoiApiHarvest + +class HarvestDataciteWorker(DoiApiHarvest): + + def __init__(self, kafka_hosts, produce_topic, state_topic, contact_email, + api_host_url="https://api.datacite.org/works", + start_date=None, end_date=None): + super().__init__(kafka_hosts=kafka_hosts, + produce_topic=produce_topic, + state_topic=state_topic, + api_host_url=api_host_url, + contact_email=contact_email, + start_date=start_date, + end_date=end_date) + + self.update_filter_name = "update" + diff --git a/python/fatcat_tools/harvest/ingest_common.py b/python/fatcat_tools/harvest/ingest_common.py new file mode 100644 index 00000000..67ff3dc3 --- /dev/null +++ b/python/fatcat_tools/harvest/ingest_common.py @@ -0,0 +1,127 @@ + +""" +logic: +- on start, fetch latest date from state feed +- in a function (unit-testable), decide which dates to ingest +- for each date needing update: + - start a loop for just that date, using resumption token for this query + - when done, publish to state feed, with immediate sync +""" + +import re +import sys +import csv +import json +import requests +import itertools +import datetime +from pykafka import KafkaClient + +from fatcat_tools.workers.worker_common import most_recent_message + +DATE_FMT = "%Y-%m-%d" + +class DoiApiHarvest: + """ + This class supports core features for both the Crossref and Datacite REST + APIs for fetching updated metadata (the Datacite API seems to be moduled on + the Crossref API). + + Implementations must provide the push results function. + """ + + def __init__(self, kafka_hosts, produce_topic, state_topic, api_host_url, + contact_email, start_date=None, end_date=None): + self.loop_sleep = 60*60 # how long to wait, in seconds, between date checks + self.api_batch_size = 50 + self.api_host_url = api_host_url + self.produce_topic = produce_topic + self.state_topic = state_topic + self.contact_email = contact_email + self.kafka = KafkaClient(hosts=kafka_hosts, broker_version="1.0.0") + self.is_update_filter = None + self.update_filter_name = "index" + + # these are both optional, and should be datetime.date + self.start_date = start_date + self.end_date = end_date + + def get_latest_date(self): + + state_topic = self.kafka.topics[self.state_topic] + latest = most_recent_message(state_topic) + if latest: + latest = datetime.datetime.strptime(latest.decode('utf-8'), DATE_FMT).date() + print("Latest date found: {}".format(latest)) + return latest + + def fetch_date(self, date): + + state_topic = self.kafka.topics[self.state_topic] + produce_topic = self.kafka.topics[self.produce_topic] + + date_str = date.strftime(DATE_FMT) + filter_param = 'from-{index}-date:{},until-{index}-date:{}'.format( + date_str, date_str, index=self.update_filter_name) + if self.is_update_filter is not None: + filter_param += ',is_update:{}'.format(bool(is_update)) + params = { + 'filter': filter_param, + 'rows': self.api_batch_size, + 'cursor': '*', + } + headers = { + 'User-Agent': 'fatcat_tools/0.1.0 (https://fatcat.wiki; mailto:{}) python-requests'.format(self.contact_email), + } + count = 0 + with produce_topic.get_producer() as producer: + while True: + http_resp = requests.get(self.api_host_url, params, headers=headers) + assert http_resp.status_code is 200 + resp = http_resp.json() + items = resp['message']['items'] + count += len(items) + print("... got {} ({} of {}) in {}".format(len(items), count, + resp['message']['total-results']), http_resp.elapsed) + #print(json.dumps(resp)) + for work in items: + producer.produce(json.dumps(work).encode('utf-8')) + if len(items) < params['rows']: + break + params['cursor'] = resp['message']['next-cursor'] + + # record our completion state + with state_topic.get_sync_producer() as producer: + producer.produce(date.strftime(DATE_FMT).encode('utf-8')) + + + def run_once(self): + today_utc = datetime.datetime.utcnow().date() + if self.start_date is None: + self.start_date = self.get_latest_date() + if self.start_date: + # if we are continuing, start day after last success + self.start_date = self.start_date + datetime.timedelta(days=1) + if self.start_date is None: + # bootstrap to yesterday (don't want to start on today until it's over) + self.start_date = datetime.datetime.utcnow().date() + if self.end_date is None: + # bootstrap to yesterday (don't want to start on today until it's over) + self.end_date = today_utc - datetime.timedelta(days=1) + print("Harvesting from {} through {}".format(self.start_date, self.end_date)) + current = self.start_date + while current <= self.end_date: + print("Fetching DOIs updated on {} (UTC)".format(current)) + self.fetch_date(current) + current += datetime.timedelta(days=1) + print("Crossref DOI ingest caught up through {}".format(self.end_date)) + return self.end_date + + def run_loop(self): + while True: + last = self.run_once() + self.start_date = last + self.end_date = None + print("Sleeping {} seconds...".format(self.loop_sleep)) + time.sleep(self.loop_sleep()) + diff --git a/python/fatcat_worker.py b/python/fatcat_worker.py index 611c8e1b..f68b0606 100755 --- a/python/fatcat_worker.py +++ b/python/fatcat_worker.py @@ -2,29 +2,51 @@ import sys import argparse +import datetime from fatcat_tools.workers.changelog import FatcatChangelogWorker, FatcatEntityUpdatesWorker from fatcat_tools.workers.elastic import FatcatElasticReleaseWorker +from fatcat_tools.harvest import HarvestCrossrefWorker -def run_changelog_worker(args): +def run_changelog(args): topic = "fatcat-{}.changelog".format(args.env) worker = FatcatChangelogWorker(args.api_host_url, args.kafka_hosts, topic, args.poll_interval) worker.run() -def run_entity_updates_worker(args): +def run_entity_updates(args): changelog_topic = "fatcat-{}.changelog".format(args.env) release_topic = "fatcat-{}.release-updates".format(args.env) worker = FatcatEntityUpdatesWorker(args.api_host_url, args.kafka_hosts, changelog_topic, release_topic) worker.run() -def run_elastic_release_worker(args): +def run_elastic_release(args): consume_topic = "fatcat-{}.release-updates".format(args.env) worker = FatcatElasticReleaseWorker(args.kafka_hosts, consume_topic, elastic_backend=args.elastic_backend, elastic_index=args.elastic_index) worker.run() +def run_harvest_crossref(args): + worker = HarvestCrossrefWorker( + args.kafka_hosts, + produce_topic="fatcat-{}.crossref".format(args.env), + state_topic="fatcat-{}.crossref-state".format(args.env), + contact_email=args.contact_email, + start_date=args.start_date, + end_date=args.end_date) + worker.run_once() + +def run_harvest_datacite(args): + worker = HarvestDataciteWorker( + args.kafka_hosts, + produce_topic="fatcat-{}.datacite".format(args.env), + state_topic="fatcat-{}.datacite-state".format(args.env), + contact_email=args.contact_email, + start_date=args.start_date, + end_date=args.end_date) + worker.run_once() + def main(): parser = argparse.ArgumentParser() parser.add_argument('--debug', @@ -42,16 +64,16 @@ def main(): subparsers = parser.add_subparsers() sub_changelog = subparsers.add_parser('changelog') - sub_changelog.set_defaults(func=run_changelog_worker) + sub_changelog.set_defaults(func=run_changelog) sub_changelog.add_argument('--poll-interval', help="how long to wait between polling (seconds)", default=10.0, type=float) sub_entity_updates = subparsers.add_parser('entity-updates') - sub_entity_updates.set_defaults(func=run_entity_updates_worker) + sub_entity_updates.set_defaults(func=run_entity_updates) sub_elastic_release = subparsers.add_parser('elastic-release') - sub_elastic_release.set_defaults(func=run_elastic_release_worker) + sub_elastic_release.set_defaults(func=run_elastic_release) sub_elastic_release.add_argument('--elastic-backend', help="elasticsearch backend to connect to", default="http://localhost:9200") @@ -59,6 +81,21 @@ def main(): help="elasticsearch index to push into", default="fatcat") + def mkdate(raw): + return datetime.datetime.strptime(raw, "%Y-%m-%d").date() + + sub_harvest_crossref = subparsers.add_parser('harvest-crossref') + sub_harvest_crossref.set_defaults(func=run_harvest_crossref) + sub_harvest_crossref.add_argument('--contact-email', + default="undefined", # better? + help="contact email to use in API header") + sub_harvest_crossref.add_argument('--start-date', + default=None, type=mkdate, + help="begining of harvest period") + sub_harvest_crossref.add_argument('--end-date', + default=None, type=mkdate, + help="end of harvest period") + args = parser.parse_args() if not args.__dict__.get("func"): print("tell me what to do!") |