diff options
author | Bryan Newbold <bnewbold@robocracy.org> | 2018-11-15 13:11:52 -0800 |
---|---|---|
committer | Bryan Newbold <bnewbold@robocracy.org> | 2018-11-15 13:15:15 -0800 |
commit | bb28a3fc1cc900f2dde31e1dbc492d9661034f41 (patch) | |
tree | f037dd3d1bab6cbf08a562dbdd4c09361fe0c030 /python/fatcat_tools/workers/elasticsearch.py | |
parent | 9f817c6c70a749f2ac449ab4edfd26c6dd8a7410 (diff) | |
download | fatcat-bb28a3fc1cc900f2dde31e1dbc492d9661034f41.tar.gz fatcat-bb28a3fc1cc900f2dde31e1dbc492d9661034f41.zip |
large refactor of python names/paths
- Add __init__.py files for fatcat_tools submodules, and use them in
imports
- Add a bunch of comments to files.
- rename a number of classes and functions to be less verbose
Diffstat (limited to 'python/fatcat_tools/workers/elasticsearch.py')
-rw-r--r-- | python/fatcat_tools/workers/elasticsearch.py | 52 |
1 files changed, 52 insertions, 0 deletions
diff --git a/python/fatcat_tools/workers/elasticsearch.py b/python/fatcat_tools/workers/elasticsearch.py new file mode 100644 index 00000000..e7abd5ee --- /dev/null +++ b/python/fatcat_tools/workers/elasticsearch.py @@ -0,0 +1,52 @@ + +import json +import time +import requests +from pykafka.common import OffsetType + +from fatcat_client import ReleaseEntity +from fatcat_tools import * +from .worker_common import FatcatWorker + + +class ElasticsearchReleaseWorker(FatcatWorker): + """ + Consumes from release-updates topic and pushes into (presumably local) + elasticsearch. + + Uses a consumer group to manage offset. + """ + + def __init__(self, kafka_hosts, consume_topic, poll_interval=10.0, offset=None, + elasticsearch_backend="http://localhost:9200", elasticsearch_index="fatcat"): + super().__init__(kafka_hosts=kafka_hosts, + consume_topic=consume_topic, + api_host_url=None) + self.consumer_group = "elasticsearch-updates" + self.elasticsearch_backend = elasticsearch_backend + self.elasticsearch_index = elasticsearch_index + + def run(self): + consume_topic = self.kafka.topics[self.consume_topic] + + consumer = consume_topic.get_balanced_consumer( + consumer_group=self.consumer_group, + managed=True, + fetch_message_max_bytes=4000000, # up to ~4MBytes + auto_commit_enable=True, + auto_commit_interval_ms=30000, # 30 seconds + compacted_topic=True, + ) + + for msg in consumer: + json_str = msg.value.decode('utf-8') + release = entity_from_json(json_str, ReleaseEntity) + #print(release) + elasticsearch_endpoint = "{}/{}/release/{}".format( + self.elasticsearch_backend, + self.elasticsearch_index, + release.ident) + print("Updating document: {}".format(elasticsearch_endpoint)) + resp = requests.post(elasticsearch_endpoint, json=release_to_elasticsearch(release)) + assert resp.status_code in (200, 201) + #consumer.commit_offsets() |