diff options
Diffstat (limited to 'python/fatcat_tools/workers')
-rw-r--r-- | python/fatcat_tools/workers/__init__.py | 4 | ||||
-rw-r--r-- | python/fatcat_tools/workers/changelog.py | 7 | ||||
-rw-r--r-- | python/fatcat_tools/workers/elasticsearch.py (renamed from python/fatcat_tools/workers/elastic.py) | 28 |
3 files changed, 22 insertions, 17 deletions
diff --git a/python/fatcat_tools/workers/__init__.py b/python/fatcat_tools/workers/__init__.py new file mode 100644 index 00000000..e8973bc3 --- /dev/null +++ b/python/fatcat_tools/workers/__init__.py @@ -0,0 +1,4 @@ + +from .changelog import ChangelogWorker, EntityUpdatesWorker +from .elasticsearch import ElasticsearchReleaseWorker +from .worker_common import most_recent_message, FatcatWorker diff --git a/python/fatcat_tools/workers/changelog.py b/python/fatcat_tools/workers/changelog.py index d07b5988..e64c043b 100644 --- a/python/fatcat_tools/workers/changelog.py +++ b/python/fatcat_tools/workers/changelog.py @@ -1,11 +1,12 @@ import json import time -from fatcat_tools.workers.worker_common import FatcatWorker, most_recent_message from pykafka.common import OffsetType +from .worker_common import FatcatWorker, most_recent_message -class FatcatChangelogWorker(FatcatWorker): + +class ChangelogWorker(FatcatWorker): """ Periodically polls the fatcat API looking for new changelogs. When they are found, fetch them and push (as JSON) into a Kafka topic. @@ -52,7 +53,7 @@ class FatcatChangelogWorker(FatcatWorker): time.sleep(self.poll_interval) -class FatcatEntityUpdatesWorker(FatcatWorker): +class EntityUpdatesWorker(FatcatWorker): """ Consumes from the changelog topic and publishes expanded entities (fetched from API) to update topics. diff --git a/python/fatcat_tools/workers/elastic.py b/python/fatcat_tools/workers/elasticsearch.py index 3a75a1b3..e7abd5ee 100644 --- a/python/fatcat_tools/workers/elastic.py +++ b/python/fatcat_tools/workers/elasticsearch.py @@ -2,14 +2,14 @@ import json import time import requests -from fatcat_tools.transforms import release_elastic_dict -from fatcat_tools.workers.worker_common import FatcatWorker -from fatcat_client import ReleaseEntity -from fatcat_tools.transforms import * from pykafka.common import OffsetType +from fatcat_client import ReleaseEntity +from fatcat_tools import * +from .worker_common import FatcatWorker + -class FatcatElasticReleaseWorker(FatcatWorker): +class ElasticsearchReleaseWorker(FatcatWorker): """ Consumes from release-updates topic and pushes into (presumably local) elasticsearch. @@ -18,13 +18,13 @@ class FatcatElasticReleaseWorker(FatcatWorker): """ def __init__(self, kafka_hosts, consume_topic, poll_interval=10.0, offset=None, - elastic_backend="http://localhost:9200", elastic_index="fatcat"): + elasticsearch_backend="http://localhost:9200", elasticsearch_index="fatcat"): super().__init__(kafka_hosts=kafka_hosts, consume_topic=consume_topic, api_host_url=None) - self.consumer_group = "elastic-updates" - self.elastic_backend = elastic_backend - self.elastic_index = elastic_index + self.consumer_group = "elasticsearch-updates" + self.elasticsearch_backend = elasticsearch_backend + self.elasticsearch_index = elasticsearch_index def run(self): consume_topic = self.kafka.topics[self.consume_topic] @@ -42,11 +42,11 @@ class FatcatElasticReleaseWorker(FatcatWorker): json_str = msg.value.decode('utf-8') release = entity_from_json(json_str, ReleaseEntity) #print(release) - elastic_endpoint = "{}/{}/release/{}".format( - self.elastic_backend, - self.elastic_index, + elasticsearch_endpoint = "{}/{}/release/{}".format( + self.elasticsearch_backend, + self.elasticsearch_index, release.ident) - print("Updating document: {}".format(elastic_endpoint)) - resp = requests.post(elastic_endpoint, json=release_elastic_dict(release)) + print("Updating document: {}".format(elasticsearch_endpoint)) + resp = requests.post(elasticsearch_endpoint, json=release_to_elasticsearch(release)) assert resp.status_code in (200, 201) #consumer.commit_offsets() |