diff options
author | Bryan Newbold <bnewbold@robocracy.org> | 2019-04-08 14:58:20 -0700 |
---|---|---|
committer | Bryan Newbold <bnewbold@robocracy.org> | 2019-04-08 14:58:20 -0700 |
commit | 953886e6e42294ca81583e9b2c4e2fa16343a86e (patch) | |
tree | feb1582ed332c742b54081e2324313c04caf6ddf /python | |
parent | b70e33dd9fc99d7d454f9293b08777cb98d68b87 (diff) | |
download | fatcat-953886e6e42294ca81583e9b2c4e2fa16343a86e.tar.gz fatcat-953886e6e42294ca81583e9b2c4e2fa16343a86e.zip |
fixes to confluent-kafka harvesters
Diffstat (limited to 'python')
-rw-r--r-- | python/fatcat_tools/harvest/doi_registrars.py | 21 | ||||
-rw-r--r-- | python/fatcat_tools/harvest/harvest_common.py | 4 | ||||
-rw-r--r-- | python/fatcat_tools/harvest/oaipmh.py | 16 |
3 files changed, 21 insertions, 20 deletions
diff --git a/python/fatcat_tools/harvest/doi_registrars.py b/python/fatcat_tools/harvest/doi_registrars.py index 1483266c..a5d9a04d 100644 --- a/python/fatcat_tools/harvest/doi_registrars.py +++ b/python/fatcat_tools/harvest/doi_registrars.py @@ -57,8 +57,9 @@ class HarvestCrossrefWorker: self.kafka_config = { 'bootstrap.servers': kafka_hosts, 'delivery.report.only.error': True, - 'default.topic.config': - {'request.required.acks': 'all'}, + 'default.topic.config': { + 'request.required.acks': 'all', + }, } self.state = HarvestState(start_date, end_date) @@ -86,15 +87,15 @@ class HarvestCrossrefWorker: def extract_key(self, obj): return obj['DOI'].encode('utf-8') - def kafka_produce_delivery_callback(err, msg): - if err is not None: - print("Kafka producer delivery error: {}".format(err)) - print("Bailing out...") - # TODO: should it be sys.exit(-1)? - raise KafkaException(err) - def fetch_date(self, date): + def fail_fast(err, msg): + if err is not None: + print("Kafka producer delivery error: {}".format(err)) + print("Bailing out...") + # TODO: should it be sys.exit(-1)? + raise KafkaException(err) + producer = Producer(self.kafka_config) date_str = date.isoformat() @@ -125,7 +126,7 @@ class HarvestCrossrefWorker: self.produce_topic, json.dumps(work).encode('utf-8'), key=self.extract_key(work), - on_delivery=self.kafka_produce_delivery_callback) + on_delivery=fail_fast) producer.poll(0) if len(items) < self.api_batch_size: break diff --git a/python/fatcat_tools/harvest/harvest_common.py b/python/fatcat_tools/harvest/harvest_common.py index 90f499da..aa7a69f5 100644 --- a/python/fatcat_tools/harvest/harvest_common.py +++ b/python/fatcat_tools/harvest/harvest_common.py @@ -150,7 +150,7 @@ class HarvestState: 'group.id': kafka_topic + "-init", }) consumer = Consumer(conf) - consumer.assign([TopicPartition(kafka_topic, 0, OFFSET_BEGINNING)]) + consumer.assign([TopicPartition(kafka_topic, 0, 0)]) c = 0 while True: msg = consumer.poll(timeout=1.0) @@ -158,7 +158,7 @@ class HarvestState: break if msg.error(): raise KafkaException(msg.error()) - sys.stdout.write('.') # XXX: + #sys.stdout.write('.') self.update(msg.value().decode('utf-8')) c += 1 consumer.close() diff --git a/python/fatcat_tools/harvest/oaipmh.py b/python/fatcat_tools/harvest/oaipmh.py index ab424482..129df2e1 100644 --- a/python/fatcat_tools/harvest/oaipmh.py +++ b/python/fatcat_tools/harvest/oaipmh.py @@ -52,15 +52,15 @@ class HarvestOaiPmhWorker: self.state = HarvestState(start_date, end_date) self.state.initialize_from_kafka(self.state_topic, self.kafka_config) - def kafka_produce_delivery_callback(err, msg): - if err is not None: - print("Kafka producer delivery error: {}".format(err)) - print("Bailing out...") - # TODO: should it be sys.exit(-1)? - raise KafkaException(err) - def fetch_date(self, date): + def fail_fast(err, msg): + if err is not None: + print("Kafka producer delivery error: {}".format(err)) + print("Bailing out...") + # TODO: should it be sys.exit(-1)? + raise KafkaException(err) + producer = Producer(self.kafka_config) api = sickle.Sickle(self.endpoint_url) @@ -86,7 +86,7 @@ class HarvestOaiPmhWorker: self.produce_topic, item.raw.encode('utf-8'), key=item.header.identifier.encode('utf-8'), - on_delivery=self.kafka_produce_delivery_callback) + on_delivery=fail_fast) producer.poll(0) producer.flush() |