diff options
author | Bryan Newbold <bnewbold@robocracy.org> | 2018-11-12 23:18:56 -0800 |
---|---|---|
committer | Bryan Newbold <bnewbold@robocracy.org> | 2018-11-12 23:18:56 -0800 |
commit | b03bfc8f3fd84141738f775b273a99850d78e1ff (patch) | |
tree | 64858e474fa38aa015f06f5e15b851dcc85da421 /python/fatcat_tools/elastic_workers.py | |
parent | 055c464deea8cdaccf3ed384995d4409b0f51409 (diff) | |
download | fatcat-b03bfc8f3fd84141738f775b273a99850d78e1ff.tar.gz fatcat-b03bfc8f3fd84141738f775b273a99850d78e1ff.zip |
refactor python modules
Diffstat (limited to 'python/fatcat_tools/elastic_workers.py')
-rw-r--r-- | python/fatcat_tools/elastic_workers.py | 47 |
1 files changed, 47 insertions, 0 deletions
diff --git a/python/fatcat_tools/elastic_workers.py b/python/fatcat_tools/elastic_workers.py new file mode 100644 index 00000000..3d2e9c39 --- /dev/null +++ b/python/fatcat_tools/elastic_workers.py @@ -0,0 +1,47 @@ + +import json +import time +import requests +from fatcat.worker_common import FatcatWorker +from fatcat_client.models import ReleaseEntity +from fatcat.entity_helpers import * +from pykafka.common import OffsetType + + +class FatcatElasticReleaseWorker(FatcatWorker): + """ + Consumes from release-updates topic and pushes into (presumably local) + elasticsearch. + + Uses a consumer group to manage offset. + """ + + def __init__(self, kafka_hosts, consume_topic, poll_interval=10.0, offset=None, + elastic_backend="http://localhost:9200", elastic_index="fatcat"): + super().__init__(kafka_hosts=kafka_hosts, + consume_topic=consume_topic, + api_host_url=None) + self.consumer_group = "elastic-updates" + self.elastic_backend = elastic_backend + self.elastic_index = elastic_index + + def run(self): + consume_topic = self.kafka.topics[self.consume_topic] + + consumer = consume_topic.get_balanced_consumer( + consumer_group=self.consumer_group, + managed=True, + ) + + for msg in consumer: + json_str = msg.value.decode('utf-8') + release = entity_from_json(json_str, ReleaseEntity) + #print(release) + elastic_endpoint = "{}/{}/release/{}".format( + self.elastic_backend, + self.elastic_index, + release.ident) + print("Updating document: {}".format(elastic_endpoint)) + resp = requests.post(elastic_endpoint, json=release.to_elastic_dict()) + assert resp.status_code in (200, 201) + consumer.commit_offsets() |