diff options
Diffstat (limited to 'python')
| -rw-r--r-- | python/fatcat_tools/workers/__init__.py | 2 | ||||
| -rw-r--r-- | python/fatcat_tools/workers/elasticsearch.py | 23 | ||||
| -rwxr-xr-x | python/fatcat_worker.py | 19 | 
3 files changed, 41 insertions, 3 deletions
| diff --git a/python/fatcat_tools/workers/__init__.py b/python/fatcat_tools/workers/__init__.py index 8bea7cdc..32fd330d 100644 --- a/python/fatcat_tools/workers/__init__.py +++ b/python/fatcat_tools/workers/__init__.py @@ -1,4 +1,4 @@  from .changelog import ChangelogWorker, EntityUpdatesWorker -from .elasticsearch import ElasticsearchReleaseWorker, ElasticsearchContainerWorker +from .elasticsearch import ElasticsearchReleaseWorker, ElasticsearchContainerWorker, ElasticsearchChangelogWorker  from .worker_common import most_recent_message, FatcatWorker diff --git a/python/fatcat_tools/workers/elasticsearch.py b/python/fatcat_tools/workers/elasticsearch.py index 68d6c304..525f372b 100644 --- a/python/fatcat_tools/workers/elasticsearch.py +++ b/python/fatcat_tools/workers/elasticsearch.py @@ -4,7 +4,7 @@ import time  import requests  from confluent_kafka import Consumer, KafkaException -from fatcat_openapi_client import ReleaseEntity, ContainerEntity, ApiClient +from fatcat_openapi_client import ReleaseEntity, ContainerEntity, ApiClient, ChangelogEntry  from fatcat_tools import *  from .worker_common import FatcatWorker @@ -148,3 +148,24 @@ class ElasticsearchContainerWorker(ElasticsearchReleaseWorker):          self.elasticsearch_document_name = "container"          self.transform_func = container_to_elasticsearch + +class ElasticsearchChangelogWorker(ElasticsearchReleaseWorker): +    """ +    Pulls changelog messages from Kafka, runs transformations and indexes them. + +    Note: Very early versions of changelog entries did not contain details +    about the editor or extra fields. +    """ +    def __init__(self, kafka_hosts, consume_topic, poll_interval=10.0, offset=None, +            elasticsearch_backend="http://localhost:9200", elasticsearch_index="fatcat_changelog", +            batch_size=200): +        super().__init__(kafka_hosts=kafka_hosts, +                         consume_topic=consume_topic) +        self.consumer_group = "elasticsearch-updates3" +        self.batch_size = batch_size +        self.poll_interval = poll_interval +        self.elasticsearch_backend = elasticsearch_backend +        self.elasticsearch_index = elasticsearch_index +        self.entity_type = ChangelogEntry +        self.elasticsearch_document_name = "changelog" +        self.transform_func = changelog_to_elasticsearch diff --git a/python/fatcat_worker.py b/python/fatcat_worker.py index bfb87a72..03167a3a 100755 --- a/python/fatcat_worker.py +++ b/python/fatcat_worker.py @@ -6,7 +6,7 @@ import datetime  import raven  from fatcat_tools import public_api -from fatcat_tools.workers import ChangelogWorker, EntityUpdatesWorker, ElasticsearchReleaseWorker, ElasticsearchContainerWorker +from fatcat_tools.workers import ChangelogWorker, EntityUpdatesWorker, ElasticsearchReleaseWorker, ElasticsearchContainerWorker, ElasticsearchChangelogWorker  # Yep, a global. Gets DSN from `SENTRY_DSN` environment variable  sentry_client = raven.Client() @@ -47,6 +47,13 @@ def run_elasticsearch_container(args):          elasticsearch_index=args.elasticsearch_index)      worker.run() +def run_elasticsearch_changelog(args): +    consume_topic = "fatcat-{}.changelog".format(args.env) +    worker = ElasticsearchChangelogWorker(args.kafka_hosts, consume_topic, +        elasticsearch_backend=args.elasticsearch_backend, +        elasticsearch_index=args.elasticsearch_index) +    worker.run() +  def main():      parser = argparse.ArgumentParser(          formatter_class=argparse.ArgumentDefaultsHelpFormatter) @@ -92,6 +99,16 @@ def main():          help="elasticsearch index to push into",          default="fatcat_container") +    sub_elasticsearch_changelog = subparsers.add_parser('elasticsearch-changelog', +        help="consume changelog kafka feed, transform and push to search") +    sub_elasticsearch_changelog.set_defaults(func=run_elasticsearch_changelog) +    sub_elasticsearch_changelog.add_argument('--elasticsearch-backend', +        help="elasticsearch backend to connect to", +        default="http://localhost:9200") +    sub_elasticsearch_changelog.add_argument('--elasticsearch-index', +        help="elasticsearch index to push into", +        default="fatcat_changelog") +      args = parser.parse_args()      if not args.__dict__.get("func"):          print("tell me what to do!") | 
