aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorBryan Newbold <bnewbold@robocracy.org>2018-11-15 12:21:45 -0800
committerBryan Newbold <bnewbold@robocracy.org>2018-11-15 12:21:45 -0800
commitf21d28315aa632cdb9f84ea8787762d1e27b4310 (patch)
tree58c6ad0d34260e1d656247ddffa8ee047a8eb520
parent5c47be5b0468c13db868548dccfdf1af50813b0c (diff)
downloadfatcat-f21d28315aa632cdb9f84ea8787762d1e27b4310.tar.gz
fatcat-f21d28315aa632cdb9f84ea8787762d1e27b4310.zip
refactoring harvesters
-rw-r--r--python/README_harvest.md21
-rwxr-xr-xpython/fatcat_harvest.py66
-rw-r--r--python/fatcat_tools/harvest/__init__.py2
-rw-r--r--python/fatcat_tools/harvest/crossrefish.py39
-rw-r--r--python/fatcat_tools/harvest/datacite.py29
-rw-r--r--python/fatcat_tools/harvest/doi_registrars.py209
-rw-r--r--python/fatcat_tools/harvest/ingest_common.py127
-rwxr-xr-xpython/fatcat_worker.py36
8 files changed, 297 insertions, 232 deletions
diff --git a/python/README_harvest.md b/python/README_harvest.md
new file mode 100644
index 00000000..e308b90c
--- /dev/null
+++ b/python/README_harvest.md
@@ -0,0 +1,21 @@
+
+## State Refactoring
+
+Harvesters should/will work on fixed window sizes.
+
+Serialize state as JSON, publish to a state topic. On load, iterate through the
+full state topic to construct recent history, and prepare a set of windows that
+need harvesting, then iterate over these.
+
+If running as continuous process, will retain state and don't need to
+re-iterate; if cron/one-off, do need to re-iterate.
+
+To start, do even OAI-PMH as dates.
+
+## "Bootstrapping" with bulk metadata
+
+1. start continuous update harvesting at time A
+2. do a bulk dump starting at time B1 (later than A, with a margin), completing at B2
+3. with database starting from scratch at C (after B2), load full bulk
+ snapshot, then run all updates since A
+
diff --git a/python/fatcat_harvest.py b/python/fatcat_harvest.py
new file mode 100755
index 00000000..dd98d22a
--- /dev/null
+++ b/python/fatcat_harvest.py
@@ -0,0 +1,66 @@
+#!/usr/bin/env python3
+
+import sys
+import argparse
+import datetime
+from fatcat_tools.harvest import HarvestCrossrefWorker, HarvestDataciteWorker
+
+def run_crossref(args):
+ worker = HarvestCrossrefWorker(
+ args.kafka_hosts,
+ produce_topic="fatcat-{}.crossref".format(args.env),
+ state_topic="fatcat-{}.crossref-state".format(args.env),
+ contact_email=args.contact_email,
+ start_date=args.start_date,
+ end_date=args.end_date)
+ worker.run_once()
+
+def run_datacite(args):
+ worker = HarvestDataciteWorker(
+ args.kafka_hosts,
+ produce_topic="fatcat-{}.datacite".format(args.env),
+ state_topic="fatcat-{}.datacite-state".format(args.env),
+ contact_email=args.contact_email,
+ start_date=args.start_date,
+ end_date=args.end_date)
+ worker.run_once()
+
+def mkdate(raw):
+ return datetime.datetime.strptime(raw, "%Y-%m-%d").date()
+
+def main():
+ parser = argparse.ArgumentParser()
+ parser.add_argument('--debug',
+ action='store_true',
+ help="enable debug logging")
+ parser.add_argument('--kafka-hosts',
+ default="localhost:9092",
+ help="list of Kafka brokers (host/port) to use")
+ parser.add_argument('--env',
+ default="qa",
+ help="Kafka topic namespace to use (eg, prod, qa)")
+ parser.add_argument('--start-date',
+ default=None, type=mkdate,
+ help="begining of harvest period")
+ parser.add_argument('--end-date',
+ default=None, type=mkdate,
+ help="end of harvest period")
+ parser.add_argument('--contact-email',
+ default="undefined", # better?
+ help="contact email to use in API header")
+ subparsers = parser.add_subparsers()
+
+ sub_crossref = subparsers.add_parser('crossref')
+ sub_crossref.set_defaults(func=run_crossref)
+
+ sub_datacite = subparsers.add_parser('datacite')
+ sub_datacite.set_defaults(func=run_datacite)
+
+ args = parser.parse_args()
+ if not args.__dict__.get("func"):
+ print("tell me what to do!")
+ sys.exit(-1)
+ args.func(args)
+
+if __name__ == '__main__':
+ main()
diff --git a/python/fatcat_tools/harvest/__init__.py b/python/fatcat_tools/harvest/__init__.py
index 85034f04..e1bde753 100644
--- a/python/fatcat_tools/harvest/__init__.py
+++ b/python/fatcat_tools/harvest/__init__.py
@@ -1,2 +1,2 @@
-from .crossrefish import HarvestCrossrefWorker
+from .doi_registrars import HarvestCrossrefWorker, HarvestDataciteWorker
diff --git a/python/fatcat_tools/harvest/crossrefish.py b/python/fatcat_tools/harvest/crossrefish.py
deleted file mode 100644
index a88cedbd..00000000
--- a/python/fatcat_tools/harvest/crossrefish.py
+++ /dev/null
@@ -1,39 +0,0 @@
-
-"""
-Notes on crossref API:
-
-- from-index-date is the updated time
-- is-update can be false, to catch only new or only old works
-
-https://api.crossref.org/works?filter=from-index-date:2018-11-14,is-update:false&rows=2
-
-I think the design is going to have to be a cronjob or long-running job
-(with long sleeps) which publishes "success through" to a separate state
-queue, as simple YYYY-MM-DD strings.
-
-Within a day, will need to use a resumption token. Maybe should use a
-crossref library... meh.
-
-will want to have some mechanism in kafka consumer (pushing to fatcat) to group
-in batches as well. maybe even pass through as batches? or just use timeouts on
-iteration.
-"""
-
-from fatcat_tools.harvest.ingest_common import DoiApiHarvest
-
-class HarvestCrossrefWorker(DoiApiHarvest):
-
- def __init__(self, kafka_hosts, produce_topic, state_topic, contact_email,
- api_host_url="https://api.crossref.org/works",
- is_update_filter=None,
- start_date=None, end_date=None):
- super().__init__(kafka_hosts=kafka_hosts,
- produce_topic=produce_topic,
- state_topic=state_topic,
- api_host_url=api_host_url,
- contact_email=contact_email,
- start_date=start_date,
- end_date=end_date)
-
- self.is_update_filter = is_update_filter
-
diff --git a/python/fatcat_tools/harvest/datacite.py b/python/fatcat_tools/harvest/datacite.py
deleted file mode 100644
index 12860810..00000000
--- a/python/fatcat_tools/harvest/datacite.py
+++ /dev/null
@@ -1,29 +0,0 @@
-
-"""
-datacite has a REST API as well as OAI-PMH endpoint.
-
-have about 8 million
-
-bulk export notes: https://github.com/datacite/datacite/issues/188
-
-fundamentally, very similar to crossref. don't have a scrape... maybe
-could/should use this script for that, and dump to JSON?
-"""
-
-from fatcat_tools.harvest.ingest_common import DoiApiHarvest
-
-class HarvestDataciteWorker(DoiApiHarvest):
-
- def __init__(self, kafka_hosts, produce_topic, state_topic, contact_email,
- api_host_url="https://api.datacite.org/works",
- start_date=None, end_date=None):
- super().__init__(kafka_hosts=kafka_hosts,
- produce_topic=produce_topic,
- state_topic=state_topic,
- api_host_url=api_host_url,
- contact_email=contact_email,
- start_date=start_date,
- end_date=end_date)
-
- self.update_filter_name = "update"
-
diff --git a/python/fatcat_tools/harvest/doi_registrars.py b/python/fatcat_tools/harvest/doi_registrars.py
new file mode 100644
index 00000000..1a6807d2
--- /dev/null
+++ b/python/fatcat_tools/harvest/doi_registrars.py
@@ -0,0 +1,209 @@
+
+import re
+import sys
+import csv
+import json
+import requests
+import itertools
+import datetime
+from pykafka import KafkaClient
+
+from fatcat_tools.workers.worker_common import most_recent_message
+
+DATE_FMT = "%Y-%m-%d"
+
+
+class HarvestCrossrefWorker:
+ """
+ Notes on crossref API:
+
+ - from-index-date is the updated time
+ - is-update can be false, to catch only new or only old works
+
+ https://api.crossref.org/works?filter=from-index-date:2018-11-14,is-update:false&rows=2
+
+ I think the design is going to have to be a cronjob or long-running job
+ (with long sleeps) which publishes "success through" to a separate state
+ queue, as simple YYYY-MM-DD strings.
+
+ Within a day, will need to use a resumption token. Maybe should use a
+ crossref library... meh.
+
+ will want to have some mechanism in kafka consumer (pushing to fatcat) to group
+ in batches as well. maybe even pass through as batches? or just use timeouts on
+ iteration.
+
+ logic of this worker:
+ - on start, fetch latest date from state feed
+ - in a function (unit-testable), decide which dates to ingest
+ - for each date needing update:
+ - start a loop for just that date, using resumption token for this query
+ - when done, publish to state feed, with immediate sync
+
+ TODO: what sort of parallelism? I guess multi-processing on dates, but need
+ to be careful how state is serialized back into kafka.
+ """
+
+
+ def __init__(self, kafka_hosts, produce_topic, state_topic, contact_email,
+ api_host_url="https://api.crossref.org/works", start_date=None,
+ end_date=None, is_update_filter=None):
+
+ self.api_host_url = api_host_url
+ self.produce_topic = produce_topic
+ self.state_topic = state_topic
+ self.contact_email = contact_email
+ self.kafka = KafkaClient(hosts=kafka_hosts, broker_version="1.0.0")
+ self.is_update_filter = is_update_filter
+
+ # these are both optional, and should be datetime.date
+ self.start_date = start_date
+ self.end_date = end_date
+
+ self.loop_sleep = 60*60 # how long to wait, in seconds, between date checks
+ self.api_batch_size = 50
+ # for crossref, it's "from-index-date"
+ self.name = "Crossref"
+
+ def get_latest_date(self):
+
+ state_topic = self.kafka.topics[self.state_topic]
+ latest = most_recent_message(state_topic)
+ if latest:
+ latest = datetime.datetime.strptime(latest.decode('utf-8'), DATE_FMT).date()
+ print("Latest date found: {}".format(latest))
+ return latest
+
+ def params(self, date_str):
+ filter_param = 'from-index-date:{},until-index-date:{}'.format(
+ date_str, date_str)
+ if self.is_update_filter is not None:
+ filter_param += ',is_update:{}'.format(bool(self.is_update_filter))
+ params = {
+ 'filter': filter_param,
+ 'rows': self.api_batch_size,
+ 'cursor': '*',
+ }
+
+ def update_params(self, params, resp):
+ params['cursor'] = resp['message']['next-cursor']
+ return params
+
+ def fetch_date(self, date):
+
+ state_topic = self.kafka.topics[self.state_topic]
+ produce_topic = self.kafka.topics[self.produce_topic]
+
+ date_str = date.strftime(DATE_FMT)
+ params = self.params(date_str)
+ headers = {
+ 'User-Agent': 'fatcat_tools/0.1.0 (https://fatcat.wiki; mailto:{}) python-requests'.format(self.contact_email),
+ }
+ count = 0
+ with produce_topic.get_producer() as producer:
+ while True:
+ http_resp = requests.get(self.api_host_url, params, headers=headers)
+ if http_resp.status_code is 503:
+ # crud backoff
+ print("got HTTP {}, pausing for 30 seconds".format(http_resp.status_code))
+ time.sleep(30.0)
+ continue
+ assert http_resp.status_code is 200
+ resp = http_resp.json()
+ items = self.extract_items(resp)
+ count += len(items)
+ print("... got {} ({} of {}) in {}".format(len(items), count,
+ self.extract_total(resp), http_resp.elapsed))
+ #print(json.dumps(resp))
+ for work in items:
+ producer.produce(json.dumps(work).encode('utf-8'))
+ if len(items) < self.api_batch_size:
+ break
+ params = self.update_params(params, resp)
+
+ # record our completion state
+ with state_topic.get_sync_producer() as producer:
+ producer.produce(date.strftime(DATE_FMT).encode('utf-8'))
+
+ def extract_items(self, resp):
+ return resp['message']['items']
+
+ def extract_total(self, resp):
+ return resp['message']['total-results']
+
+ def run_once(self):
+ today_utc = datetime.datetime.utcnow().date()
+ if self.start_date is None:
+ self.start_date = self.get_latest_date()
+ if self.start_date:
+ # if we are continuing, start day after last success
+ self.start_date = self.start_date + datetime.timedelta(days=1)
+ if self.start_date is None:
+ # bootstrap to yesterday (don't want to start on today until it's over)
+ self.start_date = datetime.datetime.utcnow().date()
+ if self.end_date is None:
+ # bootstrap to yesterday (don't want to start on today until it's over)
+ self.end_date = today_utc - datetime.timedelta(days=1)
+ print("Harvesting from {} through {}".format(self.start_date, self.end_date))
+ current = self.start_date
+ while current <= self.end_date:
+ print("Fetching DOIs updated on {} (UTC)".format(current))
+ self.fetch_date(current)
+ current += datetime.timedelta(days=1)
+ print("{} DOI ingest caught up through {}".format(self.name, self.end_date))
+ return self.end_date
+
+ def run_loop(self):
+ while True:
+ last = self.run_once()
+ self.start_date = last
+ self.end_date = None
+ print("Sleeping {} seconds...".format(self.loop_sleep))
+ time.sleep(self.loop_sleep())
+
+
+
+class HarvestDataciteWorker(HarvestCrossrefWorker):
+ """
+ datacite has a REST API as well as OAI-PMH endpoint.
+
+ have about 8 million
+
+ bulk export notes: https://github.com/datacite/datacite/issues/188
+
+ fundamentally, very similar to crossref. don't have a scrape... maybe
+ could/should use this script for that, and dump to JSON?
+ """
+
+ def __init__(self, kafka_hosts, produce_topic, state_topic, contact_email,
+ api_host_url="https://api.datacite.org/works",
+ start_date=None, end_date=None):
+ super().__init__(kafka_hosts=kafka_hosts,
+ produce_topic=produce_topic,
+ state_topic=state_topic,
+ api_host_url=api_host_url,
+ contact_email=contact_email,
+ start_date=start_date,
+ end_date=end_date)
+
+ # for datecite, it's "from-update-date"
+ self.name = "Datacite"
+
+ def params(self, date_str):
+ return {
+ 'from-update-date': date_str,
+ 'until-update-date': date_str,
+ 'page[size]': self.api_batch_size,
+ 'page[number]': 1,
+ }
+
+ def extract_items(self, resp):
+ return resp['data']
+
+ def extract_total(self, resp):
+ return resp['meta']['total']
+
+ def update_params(self, params, resp):
+ params['page[number]'] = resp['meta']['page'] + 1
+ return params
+
diff --git a/python/fatcat_tools/harvest/ingest_common.py b/python/fatcat_tools/harvest/ingest_common.py
deleted file mode 100644
index 67ff3dc3..00000000
--- a/python/fatcat_tools/harvest/ingest_common.py
+++ /dev/null
@@ -1,127 +0,0 @@
-
-"""
-logic:
-- on start, fetch latest date from state feed
-- in a function (unit-testable), decide which dates to ingest
-- for each date needing update:
- - start a loop for just that date, using resumption token for this query
- - when done, publish to state feed, with immediate sync
-"""
-
-import re
-import sys
-import csv
-import json
-import requests
-import itertools
-import datetime
-from pykafka import KafkaClient
-
-from fatcat_tools.workers.worker_common import most_recent_message
-
-DATE_FMT = "%Y-%m-%d"
-
-class DoiApiHarvest:
- """
- This class supports core features for both the Crossref and Datacite REST
- APIs for fetching updated metadata (the Datacite API seems to be moduled on
- the Crossref API).
-
- Implementations must provide the push results function.
- """
-
- def __init__(self, kafka_hosts, produce_topic, state_topic, api_host_url,
- contact_email, start_date=None, end_date=None):
- self.loop_sleep = 60*60 # how long to wait, in seconds, between date checks
- self.api_batch_size = 50
- self.api_host_url = api_host_url
- self.produce_topic = produce_topic
- self.state_topic = state_topic
- self.contact_email = contact_email
- self.kafka = KafkaClient(hosts=kafka_hosts, broker_version="1.0.0")
- self.is_update_filter = None
- self.update_filter_name = "index"
-
- # these are both optional, and should be datetime.date
- self.start_date = start_date
- self.end_date = end_date
-
- def get_latest_date(self):
-
- state_topic = self.kafka.topics[self.state_topic]
- latest = most_recent_message(state_topic)
- if latest:
- latest = datetime.datetime.strptime(latest.decode('utf-8'), DATE_FMT).date()
- print("Latest date found: {}".format(latest))
- return latest
-
- def fetch_date(self, date):
-
- state_topic = self.kafka.topics[self.state_topic]
- produce_topic = self.kafka.topics[self.produce_topic]
-
- date_str = date.strftime(DATE_FMT)
- filter_param = 'from-{index}-date:{},until-{index}-date:{}'.format(
- date_str, date_str, index=self.update_filter_name)
- if self.is_update_filter is not None:
- filter_param += ',is_update:{}'.format(bool(is_update))
- params = {
- 'filter': filter_param,
- 'rows': self.api_batch_size,
- 'cursor': '*',
- }
- headers = {
- 'User-Agent': 'fatcat_tools/0.1.0 (https://fatcat.wiki; mailto:{}) python-requests'.format(self.contact_email),
- }
- count = 0
- with produce_topic.get_producer() as producer:
- while True:
- http_resp = requests.get(self.api_host_url, params, headers=headers)
- assert http_resp.status_code is 200
- resp = http_resp.json()
- items = resp['message']['items']
- count += len(items)
- print("... got {} ({} of {}) in {}".format(len(items), count,
- resp['message']['total-results']), http_resp.elapsed)
- #print(json.dumps(resp))
- for work in items:
- producer.produce(json.dumps(work).encode('utf-8'))
- if len(items) < params['rows']:
- break
- params['cursor'] = resp['message']['next-cursor']
-
- # record our completion state
- with state_topic.get_sync_producer() as producer:
- producer.produce(date.strftime(DATE_FMT).encode('utf-8'))
-
-
- def run_once(self):
- today_utc = datetime.datetime.utcnow().date()
- if self.start_date is None:
- self.start_date = self.get_latest_date()
- if self.start_date:
- # if we are continuing, start day after last success
- self.start_date = self.start_date + datetime.timedelta(days=1)
- if self.start_date is None:
- # bootstrap to yesterday (don't want to start on today until it's over)
- self.start_date = datetime.datetime.utcnow().date()
- if self.end_date is None:
- # bootstrap to yesterday (don't want to start on today until it's over)
- self.end_date = today_utc - datetime.timedelta(days=1)
- print("Harvesting from {} through {}".format(self.start_date, self.end_date))
- current = self.start_date
- while current <= self.end_date:
- print("Fetching DOIs updated on {} (UTC)".format(current))
- self.fetch_date(current)
- current += datetime.timedelta(days=1)
- print("Crossref DOI ingest caught up through {}".format(self.end_date))
- return self.end_date
-
- def run_loop(self):
- while True:
- last = self.run_once()
- self.start_date = last
- self.end_date = None
- print("Sleeping {} seconds...".format(self.loop_sleep))
- time.sleep(self.loop_sleep())
-
diff --git a/python/fatcat_worker.py b/python/fatcat_worker.py
index f68b0606..4c52d2c1 100755
--- a/python/fatcat_worker.py
+++ b/python/fatcat_worker.py
@@ -5,7 +5,6 @@ import argparse
import datetime
from fatcat_tools.workers.changelog import FatcatChangelogWorker, FatcatEntityUpdatesWorker
from fatcat_tools.workers.elastic import FatcatElasticReleaseWorker
-from fatcat_tools.harvest import HarvestCrossrefWorker
def run_changelog(args):
topic = "fatcat-{}.changelog".format(args.env)
@@ -27,26 +26,6 @@ def run_elastic_release(args):
elastic_index=args.elastic_index)
worker.run()
-def run_harvest_crossref(args):
- worker = HarvestCrossrefWorker(
- args.kafka_hosts,
- produce_topic="fatcat-{}.crossref".format(args.env),
- state_topic="fatcat-{}.crossref-state".format(args.env),
- contact_email=args.contact_email,
- start_date=args.start_date,
- end_date=args.end_date)
- worker.run_once()
-
-def run_harvest_datacite(args):
- worker = HarvestDataciteWorker(
- args.kafka_hosts,
- produce_topic="fatcat-{}.datacite".format(args.env),
- state_topic="fatcat-{}.datacite-state".format(args.env),
- contact_email=args.contact_email,
- start_date=args.start_date,
- end_date=args.end_date)
- worker.run_once()
-
def main():
parser = argparse.ArgumentParser()
parser.add_argument('--debug',
@@ -81,21 +60,6 @@ def main():
help="elasticsearch index to push into",
default="fatcat")
- def mkdate(raw):
- return datetime.datetime.strptime(raw, "%Y-%m-%d").date()
-
- sub_harvest_crossref = subparsers.add_parser('harvest-crossref')
- sub_harvest_crossref.set_defaults(func=run_harvest_crossref)
- sub_harvest_crossref.add_argument('--contact-email',
- default="undefined", # better?
- help="contact email to use in API header")
- sub_harvest_crossref.add_argument('--start-date',
- default=None, type=mkdate,
- help="begining of harvest period")
- sub_harvest_crossref.add_argument('--end-date',
- default=None, type=mkdate,
- help="end of harvest period")
-
args = parser.parse_args()
if not args.__dict__.get("func"):
print("tell me what to do!")