aboutsummaryrefslogtreecommitdiffstats
path: root/python/fatcat_tools/workers/elasticsearch.py
diff options
context:
space:
mode:
authorBryan Newbold <bnewbold@robocracy.org>2021-02-26 20:13:07 -0800
committerBryan Newbold <bnewbold@robocracy.org>2021-04-06 21:53:49 -0700
commit913f1b845ea9763e301db924199c7db8356624aa (patch)
tree4327a601caf9ddef88527869f18fe66ce9939ceb /python/fatcat_tools/workers/elasticsearch.py
parent6c6a9915e5501663b6840b3e304388017d5ce6f9 (diff)
downloadfatcat-913f1b845ea9763e301db924199c7db8356624aa.tar.gz
fatcat-913f1b845ea9763e301db924199c7db8356624aa.zip
indexing: don't use document names
Diffstat (limited to 'python/fatcat_tools/workers/elasticsearch.py')
-rw-r--r--python/fatcat_tools/workers/elasticsearch.py18
1 files changed, 4 insertions, 14 deletions
diff --git a/python/fatcat_tools/workers/elasticsearch.py b/python/fatcat_tools/workers/elasticsearch.py
index 61854c31..4850bb0a 100644
--- a/python/fatcat_tools/workers/elasticsearch.py
+++ b/python/fatcat_tools/workers/elasticsearch.py
@@ -27,7 +27,6 @@ class ElasticsearchReleaseWorker(FatcatWorker):
self.elasticsearch_backend = elasticsearch_backend
self.elasticsearch_index = elasticsearch_index
self.entity_type = ReleaseEntity
- self.elasticsearch_document_name = "release"
self.transform_func = release_to_elasticsearch
self.api_host = api_host
@@ -97,14 +96,8 @@ class ElasticsearchReleaseWorker(FatcatWorker):
bulk_actions = []
for msg in batch:
json_str = msg.value().decode('utf-8')
- # HACK: work around a bug where container entities got published to
- # release_v03 topic
- if self.elasticsearch_document_name == "release":
- entity_dict = json.loads(json_str)
- if entity_dict.get('name') and not entity_dict.get('title'):
- continue
entity = entity_from_json(json_str, self.entity_type, api_client=ac)
- if self.elasticsearch_document_name == "changelog":
+ if self.entity_type == ChangelogEntry:
key = entity.index
# might need to fetch from API
if not (entity.editgroup and entity.editgroup.editor):
@@ -117,11 +110,10 @@ class ElasticsearchReleaseWorker(FatcatWorker):
}))
bulk_actions.append(json.dumps(
self.transform_func(entity)))
- print("Upserting, eg, {} (of {} {} in elasticsearch)".format(key, len(batch), self.elasticsearch_document_name))
- elasticsearch_endpoint = "{}/{}/{}/_bulk".format(
+ print("Upserting, eg, {} (of {} {} in elasticsearch)".format(key, len(batch), self.entity_type))
+ elasticsearch_endpoint = "{}/{}/_bulk".format(
self.elasticsearch_backend,
- self.elasticsearch_index,
- self.elasticsearch_document_name)
+ self.elasticsearch_index)
resp = requests.post(elasticsearch_endpoint,
headers={"Content-Type": "application/x-ndjson"},
data="\n".join(bulk_actions) + "\n")
@@ -152,7 +144,6 @@ class ElasticsearchContainerWorker(ElasticsearchReleaseWorker):
# previous group got corrupted (by pykafka library?)
self.consumer_group = "elasticsearch-updates3"
self.entity_type = ContainerEntity
- self.elasticsearch_document_name = "container"
self.transform_func = container_to_elasticsearch
@@ -174,5 +165,4 @@ class ElasticsearchChangelogWorker(ElasticsearchReleaseWorker):
self.elasticsearch_backend = elasticsearch_backend
self.elasticsearch_index = elasticsearch_index
self.entity_type = ChangelogEntry
- self.elasticsearch_document_name = "changelog"
self.transform_func = changelog_to_elasticsearch