aboutsummaryrefslogtreecommitdiffstats
path: root/python/fatcat_tools/harvest/oaipmh.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/fatcat_tools/harvest/oaipmh.py')
-rw-r--r--python/fatcat_tools/harvest/oaipmh.py52
1 files changed, 45 insertions, 7 deletions
diff --git a/python/fatcat_tools/harvest/oaipmh.py b/python/fatcat_tools/harvest/oaipmh.py
index 19eb6897..c829f2a2 100644
--- a/python/fatcat_tools/harvest/oaipmh.py
+++ b/python/fatcat_tools/harvest/oaipmh.py
@@ -5,8 +5,9 @@ from typing import Any, Optional
import sickle
from confluent_kafka import KafkaException, Producer
+from bs4 import BeautifulSoup
-from .harvest_common import HarvestState
+from .harvest_common import HarvestState, requests_retry_session
class HarvestOaiPmhWorker:
@@ -44,6 +45,9 @@ class HarvestOaiPmhWorker:
self.loop_sleep = 60 * 60 # how long to wait, in seconds, between date checks
+ # optional; not all workers will need or use this HTTP session
+ self.http_session = requests_retry_session()
+
self.endpoint_url = None # needs override
self.metadata_prefix = None # needs override
self.name = "unnamed"
@@ -94,14 +98,21 @@ class HarvestOaiPmhWorker:
count += 1
if count % 50 == 0:
print("... up to {}".format(count), file=sys.stderr)
- producer.produce(
- self.produce_topic,
- item.raw.encode("utf-8"),
- key=item.header.identifier.encode("utf-8"),
- on_delivery=fail_fast,
- )
+ self.produce_record(item, producer)
producer.flush()
+ def produce_record(self, item: sickle.models.Record, producer: Producer) -> None:
+ """
+ The intent of this function is to allow overloading the record type
+ being passed along to the Kafka topic
+ """
+ producer.produce(
+ self.produce_topic,
+ item.raw.encode("utf-8"),
+ key=item.header.identifier.encode("utf-8"),
+ on_delivery=fail_fast,
+ )
+
def run(self, continuous: bool = False) -> None:
while True:
@@ -164,3 +175,30 @@ class HarvestDoajArticleWorker(HarvestOaiPmhWorker):
self.endpoint_url = "https://www.doaj.org/oai.article"
self.metadata_prefix = "oai_doaj"
self.name = "doaj-article"
+
+ def parse_doaj_article_id(self, raw_xml: bytes) -> str:
+ # XXX: don't parse XML; instead just handle item.oai_identifier
+ soup = BeautifulSoup(raw_xml, "xml")
+ elem = soup.find("record header identifier")
+ oai_id = elem.text.strip()
+ assert oai_id.startswith("oai:doaj.org/article:")
+ article_id = oai_id.replace("oai:doaj.org/article:", "")
+ assert len(article_id) == 32 and article_id == article_id.lower()
+ return article_id
+
+ def produce_record(self, item: sickle.models.Record, producer: Producer) -> None:
+ """
+ For each OAI-PMH record, do an API call to get the JSON format
+ response, and publish that to Kafka instead of the OAI-PMH XML
+ """
+
+ article_id = self.parse_doaj_article_id(item.raw)
+ resp = self.http_session.get(f"https://doaj.org/api/articles/{article_id}")
+ resp.raise_for_status()
+ assert resp.json()['id'] == article_id
+ producer.produce(
+ self.produce_topic,
+ resp.content.encode("utf-8"),
+ key=article_id.encode("utf-8"),
+ on_delivery=fail_fast,
+ )