aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorBryan Newbold <bnewbold@robocracy.org>2018-11-04 18:52:22 -0800
committerBryan Newbold <bnewbold@robocracy.org>2018-11-04 18:52:22 -0800
commite5486378d8d7adf8974b1f1ebaf0400445ba8791 (patch)
treee4f309802df8450c0eaf02ec634aea311b3804c2
parent66953b184d9b664e15cd7d7fddcb250c2b767df5 (diff)
downloadfatcat-e5486378d8d7adf8974b1f1ebaf0400445ba8791.tar.gz
fatcat-e5486378d8d7adf8974b1f1ebaf0400445ba8791.zip
elastic release worker
-rw-r--r--python/fatcat/elastic_workers.py46
-rw-r--r--python/fatcat/release_model.py15
-rwxr-xr-xpython/fatcat_worker.py10
3 files changed, 70 insertions, 1 deletions
diff --git a/python/fatcat/elastic_workers.py b/python/fatcat/elastic_workers.py
new file mode 100644
index 00000000..c3226981
--- /dev/null
+++ b/python/fatcat/elastic_workers.py
@@ -0,0 +1,46 @@
+
+import json
+import time
+import requests
+from fatcat.worker_common import FatcatWorker
+from fatcat.release_model import FatcatRelease
+from pykafka.common import OffsetType
+
+
+class FatcatElasticReleaseWorker(FatcatWorker):
+ """
+ Consumes from release-updates topic and pushes into (presumably local)
+ elasticsearch.
+
+ Uses a consumer group to manage offset.
+ """
+
+ def __init__(self, kafka_hosts, consume_topic, poll_interval=10.0, offset=None,
+ elastic_backend="http://localhost:9200", elastic_index="fatcat"):
+ super().__init__(kafka_hosts=kafka_hosts,
+ consume_topic=consume_topic,
+ api_host_url=None)
+ self.consumer_group = "elastic-updates"
+ self.elastic_backend = elastic_backend
+ self.elastic_index = elastic_index
+
+ def run(self):
+ consume_topic = self.kafka.topics[self.consume_topic]
+
+ consumer = consume_topic.get_balanced_consumer(
+ consumer_group=self.consumer_group,
+ managed=True,
+ )
+
+ for msg in consumer:
+ json_str = msg.value.decode('utf-8')
+ release = FatcatRelease.from_json(json_str)
+ #print(release)
+ elastic_endpoint = "{}/{}/release/{}".format(
+ self.elastic_backend,
+ self.elastic_index,
+ release.ident)
+ print("Updating document: {}".format(elastic_endpoint))
+ resp = requests.post(elastic_endpoint, json=release.to_elastic_dict())
+ assert resp.status_code in (200, 201)
+ consumer.commit_offsets()
diff --git a/python/fatcat/release_model.py b/python/fatcat/release_model.py
index aa0d2d9f..403fc671 100644
--- a/python/fatcat/release_model.py
+++ b/python/fatcat/release_model.py
@@ -1,4 +1,5 @@
+import collections
from fatcat_client.models import ReleaseEntity
from fatcat_client.api_client import ApiClient
@@ -23,7 +24,6 @@ class FatcatRelease(ReleaseEntity):
ident = self.ident,
revision = self.revision,
title = self.title,
- release_date = self.release_date,
release_type = self.release_type,
release_status = self.release_status,
language = self.language,
@@ -35,6 +35,9 @@ class FatcatRelease(ReleaseEntity):
wikidata_qid = self.wikidata_qid
)
+ if self.release_date:
+ t['release_date'] = self.release_date.strftime('%F')
+
container = self.container
container_is_kept = False
if container:
@@ -88,3 +91,13 @@ class FatcatRelease(ReleaseEntity):
def to_json(self):
ac = ApiClient()
return ac.sanitize_for_serialization(self)
+
+ def from_json(json_str):
+ """
+ Hack to take advantage of the code-generated deserialization code
+ """
+ ac = ApiClient()
+ thing = collections.namedtuple('Thing', ['data'])
+ thing.data = json_str
+ return ac.deserialize(thing, FatcatRelease)
+
diff --git a/python/fatcat_worker.py b/python/fatcat_worker.py
index cc11beca..50ff0fb7 100755
--- a/python/fatcat_worker.py
+++ b/python/fatcat_worker.py
@@ -3,6 +3,7 @@
import sys
import argparse
from fatcat.changelog_workers import FatcatChangelogWorker, FatcatEntityUpdatesWorker
+from fatcat.elastic_workers import FatcatElasticReleaseWorker
def run_changelog_worker(args):
topic = "fatcat-{}.changelog".format(args.env)
@@ -17,6 +18,12 @@ def run_entity_updates_worker(args):
changelog_topic, release_topic)
worker.run()
+def run_elastic_release_worker(args):
+ consume_topic = "fatcat-{}.release-updates".format(args.env)
+ worker = FatcatElasticReleaseWorker(args.kafka_hosts,
+ consume_topic)
+ worker.run()
+
def main():
parser = argparse.ArgumentParser()
parser.add_argument('--debug',
@@ -42,6 +49,9 @@ def main():
sub_entity_updates = subparsers.add_parser('entity-updates')
sub_entity_updates.set_defaults(func=run_entity_updates_worker)
+ sub_elastic_release = subparsers.add_parser('elastic-release')
+ sub_elastic_release.set_defaults(func=run_elastic_release_worker)
+
args = parser.parse_args()
if not args.__dict__.get("func"):
print("tell me what to do!")