summaryrefslogtreecommitdiffstats
path: root/python/fatcat_tools/harvest/ingest_common.py
diff options
context:
space:
mode:
authorBryan Newbold <bnewbold@robocracy.org>2018-11-15 12:21:45 -0800
committerBryan Newbold <bnewbold@robocracy.org>2018-11-15 12:21:45 -0800
commitf21d28315aa632cdb9f84ea8787762d1e27b4310 (patch)
tree58c6ad0d34260e1d656247ddffa8ee047a8eb520 /python/fatcat_tools/harvest/ingest_common.py
parent5c47be5b0468c13db868548dccfdf1af50813b0c (diff)
downloadfatcat-f21d28315aa632cdb9f84ea8787762d1e27b4310.tar.gz
fatcat-f21d28315aa632cdb9f84ea8787762d1e27b4310.zip
refactoring harvesters
Diffstat (limited to 'python/fatcat_tools/harvest/ingest_common.py')
-rw-r--r--python/fatcat_tools/harvest/ingest_common.py127
1 files changed, 0 insertions, 127 deletions
diff --git a/python/fatcat_tools/harvest/ingest_common.py b/python/fatcat_tools/harvest/ingest_common.py
deleted file mode 100644
index 67ff3dc3..00000000
--- a/python/fatcat_tools/harvest/ingest_common.py
+++ /dev/null
@@ -1,127 +0,0 @@
-
-"""
-logic:
-- on start, fetch latest date from state feed
-- in a function (unit-testable), decide which dates to ingest
-- for each date needing update:
- - start a loop for just that date, using resumption token for this query
- - when done, publish to state feed, with immediate sync
-"""
-
-import re
-import sys
-import csv
-import json
-import requests
-import itertools
-import datetime
-from pykafka import KafkaClient
-
-from fatcat_tools.workers.worker_common import most_recent_message
-
-DATE_FMT = "%Y-%m-%d"
-
-class DoiApiHarvest:
- """
- This class supports core features for both the Crossref and Datacite REST
- APIs for fetching updated metadata (the Datacite API seems to be moduled on
- the Crossref API).
-
- Implementations must provide the push results function.
- """
-
- def __init__(self, kafka_hosts, produce_topic, state_topic, api_host_url,
- contact_email, start_date=None, end_date=None):
- self.loop_sleep = 60*60 # how long to wait, in seconds, between date checks
- self.api_batch_size = 50
- self.api_host_url = api_host_url
- self.produce_topic = produce_topic
- self.state_topic = state_topic
- self.contact_email = contact_email
- self.kafka = KafkaClient(hosts=kafka_hosts, broker_version="1.0.0")
- self.is_update_filter = None
- self.update_filter_name = "index"
-
- # these are both optional, and should be datetime.date
- self.start_date = start_date
- self.end_date = end_date
-
- def get_latest_date(self):
-
- state_topic = self.kafka.topics[self.state_topic]
- latest = most_recent_message(state_topic)
- if latest:
- latest = datetime.datetime.strptime(latest.decode('utf-8'), DATE_FMT).date()
- print("Latest date found: {}".format(latest))
- return latest
-
- def fetch_date(self, date):
-
- state_topic = self.kafka.topics[self.state_topic]
- produce_topic = self.kafka.topics[self.produce_topic]
-
- date_str = date.strftime(DATE_FMT)
- filter_param = 'from-{index}-date:{},until-{index}-date:{}'.format(
- date_str, date_str, index=self.update_filter_name)
- if self.is_update_filter is not None:
- filter_param += ',is_update:{}'.format(bool(is_update))
- params = {
- 'filter': filter_param,
- 'rows': self.api_batch_size,
- 'cursor': '*',
- }
- headers = {
- 'User-Agent': 'fatcat_tools/0.1.0 (https://fatcat.wiki; mailto:{}) python-requests'.format(self.contact_email),
- }
- count = 0
- with produce_topic.get_producer() as producer:
- while True:
- http_resp = requests.get(self.api_host_url, params, headers=headers)
- assert http_resp.status_code is 200
- resp = http_resp.json()
- items = resp['message']['items']
- count += len(items)
- print("... got {} ({} of {}) in {}".format(len(items), count,
- resp['message']['total-results']), http_resp.elapsed)
- #print(json.dumps(resp))
- for work in items:
- producer.produce(json.dumps(work).encode('utf-8'))
- if len(items) < params['rows']:
- break
- params['cursor'] = resp['message']['next-cursor']
-
- # record our completion state
- with state_topic.get_sync_producer() as producer:
- producer.produce(date.strftime(DATE_FMT).encode('utf-8'))
-
-
- def run_once(self):
- today_utc = datetime.datetime.utcnow().date()
- if self.start_date is None:
- self.start_date = self.get_latest_date()
- if self.start_date:
- # if we are continuing, start day after last success
- self.start_date = self.start_date + datetime.timedelta(days=1)
- if self.start_date is None:
- # bootstrap to yesterday (don't want to start on today until it's over)
- self.start_date = datetime.datetime.utcnow().date()
- if self.end_date is None:
- # bootstrap to yesterday (don't want to start on today until it's over)
- self.end_date = today_utc - datetime.timedelta(days=1)
- print("Harvesting from {} through {}".format(self.start_date, self.end_date))
- current = self.start_date
- while current <= self.end_date:
- print("Fetching DOIs updated on {} (UTC)".format(current))
- self.fetch_date(current)
- current += datetime.timedelta(days=1)
- print("Crossref DOI ingest caught up through {}".format(self.end_date))
- return self.end_date
-
- def run_loop(self):
- while True:
- last = self.run_once()
- self.start_date = last
- self.end_date = None
- print("Sleeping {} seconds...".format(self.loop_sleep))
- time.sleep(self.loop_sleep())
-