From d36b38fc567bbba4ac84713042e9558afd4981b3 Mon Sep 17 00:00:00 2001 From: Bryan Newbold Date: Fri, 26 Feb 2021 20:25:27 -0800 Subject: ES schemas: add doc_index_ts to all mappings --- python/fatcat_tools/transforms/elasticsearch.py | 4 ++++ 1 file changed, 4 insertions(+) (limited to 'python/fatcat_tools') diff --git a/python/fatcat_tools/transforms/elasticsearch.py b/python/fatcat_tools/transforms/elasticsearch.py index f37aadba..12b8ea2b 100644 --- a/python/fatcat_tools/transforms/elasticsearch.py +++ b/python/fatcat_tools/transforms/elasticsearch.py @@ -46,6 +46,7 @@ def release_to_elasticsearch(entity: ReleaseEntity, force_bool: bool = True) -> # First, the easy ones (direct copy) release = entity t = dict( + doc_index_ts=datetime.datetime.utcnow(), ident = release.ident, state = release.state, revision = release.revision, @@ -392,6 +393,7 @@ def container_to_elasticsearch(entity, force_bool=True): # First, the easy ones (direct copy) t = dict( + doc_index_ts=datetime.datetime.utcnow(), ident = entity.ident, state = entity.state, revision = entity.revision, @@ -495,6 +497,7 @@ def changelog_to_elasticsearch(entity): editgroup = entity.editgroup t = dict( + doc_index_ts=datetime.datetime.utcnow(), index=entity.index, editgroup_id=entity.editgroup_id, timestamp=entity.timestamp.isoformat(), @@ -558,6 +561,7 @@ def file_to_elasticsearch(entity): # First, the easy ones (direct copy) t = dict( + doc_index_ts=datetime.datetime.utcnow(), ident = entity.ident, state = entity.state, revision = entity.revision, -- cgit v1.2.3 From 61bd2d65fd1c4fbda2c28d36c5388a610b4d1d14 Mon Sep 17 00:00:00 2001 From: Bryan Newbold Date: Tue, 30 Mar 2021 12:31:50 -0700 Subject: release ES: add discipline field --- extra/elasticsearch/release_schema.json | 1 + python/fatcat_tools/transforms/elasticsearch.py | 2 ++ 2 files changed, 3 insertions(+) (limited to 'python/fatcat_tools') diff --git a/extra/elasticsearch/release_schema.json b/extra/elasticsearch/release_schema.json index fe1485ef..c562de18 100644 --- a/extra/elasticsearch/release_schema.json +++ b/extra/elasticsearch/release_schema.json @@ -84,6 +84,7 @@ "container_id": { "type": "keyword", "normalizer": "default" }, "container_issnl": { "type": "keyword", "normalizer": "default" }, "container_type": { "type": "keyword", "normalizer": "default" }, + "discipline": { "type": "keyword", "normalizer": "default" }, "contrib_count": { "type": "integer" }, "contrib_names": { "type": "text", "index": true, "analyzer": "textIcu", "search_analyzer":"textIcuSearch", "copy_to": "biblio" }, "affiliations": { "type": "text", "index": true, "analyzer": "textIcu", "search_analyzer":"textIcuSearch" }, diff --git a/python/fatcat_tools/transforms/elasticsearch.py b/python/fatcat_tools/transforms/elasticsearch.py index 12b8ea2b..5058989c 100644 --- a/python/fatcat_tools/transforms/elasticsearch.py +++ b/python/fatcat_tools/transforms/elasticsearch.py @@ -296,6 +296,8 @@ def _rte_container_helper(container: ContainerEntity, release_year: Optional[int t['country_code_upper'] = c_extra['country'].upper() if c_extra.get('publisher_type'): t['publisher_type'] = c_extra['publisher_type'] + if c_extra.get('discipline'): + t['discipline'] = c_extra['discipline'] return t def _rte_content_helper(release: ReleaseEntity) -> dict: -- cgit v1.2.3 From 2e781738937efecbfc527a47ade6c3deaba64247 Mon Sep 17 00:00:00 2001 From: Bryan Newbold Date: Tue, 6 Apr 2021 20:04:03 -0700 Subject: container search schema: preservation stats, new fields Includes transform code updates and partial test coverage. --- extra/elasticsearch/container_schema.json | 17 ++++----- python/fatcat_tools/transforms/elasticsearch.py | 20 +++++++++-- python/tests/transform_elasticsearch.py | 47 ++++++++++++++++++++++--- 3 files changed, 69 insertions(+), 15 deletions(-) (limited to 'python/fatcat_tools') diff --git a/extra/elasticsearch/container_schema.json b/extra/elasticsearch/container_schema.json index 21b8d4ec..9673e9e3 100644 --- a/extra/elasticsearch/container_schema.json +++ b/extra/elasticsearch/container_schema.json @@ -55,6 +55,7 @@ "issnl": { "type": "keyword", "normalizer": "default" }, "issns": { "type": "keyword", "normalizer": "default" }, "wikidata_qid": { "type": "keyword", "normalizer": "default" }, + "dblp_prefix": { "type": "keyword", "normalizer": "default" }, "country_code": { "type": "keyword", "normalizer": "default" }, "region": { "type": "keyword", "normalizer": "default" }, "discipline": { "type": "keyword", "normalizer": "default" }, @@ -74,19 +75,19 @@ "any_jstor": { "type": "boolean" }, "any_ia_sim": { "type": "boolean" }, "sherpa_romeo_color": { "type": "keyword", "normalizer": "default" }, + "keepers": { "type": "keyword", "normalizer": "default" }, - "releases_total": { "type": "integer" }, - "releases_kbart": { "type": "integer" }, - "releases_ia": { "type": "integer" }, - "releases_ia_sim": { "type": "integer" }, - "releases_shadows": { "type": "integer" }, - "releases_any_file": { "type": "integer" }, - "releases_any_fileset": { "type": "integer" }, - "releases_any_webcapture": { "type": "integer" }, + "releases_total": { "type": "integer" }, + "preservation_bright": { "type": "integer" }, + "preservation_dark": { "type": "integer" }, + "preservation_shadows_only":{ "type": "integer" }, + "preservation_none": { "type": "integer" }, "year": { "type": "alias", "path": "first_year" }, "type": { "type": "alias", "path": "container_type" }, "issn": { "type": "alias", "path": "issns" }, + "release_count": { "type": "alias", "path": "releases_total" }, + "releases_count": { "type": "alias", "path": "releases_total" }, "oa": { "type": "alias", "path": "is_oa" }, "longtail": { "type": "alias", "path": "is_longtail_oa" } } diff --git a/python/fatcat_tools/transforms/elasticsearch.py b/python/fatcat_tools/transforms/elasticsearch.py index 5058989c..fe463fa4 100644 --- a/python/fatcat_tools/transforms/elasticsearch.py +++ b/python/fatcat_tools/transforms/elasticsearch.py @@ -377,7 +377,7 @@ def _rte_url_helper(url_obj) -> dict: return t -def container_to_elasticsearch(entity, force_bool=True): +def container_to_elasticsearch(entity, force_bool=True, stats=None): """ Converts from an entity model/schema to elasticsearch oriented schema. @@ -411,10 +411,13 @@ def container_to_elasticsearch(entity, force_bool=True): entity.extra = dict() for key in ('country', 'languages', 'mimetypes', 'original_name', 'first_year', 'last_year', 'aliases', 'abbrev', 'region', - 'discipline'): + 'discipline', 'publisher_type'): if entity.extra.get(key): t[key] = entity.extra[key] + if entity.extra.get('dblp') and entity.extra['dblp'].get('prefix'): + t['dblp_prefix'] = entity.extra['dblp']['prefix'] + if 'country' in t: t['country_code'] = t.pop('country') @@ -432,6 +435,7 @@ def container_to_elasticsearch(entity, force_bool=True): any_kbart = None any_jstor = None any_ia_sim = None + keepers = [] extra = entity.extra if extra.get('doaj'): @@ -455,6 +459,9 @@ def container_to_elasticsearch(entity, force_bool=True): any_kbart = True if extra['kbart'].get('jstor'): any_jstor = True + for k, v in extra['kbart'].items(): + if v and isinstance(v, dict): + keepers.append(k) if extra.get('ia'): if extra['ia'].get('sim'): any_ia_sim = True @@ -462,6 +469,7 @@ def container_to_elasticsearch(entity, force_bool=True): is_longtail_oa = True t['is_superceded'] = bool(extra.get('superceded')) + t['keepers'] = keepers t['in_doaj'] = bool(in_doaj) t['in_road'] = bool(in_road) t['any_kbart'] = bool(any_kbart) @@ -475,6 +483,14 @@ def container_to_elasticsearch(entity, force_bool=True): t['is_longtail_oa'] = is_longtail_oa t['any_jstor'] = any_jstor t['any_ia_sim'] = any_ia_sim + + # mix in stats, if provided + if stats: + t['releases_total'] = stats['total'] + t['preservation_bright'] = stats['preservation']['bright'] + t['preservation_dark'] = stats['preservation']['dark'] + t['preservation_shadows_only'] = stats['preservation']['shadows_only'] + t['preservation_none'] = stats['preservation']['none'] return t diff --git a/python/tests/transform_elasticsearch.py b/python/tests/transform_elasticsearch.py index 9cf77d4a..ba2b7ea2 100644 --- a/python/tests/transform_elasticsearch.py +++ b/python/tests/transform_elasticsearch.py @@ -147,11 +147,48 @@ def test_elasticsearch_release_from_json(): def test_elasticsearch_container_transform(journal_metadata_importer): with open('tests/files/journal_metadata.sample.json', 'r') as f: - raw = json.loads(f.readline()) - c = journal_metadata_importer.parse_record(raw) - c.state = 'active' - es = container_to_elasticsearch(c) - assert es['publisher'] == c.publisher + raw1 = json.loads(f.readline()) + raw2 = json.loads(f.readline()) + c1 = journal_metadata_importer.parse_record(raw1) + c1.state = 'active' + c2 = journal_metadata_importer.parse_record(raw2) + c2.state = 'active' + + c1.extra['publisher_type'] = "big5" + c1.extra['discipline'] = "history" + es = container_to_elasticsearch(c1) + assert es['publisher'] == c1.publisher + assert es['discipline'] == c1.extra['discipline'] + assert es['publisher_type'] == c1.extra['publisher_type'] + assert es['keepers'] == [] + + stats = { + "ident": "en4qj5ijrbf5djxx7p5zzpjyoq", + "in_kbart": 11136, + "in_web": 9501, + "is_preserved": 11136, + "issnl": "2050-084X", + "preservation": { + "bright": 9501, + "dark": 1635, + "none": 0, + "shadows_only": 0, + "total": 11136 + }, + "release_type": { + "_unknown": 9, + "article-journal": 11124, + "editorial": 2, + "letter": 1 + }, + "total": 11136 + } + es = container_to_elasticsearch(c2, stats=stats) + assert es['name'] == c2.name + assert es['publisher'] == c2.publisher + assert es['keepers'] == list(c2.extra['kbart'].keys()) == ["portico"] + assert es['any_kbart'] == True + def test_elasticsearch_file_transform(matched_importer): f = entity_from_json(open('./tests/files/file_bcah4zp5tvdhjl5bqci2c2lgfa.json', 'r').read(), FileEntity) -- cgit v1.2.3 From 5ba1859a2308c72e7e3e4e86af3c275210c5bacd Mon Sep 17 00:00:00 2001 From: Bryan Newbold Date: Tue, 6 Apr 2021 20:25:46 -0700 Subject: ES schema updates: doc_index_ts as a str, not datetime The schema is a timestamp, but python needs to serialize as JSON, and doesn't do datetime automatically. --- python/fatcat_tools/transforms/elasticsearch.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) (limited to 'python/fatcat_tools') diff --git a/python/fatcat_tools/transforms/elasticsearch.py b/python/fatcat_tools/transforms/elasticsearch.py index fe463fa4..4bf9091a 100644 --- a/python/fatcat_tools/transforms/elasticsearch.py +++ b/python/fatcat_tools/transforms/elasticsearch.py @@ -46,7 +46,7 @@ def release_to_elasticsearch(entity: ReleaseEntity, force_bool: bool = True) -> # First, the easy ones (direct copy) release = entity t = dict( - doc_index_ts=datetime.datetime.utcnow(), + doc_index_ts=datetime.datetime.utcnow().isoformat()+"Z", ident = release.ident, state = release.state, revision = release.revision, @@ -395,7 +395,7 @@ def container_to_elasticsearch(entity, force_bool=True, stats=None): # First, the easy ones (direct copy) t = dict( - doc_index_ts=datetime.datetime.utcnow(), + doc_index_ts=datetime.datetime.utcnow().isoformat()+"Z", ident = entity.ident, state = entity.state, revision = entity.revision, @@ -515,7 +515,7 @@ def changelog_to_elasticsearch(entity): editgroup = entity.editgroup t = dict( - doc_index_ts=datetime.datetime.utcnow(), + doc_index_ts=datetime.datetime.utcnow().isoformat()+"Z", index=entity.index, editgroup_id=entity.editgroup_id, timestamp=entity.timestamp.isoformat(), @@ -579,7 +579,7 @@ def file_to_elasticsearch(entity): # First, the easy ones (direct copy) t = dict( - doc_index_ts=datetime.datetime.utcnow(), + doc_index_ts=datetime.datetime.utcnow().isoformat()+"Z", ident = entity.ident, state = entity.state, revision = entity.revision, -- cgit v1.2.3 From 9f110393b90d5b9e95a39b4f83d3e864434dd189 Mon Sep 17 00:00:00 2001 From: Bryan Newbold Date: Tue, 6 Apr 2021 21:48:40 -0700 Subject: container ES index worker: support for querying status --- python/fatcat_tools/workers/elasticsearch.py | 37 ++++++++++++++++++++++++---- python/fatcat_worker.py | 5 ++++ 2 files changed, 37 insertions(+), 5 deletions(-) (limited to 'python/fatcat_tools') diff --git a/python/fatcat_tools/workers/elasticsearch.py b/python/fatcat_tools/workers/elasticsearch.py index 4850bb0a..13409eb5 100644 --- a/python/fatcat_tools/workers/elasticsearch.py +++ b/python/fatcat_tools/workers/elasticsearch.py @@ -1,10 +1,16 @@ import json + import requests +import elasticsearch from confluent_kafka import Consumer, KafkaException from fatcat_openapi_client import ReleaseEntity, ContainerEntity, ApiClient, ChangelogEntry -from fatcat_tools import * +from fatcat_tools import (public_api, entity_from_json, + release_to_elasticsearch, container_to_elasticsearch, + changelog_to_elasticsearch, +) +from fatcat_web.search import get_elastic_container_stats from .worker_common import FatcatWorker @@ -18,7 +24,8 @@ class ElasticsearchReleaseWorker(FatcatWorker): def __init__(self, kafka_hosts, consume_topic, poll_interval=10.0, offset=None, elasticsearch_backend="http://localhost:9200", elasticsearch_index="fatcat", - batch_size=200, api_host="https://api.fatcat.wiki/v0"): + elasticsearch_release_index="fatcat_releases", + batch_size=200, api_host="https://api.fatcat.wiki/v0", query_stats=False): super().__init__(kafka_hosts=kafka_hosts, consume_topic=consume_topic) self.consumer_group = "elasticsearch-updates3" @@ -26,14 +33,19 @@ class ElasticsearchReleaseWorker(FatcatWorker): self.poll_interval = poll_interval self.elasticsearch_backend = elasticsearch_backend self.elasticsearch_index = elasticsearch_index + self.elasticsearch_release_index = elasticsearch_release_index self.entity_type = ReleaseEntity self.transform_func = release_to_elasticsearch self.api_host = api_host + self.query_stats = query_stats def run(self): ac = ApiClient() api = public_api(self.api_host) + # only used by container indexing query_stats code path + es_client = elasticsearch.Elasticsearch(self.elasticsearch_backend) + def fail_fast(err, partitions): if err is not None: print("Kafka consumer commit error: {}".format(err)) @@ -104,13 +116,25 @@ class ElasticsearchReleaseWorker(FatcatWorker): entity = api.get_changelog_entry(entity.index) else: key = entity.ident + + if self.entity_type == ContainerEntity and self.query_stats: + stats = get_elastic_container_stats( + entity.ident, + es_client=es_client, + es_index=self.elasticsearch_release_index, + merge_shadows=True, + ) + doc_dict = container_to_elasticsearch(entity, stats=stats) + else: + doc_dict = self.transform_func(entity) + # TODO: handle deletions from index bulk_actions.append(json.dumps({ "index": { "_id": key, }, })) - bulk_actions.append(json.dumps( - self.transform_func(entity))) - print("Upserting, eg, {} (of {} {} in elasticsearch)".format(key, len(batch), self.entity_type)) + bulk_actions.append(json.dumps(doc_dict)) + + print("Upserting, eg, {} (of {} {} in elasticsearch)".format(key, len(batch), self.entity_type.__name__)) elasticsearch_endpoint = "{}/{}/_bulk".format( self.elasticsearch_backend, self.elasticsearch_index) @@ -132,6 +156,7 @@ class ElasticsearchReleaseWorker(FatcatWorker): class ElasticsearchContainerWorker(ElasticsearchReleaseWorker): def __init__(self, kafka_hosts, consume_topic, poll_interval=10.0, offset=None, + query_stats=False, elasticsearch_release_index="fatcat_release", elasticsearch_backend="http://localhost:9200", elasticsearch_index="fatcat", batch_size=200): super().__init__(kafka_hosts=kafka_hosts, @@ -140,6 +165,8 @@ class ElasticsearchContainerWorker(ElasticsearchReleaseWorker): offset=offset, elasticsearch_backend=elasticsearch_backend, elasticsearch_index=elasticsearch_index, + elasticsearch_release_index=elasticsearch_release_index, + query_stats=query_stats, batch_size=batch_size) # previous group got corrupted (by pykafka library?) self.consumer_group = "elasticsearch-updates3" diff --git a/python/fatcat_worker.py b/python/fatcat_worker.py index 5e3fb3e9..95f5024a 100755 --- a/python/fatcat_worker.py +++ b/python/fatcat_worker.py @@ -44,6 +44,8 @@ def run_elasticsearch_release(args): def run_elasticsearch_container(args): consume_topic = "fatcat-{}.container-updates".format(args.env) worker = ElasticsearchContainerWorker(args.kafka_hosts, consume_topic, + query_stats=args.query_stats, + elasticsearch_release_index="fatcat_release", elasticsearch_backend=args.elasticsearch_backend, elasticsearch_index=args.elasticsearch_index) worker.run() @@ -99,6 +101,9 @@ def main(): sub_elasticsearch_container.add_argument('--elasticsearch-index', help="elasticsearch index to push into", default="fatcat_container") + sub_elasticsearch_container.add_argument('--query-stats', + action='store_true', + help="whether to query release search index for container stats") sub_elasticsearch_changelog = subparsers.add_parser('elasticsearch-changelog', help="consume changelog kafka feed, transform and push to search") -- cgit v1.2.3