aboutsummaryrefslogtreecommitdiffstats
path: root/python/fatcat_tools/harvest/oaipmh.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/fatcat_tools/harvest/oaipmh.py')
-rw-r--r--python/fatcat_tools/harvest/oaipmh.py57
1 files changed, 31 insertions, 26 deletions
diff --git a/python/fatcat_tools/harvest/oaipmh.py b/python/fatcat_tools/harvest/oaipmh.py
index 0eb0343d..40d1c853 100644
--- a/python/fatcat_tools/harvest/oaipmh.py
+++ b/python/fatcat_tools/harvest/oaipmh.py
@@ -1,4 +1,3 @@
-
import sys
import time
@@ -25,19 +24,18 @@ class HarvestOaiPmhWorker:
would want something similar operationally. Oh well!
"""
- def __init__(self, kafka_hosts, produce_topic, state_topic,
- start_date=None, end_date=None):
+ def __init__(self, kafka_hosts, produce_topic, state_topic, start_date=None, end_date=None):
self.produce_topic = produce_topic
self.state_topic = state_topic
self.kafka_config = {
- 'bootstrap.servers': kafka_hosts,
- 'message.max.bytes': 20000000, # ~20 MBytes; broker is ~50 MBytes
+ "bootstrap.servers": kafka_hosts,
+ "message.max.bytes": 20000000, # ~20 MBytes; broker is ~50 MBytes
}
- self.loop_sleep = 60*60 # how long to wait, in seconds, between date checks
+ self.loop_sleep = 60 * 60 # how long to wait, in seconds, between date checks
- self.endpoint_url = None # needs override
+ self.endpoint_url = None # needs override
self.metadata_prefix = None # needs override
self.name = "unnamed"
self.state = HarvestState(start_date, end_date)
@@ -45,7 +43,6 @@ class HarvestOaiPmhWorker:
print(self.state, file=sys.stderr)
def fetch_date(self, date):
-
def fail_fast(err, msg):
if err is not None:
print("Kafka producer delivery error: {}".format(err), file=sys.stderr)
@@ -54,12 +51,14 @@ class HarvestOaiPmhWorker:
raise KafkaException(err)
producer_conf = self.kafka_config.copy()
- producer_conf.update({
- 'delivery.report.only.error': True,
- 'default.topic.config': {
- 'request.required.acks': -1, # all brokers must confirm
- },
- })
+ producer_conf.update(
+ {
+ "delivery.report.only.error": True,
+ "default.topic.config": {
+ "request.required.acks": -1, # all brokers must confirm
+ },
+ }
+ )
producer = Producer(producer_conf)
api = sickle.Sickle(self.endpoint_url, max_retries=5, retry_status_codes=[503])
@@ -67,13 +66,18 @@ class HarvestOaiPmhWorker:
# this dict kwargs hack is to work around 'from' as a reserved python keyword
# recommended by sickle docs
try:
- records = api.ListRecords(**{
- 'metadataPrefix': self.metadata_prefix,
- 'from': date_str,
- 'until': date_str,
- })
+ records = api.ListRecords(
+ **{
+ "metadataPrefix": self.metadata_prefix,
+ "from": date_str,
+ "until": date_str,
+ }
+ )
except sickle.oaiexceptions.NoRecordsMatch:
- print("WARN: no OAI-PMH records for this date: {} (UTC)".format(date_str), file=sys.stderr)
+ print(
+ "WARN: no OAI-PMH records for this date: {} (UTC)".format(date_str),
+ file=sys.stderr,
+ )
return
count = 0
@@ -83,9 +87,10 @@ class HarvestOaiPmhWorker:
print("... up to {}".format(count), file=sys.stderr)
producer.produce(
self.produce_topic,
- item.raw.encode('utf-8'),
- key=item.header.identifier.encode('utf-8'),
- on_delivery=fail_fast)
+ item.raw.encode("utf-8"),
+ key=item.header.identifier.encode("utf-8"),
+ on_delivery=fail_fast,
+ )
producer.flush()
def run(self, continuous=False):
@@ -95,9 +100,9 @@ class HarvestOaiPmhWorker:
if current:
print("Fetching DOIs updated on {} (UTC)".format(current), file=sys.stderr)
self.fetch_date(current)
- self.state.complete(current,
- kafka_topic=self.state_topic,
- kafka_config=self.kafka_config)
+ self.state.complete(
+ current, kafka_topic=self.state_topic, kafka_config=self.kafka_config
+ )
continue
if continuous: