diff options
author | Bryan Newbold <bnewbold@robocracy.org> | 2021-11-02 18:14:59 -0700 |
---|---|---|
committer | Bryan Newbold <bnewbold@robocracy.org> | 2021-11-02 18:14:59 -0700 |
commit | 31d1a6a713d177990609767d508209ced19ca396 (patch) | |
tree | a628a57bdb373669394a6b520102b1b4b5ffe7da /python/fatcat_tools/harvest/oaipmh.py | |
parent | 9dc891b8098542bb089c8c47098b60a8beb76a53 (diff) | |
download | fatcat-31d1a6a713d177990609767d508209ced19ca396.tar.gz fatcat-31d1a6a713d177990609767d508209ced19ca396.zip |
fmt (black): fatcat_tools/
Diffstat (limited to 'python/fatcat_tools/harvest/oaipmh.py')
-rw-r--r-- | python/fatcat_tools/harvest/oaipmh.py | 57 |
1 files changed, 31 insertions, 26 deletions
diff --git a/python/fatcat_tools/harvest/oaipmh.py b/python/fatcat_tools/harvest/oaipmh.py index 0eb0343d..40d1c853 100644 --- a/python/fatcat_tools/harvest/oaipmh.py +++ b/python/fatcat_tools/harvest/oaipmh.py @@ -1,4 +1,3 @@ - import sys import time @@ -25,19 +24,18 @@ class HarvestOaiPmhWorker: would want something similar operationally. Oh well! """ - def __init__(self, kafka_hosts, produce_topic, state_topic, - start_date=None, end_date=None): + def __init__(self, kafka_hosts, produce_topic, state_topic, start_date=None, end_date=None): self.produce_topic = produce_topic self.state_topic = state_topic self.kafka_config = { - 'bootstrap.servers': kafka_hosts, - 'message.max.bytes': 20000000, # ~20 MBytes; broker is ~50 MBytes + "bootstrap.servers": kafka_hosts, + "message.max.bytes": 20000000, # ~20 MBytes; broker is ~50 MBytes } - self.loop_sleep = 60*60 # how long to wait, in seconds, between date checks + self.loop_sleep = 60 * 60 # how long to wait, in seconds, between date checks - self.endpoint_url = None # needs override + self.endpoint_url = None # needs override self.metadata_prefix = None # needs override self.name = "unnamed" self.state = HarvestState(start_date, end_date) @@ -45,7 +43,6 @@ class HarvestOaiPmhWorker: print(self.state, file=sys.stderr) def fetch_date(self, date): - def fail_fast(err, msg): if err is not None: print("Kafka producer delivery error: {}".format(err), file=sys.stderr) @@ -54,12 +51,14 @@ class HarvestOaiPmhWorker: raise KafkaException(err) producer_conf = self.kafka_config.copy() - producer_conf.update({ - 'delivery.report.only.error': True, - 'default.topic.config': { - 'request.required.acks': -1, # all brokers must confirm - }, - }) + producer_conf.update( + { + "delivery.report.only.error": True, + "default.topic.config": { + "request.required.acks": -1, # all brokers must confirm + }, + } + ) producer = Producer(producer_conf) api = sickle.Sickle(self.endpoint_url, max_retries=5, retry_status_codes=[503]) @@ -67,13 +66,18 @@ class HarvestOaiPmhWorker: # this dict kwargs hack is to work around 'from' as a reserved python keyword # recommended by sickle docs try: - records = api.ListRecords(**{ - 'metadataPrefix': self.metadata_prefix, - 'from': date_str, - 'until': date_str, - }) + records = api.ListRecords( + **{ + "metadataPrefix": self.metadata_prefix, + "from": date_str, + "until": date_str, + } + ) except sickle.oaiexceptions.NoRecordsMatch: - print("WARN: no OAI-PMH records for this date: {} (UTC)".format(date_str), file=sys.stderr) + print( + "WARN: no OAI-PMH records for this date: {} (UTC)".format(date_str), + file=sys.stderr, + ) return count = 0 @@ -83,9 +87,10 @@ class HarvestOaiPmhWorker: print("... up to {}".format(count), file=sys.stderr) producer.produce( self.produce_topic, - item.raw.encode('utf-8'), - key=item.header.identifier.encode('utf-8'), - on_delivery=fail_fast) + item.raw.encode("utf-8"), + key=item.header.identifier.encode("utf-8"), + on_delivery=fail_fast, + ) producer.flush() def run(self, continuous=False): @@ -95,9 +100,9 @@ class HarvestOaiPmhWorker: if current: print("Fetching DOIs updated on {} (UTC)".format(current), file=sys.stderr) self.fetch_date(current) - self.state.complete(current, - kafka_topic=self.state_topic, - kafka_config=self.kafka_config) + self.state.complete( + current, kafka_topic=self.state_topic, kafka_config=self.kafka_config + ) continue if continuous: |