diff options
| author | Bryan Newbold <bnewbold@robocracy.org> | 2018-11-19 23:04:18 -0800 | 
|---|---|---|
| committer | Bryan Newbold <bnewbold@robocracy.org> | 2018-11-19 23:04:18 -0800 | 
| commit | e590eec544ab6f2e54e8770f01e64eef3158fdaa (patch) | |
| tree | 5f1fe36a489e7e42642d96a3a719dcbd74d60901 /python/fatcat_tools | |
| parent | 65bdebea35f2ab3c9c8b0f8a8b0a9a577a36bee2 (diff) | |
| download | fatcat-e590eec544ab6f2e54e8770f01e64eef3158fdaa.tar.gz fatcat-e590eec544ab6f2e54e8770f01e64eef3158fdaa.zip | |
initial OAI-PMH harvesters
Diffstat (limited to 'python/fatcat_tools')
| -rw-r--r-- | python/fatcat_tools/harvest/__init__.py | 2 | ||||
| -rw-r--r-- | python/fatcat_tools/harvest/doi_registrars.py | 13 | ||||
| -rw-r--r-- | python/fatcat_tools/harvest/oaipmh.py | 157 | 
3 files changed, 167 insertions, 5 deletions
| diff --git a/python/fatcat_tools/harvest/__init__.py b/python/fatcat_tools/harvest/__init__.py index 4de2cbde..7d814696 100644 --- a/python/fatcat_tools/harvest/__init__.py +++ b/python/fatcat_tools/harvest/__init__.py @@ -1,3 +1,5 @@  from .harvest_common import HarvestState  from .doi_registrars import HarvestCrossrefWorker, HarvestDataciteWorker +from .oaipmh import HarvestArxivWorker, HarvestPubmedWorker,\ +    HarvestDoajArticleWorker, HarvestDoajJournalWorker diff --git a/python/fatcat_tools/harvest/doi_registrars.py b/python/fatcat_tools/harvest/doi_registrars.py index d5e4b7ec..10492c17 100644 --- a/python/fatcat_tools/harvest/doi_registrars.py +++ b/python/fatcat_tools/harvest/doi_registrars.py @@ -10,15 +10,13 @@ import datetime  from pykafka import KafkaClient  from fatcat_tools.workers import most_recent_message -from .harvest_common import HarvestState +from .harvest_common import HarvestState, DATE_FMT  # Skip pylint due to:  #   AttributeError: 'NoneType' object has no attribute 'scope'  # in 'astroid/node_classes.py'  # pylint: skip-file -DATE_FMT = "%Y-%m-%d" -  class HarvestCrossrefWorker:      """ @@ -68,7 +66,6 @@ class HarvestCrossrefWorker:          self.loop_sleep = 60*60 # how long to wait, in seconds, between date checks          self.api_batch_size = 50 -        # for crossref, it's "from-index-date"          self.name = "Crossref"      def params(self, date_str): @@ -86,6 +83,9 @@ class HarvestCrossrefWorker:          params['cursor'] = resp['message']['next-cursor']          return params +    def extract_key(self, obj): +        return obj['DOI'].encode('utf-8') +      def fetch_date(self, date):          produce_topic = self.kafka.topics[self.produce_topic] @@ -112,7 +112,7 @@ class HarvestCrossrefWorker:                      self.extract_total(resp), http_resp.elapsed))                  #print(json.dumps(resp))                  for work in items: -                    producer.produce(json.dumps(work).encode('utf-8')) +                    producer.produce(json.dumps(work).encode('utf-8'), partition_key=self.extract_key(work))                  if len(items) < self.api_batch_size:                      break                  params = self.update_params(params, resp) @@ -181,6 +181,9 @@ class HarvestDataciteWorker(HarvestCrossrefWorker):      def extract_total(self, resp):          return resp['meta']['total'] +    def extract_key(self, obj): +        return obj['doi'].encode('utf-8') +      def update_params(self, params, resp):          params['page[number]'] = resp['meta']['page'] + 1          return params diff --git a/python/fatcat_tools/harvest/oaipmh.py b/python/fatcat_tools/harvest/oaipmh.py new file mode 100644 index 00000000..c3cb90db --- /dev/null +++ b/python/fatcat_tools/harvest/oaipmh.py @@ -0,0 +1,157 @@ + +""" +OAI-PMH protocol: +    https://sickle.readthedocs.io/en/latest/ + +Pubmed +    https://www.ncbi.nlm.nih.gov/pmc/tools/oai/ +    https://www.ncbi.nlm.nih.gov/pmc/oai/oai.cgi?verb=GetRecord&identifier=oai:pubmedcentral.nih.gov:152494&metadataPrefix=pmc_fm +    https://github.com/titipata/pubmed_parser + +arxiv +    some APIs work on a per-version basis, others do not + +    http://export.arxiv.org/oai2?verb=GetRecord&identifier=oai:arXiv.org:0804.2273&metadataPrefix=arXiv +    http://export.arxiv.org/oai2?verb=GetRecord&identifier=oai:arXiv.org:0804.2273&metadataPrefix=arXivRaw + +doaj +    https://github.com/miku/doajfetch + +----- + +actually, just going to re-use https://github.com/miku/metha for OAI-PMH stuff +    => shell script from cronjob +    => call metha-sync daily +    => metha-cat -since <whenever> | kafkacat output +    => echo "date" | kafkat state +    => some shell trick (comm?) to find missing dates; for each, do metha-cat into kafka + +or, just skip kafka for this stuff for now? hrm. + +crossref-like stuff is far enough along to keep + +## More Miku Magic! + +wowa, JSTOR KBART files! +    http://www.jstor.org/kbart/collections/all-archive-titles + +https://github.com/miku/ldjtab: faster than jq for just grabbing  + +sort can be told how much memory to use; eg: `sort -S50%`, and threads to use + +""" + +import re +import sys +import csv +import json +import time +import requests +import itertools +import datetime +from pykafka import KafkaClient +import sickle + +from fatcat_tools.workers import most_recent_message +from .harvest_common import HarvestState, DATE_FMT + + +class HarvestOaiPmhWorker: +    """ +    Base class for OAI-PMH harvesters. + +    Based on Crossref importer +    """ + + +    def __init__(self, kafka_hosts, produce_topic, state_topic, +            start_date=None, end_date=None): + +        self.produce_topic = produce_topic +        self.state_topic = state_topic +        self.kafka = KafkaClient(hosts=kafka_hosts, broker_version="1.0.0") + +        self.loop_sleep = 60*60 # how long to wait, in seconds, between date checks + +        self.endpoint_url = None # needs override +        self.metadata_prefix = None  # needs override +        self.state = HarvestState(start_date, end_date) +        self.state.initialize_from_kafka(self.kafka.topics[self.state_topic]) + + +    def fetch_date(self, date): + +        api = sickle.Sickle(self.endpoint_url) +        date_str = date.strftime(DATE_FMT) +        produce_topic = self.kafka.topics[self.produce_topic] +        # this dict kwargs hack is to work around 'from' as a reserved python keyword +        # recommended by sickle docs +        records = api.ListRecords(**{ +            'metadataPrefix': self.metadata_prefix, +            'from': date_str, +            'until': date_str, +        }) + +        count = 0 +        with produce_topic.get_producer() as producer: +            for item in records: +                count += 1 +                if count % 50 == 0: +                    print("... up to {}".format(count)) +                producer.produce(item.raw.encode('utf-8'), partition_key=item.header.identifier.encode('utf-8')) + +    def run(self, continuous=False): + +        while True: +            current = self.state.next(continuous) +            if current: +                print("Fetching DOIs updated on {} (UTC)".format(current)) +                self.fetch_date(current) +                self.state.complete(current, kafka_topic=self.kafka.topics[self.state_topic]) +                continue + +            if continuous: +                print("Sleeping {} seconds...".format(self.loop_sleep)) +                time.sleep(self.loop_sleep()) +            else: +                break +        print("{} DOI ingest caught up".format(self.name)) + + +class HarvestArxivWorker(HarvestOaiPmhWorker): + +    def __init__(self, **kwargs): +        super().__init__(**kwargs)  +        self.endpoint_url = "https://export.arxiv.org/oai2" +        self.metadata_prefix = "arXiv" + + +class HarvestPubmedWorker(HarvestOaiPmhWorker): + +    def __init__(self, **kwargs): +        super().__init__(**kwargs)  +        self.endpoint_url = "https://www.ncbi.nlm.nih.gov/pmc/oai/oai.cgi" +        self.metadata_prefix = "pmc_fm" + + +class HarvestDoajJournalWorker(HarvestOaiPmhWorker): +    """ +    WARNING: DOAJ OAI-PMH doesn't seem to respect 'from' and 'until' params +    """ + +    def __init__(self, **kwargs): +        super().__init__(**kwargs)  +        self.endpoint_url = "https://www.doaj.org/oai" +        self.metadata_prefix = "oai_dc" + + +class HarvestDoajArticleWorker(HarvestOaiPmhWorker): +    """ +    WARNING: DOAJ OAI-PMH doesn't seem to respect 'from' and 'until' params +    """ + +    def __init__(self, **kwargs): +        super().__init__(**kwargs)  +        self.endpoint_url = "https://www.doaj.org/oai.article" +        self.metadata_prefix = "oai_doaj" + | 
