summaryrefslogtreecommitdiffstats
path: root/python/fatcat_tools/workers
diff options
context:
space:
mode:
Diffstat (limited to 'python/fatcat_tools/workers')
-rw-r--r--python/fatcat_tools/workers/elasticsearch.py37
1 files changed, 32 insertions, 5 deletions
diff --git a/python/fatcat_tools/workers/elasticsearch.py b/python/fatcat_tools/workers/elasticsearch.py
index 4850bb0a..13409eb5 100644
--- a/python/fatcat_tools/workers/elasticsearch.py
+++ b/python/fatcat_tools/workers/elasticsearch.py
@@ -1,10 +1,16 @@
import json
+
import requests
+import elasticsearch
from confluent_kafka import Consumer, KafkaException
from fatcat_openapi_client import ReleaseEntity, ContainerEntity, ApiClient, ChangelogEntry
-from fatcat_tools import *
+from fatcat_tools import (public_api, entity_from_json,
+ release_to_elasticsearch, container_to_elasticsearch,
+ changelog_to_elasticsearch,
+)
+from fatcat_web.search import get_elastic_container_stats
from .worker_common import FatcatWorker
@@ -18,7 +24,8 @@ class ElasticsearchReleaseWorker(FatcatWorker):
def __init__(self, kafka_hosts, consume_topic, poll_interval=10.0, offset=None,
elasticsearch_backend="http://localhost:9200", elasticsearch_index="fatcat",
- batch_size=200, api_host="https://api.fatcat.wiki/v0"):
+ elasticsearch_release_index="fatcat_releases",
+ batch_size=200, api_host="https://api.fatcat.wiki/v0", query_stats=False):
super().__init__(kafka_hosts=kafka_hosts,
consume_topic=consume_topic)
self.consumer_group = "elasticsearch-updates3"
@@ -26,14 +33,19 @@ class ElasticsearchReleaseWorker(FatcatWorker):
self.poll_interval = poll_interval
self.elasticsearch_backend = elasticsearch_backend
self.elasticsearch_index = elasticsearch_index
+ self.elasticsearch_release_index = elasticsearch_release_index
self.entity_type = ReleaseEntity
self.transform_func = release_to_elasticsearch
self.api_host = api_host
+ self.query_stats = query_stats
def run(self):
ac = ApiClient()
api = public_api(self.api_host)
+ # only used by container indexing query_stats code path
+ es_client = elasticsearch.Elasticsearch(self.elasticsearch_backend)
+
def fail_fast(err, partitions):
if err is not None:
print("Kafka consumer commit error: {}".format(err))
@@ -104,13 +116,25 @@ class ElasticsearchReleaseWorker(FatcatWorker):
entity = api.get_changelog_entry(entity.index)
else:
key = entity.ident
+
+ if self.entity_type == ContainerEntity and self.query_stats:
+ stats = get_elastic_container_stats(
+ entity.ident,
+ es_client=es_client,
+ es_index=self.elasticsearch_release_index,
+ merge_shadows=True,
+ )
+ doc_dict = container_to_elasticsearch(entity, stats=stats)
+ else:
+ doc_dict = self.transform_func(entity)
+
# TODO: handle deletions from index
bulk_actions.append(json.dumps({
"index": { "_id": key, },
}))
- bulk_actions.append(json.dumps(
- self.transform_func(entity)))
- print("Upserting, eg, {} (of {} {} in elasticsearch)".format(key, len(batch), self.entity_type))
+ bulk_actions.append(json.dumps(doc_dict))
+
+ print("Upserting, eg, {} (of {} {} in elasticsearch)".format(key, len(batch), self.entity_type.__name__))
elasticsearch_endpoint = "{}/{}/_bulk".format(
self.elasticsearch_backend,
self.elasticsearch_index)
@@ -132,6 +156,7 @@ class ElasticsearchReleaseWorker(FatcatWorker):
class ElasticsearchContainerWorker(ElasticsearchReleaseWorker):
def __init__(self, kafka_hosts, consume_topic, poll_interval=10.0, offset=None,
+ query_stats=False, elasticsearch_release_index="fatcat_release",
elasticsearch_backend="http://localhost:9200", elasticsearch_index="fatcat",
batch_size=200):
super().__init__(kafka_hosts=kafka_hosts,
@@ -140,6 +165,8 @@ class ElasticsearchContainerWorker(ElasticsearchReleaseWorker):
offset=offset,
elasticsearch_backend=elasticsearch_backend,
elasticsearch_index=elasticsearch_index,
+ elasticsearch_release_index=elasticsearch_release_index,
+ query_stats=query_stats,
batch_size=batch_size)
# previous group got corrupted (by pykafka library?)
self.consumer_group = "elasticsearch-updates3"