diff options
Diffstat (limited to 'python')
-rw-r--r-- | python/README_harvest.md | 21 | ||||
-rwxr-xr-x | python/fatcat_harvest.py | 66 | ||||
-rw-r--r-- | python/fatcat_tools/harvest/__init__.py | 2 | ||||
-rw-r--r-- | python/fatcat_tools/harvest/crossrefish.py | 39 | ||||
-rw-r--r-- | python/fatcat_tools/harvest/datacite.py | 29 | ||||
-rw-r--r-- | python/fatcat_tools/harvest/doi_registrars.py | 209 | ||||
-rw-r--r-- | python/fatcat_tools/harvest/ingest_common.py | 127 | ||||
-rwxr-xr-x | python/fatcat_worker.py | 36 |
8 files changed, 297 insertions, 232 deletions
diff --git a/python/README_harvest.md b/python/README_harvest.md new file mode 100644 index 00000000..e308b90c --- /dev/null +++ b/python/README_harvest.md @@ -0,0 +1,21 @@ + +## State Refactoring + +Harvesters should/will work on fixed window sizes. + +Serialize state as JSON, publish to a state topic. On load, iterate through the +full state topic to construct recent history, and prepare a set of windows that +need harvesting, then iterate over these. + +If running as continuous process, will retain state and don't need to +re-iterate; if cron/one-off, do need to re-iterate. + +To start, do even OAI-PMH as dates. + +## "Bootstrapping" with bulk metadata + +1. start continuous update harvesting at time A +2. do a bulk dump starting at time B1 (later than A, with a margin), completing at B2 +3. with database starting from scratch at C (after B2), load full bulk + snapshot, then run all updates since A + diff --git a/python/fatcat_harvest.py b/python/fatcat_harvest.py new file mode 100755 index 00000000..dd98d22a --- /dev/null +++ b/python/fatcat_harvest.py @@ -0,0 +1,66 @@ +#!/usr/bin/env python3 + +import sys +import argparse +import datetime +from fatcat_tools.harvest import HarvestCrossrefWorker, HarvestDataciteWorker + +def run_crossref(args): + worker = HarvestCrossrefWorker( + args.kafka_hosts, + produce_topic="fatcat-{}.crossref".format(args.env), + state_topic="fatcat-{}.crossref-state".format(args.env), + contact_email=args.contact_email, + start_date=args.start_date, + end_date=args.end_date) + worker.run_once() + +def run_datacite(args): + worker = HarvestDataciteWorker( + args.kafka_hosts, + produce_topic="fatcat-{}.datacite".format(args.env), + state_topic="fatcat-{}.datacite-state".format(args.env), + contact_email=args.contact_email, + start_date=args.start_date, + end_date=args.end_date) + worker.run_once() + +def mkdate(raw): + return datetime.datetime.strptime(raw, "%Y-%m-%d").date() + +def main(): + parser = argparse.ArgumentParser() + parser.add_argument('--debug', + action='store_true', + help="enable debug logging") + parser.add_argument('--kafka-hosts', + default="localhost:9092", + help="list of Kafka brokers (host/port) to use") + parser.add_argument('--env', + default="qa", + help="Kafka topic namespace to use (eg, prod, qa)") + parser.add_argument('--start-date', + default=None, type=mkdate, + help="begining of harvest period") + parser.add_argument('--end-date', + default=None, type=mkdate, + help="end of harvest period") + parser.add_argument('--contact-email', + default="undefined", # better? + help="contact email to use in API header") + subparsers = parser.add_subparsers() + + sub_crossref = subparsers.add_parser('crossref') + sub_crossref.set_defaults(func=run_crossref) + + sub_datacite = subparsers.add_parser('datacite') + sub_datacite.set_defaults(func=run_datacite) + + args = parser.parse_args() + if not args.__dict__.get("func"): + print("tell me what to do!") + sys.exit(-1) + args.func(args) + +if __name__ == '__main__': + main() diff --git a/python/fatcat_tools/harvest/__init__.py b/python/fatcat_tools/harvest/__init__.py index 85034f04..e1bde753 100644 --- a/python/fatcat_tools/harvest/__init__.py +++ b/python/fatcat_tools/harvest/__init__.py @@ -1,2 +1,2 @@ -from .crossrefish import HarvestCrossrefWorker +from .doi_registrars import HarvestCrossrefWorker, HarvestDataciteWorker diff --git a/python/fatcat_tools/harvest/crossrefish.py b/python/fatcat_tools/harvest/crossrefish.py deleted file mode 100644 index a88cedbd..00000000 --- a/python/fatcat_tools/harvest/crossrefish.py +++ /dev/null @@ -1,39 +0,0 @@ - -""" -Notes on crossref API: - -- from-index-date is the updated time -- is-update can be false, to catch only new or only old works - -https://api.crossref.org/works?filter=from-index-date:2018-11-14,is-update:false&rows=2 - -I think the design is going to have to be a cronjob or long-running job -(with long sleeps) which publishes "success through" to a separate state -queue, as simple YYYY-MM-DD strings. - -Within a day, will need to use a resumption token. Maybe should use a -crossref library... meh. - -will want to have some mechanism in kafka consumer (pushing to fatcat) to group -in batches as well. maybe even pass through as batches? or just use timeouts on -iteration. -""" - -from fatcat_tools.harvest.ingest_common import DoiApiHarvest - -class HarvestCrossrefWorker(DoiApiHarvest): - - def __init__(self, kafka_hosts, produce_topic, state_topic, contact_email, - api_host_url="https://api.crossref.org/works", - is_update_filter=None, - start_date=None, end_date=None): - super().__init__(kafka_hosts=kafka_hosts, - produce_topic=produce_topic, - state_topic=state_topic, - api_host_url=api_host_url, - contact_email=contact_email, - start_date=start_date, - end_date=end_date) - - self.is_update_filter = is_update_filter - diff --git a/python/fatcat_tools/harvest/datacite.py b/python/fatcat_tools/harvest/datacite.py deleted file mode 100644 index 12860810..00000000 --- a/python/fatcat_tools/harvest/datacite.py +++ /dev/null @@ -1,29 +0,0 @@ - -""" -datacite has a REST API as well as OAI-PMH endpoint. - -have about 8 million - -bulk export notes: https://github.com/datacite/datacite/issues/188 - -fundamentally, very similar to crossref. don't have a scrape... maybe -could/should use this script for that, and dump to JSON? -""" - -from fatcat_tools.harvest.ingest_common import DoiApiHarvest - -class HarvestDataciteWorker(DoiApiHarvest): - - def __init__(self, kafka_hosts, produce_topic, state_topic, contact_email, - api_host_url="https://api.datacite.org/works", - start_date=None, end_date=None): - super().__init__(kafka_hosts=kafka_hosts, - produce_topic=produce_topic, - state_topic=state_topic, - api_host_url=api_host_url, - contact_email=contact_email, - start_date=start_date, - end_date=end_date) - - self.update_filter_name = "update" - diff --git a/python/fatcat_tools/harvest/doi_registrars.py b/python/fatcat_tools/harvest/doi_registrars.py new file mode 100644 index 00000000..1a6807d2 --- /dev/null +++ b/python/fatcat_tools/harvest/doi_registrars.py @@ -0,0 +1,209 @@ + +import re +import sys +import csv +import json +import requests +import itertools +import datetime +from pykafka import KafkaClient + +from fatcat_tools.workers.worker_common import most_recent_message + +DATE_FMT = "%Y-%m-%d" + + +class HarvestCrossrefWorker: + """ + Notes on crossref API: + + - from-index-date is the updated time + - is-update can be false, to catch only new or only old works + + https://api.crossref.org/works?filter=from-index-date:2018-11-14,is-update:false&rows=2 + + I think the design is going to have to be a cronjob or long-running job + (with long sleeps) which publishes "success through" to a separate state + queue, as simple YYYY-MM-DD strings. + + Within a day, will need to use a resumption token. Maybe should use a + crossref library... meh. + + will want to have some mechanism in kafka consumer (pushing to fatcat) to group + in batches as well. maybe even pass through as batches? or just use timeouts on + iteration. + + logic of this worker: + - on start, fetch latest date from state feed + - in a function (unit-testable), decide which dates to ingest + - for each date needing update: + - start a loop for just that date, using resumption token for this query + - when done, publish to state feed, with immediate sync + + TODO: what sort of parallelism? I guess multi-processing on dates, but need + to be careful how state is serialized back into kafka. + """ + + + def __init__(self, kafka_hosts, produce_topic, state_topic, contact_email, + api_host_url="https://api.crossref.org/works", start_date=None, + end_date=None, is_update_filter=None): + + self.api_host_url = api_host_url + self.produce_topic = produce_topic + self.state_topic = state_topic + self.contact_email = contact_email + self.kafka = KafkaClient(hosts=kafka_hosts, broker_version="1.0.0") + self.is_update_filter = is_update_filter + + # these are both optional, and should be datetime.date + self.start_date = start_date + self.end_date = end_date + + self.loop_sleep = 60*60 # how long to wait, in seconds, between date checks + self.api_batch_size = 50 + # for crossref, it's "from-index-date" + self.name = "Crossref" + + def get_latest_date(self): + + state_topic = self.kafka.topics[self.state_topic] + latest = most_recent_message(state_topic) + if latest: + latest = datetime.datetime.strptime(latest.decode('utf-8'), DATE_FMT).date() + print("Latest date found: {}".format(latest)) + return latest + + def params(self, date_str): + filter_param = 'from-index-date:{},until-index-date:{}'.format( + date_str, date_str) + if self.is_update_filter is not None: + filter_param += ',is_update:{}'.format(bool(self.is_update_filter)) + params = { + 'filter': filter_param, + 'rows': self.api_batch_size, + 'cursor': '*', + } + + def update_params(self, params, resp): + params['cursor'] = resp['message']['next-cursor'] + return params + + def fetch_date(self, date): + + state_topic = self.kafka.topics[self.state_topic] + produce_topic = self.kafka.topics[self.produce_topic] + + date_str = date.strftime(DATE_FMT) + params = self.params(date_str) + headers = { + 'User-Agent': 'fatcat_tools/0.1.0 (https://fatcat.wiki; mailto:{}) python-requests'.format(self.contact_email), + } + count = 0 + with produce_topic.get_producer() as producer: + while True: + http_resp = requests.get(self.api_host_url, params, headers=headers) + if http_resp.status_code is 503: + # crud backoff + print("got HTTP {}, pausing for 30 seconds".format(http_resp.status_code)) + time.sleep(30.0) + continue + assert http_resp.status_code is 200 + resp = http_resp.json() + items = self.extract_items(resp) + count += len(items) + print("... got {} ({} of {}) in {}".format(len(items), count, + self.extract_total(resp), http_resp.elapsed)) + #print(json.dumps(resp)) + for work in items: + producer.produce(json.dumps(work).encode('utf-8')) + if len(items) < self.api_batch_size: + break + params = self.update_params(params, resp) + + # record our completion state + with state_topic.get_sync_producer() as producer: + producer.produce(date.strftime(DATE_FMT).encode('utf-8')) + + def extract_items(self, resp): + return resp['message']['items'] + + def extract_total(self, resp): + return resp['message']['total-results'] + + def run_once(self): + today_utc = datetime.datetime.utcnow().date() + if self.start_date is None: + self.start_date = self.get_latest_date() + if self.start_date: + # if we are continuing, start day after last success + self.start_date = self.start_date + datetime.timedelta(days=1) + if self.start_date is None: + # bootstrap to yesterday (don't want to start on today until it's over) + self.start_date = datetime.datetime.utcnow().date() + if self.end_date is None: + # bootstrap to yesterday (don't want to start on today until it's over) + self.end_date = today_utc - datetime.timedelta(days=1) + print("Harvesting from {} through {}".format(self.start_date, self.end_date)) + current = self.start_date + while current <= self.end_date: + print("Fetching DOIs updated on {} (UTC)".format(current)) + self.fetch_date(current) + current += datetime.timedelta(days=1) + print("{} DOI ingest caught up through {}".format(self.name, self.end_date)) + return self.end_date + + def run_loop(self): + while True: + last = self.run_once() + self.start_date = last + self.end_date = None + print("Sleeping {} seconds...".format(self.loop_sleep)) + time.sleep(self.loop_sleep()) + + + +class HarvestDataciteWorker(HarvestCrossrefWorker): + """ + datacite has a REST API as well as OAI-PMH endpoint. + + have about 8 million + + bulk export notes: https://github.com/datacite/datacite/issues/188 + + fundamentally, very similar to crossref. don't have a scrape... maybe + could/should use this script for that, and dump to JSON? + """ + + def __init__(self, kafka_hosts, produce_topic, state_topic, contact_email, + api_host_url="https://api.datacite.org/works", + start_date=None, end_date=None): + super().__init__(kafka_hosts=kafka_hosts, + produce_topic=produce_topic, + state_topic=state_topic, + api_host_url=api_host_url, + contact_email=contact_email, + start_date=start_date, + end_date=end_date) + + # for datecite, it's "from-update-date" + self.name = "Datacite" + + def params(self, date_str): + return { + 'from-update-date': date_str, + 'until-update-date': date_str, + 'page[size]': self.api_batch_size, + 'page[number]': 1, + } + + def extract_items(self, resp): + return resp['data'] + + def extract_total(self, resp): + return resp['meta']['total'] + + def update_params(self, params, resp): + params['page[number]'] = resp['meta']['page'] + 1 + return params + diff --git a/python/fatcat_tools/harvest/ingest_common.py b/python/fatcat_tools/harvest/ingest_common.py deleted file mode 100644 index 67ff3dc3..00000000 --- a/python/fatcat_tools/harvest/ingest_common.py +++ /dev/null @@ -1,127 +0,0 @@ - -""" -logic: -- on start, fetch latest date from state feed -- in a function (unit-testable), decide which dates to ingest -- for each date needing update: - - start a loop for just that date, using resumption token for this query - - when done, publish to state feed, with immediate sync -""" - -import re -import sys -import csv -import json -import requests -import itertools -import datetime -from pykafka import KafkaClient - -from fatcat_tools.workers.worker_common import most_recent_message - -DATE_FMT = "%Y-%m-%d" - -class DoiApiHarvest: - """ - This class supports core features for both the Crossref and Datacite REST - APIs for fetching updated metadata (the Datacite API seems to be moduled on - the Crossref API). - - Implementations must provide the push results function. - """ - - def __init__(self, kafka_hosts, produce_topic, state_topic, api_host_url, - contact_email, start_date=None, end_date=None): - self.loop_sleep = 60*60 # how long to wait, in seconds, between date checks - self.api_batch_size = 50 - self.api_host_url = api_host_url - self.produce_topic = produce_topic - self.state_topic = state_topic - self.contact_email = contact_email - self.kafka = KafkaClient(hosts=kafka_hosts, broker_version="1.0.0") - self.is_update_filter = None - self.update_filter_name = "index" - - # these are both optional, and should be datetime.date - self.start_date = start_date - self.end_date = end_date - - def get_latest_date(self): - - state_topic = self.kafka.topics[self.state_topic] - latest = most_recent_message(state_topic) - if latest: - latest = datetime.datetime.strptime(latest.decode('utf-8'), DATE_FMT).date() - print("Latest date found: {}".format(latest)) - return latest - - def fetch_date(self, date): - - state_topic = self.kafka.topics[self.state_topic] - produce_topic = self.kafka.topics[self.produce_topic] - - date_str = date.strftime(DATE_FMT) - filter_param = 'from-{index}-date:{},until-{index}-date:{}'.format( - date_str, date_str, index=self.update_filter_name) - if self.is_update_filter is not None: - filter_param += ',is_update:{}'.format(bool(is_update)) - params = { - 'filter': filter_param, - 'rows': self.api_batch_size, - 'cursor': '*', - } - headers = { - 'User-Agent': 'fatcat_tools/0.1.0 (https://fatcat.wiki; mailto:{}) python-requests'.format(self.contact_email), - } - count = 0 - with produce_topic.get_producer() as producer: - while True: - http_resp = requests.get(self.api_host_url, params, headers=headers) - assert http_resp.status_code is 200 - resp = http_resp.json() - items = resp['message']['items'] - count += len(items) - print("... got {} ({} of {}) in {}".format(len(items), count, - resp['message']['total-results']), http_resp.elapsed) - #print(json.dumps(resp)) - for work in items: - producer.produce(json.dumps(work).encode('utf-8')) - if len(items) < params['rows']: - break - params['cursor'] = resp['message']['next-cursor'] - - # record our completion state - with state_topic.get_sync_producer() as producer: - producer.produce(date.strftime(DATE_FMT).encode('utf-8')) - - - def run_once(self): - today_utc = datetime.datetime.utcnow().date() - if self.start_date is None: - self.start_date = self.get_latest_date() - if self.start_date: - # if we are continuing, start day after last success - self.start_date = self.start_date + datetime.timedelta(days=1) - if self.start_date is None: - # bootstrap to yesterday (don't want to start on today until it's over) - self.start_date = datetime.datetime.utcnow().date() - if self.end_date is None: - # bootstrap to yesterday (don't want to start on today until it's over) - self.end_date = today_utc - datetime.timedelta(days=1) - print("Harvesting from {} through {}".format(self.start_date, self.end_date)) - current = self.start_date - while current <= self.end_date: - print("Fetching DOIs updated on {} (UTC)".format(current)) - self.fetch_date(current) - current += datetime.timedelta(days=1) - print("Crossref DOI ingest caught up through {}".format(self.end_date)) - return self.end_date - - def run_loop(self): - while True: - last = self.run_once() - self.start_date = last - self.end_date = None - print("Sleeping {} seconds...".format(self.loop_sleep)) - time.sleep(self.loop_sleep()) - diff --git a/python/fatcat_worker.py b/python/fatcat_worker.py index f68b0606..4c52d2c1 100755 --- a/python/fatcat_worker.py +++ b/python/fatcat_worker.py @@ -5,7 +5,6 @@ import argparse import datetime from fatcat_tools.workers.changelog import FatcatChangelogWorker, FatcatEntityUpdatesWorker from fatcat_tools.workers.elastic import FatcatElasticReleaseWorker -from fatcat_tools.harvest import HarvestCrossrefWorker def run_changelog(args): topic = "fatcat-{}.changelog".format(args.env) @@ -27,26 +26,6 @@ def run_elastic_release(args): elastic_index=args.elastic_index) worker.run() -def run_harvest_crossref(args): - worker = HarvestCrossrefWorker( - args.kafka_hosts, - produce_topic="fatcat-{}.crossref".format(args.env), - state_topic="fatcat-{}.crossref-state".format(args.env), - contact_email=args.contact_email, - start_date=args.start_date, - end_date=args.end_date) - worker.run_once() - -def run_harvest_datacite(args): - worker = HarvestDataciteWorker( - args.kafka_hosts, - produce_topic="fatcat-{}.datacite".format(args.env), - state_topic="fatcat-{}.datacite-state".format(args.env), - contact_email=args.contact_email, - start_date=args.start_date, - end_date=args.end_date) - worker.run_once() - def main(): parser = argparse.ArgumentParser() parser.add_argument('--debug', @@ -81,21 +60,6 @@ def main(): help="elasticsearch index to push into", default="fatcat") - def mkdate(raw): - return datetime.datetime.strptime(raw, "%Y-%m-%d").date() - - sub_harvest_crossref = subparsers.add_parser('harvest-crossref') - sub_harvest_crossref.set_defaults(func=run_harvest_crossref) - sub_harvest_crossref.add_argument('--contact-email', - default="undefined", # better? - help="contact email to use in API header") - sub_harvest_crossref.add_argument('--start-date', - default=None, type=mkdate, - help="begining of harvest period") - sub_harvest_crossref.add_argument('--end-date', - default=None, type=mkdate, - help="end of harvest period") - args = parser.parse_args() if not args.__dict__.get("func"): print("tell me what to do!") |