diff options
author | Bryan Newbold <bnewbold@robocracy.org> | 2018-11-13 11:32:41 -0800 |
---|---|---|
committer | Bryan Newbold <bnewbold@robocracy.org> | 2018-11-13 11:32:41 -0800 |
commit | 279b22e30d9b590838268f5f5acdaa1110ee593a (patch) | |
tree | c9965a089be1b8ef607573ea9261c0c378c0ab47 /python/fatcat_tools/workers/elastic.py | |
parent | 7ebda2e051b51e49544ab75673b19ec5f27d9d45 (diff) | |
download | fatcat-279b22e30d9b590838268f5f5acdaa1110ee593a.tar.gz fatcat-279b22e30d9b590838268f5f5acdaa1110ee593a.zip |
shuffle around fatcat_tools layout
Diffstat (limited to 'python/fatcat_tools/workers/elastic.py')
-rw-r--r-- | python/fatcat_tools/workers/elastic.py | 47 |
1 files changed, 47 insertions, 0 deletions
diff --git a/python/fatcat_tools/workers/elastic.py b/python/fatcat_tools/workers/elastic.py new file mode 100644 index 00000000..46632792 --- /dev/null +++ b/python/fatcat_tools/workers/elastic.py @@ -0,0 +1,47 @@ + +import json +import time +import requests +from fatcat_tools.workers.worker_common import FatcatWorker +from fatcat_client.models import ReleaseEntity +from fatcat_tools.transforms import * +from pykafka.common import OffsetType + + +class FatcatElasticReleaseWorker(FatcatWorker): + """ + Consumes from release-updates topic and pushes into (presumably local) + elasticsearch. + + Uses a consumer group to manage offset. + """ + + def __init__(self, kafka_hosts, consume_topic, poll_interval=10.0, offset=None, + elastic_backend="http://localhost:9200", elastic_index="fatcat"): + super().__init__(kafka_hosts=kafka_hosts, + consume_topic=consume_topic, + api_host_url=None) + self.consumer_group = "elastic-updates" + self.elastic_backend = elastic_backend + self.elastic_index = elastic_index + + def run(self): + consume_topic = self.kafka.topics[self.consume_topic] + + consumer = consume_topic.get_balanced_consumer( + consumer_group=self.consumer_group, + managed=True, + ) + + for msg in consumer: + json_str = msg.value.decode('utf-8') + release = entity_from_json(json_str, ReleaseEntity) + #print(release) + elastic_endpoint = "{}/{}/release/{}".format( + self.elastic_backend, + self.elastic_index, + release.ident) + print("Updating document: {}".format(elastic_endpoint)) + resp = requests.post(elastic_endpoint, json=release.to_elastic_dict()) + assert resp.status_code in (200, 201) + consumer.commit_offsets() |