summaryrefslogtreecommitdiffstats
path: root/python/fatcat_tools/workers
diff options
context:
space:
mode:
Diffstat (limited to 'python/fatcat_tools/workers')
-rw-r--r--python/fatcat_tools/workers/__init__.py4
-rw-r--r--python/fatcat_tools/workers/changelog.py7
-rw-r--r--python/fatcat_tools/workers/elasticsearch.py (renamed from python/fatcat_tools/workers/elastic.py)28
3 files changed, 22 insertions, 17 deletions
diff --git a/python/fatcat_tools/workers/__init__.py b/python/fatcat_tools/workers/__init__.py
new file mode 100644
index 00000000..e8973bc3
--- /dev/null
+++ b/python/fatcat_tools/workers/__init__.py
@@ -0,0 +1,4 @@
+
+from .changelog import ChangelogWorker, EntityUpdatesWorker
+from .elasticsearch import ElasticsearchReleaseWorker
+from .worker_common import most_recent_message, FatcatWorker
diff --git a/python/fatcat_tools/workers/changelog.py b/python/fatcat_tools/workers/changelog.py
index d07b5988..e64c043b 100644
--- a/python/fatcat_tools/workers/changelog.py
+++ b/python/fatcat_tools/workers/changelog.py
@@ -1,11 +1,12 @@
import json
import time
-from fatcat_tools.workers.worker_common import FatcatWorker, most_recent_message
from pykafka.common import OffsetType
+from .worker_common import FatcatWorker, most_recent_message
-class FatcatChangelogWorker(FatcatWorker):
+
+class ChangelogWorker(FatcatWorker):
"""
Periodically polls the fatcat API looking for new changelogs. When they are
found, fetch them and push (as JSON) into a Kafka topic.
@@ -52,7 +53,7 @@ class FatcatChangelogWorker(FatcatWorker):
time.sleep(self.poll_interval)
-class FatcatEntityUpdatesWorker(FatcatWorker):
+class EntityUpdatesWorker(FatcatWorker):
"""
Consumes from the changelog topic and publishes expanded entities (fetched
from API) to update topics.
diff --git a/python/fatcat_tools/workers/elastic.py b/python/fatcat_tools/workers/elasticsearch.py
index 3a75a1b3..e7abd5ee 100644
--- a/python/fatcat_tools/workers/elastic.py
+++ b/python/fatcat_tools/workers/elasticsearch.py
@@ -2,14 +2,14 @@
import json
import time
import requests
-from fatcat_tools.transforms import release_elastic_dict
-from fatcat_tools.workers.worker_common import FatcatWorker
-from fatcat_client import ReleaseEntity
-from fatcat_tools.transforms import *
from pykafka.common import OffsetType
+from fatcat_client import ReleaseEntity
+from fatcat_tools import *
+from .worker_common import FatcatWorker
+
-class FatcatElasticReleaseWorker(FatcatWorker):
+class ElasticsearchReleaseWorker(FatcatWorker):
"""
Consumes from release-updates topic and pushes into (presumably local)
elasticsearch.
@@ -18,13 +18,13 @@ class FatcatElasticReleaseWorker(FatcatWorker):
"""
def __init__(self, kafka_hosts, consume_topic, poll_interval=10.0, offset=None,
- elastic_backend="http://localhost:9200", elastic_index="fatcat"):
+ elasticsearch_backend="http://localhost:9200", elasticsearch_index="fatcat"):
super().__init__(kafka_hosts=kafka_hosts,
consume_topic=consume_topic,
api_host_url=None)
- self.consumer_group = "elastic-updates"
- self.elastic_backend = elastic_backend
- self.elastic_index = elastic_index
+ self.consumer_group = "elasticsearch-updates"
+ self.elasticsearch_backend = elasticsearch_backend
+ self.elasticsearch_index = elasticsearch_index
def run(self):
consume_topic = self.kafka.topics[self.consume_topic]
@@ -42,11 +42,11 @@ class FatcatElasticReleaseWorker(FatcatWorker):
json_str = msg.value.decode('utf-8')
release = entity_from_json(json_str, ReleaseEntity)
#print(release)
- elastic_endpoint = "{}/{}/release/{}".format(
- self.elastic_backend,
- self.elastic_index,
+ elasticsearch_endpoint = "{}/{}/release/{}".format(
+ self.elasticsearch_backend,
+ self.elasticsearch_index,
release.ident)
- print("Updating document: {}".format(elastic_endpoint))
- resp = requests.post(elastic_endpoint, json=release_elastic_dict(release))
+ print("Updating document: {}".format(elasticsearch_endpoint))
+ resp = requests.post(elasticsearch_endpoint, json=release_to_elasticsearch(release))
assert resp.status_code in (200, 201)
#consumer.commit_offsets()