aboutsummaryrefslogtreecommitdiffstats
path: root/python/fatcat_tools/workers/elasticsearch.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/fatcat_tools/workers/elasticsearch.py')
-rw-r--r--python/fatcat_tools/workers/elasticsearch.py22
1 files changed, 14 insertions, 8 deletions
diff --git a/python/fatcat_tools/workers/elasticsearch.py b/python/fatcat_tools/workers/elasticsearch.py
index acb705c2..2ba241eb 100644
--- a/python/fatcat_tools/workers/elasticsearch.py
+++ b/python/fatcat_tools/workers/elasticsearch.py
@@ -2,7 +2,7 @@
import json
import time
import requests
-from confluent_kafka import Consumer, Producer, KafkaException
+from confluent_kafka import Consumer, KafkaException
from fatcat_openapi_client import ReleaseEntity, ContainerEntity, ApiClient
from fatcat_tools import *
@@ -61,7 +61,6 @@ class ElasticsearchReleaseWorker(FatcatWorker):
consumer_conf.update({
'group.id': self.consumer_group,
'on_commit': fail_fast,
- 'delivery.report.only.error': True,
# messages don't have offset marked as stored until pushed to
# elastic, but we do auto-commit stored offsets to broker
'enable.auto.commit': True,
@@ -97,16 +96,24 @@ class ElasticsearchReleaseWorker(FatcatWorker):
bulk_actions = []
for msg in batch:
json_str = msg.value().decode('utf-8')
- entity = entity_from_json(json_str, ReleaseEntity, api_client=ac)
- print("Upserting: release/{}".format(entity.ident))
+ # HACK: work around a bug where container entities got published to
+ # release_v03 topic
+ if self.elasticsearch_document_name == "release":
+ entity_dict = json.loads(json_str)
+ if entity_dict.get('name') and not entity_dict.get('title'):
+ continue
+ entity = entity_from_json(json_str, self.entity_type, api_client=ac)
+ # TODO: handle deletions from index
bulk_actions.append(json.dumps({
"index": { "_id": entity.ident, },
}))
bulk_actions.append(json.dumps(
- release_to_elasticsearch(entity)))
- elasticsearch_endpoint = "{}/{}/release/_bulk".format(
+ self.transform_func(entity)))
+ print("Upserting, eg, {} (of {} releases in elasticsearch)".format(entity.ident, len(batch)))
+ elasticsearch_endpoint = "{}/{}/{}/_bulk".format(
self.elasticsearch_backend,
- self.elasticsearch_index)
+ self.elasticsearch_index,
+ self.elasticsearch_document_name)
resp = requests.post(elasticsearch_endpoint,
headers={"Content-Type": "application/x-ndjson"},
data="\n".join(bulk_actions) + "\n")
@@ -141,4 +148,3 @@ class ElasticsearchContainerWorker(ElasticsearchReleaseWorker):
self.elasticsearch_document_name = "container"
self.transform_func = container_to_elasticsearch
-