aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--python/fatcat_tools/workers/__init__.py2
-rw-r--r--python/fatcat_tools/workers/elasticsearch.py30
-rwxr-xr-xpython/fatcat_worker.py16
3 files changed, 42 insertions, 6 deletions
diff --git a/python/fatcat_tools/workers/__init__.py b/python/fatcat_tools/workers/__init__.py
index e8973bc3..8bea7cdc 100644
--- a/python/fatcat_tools/workers/__init__.py
+++ b/python/fatcat_tools/workers/__init__.py
@@ -1,4 +1,4 @@
from .changelog import ChangelogWorker, EntityUpdatesWorker
-from .elasticsearch import ElasticsearchReleaseWorker
+from .elasticsearch import ElasticsearchReleaseWorker, ElasticsearchContainerWorker
from .worker_common import most_recent_message, FatcatWorker
diff --git a/python/fatcat_tools/workers/elasticsearch.py b/python/fatcat_tools/workers/elasticsearch.py
index 83310284..6e336e52 100644
--- a/python/fatcat_tools/workers/elasticsearch.py
+++ b/python/fatcat_tools/workers/elasticsearch.py
@@ -24,6 +24,9 @@ class ElasticsearchReleaseWorker(FatcatWorker):
self.consumer_group = "elasticsearch-updates"
self.elasticsearch_backend = elasticsearch_backend
self.elasticsearch_index = elasticsearch_index
+ self.entity_type = ReleaseEntity
+ self.elasticsearch_document_name = "release"
+ self.transform_func = release_to_elasticsearch
def run(self):
consume_topic = self.kafka.topics[self.consume_topic]
@@ -40,13 +43,30 @@ class ElasticsearchReleaseWorker(FatcatWorker):
for msg in consumer:
json_str = msg.value.decode('utf-8')
- release = entity_from_json(json_str, ReleaseEntity, api_client=ac)
- #print(release)
- elasticsearch_endpoint = "{}/{}/release/{}".format(
+ entity = entity_from_json(json_str, self.entity_type, api_client=ac)
+ #print(entity)
+ elasticsearch_endpoint = "{}/{}/{}/{}".format(
self.elasticsearch_backend,
self.elasticsearch_index,
- release.ident)
+ self.elasticsearch_document_name,
+ entity.ident)
print("Updating document: {}".format(elasticsearch_endpoint))
- resp = requests.post(elasticsearch_endpoint, json=release_to_elasticsearch(release))
+ resp = requests.post(elasticsearch_endpoint, json=self.transform_func(entity))
resp.raise_for_status()
#consumer.commit_offsets()
+
+
+class ElasticsearchContainerWorker(ElasticsearchReleaseWorker):
+
+ def __init__(self, kafka_hosts, consume_topic, poll_interval=10.0, offset=None,
+ elasticsearch_backend="http://localhost:9200", elasticsearch_index="fatcat"):
+ super().__init__(kafka_hosts=kafka_hosts,
+ consume_topic=consume_topic,
+ poll_interval,
+ offset,
+ elasticsearch_backend,
+ elasticsearch_index)
+ self.entity_type = ContainerEntity
+ self.elasticsearch_document_name = "container"
+ self.transform_func = container_to_elasticsearch
+
diff --git a/python/fatcat_worker.py b/python/fatcat_worker.py
index 267779ff..4455d29e 100755
--- a/python/fatcat_worker.py
+++ b/python/fatcat_worker.py
@@ -38,6 +38,13 @@ def run_elasticsearch_release(args):
elasticsearch_index=args.elasticsearch_index)
worker.run()
+def run_elasticsearch_container(args):
+ consume_topic = "fatcat-{}.container-updates".format(args.env)
+ worker = ElasticsearchContainerWorker(args.kafka_hosts, consume_topic,
+ elasticsearch_backend=args.elasticsearch_backend,
+ elasticsearch_index=args.elasticsearch_index)
+ worker.run()
+
def main():
parser = argparse.ArgumentParser()
parser.add_argument('--debug',
@@ -72,6 +79,15 @@ def main():
help="elasticsearch index to push into",
default="fatcat_release_v03")
+ sub_elasticsearch_container = subparsers.add_parser('elasticsearch-container')
+ sub_elasticsearch_container.set_defaults(func=run_elasticsearch_container)
+ sub_elasticsearch_container.add_argument('--elasticsearch-backend',
+ help="elasticsearch backend to connect to",
+ default="http://localhost:9200")
+ sub_elasticsearch_container.add_argument('--elasticsearch-index',
+ help="elasticsearch index to push into",
+ default="fatcat_container")
+
args = parser.parse_args()
if not args.__dict__.get("func"):
print("tell me what to do!")