From e5486378d8d7adf8974b1f1ebaf0400445ba8791 Mon Sep 17 00:00:00 2001 From: Bryan Newbold Date: Sun, 4 Nov 2018 18:52:22 -0800 Subject: elastic release worker --- python/fatcat/elastic_workers.py | 46 ++++++++++++++++++++++++++++++++++++++++ python/fatcat/release_model.py | 15 ++++++++++++- python/fatcat_worker.py | 10 +++++++++ 3 files changed, 70 insertions(+), 1 deletion(-) create mode 100644 python/fatcat/elastic_workers.py diff --git a/python/fatcat/elastic_workers.py b/python/fatcat/elastic_workers.py new file mode 100644 index 00000000..c3226981 --- /dev/null +++ b/python/fatcat/elastic_workers.py @@ -0,0 +1,46 @@ + +import json +import time +import requests +from fatcat.worker_common import FatcatWorker +from fatcat.release_model import FatcatRelease +from pykafka.common import OffsetType + + +class FatcatElasticReleaseWorker(FatcatWorker): + """ + Consumes from release-updates topic and pushes into (presumably local) + elasticsearch. + + Uses a consumer group to manage offset. + """ + + def __init__(self, kafka_hosts, consume_topic, poll_interval=10.0, offset=None, + elastic_backend="http://localhost:9200", elastic_index="fatcat"): + super().__init__(kafka_hosts=kafka_hosts, + consume_topic=consume_topic, + api_host_url=None) + self.consumer_group = "elastic-updates" + self.elastic_backend = elastic_backend + self.elastic_index = elastic_index + + def run(self): + consume_topic = self.kafka.topics[self.consume_topic] + + consumer = consume_topic.get_balanced_consumer( + consumer_group=self.consumer_group, + managed=True, + ) + + for msg in consumer: + json_str = msg.value.decode('utf-8') + release = FatcatRelease.from_json(json_str) + #print(release) + elastic_endpoint = "{}/{}/release/{}".format( + self.elastic_backend, + self.elastic_index, + release.ident) + print("Updating document: {}".format(elastic_endpoint)) + resp = requests.post(elastic_endpoint, json=release.to_elastic_dict()) + assert resp.status_code in (200, 201) + consumer.commit_offsets() diff --git a/python/fatcat/release_model.py b/python/fatcat/release_model.py index aa0d2d9f..403fc671 100644 --- a/python/fatcat/release_model.py +++ b/python/fatcat/release_model.py @@ -1,4 +1,5 @@ +import collections from fatcat_client.models import ReleaseEntity from fatcat_client.api_client import ApiClient @@ -23,7 +24,6 @@ class FatcatRelease(ReleaseEntity): ident = self.ident, revision = self.revision, title = self.title, - release_date = self.release_date, release_type = self.release_type, release_status = self.release_status, language = self.language, @@ -35,6 +35,9 @@ class FatcatRelease(ReleaseEntity): wikidata_qid = self.wikidata_qid ) + if self.release_date: + t['release_date'] = self.release_date.strftime('%F') + container = self.container container_is_kept = False if container: @@ -88,3 +91,13 @@ class FatcatRelease(ReleaseEntity): def to_json(self): ac = ApiClient() return ac.sanitize_for_serialization(self) + + def from_json(json_str): + """ + Hack to take advantage of the code-generated deserialization code + """ + ac = ApiClient() + thing = collections.namedtuple('Thing', ['data']) + thing.data = json_str + return ac.deserialize(thing, FatcatRelease) + diff --git a/python/fatcat_worker.py b/python/fatcat_worker.py index cc11beca..50ff0fb7 100755 --- a/python/fatcat_worker.py +++ b/python/fatcat_worker.py @@ -3,6 +3,7 @@ import sys import argparse from fatcat.changelog_workers import FatcatChangelogWorker, FatcatEntityUpdatesWorker +from fatcat.elastic_workers import FatcatElasticReleaseWorker def run_changelog_worker(args): topic = "fatcat-{}.changelog".format(args.env) @@ -17,6 +18,12 @@ def run_entity_updates_worker(args): changelog_topic, release_topic) worker.run() +def run_elastic_release_worker(args): + consume_topic = "fatcat-{}.release-updates".format(args.env) + worker = FatcatElasticReleaseWorker(args.kafka_hosts, + consume_topic) + worker.run() + def main(): parser = argparse.ArgumentParser() parser.add_argument('--debug', @@ -42,6 +49,9 @@ def main(): sub_entity_updates = subparsers.add_parser('entity-updates') sub_entity_updates.set_defaults(func=run_entity_updates_worker) + sub_elastic_release = subparsers.add_parser('elastic-release') + sub_elastic_release.set_defaults(func=run_elastic_release_worker) + args = parser.parse_args() if not args.__dict__.get("func"): print("tell me what to do!") -- cgit v1.2.3