diff options
Diffstat (limited to 'python')
| -rw-r--r-- | python/fatcat/elastic_workers.py | 46 | ||||
| -rw-r--r-- | python/fatcat/release_model.py | 15 | ||||
| -rwxr-xr-x | python/fatcat_worker.py | 10 | 
3 files changed, 70 insertions, 1 deletions
| diff --git a/python/fatcat/elastic_workers.py b/python/fatcat/elastic_workers.py new file mode 100644 index 00000000..c3226981 --- /dev/null +++ b/python/fatcat/elastic_workers.py @@ -0,0 +1,46 @@ + +import json +import time +import requests +from fatcat.worker_common import FatcatWorker +from fatcat.release_model import FatcatRelease +from pykafka.common import OffsetType + + +class FatcatElasticReleaseWorker(FatcatWorker): +    """ +    Consumes from release-updates topic and pushes into (presumably local) +    elasticsearch. + +    Uses a consumer group to manage offset. +    """ + +    def __init__(self, kafka_hosts, consume_topic, poll_interval=10.0, offset=None, +            elastic_backend="http://localhost:9200", elastic_index="fatcat"): +        super().__init__(kafka_hosts=kafka_hosts, +                         consume_topic=consume_topic, +                         api_host_url=None) +        self.consumer_group = "elastic-updates" +        self.elastic_backend = elastic_backend +        self.elastic_index = elastic_index + +    def run(self): +        consume_topic = self.kafka.topics[self.consume_topic] + +        consumer = consume_topic.get_balanced_consumer( +            consumer_group=self.consumer_group, +            managed=True, +        ) + +        for msg in consumer: +            json_str = msg.value.decode('utf-8') +            release = FatcatRelease.from_json(json_str) +            #print(release) +            elastic_endpoint = "{}/{}/release/{}".format( +                self.elastic_backend, +                self.elastic_index, +                release.ident) +            print("Updating document: {}".format(elastic_endpoint)) +            resp = requests.post(elastic_endpoint, json=release.to_elastic_dict()) +            assert resp.status_code in (200, 201) +            consumer.commit_offsets() diff --git a/python/fatcat/release_model.py b/python/fatcat/release_model.py index aa0d2d9f..403fc671 100644 --- a/python/fatcat/release_model.py +++ b/python/fatcat/release_model.py @@ -1,4 +1,5 @@ +import collections  from fatcat_client.models import ReleaseEntity  from fatcat_client.api_client import ApiClient @@ -23,7 +24,6 @@ class FatcatRelease(ReleaseEntity):              ident = self.ident,              revision = self.revision,              title = self.title, -            release_date = self.release_date,              release_type = self.release_type,              release_status = self.release_status,              language = self.language, @@ -35,6 +35,9 @@ class FatcatRelease(ReleaseEntity):              wikidata_qid = self.wikidata_qid          ) +        if self.release_date: +            t['release_date'] = self.release_date.strftime('%F') +          container = self.container          container_is_kept = False          if container: @@ -88,3 +91,13 @@ class FatcatRelease(ReleaseEntity):      def to_json(self):          ac = ApiClient()          return ac.sanitize_for_serialization(self) + +    def from_json(json_str): +        """ +        Hack to take advantage of the code-generated deserialization code +        """ +        ac = ApiClient() +        thing = collections.namedtuple('Thing', ['data']) +        thing.data = json_str +        return ac.deserialize(thing, FatcatRelease) + diff --git a/python/fatcat_worker.py b/python/fatcat_worker.py index cc11beca..50ff0fb7 100755 --- a/python/fatcat_worker.py +++ b/python/fatcat_worker.py @@ -3,6 +3,7 @@  import sys  import argparse  from fatcat.changelog_workers import FatcatChangelogWorker, FatcatEntityUpdatesWorker +from fatcat.elastic_workers import FatcatElasticReleaseWorker  def run_changelog_worker(args):      topic = "fatcat-{}.changelog".format(args.env) @@ -17,6 +18,12 @@ def run_entity_updates_worker(args):          changelog_topic, release_topic)      worker.run() +def run_elastic_release_worker(args): +    consume_topic = "fatcat-{}.release-updates".format(args.env) +    worker = FatcatElasticReleaseWorker(args.kafka_hosts, +        consume_topic) +    worker.run() +  def main():      parser = argparse.ArgumentParser()      parser.add_argument('--debug', @@ -42,6 +49,9 @@ def main():      sub_entity_updates = subparsers.add_parser('entity-updates')      sub_entity_updates.set_defaults(func=run_entity_updates_worker) +    sub_elastic_release = subparsers.add_parser('elastic-release') +    sub_elastic_release.set_defaults(func=run_elastic_release_worker) +      args = parser.parse_args()      if not args.__dict__.get("func"):          print("tell me what to do!") | 
