aboutsummaryrefslogtreecommitdiffstats
path: root/python/fatcat_tools/workers
diff options
context:
space:
mode:
Diffstat (limited to 'python/fatcat_tools/workers')
-rw-r--r--python/fatcat_tools/workers/__init__.py2
-rw-r--r--python/fatcat_tools/workers/elasticsearch.py30
2 files changed, 26 insertions, 6 deletions
diff --git a/python/fatcat_tools/workers/__init__.py b/python/fatcat_tools/workers/__init__.py
index e8973bc3..8bea7cdc 100644
--- a/python/fatcat_tools/workers/__init__.py
+++ b/python/fatcat_tools/workers/__init__.py
@@ -1,4 +1,4 @@
from .changelog import ChangelogWorker, EntityUpdatesWorker
-from .elasticsearch import ElasticsearchReleaseWorker
+from .elasticsearch import ElasticsearchReleaseWorker, ElasticsearchContainerWorker
from .worker_common import most_recent_message, FatcatWorker
diff --git a/python/fatcat_tools/workers/elasticsearch.py b/python/fatcat_tools/workers/elasticsearch.py
index 83310284..6e336e52 100644
--- a/python/fatcat_tools/workers/elasticsearch.py
+++ b/python/fatcat_tools/workers/elasticsearch.py
@@ -24,6 +24,9 @@ class ElasticsearchReleaseWorker(FatcatWorker):
self.consumer_group = "elasticsearch-updates"
self.elasticsearch_backend = elasticsearch_backend
self.elasticsearch_index = elasticsearch_index
+ self.entity_type = ReleaseEntity
+ self.elasticsearch_document_name = "release"
+ self.transform_func = release_to_elasticsearch
def run(self):
consume_topic = self.kafka.topics[self.consume_topic]
@@ -40,13 +43,30 @@ class ElasticsearchReleaseWorker(FatcatWorker):
for msg in consumer:
json_str = msg.value.decode('utf-8')
- release = entity_from_json(json_str, ReleaseEntity, api_client=ac)
- #print(release)
- elasticsearch_endpoint = "{}/{}/release/{}".format(
+ entity = entity_from_json(json_str, self.entity_type, api_client=ac)
+ #print(entity)
+ elasticsearch_endpoint = "{}/{}/{}/{}".format(
self.elasticsearch_backend,
self.elasticsearch_index,
- release.ident)
+ self.elasticsearch_document_name,
+ entity.ident)
print("Updating document: {}".format(elasticsearch_endpoint))
- resp = requests.post(elasticsearch_endpoint, json=release_to_elasticsearch(release))
+ resp = requests.post(elasticsearch_endpoint, json=self.transform_func(entity))
resp.raise_for_status()
#consumer.commit_offsets()
+
+
+class ElasticsearchContainerWorker(ElasticsearchReleaseWorker):
+
+ def __init__(self, kafka_hosts, consume_topic, poll_interval=10.0, offset=None,
+ elasticsearch_backend="http://localhost:9200", elasticsearch_index="fatcat"):
+ super().__init__(kafka_hosts=kafka_hosts,
+ consume_topic=consume_topic,
+ poll_interval,
+ offset,
+ elasticsearch_backend,
+ elasticsearch_index)
+ self.entity_type = ContainerEntity
+ self.elasticsearch_document_name = "container"
+ self.transform_func = container_to_elasticsearch
+