aboutsummaryrefslogtreecommitdiffstats
path: root/python/fatcat_tools/harvest/ingest_common.py
blob: 67ff3dc3c265d1d2fd7634ed7253a3e4f9a30a69 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127

"""
logic:
- on start, fetch latest date from state feed
- in a function (unit-testable), decide which dates to ingest
- for each date needing update:
    - start a loop for just that date, using resumption token for this query
    - when done, publish to state feed, with immediate sync
"""

import re
import sys
import csv
import json
import requests
import itertools
import datetime
from pykafka import KafkaClient

from fatcat_tools.workers.worker_common import most_recent_message

DATE_FMT = "%Y-%m-%d"

class DoiApiHarvest:
    """
    This class supports core features for both the Crossref and Datacite REST
    APIs for fetching updated metadata (the Datacite API seems to be moduled on
    the Crossref API).

    Implementations must provide the push results function.
    """

    def __init__(self, kafka_hosts, produce_topic, state_topic, api_host_url,
            contact_email, start_date=None, end_date=None):
        self.loop_sleep = 60*60 # how long to wait, in seconds, between date checks
        self.api_batch_size = 50
        self.api_host_url = api_host_url
        self.produce_topic = produce_topic
        self.state_topic = state_topic
        self.contact_email = contact_email
        self.kafka = KafkaClient(hosts=kafka_hosts, broker_version="1.0.0")
        self.is_update_filter = None
        self.update_filter_name = "index"

        # these are both optional, and should be datetime.date
        self.start_date = start_date
        self.end_date = end_date

    def get_latest_date(self):

        state_topic = self.kafka.topics[self.state_topic]
        latest = most_recent_message(state_topic)
        if latest:
            latest = datetime.datetime.strptime(latest.decode('utf-8'), DATE_FMT).date()
        print("Latest date found: {}".format(latest))
        return latest

    def fetch_date(self, date):

        state_topic = self.kafka.topics[self.state_topic]
        produce_topic = self.kafka.topics[self.produce_topic]
 
        date_str = date.strftime(DATE_FMT)
        filter_param = 'from-{index}-date:{},until-{index}-date:{}'.format(
            date_str, date_str, index=self.update_filter_name)
        if self.is_update_filter is not None:
            filter_param += ',is_update:{}'.format(bool(is_update))
        params = {
            'filter': filter_param,
            'rows': self.api_batch_size,
            'cursor': '*',
        }
        headers = {
            'User-Agent': 'fatcat_tools/0.1.0 (https://fatcat.wiki; mailto:{}) python-requests'.format(self.contact_email),
        }
        count = 0
        with produce_topic.get_producer() as producer:
            while True:
                http_resp = requests.get(self.api_host_url, params, headers=headers)
                assert http_resp.status_code is 200
                resp = http_resp.json()
                items = resp['message']['items']
                count += len(items)
                print("... got {} ({} of {}) in {}".format(len(items), count,
                    resp['message']['total-results']), http_resp.elapsed)
                #print(json.dumps(resp))
                for work in items:
                    producer.produce(json.dumps(work).encode('utf-8'))
                if len(items) < params['rows']:
                    break
                params['cursor'] = resp['message']['next-cursor']

        # record our completion state
        with state_topic.get_sync_producer() as producer:
            producer.produce(date.strftime(DATE_FMT).encode('utf-8'))
        

    def run_once(self):
        today_utc = datetime.datetime.utcnow().date()
        if self.start_date is None:
            self.start_date = self.get_latest_date()
            if self.start_date: 
                # if we are continuing, start day after last success
                self.start_date = self.start_date + datetime.timedelta(days=1)
        if self.start_date is None:
            # bootstrap to yesterday (don't want to start on today until it's over)
            self.start_date = datetime.datetime.utcnow().date()
        if self.end_date is None:
            # bootstrap to yesterday (don't want to start on today until it's over)
            self.end_date = today_utc - datetime.timedelta(days=1)
        print("Harvesting from {} through {}".format(self.start_date, self.end_date))
        current = self.start_date
        while current <= self.end_date:
            print("Fetching DOIs updated on {} (UTC)".format(current))
            self.fetch_date(current)
            current += datetime.timedelta(days=1)
        print("Crossref DOI ingest caught up through {}".format(self.end_date))
        return self.end_date

    def run_loop(self):
        while True:
            last = self.run_once()
            self.start_date = last
            self.end_date = None
            print("Sleeping {} seconds...".format(self.loop_sleep))
            time.sleep(self.loop_sleep())