summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--python/fatcat_tools/harvest/doi_registrars.py13
-rw-r--r--python/fatcat_tools/harvest/harvest_common.py37
-rw-r--r--python/fatcat_tools/harvest/oaipmh.py13
-rw-r--r--python/fatcat_tools/importers/common.py1
-rw-r--r--python/fatcat_tools/workers/changelog.py16
-rw-r--r--python/fatcat_tools/workers/elasticsearch.py22
6 files changed, 75 insertions, 27 deletions
diff --git a/python/fatcat_tools/harvest/doi_registrars.py b/python/fatcat_tools/harvest/doi_registrars.py
index 3362df35..7e791745 100644
--- a/python/fatcat_tools/harvest/doi_registrars.py
+++ b/python/fatcat_tools/harvest/doi_registrars.py
@@ -56,11 +56,7 @@ class HarvestCrossrefWorker:
self.is_update_filter = is_update_filter
self.kafka_config = {
'bootstrap.servers': kafka_hosts,
- 'delivery.report.only.error': True,
'message.max.bytes': 20000000, # ~20 MBytes; broker is ~50 MBytes
- 'default.topic.config': {
- 'request.required.acks': 'all',
- },
}
self.state = HarvestState(start_date, end_date)
@@ -97,7 +93,14 @@ class HarvestCrossrefWorker:
# TODO: should it be sys.exit(-1)?
raise KafkaException(err)
- producer = Producer(self.kafka_config)
+ producer_conf = self.kafka_config.copy()
+ producer_conf.update({
+ 'delivery.report.only.error': True,
+ 'default.topic.config': {
+ 'request.required.acks': -1, # all brokers must confirm
+ },
+ })
+ producer = Producer(producer_conf)
date_str = date.isoformat()
params = self.params(date_str)
diff --git a/python/fatcat_tools/harvest/harvest_common.py b/python/fatcat_tools/harvest/harvest_common.py
index aa7a69f5..78830a1c 100644
--- a/python/fatcat_tools/harvest/harvest_common.py
+++ b/python/fatcat_tools/harvest/harvest_common.py
@@ -130,30 +130,56 @@ class HarvestState:
if err:
raise KafkaException(err)
print("Commiting status to Kafka: {}".format(kafka_topic))
- producer = Producer(kafka_config)
- producer.produce(kafka_topic, state_json, on_delivery=fail_fast)
+ producer_conf = kafka_config.copy()
+ producer_conf.update({
+ 'delivery.report.only.error': True,
+ 'default.topic.config': {
+ 'request.required.acks': -1, # all brokers must confirm
+ },
+ })
+ producer = Producer(producer_conf)
+ producer.produce(
+ kafka_topic,
+ state_json,
+ on_delivery=fail_fast)
producer.flush()
return state_json
def initialize_from_kafka(self, kafka_topic, kafka_config):
"""
kafka_topic should have type str
+
+ TODO: this method does not fail if client can't connect to host.
"""
if not kafka_topic:
return
print("Fetching state from kafka topic: {}".format(kafka_topic))
+ def fail_fast(err, msg):
+ if err:
+ raise KafkaException(err)
conf = kafka_config.copy()
conf.update({
+ 'group.id': 'dummy_init_group', # should never be commited
+ 'enable.auto.commit': False,
'auto.offset.reset': 'earliest',
'session.timeout.ms': 10000,
- 'group.id': kafka_topic + "-init",
})
consumer = Consumer(conf)
+
+ # this watermark fetch is mostly to ensure we are connected to broker and
+ # fail fast if not, but we also confirm that we read to end below.
+ hwm = consumer.get_watermark_offsets(
+ TopicPartition(kafka_topic, 0),
+ timeout=5.0,
+ cached=False)
+ if not hwm:
+ raise Exception("Kafka consumer timeout, or topic {} doesn't exist".format(kafka_topic))
+
consumer.assign([TopicPartition(kafka_topic, 0, 0)])
c = 0
while True:
- msg = consumer.poll(timeout=1.0)
+ msg = consumer.poll(timeout=2.0)
if not msg:
break
if msg.error():
@@ -162,4 +188,7 @@ class HarvestState:
self.update(msg.value().decode('utf-8'))
c += 1
consumer.close()
+
+ # verify that we got at least to HWM
+ assert c >= hwm[1]
print("... got {} state update messages, done".format(c))
diff --git a/python/fatcat_tools/harvest/oaipmh.py b/python/fatcat_tools/harvest/oaipmh.py
index 3e3bea03..f908ba83 100644
--- a/python/fatcat_tools/harvest/oaipmh.py
+++ b/python/fatcat_tools/harvest/oaipmh.py
@@ -39,10 +39,7 @@ class HarvestOaiPmhWorker:
self.state_topic = state_topic
self.kafka_config = {
'bootstrap.servers': kafka_hosts,
- 'delivery.report.only.error': True,
'message.max.bytes': 20000000, # ~20 MBytes; broker is ~50 MBytes
- 'default.topic.config':
- {'request.required.acks': 'all'},
}
self.loop_sleep = 60*60 # how long to wait, in seconds, between date checks
@@ -62,7 +59,14 @@ class HarvestOaiPmhWorker:
# TODO: should it be sys.exit(-1)?
raise KafkaException(err)
- producer = Producer(self.kafka_config)
+ producer_conf = self.kafka_config.copy()
+ producer_conf.update({
+ 'delivery.report.only.error': True,
+ 'default.topic.config': {
+ 'request.required.acks': -1, # all brokers must confirm
+ },
+ })
+ producer = Producer(producer_conf)
api = sickle.Sickle(self.endpoint_url)
date_str = date.isoformat()
@@ -88,7 +92,6 @@ class HarvestOaiPmhWorker:
item.raw.encode('utf-8'),
key=item.header.identifier.encode('utf-8'),
on_delivery=fail_fast)
- producer.poll(0)
producer.flush()
def run(self, continuous=False):
diff --git a/python/fatcat_tools/importers/common.py b/python/fatcat_tools/importers/common.py
index 42fe38aa..a25c3196 100644
--- a/python/fatcat_tools/importers/common.py
+++ b/python/fatcat_tools/importers/common.py
@@ -775,7 +775,6 @@ def make_kafka_consumer(hosts, env, topic_suffix, group):
'bootstrap.servers': hosts,
'group.id': group,
'on_commit': fail_fast,
- 'delivery.report.only.error': True,
# messages don't have offset marked as stored until pushed to
# elastic, but we do auto-commit stored offsets to broker
'enable.auto.offset.store': False,
diff --git a/python/fatcat_tools/workers/changelog.py b/python/fatcat_tools/workers/changelog.py
index c134bde2..8b1ba5e9 100644
--- a/python/fatcat_tools/workers/changelog.py
+++ b/python/fatcat_tools/workers/changelog.py
@@ -40,7 +40,14 @@ class ChangelogWorker(FatcatWorker):
# TODO: should it be sys.exit(-1)?
raise KafkaException(err)
- producer = Producer(self.kafka_config)
+ producer_conf = self.kafka_config.copy()
+ producer_conf.update({
+ 'delivery.report.only.error': True,
+ 'default.topic.config': {
+ 'request.required.acks': -1, # all brokers must confirm
+ },
+ })
+ producer = Producer(producer_conf)
while True:
latest = int(self.api.get_changelog(limit=1)[0].index)
@@ -58,7 +65,7 @@ class ChangelogWorker(FatcatWorker):
#NOTE timestamp could be timestamp=cle.timestamp (?)
)
self.offset = i
- producer.poll(0)
+ producer.flush()
print("Sleeping {} seconds...".format(self.poll_interval))
time.sleep(self.poll_interval)
@@ -118,7 +125,6 @@ class EntityUpdatesWorker(FatcatWorker):
consumer_conf.update({
'group.id': self.consumer_group,
'on_commit': fail_fast,
- 'delivery.report.only.error': True,
# messages don't have offset marked as stored until pushed to
# elastic, but we do auto-commit stored offsets to broker
'enable.auto.commit': True,
@@ -134,8 +140,9 @@ class EntityUpdatesWorker(FatcatWorker):
producer_conf = self.kafka_config.copy()
producer_conf.update({
+ 'delivery.report.only.error': True,
'default.topic.config': {
- 'request.required.acks': -1,
+ 'request.required.acks': -1, # all brokers must confirm
},
})
producer = Producer(producer_conf)
@@ -207,6 +214,7 @@ class EntityUpdatesWorker(FatcatWorker):
key=ident.encode('utf-8'),
on_delivery=fail_fast,
)
+ producer.flush()
# TODO: actually update works
consumer.store_offsets(message=msg)
diff --git a/python/fatcat_tools/workers/elasticsearch.py b/python/fatcat_tools/workers/elasticsearch.py
index acb705c2..2ba241eb 100644
--- a/python/fatcat_tools/workers/elasticsearch.py
+++ b/python/fatcat_tools/workers/elasticsearch.py
@@ -2,7 +2,7 @@
import json
import time
import requests
-from confluent_kafka import Consumer, Producer, KafkaException
+from confluent_kafka import Consumer, KafkaException
from fatcat_openapi_client import ReleaseEntity, ContainerEntity, ApiClient
from fatcat_tools import *
@@ -61,7 +61,6 @@ class ElasticsearchReleaseWorker(FatcatWorker):
consumer_conf.update({
'group.id': self.consumer_group,
'on_commit': fail_fast,
- 'delivery.report.only.error': True,
# messages don't have offset marked as stored until pushed to
# elastic, but we do auto-commit stored offsets to broker
'enable.auto.commit': True,
@@ -97,16 +96,24 @@ class ElasticsearchReleaseWorker(FatcatWorker):
bulk_actions = []
for msg in batch:
json_str = msg.value().decode('utf-8')
- entity = entity_from_json(json_str, ReleaseEntity, api_client=ac)
- print("Upserting: release/{}".format(entity.ident))
+ # HACK: work around a bug where container entities got published to
+ # release_v03 topic
+ if self.elasticsearch_document_name == "release":
+ entity_dict = json.loads(json_str)
+ if entity_dict.get('name') and not entity_dict.get('title'):
+ continue
+ entity = entity_from_json(json_str, self.entity_type, api_client=ac)
+ # TODO: handle deletions from index
bulk_actions.append(json.dumps({
"index": { "_id": entity.ident, },
}))
bulk_actions.append(json.dumps(
- release_to_elasticsearch(entity)))
- elasticsearch_endpoint = "{}/{}/release/_bulk".format(
+ self.transform_func(entity)))
+ print("Upserting, eg, {} (of {} releases in elasticsearch)".format(entity.ident, len(batch)))
+ elasticsearch_endpoint = "{}/{}/{}/_bulk".format(
self.elasticsearch_backend,
- self.elasticsearch_index)
+ self.elasticsearch_index,
+ self.elasticsearch_document_name)
resp = requests.post(elasticsearch_endpoint,
headers={"Content-Type": "application/x-ndjson"},
data="\n".join(bulk_actions) + "\n")
@@ -141,4 +148,3 @@ class ElasticsearchContainerWorker(ElasticsearchReleaseWorker):
self.elasticsearch_document_name = "container"
self.transform_func = container_to_elasticsearch
-