diff options
Diffstat (limited to 'python')
| -rw-r--r-- | python/fatcat_tools/workers/__init__.py | 2 | ||||
| -rw-r--r-- | python/fatcat_tools/workers/elasticsearch.py | 30 | ||||
| -rwxr-xr-x | python/fatcat_worker.py | 16 | 
3 files changed, 42 insertions, 6 deletions
| diff --git a/python/fatcat_tools/workers/__init__.py b/python/fatcat_tools/workers/__init__.py index e8973bc3..8bea7cdc 100644 --- a/python/fatcat_tools/workers/__init__.py +++ b/python/fatcat_tools/workers/__init__.py @@ -1,4 +1,4 @@  from .changelog import ChangelogWorker, EntityUpdatesWorker -from .elasticsearch import ElasticsearchReleaseWorker +from .elasticsearch import ElasticsearchReleaseWorker, ElasticsearchContainerWorker  from .worker_common import most_recent_message, FatcatWorker diff --git a/python/fatcat_tools/workers/elasticsearch.py b/python/fatcat_tools/workers/elasticsearch.py index 83310284..6e336e52 100644 --- a/python/fatcat_tools/workers/elasticsearch.py +++ b/python/fatcat_tools/workers/elasticsearch.py @@ -24,6 +24,9 @@ class ElasticsearchReleaseWorker(FatcatWorker):          self.consumer_group = "elasticsearch-updates"          self.elasticsearch_backend = elasticsearch_backend          self.elasticsearch_index = elasticsearch_index +        self.entity_type = ReleaseEntity +        self.elasticsearch_document_name = "release" +        self.transform_func = release_to_elasticsearch      def run(self):          consume_topic = self.kafka.topics[self.consume_topic] @@ -40,13 +43,30 @@ class ElasticsearchReleaseWorker(FatcatWorker):          for msg in consumer:              json_str = msg.value.decode('utf-8') -            release = entity_from_json(json_str, ReleaseEntity, api_client=ac) -            #print(release) -            elasticsearch_endpoint = "{}/{}/release/{}".format( +            entity = entity_from_json(json_str, self.entity_type, api_client=ac) +            #print(entity) +            elasticsearch_endpoint = "{}/{}/{}/{}".format(                  self.elasticsearch_backend,                  self.elasticsearch_index, -                release.ident) +                self.elasticsearch_document_name, +                entity.ident)              print("Updating document: {}".format(elasticsearch_endpoint)) -            resp = requests.post(elasticsearch_endpoint, json=release_to_elasticsearch(release)) +            resp = requests.post(elasticsearch_endpoint, json=self.transform_func(entity))              resp.raise_for_status()              #consumer.commit_offsets() + + +class ElasticsearchContainerWorker(ElasticsearchReleaseWorker): + +    def __init__(self, kafka_hosts, consume_topic, poll_interval=10.0, offset=None, +            elasticsearch_backend="http://localhost:9200", elasticsearch_index="fatcat"): +        super().__init__(kafka_hosts=kafka_hosts, +                         consume_topic=consume_topic, +                         poll_interval, +                         offset, +                         elasticsearch_backend, +                         elasticsearch_index) +        self.entity_type = ContainerEntity +        self.elasticsearch_document_name = "container" +        self.transform_func = container_to_elasticsearch + diff --git a/python/fatcat_worker.py b/python/fatcat_worker.py index 267779ff..4455d29e 100755 --- a/python/fatcat_worker.py +++ b/python/fatcat_worker.py @@ -38,6 +38,13 @@ def run_elasticsearch_release(args):          elasticsearch_index=args.elasticsearch_index)      worker.run() +def run_elasticsearch_container(args): +    consume_topic = "fatcat-{}.container-updates".format(args.env) +    worker = ElasticsearchContainerWorker(args.kafka_hosts, consume_topic, +        elasticsearch_backend=args.elasticsearch_backend, +        elasticsearch_index=args.elasticsearch_index) +    worker.run() +  def main():      parser = argparse.ArgumentParser()      parser.add_argument('--debug', @@ -72,6 +79,15 @@ def main():          help="elasticsearch index to push into",          default="fatcat_release_v03") +    sub_elasticsearch_container = subparsers.add_parser('elasticsearch-container') +    sub_elasticsearch_container.set_defaults(func=run_elasticsearch_container) +    sub_elasticsearch_container.add_argument('--elasticsearch-backend', +        help="elasticsearch backend to connect to", +        default="http://localhost:9200") +    sub_elasticsearch_container.add_argument('--elasticsearch-index', +        help="elasticsearch index to push into", +        default="fatcat_container") +      args = parser.parse_args()      if not args.__dict__.get("func"):          print("tell me what to do!") | 
