diff options
Diffstat (limited to 'python/fatcat_tools')
-rw-r--r-- | python/fatcat_tools/transforms/elasticsearch.py | 26 | ||||
-rw-r--r-- | python/fatcat_tools/workers/elasticsearch.py | 37 |
2 files changed, 56 insertions, 7 deletions
diff --git a/python/fatcat_tools/transforms/elasticsearch.py b/python/fatcat_tools/transforms/elasticsearch.py index f37aadba..4bf9091a 100644 --- a/python/fatcat_tools/transforms/elasticsearch.py +++ b/python/fatcat_tools/transforms/elasticsearch.py @@ -46,6 +46,7 @@ def release_to_elasticsearch(entity: ReleaseEntity, force_bool: bool = True) -> # First, the easy ones (direct copy) release = entity t = dict( + doc_index_ts=datetime.datetime.utcnow().isoformat()+"Z", ident = release.ident, state = release.state, revision = release.revision, @@ -295,6 +296,8 @@ def _rte_container_helper(container: ContainerEntity, release_year: Optional[int t['country_code_upper'] = c_extra['country'].upper() if c_extra.get('publisher_type'): t['publisher_type'] = c_extra['publisher_type'] + if c_extra.get('discipline'): + t['discipline'] = c_extra['discipline'] return t def _rte_content_helper(release: ReleaseEntity) -> dict: @@ -374,7 +377,7 @@ def _rte_url_helper(url_obj) -> dict: return t -def container_to_elasticsearch(entity, force_bool=True): +def container_to_elasticsearch(entity, force_bool=True, stats=None): """ Converts from an entity model/schema to elasticsearch oriented schema. @@ -392,6 +395,7 @@ def container_to_elasticsearch(entity, force_bool=True): # First, the easy ones (direct copy) t = dict( + doc_index_ts=datetime.datetime.utcnow().isoformat()+"Z", ident = entity.ident, state = entity.state, revision = entity.revision, @@ -407,10 +411,13 @@ def container_to_elasticsearch(entity, force_bool=True): entity.extra = dict() for key in ('country', 'languages', 'mimetypes', 'original_name', 'first_year', 'last_year', 'aliases', 'abbrev', 'region', - 'discipline'): + 'discipline', 'publisher_type'): if entity.extra.get(key): t[key] = entity.extra[key] + if entity.extra.get('dblp') and entity.extra['dblp'].get('prefix'): + t['dblp_prefix'] = entity.extra['dblp']['prefix'] + if 'country' in t: t['country_code'] = t.pop('country') @@ -428,6 +435,7 @@ def container_to_elasticsearch(entity, force_bool=True): any_kbart = None any_jstor = None any_ia_sim = None + keepers = [] extra = entity.extra if extra.get('doaj'): @@ -451,6 +459,9 @@ def container_to_elasticsearch(entity, force_bool=True): any_kbart = True if extra['kbart'].get('jstor'): any_jstor = True + for k, v in extra['kbart'].items(): + if v and isinstance(v, dict): + keepers.append(k) if extra.get('ia'): if extra['ia'].get('sim'): any_ia_sim = True @@ -458,6 +469,7 @@ def container_to_elasticsearch(entity, force_bool=True): is_longtail_oa = True t['is_superceded'] = bool(extra.get('superceded')) + t['keepers'] = keepers t['in_doaj'] = bool(in_doaj) t['in_road'] = bool(in_road) t['any_kbart'] = bool(any_kbart) @@ -471,6 +483,14 @@ def container_to_elasticsearch(entity, force_bool=True): t['is_longtail_oa'] = is_longtail_oa t['any_jstor'] = any_jstor t['any_ia_sim'] = any_ia_sim + + # mix in stats, if provided + if stats: + t['releases_total'] = stats['total'] + t['preservation_bright'] = stats['preservation']['bright'] + t['preservation_dark'] = stats['preservation']['dark'] + t['preservation_shadows_only'] = stats['preservation']['shadows_only'] + t['preservation_none'] = stats['preservation']['none'] return t @@ -495,6 +515,7 @@ def changelog_to_elasticsearch(entity): editgroup = entity.editgroup t = dict( + doc_index_ts=datetime.datetime.utcnow().isoformat()+"Z", index=entity.index, editgroup_id=entity.editgroup_id, timestamp=entity.timestamp.isoformat(), @@ -558,6 +579,7 @@ def file_to_elasticsearch(entity): # First, the easy ones (direct copy) t = dict( + doc_index_ts=datetime.datetime.utcnow().isoformat()+"Z", ident = entity.ident, state = entity.state, revision = entity.revision, diff --git a/python/fatcat_tools/workers/elasticsearch.py b/python/fatcat_tools/workers/elasticsearch.py index 4850bb0a..13409eb5 100644 --- a/python/fatcat_tools/workers/elasticsearch.py +++ b/python/fatcat_tools/workers/elasticsearch.py @@ -1,10 +1,16 @@ import json + import requests +import elasticsearch from confluent_kafka import Consumer, KafkaException from fatcat_openapi_client import ReleaseEntity, ContainerEntity, ApiClient, ChangelogEntry -from fatcat_tools import * +from fatcat_tools import (public_api, entity_from_json, + release_to_elasticsearch, container_to_elasticsearch, + changelog_to_elasticsearch, +) +from fatcat_web.search import get_elastic_container_stats from .worker_common import FatcatWorker @@ -18,7 +24,8 @@ class ElasticsearchReleaseWorker(FatcatWorker): def __init__(self, kafka_hosts, consume_topic, poll_interval=10.0, offset=None, elasticsearch_backend="http://localhost:9200", elasticsearch_index="fatcat", - batch_size=200, api_host="https://api.fatcat.wiki/v0"): + elasticsearch_release_index="fatcat_releases", + batch_size=200, api_host="https://api.fatcat.wiki/v0", query_stats=False): super().__init__(kafka_hosts=kafka_hosts, consume_topic=consume_topic) self.consumer_group = "elasticsearch-updates3" @@ -26,14 +33,19 @@ class ElasticsearchReleaseWorker(FatcatWorker): self.poll_interval = poll_interval self.elasticsearch_backend = elasticsearch_backend self.elasticsearch_index = elasticsearch_index + self.elasticsearch_release_index = elasticsearch_release_index self.entity_type = ReleaseEntity self.transform_func = release_to_elasticsearch self.api_host = api_host + self.query_stats = query_stats def run(self): ac = ApiClient() api = public_api(self.api_host) + # only used by container indexing query_stats code path + es_client = elasticsearch.Elasticsearch(self.elasticsearch_backend) + def fail_fast(err, partitions): if err is not None: print("Kafka consumer commit error: {}".format(err)) @@ -104,13 +116,25 @@ class ElasticsearchReleaseWorker(FatcatWorker): entity = api.get_changelog_entry(entity.index) else: key = entity.ident + + if self.entity_type == ContainerEntity and self.query_stats: + stats = get_elastic_container_stats( + entity.ident, + es_client=es_client, + es_index=self.elasticsearch_release_index, + merge_shadows=True, + ) + doc_dict = container_to_elasticsearch(entity, stats=stats) + else: + doc_dict = self.transform_func(entity) + # TODO: handle deletions from index bulk_actions.append(json.dumps({ "index": { "_id": key, }, })) - bulk_actions.append(json.dumps( - self.transform_func(entity))) - print("Upserting, eg, {} (of {} {} in elasticsearch)".format(key, len(batch), self.entity_type)) + bulk_actions.append(json.dumps(doc_dict)) + + print("Upserting, eg, {} (of {} {} in elasticsearch)".format(key, len(batch), self.entity_type.__name__)) elasticsearch_endpoint = "{}/{}/_bulk".format( self.elasticsearch_backend, self.elasticsearch_index) @@ -132,6 +156,7 @@ class ElasticsearchReleaseWorker(FatcatWorker): class ElasticsearchContainerWorker(ElasticsearchReleaseWorker): def __init__(self, kafka_hosts, consume_topic, poll_interval=10.0, offset=None, + query_stats=False, elasticsearch_release_index="fatcat_release", elasticsearch_backend="http://localhost:9200", elasticsearch_index="fatcat", batch_size=200): super().__init__(kafka_hosts=kafka_hosts, @@ -140,6 +165,8 @@ class ElasticsearchContainerWorker(ElasticsearchReleaseWorker): offset=offset, elasticsearch_backend=elasticsearch_backend, elasticsearch_index=elasticsearch_index, + elasticsearch_release_index=elasticsearch_release_index, + query_stats=query_stats, batch_size=batch_size) # previous group got corrupted (by pykafka library?) self.consumer_group = "elasticsearch-updates3" |