From e590eec544ab6f2e54e8770f01e64eef3158fdaa Mon Sep 17 00:00:00 2001 From: Bryan Newbold Date: Mon, 19 Nov 2018 23:04:18 -0800 Subject: initial OAI-PMH harvesters --- python/fatcat_tools/harvest/__init__.py | 2 + python/fatcat_tools/harvest/doi_registrars.py | 13 ++- python/fatcat_tools/harvest/oaipmh.py | 157 ++++++++++++++++++++++++++ 3 files changed, 167 insertions(+), 5 deletions(-) create mode 100644 python/fatcat_tools/harvest/oaipmh.py (limited to 'python/fatcat_tools') diff --git a/python/fatcat_tools/harvest/__init__.py b/python/fatcat_tools/harvest/__init__.py index 4de2cbde..7d814696 100644 --- a/python/fatcat_tools/harvest/__init__.py +++ b/python/fatcat_tools/harvest/__init__.py @@ -1,3 +1,5 @@ from .harvest_common import HarvestState from .doi_registrars import HarvestCrossrefWorker, HarvestDataciteWorker +from .oaipmh import HarvestArxivWorker, HarvestPubmedWorker,\ + HarvestDoajArticleWorker, HarvestDoajJournalWorker diff --git a/python/fatcat_tools/harvest/doi_registrars.py b/python/fatcat_tools/harvest/doi_registrars.py index d5e4b7ec..10492c17 100644 --- a/python/fatcat_tools/harvest/doi_registrars.py +++ b/python/fatcat_tools/harvest/doi_registrars.py @@ -10,15 +10,13 @@ import datetime from pykafka import KafkaClient from fatcat_tools.workers import most_recent_message -from .harvest_common import HarvestState +from .harvest_common import HarvestState, DATE_FMT # Skip pylint due to: # AttributeError: 'NoneType' object has no attribute 'scope' # in 'astroid/node_classes.py' # pylint: skip-file -DATE_FMT = "%Y-%m-%d" - class HarvestCrossrefWorker: """ @@ -68,7 +66,6 @@ class HarvestCrossrefWorker: self.loop_sleep = 60*60 # how long to wait, in seconds, between date checks self.api_batch_size = 50 - # for crossref, it's "from-index-date" self.name = "Crossref" def params(self, date_str): @@ -86,6 +83,9 @@ class HarvestCrossrefWorker: params['cursor'] = resp['message']['next-cursor'] return params + def extract_key(self, obj): + return obj['DOI'].encode('utf-8') + def fetch_date(self, date): produce_topic = self.kafka.topics[self.produce_topic] @@ -112,7 +112,7 @@ class HarvestCrossrefWorker: self.extract_total(resp), http_resp.elapsed)) #print(json.dumps(resp)) for work in items: - producer.produce(json.dumps(work).encode('utf-8')) + producer.produce(json.dumps(work).encode('utf-8'), partition_key=self.extract_key(work)) if len(items) < self.api_batch_size: break params = self.update_params(params, resp) @@ -181,6 +181,9 @@ class HarvestDataciteWorker(HarvestCrossrefWorker): def extract_total(self, resp): return resp['meta']['total'] + def extract_key(self, obj): + return obj['doi'].encode('utf-8') + def update_params(self, params, resp): params['page[number]'] = resp['meta']['page'] + 1 return params diff --git a/python/fatcat_tools/harvest/oaipmh.py b/python/fatcat_tools/harvest/oaipmh.py new file mode 100644 index 00000000..c3cb90db --- /dev/null +++ b/python/fatcat_tools/harvest/oaipmh.py @@ -0,0 +1,157 @@ + +""" +OAI-PMH protocol: + https://sickle.readthedocs.io/en/latest/ + +Pubmed + https://www.ncbi.nlm.nih.gov/pmc/tools/oai/ + https://www.ncbi.nlm.nih.gov/pmc/oai/oai.cgi?verb=GetRecord&identifier=oai:pubmedcentral.nih.gov:152494&metadataPrefix=pmc_fm + https://github.com/titipata/pubmed_parser + +arxiv + some APIs work on a per-version basis, others do not + + http://export.arxiv.org/oai2?verb=GetRecord&identifier=oai:arXiv.org:0804.2273&metadataPrefix=arXiv + http://export.arxiv.org/oai2?verb=GetRecord&identifier=oai:arXiv.org:0804.2273&metadataPrefix=arXivRaw + +doaj + https://github.com/miku/doajfetch + +----- + +actually, just going to re-use https://github.com/miku/metha for OAI-PMH stuff + => shell script from cronjob + => call metha-sync daily + => metha-cat -since | kafkacat output + => echo "date" | kafkat state + => some shell trick (comm?) to find missing dates; for each, do metha-cat into kafka + +or, just skip kafka for this stuff for now? hrm. + +crossref-like stuff is far enough along to keep + +## More Miku Magic! + +wowa, JSTOR KBART files! + http://www.jstor.org/kbart/collections/all-archive-titles + +https://github.com/miku/ldjtab: faster than jq for just grabbing + +sort can be told how much memory to use; eg: `sort -S50%`, and threads to use + +""" + +import re +import sys +import csv +import json +import time +import requests +import itertools +import datetime +from pykafka import KafkaClient +import sickle + +from fatcat_tools.workers import most_recent_message +from .harvest_common import HarvestState, DATE_FMT + + +class HarvestOaiPmhWorker: + """ + Base class for OAI-PMH harvesters. + + Based on Crossref importer + """ + + + def __init__(self, kafka_hosts, produce_topic, state_topic, + start_date=None, end_date=None): + + self.produce_topic = produce_topic + self.state_topic = state_topic + self.kafka = KafkaClient(hosts=kafka_hosts, broker_version="1.0.0") + + self.loop_sleep = 60*60 # how long to wait, in seconds, between date checks + + self.endpoint_url = None # needs override + self.metadata_prefix = None # needs override + self.state = HarvestState(start_date, end_date) + self.state.initialize_from_kafka(self.kafka.topics[self.state_topic]) + + + def fetch_date(self, date): + + api = sickle.Sickle(self.endpoint_url) + date_str = date.strftime(DATE_FMT) + produce_topic = self.kafka.topics[self.produce_topic] + # this dict kwargs hack is to work around 'from' as a reserved python keyword + # recommended by sickle docs + records = api.ListRecords(**{ + 'metadataPrefix': self.metadata_prefix, + 'from': date_str, + 'until': date_str, + }) + + count = 0 + with produce_topic.get_producer() as producer: + for item in records: + count += 1 + if count % 50 == 0: + print("... up to {}".format(count)) + producer.produce(item.raw.encode('utf-8'), partition_key=item.header.identifier.encode('utf-8')) + + def run(self, continuous=False): + + while True: + current = self.state.next(continuous) + if current: + print("Fetching DOIs updated on {} (UTC)".format(current)) + self.fetch_date(current) + self.state.complete(current, kafka_topic=self.kafka.topics[self.state_topic]) + continue + + if continuous: + print("Sleeping {} seconds...".format(self.loop_sleep)) + time.sleep(self.loop_sleep()) + else: + break + print("{} DOI ingest caught up".format(self.name)) + + +class HarvestArxivWorker(HarvestOaiPmhWorker): + + def __init__(self, **kwargs): + super().__init__(**kwargs) + self.endpoint_url = "https://export.arxiv.org/oai2" + self.metadata_prefix = "arXiv" + + +class HarvestPubmedWorker(HarvestOaiPmhWorker): + + def __init__(self, **kwargs): + super().__init__(**kwargs) + self.endpoint_url = "https://www.ncbi.nlm.nih.gov/pmc/oai/oai.cgi" + self.metadata_prefix = "pmc_fm" + + +class HarvestDoajJournalWorker(HarvestOaiPmhWorker): + """ + WARNING: DOAJ OAI-PMH doesn't seem to respect 'from' and 'until' params + """ + + def __init__(self, **kwargs): + super().__init__(**kwargs) + self.endpoint_url = "https://www.doaj.org/oai" + self.metadata_prefix = "oai_dc" + + +class HarvestDoajArticleWorker(HarvestOaiPmhWorker): + """ + WARNING: DOAJ OAI-PMH doesn't seem to respect 'from' and 'until' params + """ + + def __init__(self, **kwargs): + super().__init__(**kwargs) + self.endpoint_url = "https://www.doaj.org/oai.article" + self.metadata_prefix = "oai_doaj" + -- cgit v1.2.3