From d36b38fc567bbba4ac84713042e9558afd4981b3 Mon Sep 17 00:00:00 2001 From: Bryan Newbold Date: Fri, 26 Feb 2021 20:25:27 -0800 Subject: ES schemas: add doc_index_ts to all mappings --- extra/elasticsearch/changelog_schema.json | 1 + extra/elasticsearch/container_schema.json | 1 + extra/elasticsearch/fatcat_ref.json | 5 +++++ extra/elasticsearch/file_schema.json | 1 + extra/elasticsearch/release_schema.json | 1 + python/fatcat_tools/transforms/elasticsearch.py | 4 ++++ 6 files changed, 13 insertions(+) diff --git a/extra/elasticsearch/changelog_schema.json b/extra/elasticsearch/changelog_schema.json index 6e784a57..65536bd6 100644 --- a/extra/elasticsearch/changelog_schema.json +++ b/extra/elasticsearch/changelog_schema.json @@ -29,6 +29,7 @@ "mappings": { "_doc": { "properties": { + "doc_index_ts": { "type": "date" }, "index": { "type": "integer" }, "editgroup_id": { "type": "keyword", "normalizer": "default", "doc_values": false }, "timestamp": { "type": "date" }, diff --git a/extra/elasticsearch/container_schema.json b/extra/elasticsearch/container_schema.json index 1960984d..21b8d4ec 100644 --- a/extra/elasticsearch/container_schema.json +++ b/extra/elasticsearch/container_schema.json @@ -41,6 +41,7 @@ "mappings": { "_doc": { "properties": { + "doc_index_ts": { "type": "date" }, "ident": { "type": "keyword", "normalizer": "default", "doc_values": false }, "state": { "type": "keyword", "normalizer": "default" }, "revision": { "type": "keyword", "normalizer": "default", "doc_values": false }, diff --git a/extra/elasticsearch/fatcat_ref.json b/extra/elasticsearch/fatcat_ref.json index b82ce93b..32c7a3cd 100644 --- a/extra/elasticsearch/fatcat_ref.json +++ b/extra/elasticsearch/fatcat_ref.json @@ -100,6 +100,11 @@ "target_csl": { "type": "object", "enabled": false + }, + + "doc_index_ts": { + "type": "alias", + "path": "indexed_ts" } } } diff --git a/extra/elasticsearch/file_schema.json b/extra/elasticsearch/file_schema.json index 4635e469..a8dbc6d0 100644 --- a/extra/elasticsearch/file_schema.json +++ b/extra/elasticsearch/file_schema.json @@ -29,6 +29,7 @@ "mappings": { "_doc": { "properties": { + "doc_index_ts": { "type": "date" }, "ident": { "type": "keyword", "normalizer": "default", "doc_values": false }, "state": { "type": "keyword", "normalizer": "default" }, "revision": { "type": "keyword", "normalizer": "default", "doc_values": false }, diff --git a/extra/elasticsearch/release_schema.json b/extra/elasticsearch/release_schema.json index 91f2f183..fe1485ef 100644 --- a/extra/elasticsearch/release_schema.json +++ b/extra/elasticsearch/release_schema.json @@ -41,6 +41,7 @@ "mappings": { "_doc": { "properties": { + "doc_index_ts": { "type": "date" }, "ident": { "type": "keyword", "normalizer": "default", "doc_values": false }, "state": { "type": "keyword", "normalizer": "default" }, "revision": { "type": "keyword", "normalizer": "default", "doc_values": false }, diff --git a/python/fatcat_tools/transforms/elasticsearch.py b/python/fatcat_tools/transforms/elasticsearch.py index f37aadba..12b8ea2b 100644 --- a/python/fatcat_tools/transforms/elasticsearch.py +++ b/python/fatcat_tools/transforms/elasticsearch.py @@ -46,6 +46,7 @@ def release_to_elasticsearch(entity: ReleaseEntity, force_bool: bool = True) -> # First, the easy ones (direct copy) release = entity t = dict( + doc_index_ts=datetime.datetime.utcnow(), ident = release.ident, state = release.state, revision = release.revision, @@ -392,6 +393,7 @@ def container_to_elasticsearch(entity, force_bool=True): # First, the easy ones (direct copy) t = dict( + doc_index_ts=datetime.datetime.utcnow(), ident = entity.ident, state = entity.state, revision = entity.revision, @@ -495,6 +497,7 @@ def changelog_to_elasticsearch(entity): editgroup = entity.editgroup t = dict( + doc_index_ts=datetime.datetime.utcnow(), index=entity.index, editgroup_id=entity.editgroup_id, timestamp=entity.timestamp.isoformat(), @@ -558,6 +561,7 @@ def file_to_elasticsearch(entity): # First, the easy ones (direct copy) t = dict( + doc_index_ts=datetime.datetime.utcnow(), ident = entity.ident, state = entity.state, revision = entity.revision, -- cgit v1.2.3 From 61bd2d65fd1c4fbda2c28d36c5388a610b4d1d14 Mon Sep 17 00:00:00 2001 From: Bryan Newbold Date: Tue, 30 Mar 2021 12:31:50 -0700 Subject: release ES: add discipline field --- extra/elasticsearch/release_schema.json | 1 + python/fatcat_tools/transforms/elasticsearch.py | 2 ++ 2 files changed, 3 insertions(+) diff --git a/extra/elasticsearch/release_schema.json b/extra/elasticsearch/release_schema.json index fe1485ef..c562de18 100644 --- a/extra/elasticsearch/release_schema.json +++ b/extra/elasticsearch/release_schema.json @@ -84,6 +84,7 @@ "container_id": { "type": "keyword", "normalizer": "default" }, "container_issnl": { "type": "keyword", "normalizer": "default" }, "container_type": { "type": "keyword", "normalizer": "default" }, + "discipline": { "type": "keyword", "normalizer": "default" }, "contrib_count": { "type": "integer" }, "contrib_names": { "type": "text", "index": true, "analyzer": "textIcu", "search_analyzer":"textIcuSearch", "copy_to": "biblio" }, "affiliations": { "type": "text", "index": true, "analyzer": "textIcu", "search_analyzer":"textIcuSearch" }, diff --git a/python/fatcat_tools/transforms/elasticsearch.py b/python/fatcat_tools/transforms/elasticsearch.py index 12b8ea2b..5058989c 100644 --- a/python/fatcat_tools/transforms/elasticsearch.py +++ b/python/fatcat_tools/transforms/elasticsearch.py @@ -296,6 +296,8 @@ def _rte_container_helper(container: ContainerEntity, release_year: Optional[int t['country_code_upper'] = c_extra['country'].upper() if c_extra.get('publisher_type'): t['publisher_type'] = c_extra['publisher_type'] + if c_extra.get('discipline'): + t['discipline'] = c_extra['discipline'] return t def _rte_content_helper(release: ReleaseEntity) -> dict: -- cgit v1.2.3 From 2e781738937efecbfc527a47ade6c3deaba64247 Mon Sep 17 00:00:00 2001 From: Bryan Newbold Date: Tue, 6 Apr 2021 20:04:03 -0700 Subject: container search schema: preservation stats, new fields Includes transform code updates and partial test coverage. --- extra/elasticsearch/container_schema.json | 17 ++++----- python/fatcat_tools/transforms/elasticsearch.py | 20 +++++++++-- python/tests/transform_elasticsearch.py | 47 ++++++++++++++++++++++--- 3 files changed, 69 insertions(+), 15 deletions(-) diff --git a/extra/elasticsearch/container_schema.json b/extra/elasticsearch/container_schema.json index 21b8d4ec..9673e9e3 100644 --- a/extra/elasticsearch/container_schema.json +++ b/extra/elasticsearch/container_schema.json @@ -55,6 +55,7 @@ "issnl": { "type": "keyword", "normalizer": "default" }, "issns": { "type": "keyword", "normalizer": "default" }, "wikidata_qid": { "type": "keyword", "normalizer": "default" }, + "dblp_prefix": { "type": "keyword", "normalizer": "default" }, "country_code": { "type": "keyword", "normalizer": "default" }, "region": { "type": "keyword", "normalizer": "default" }, "discipline": { "type": "keyword", "normalizer": "default" }, @@ -74,19 +75,19 @@ "any_jstor": { "type": "boolean" }, "any_ia_sim": { "type": "boolean" }, "sherpa_romeo_color": { "type": "keyword", "normalizer": "default" }, + "keepers": { "type": "keyword", "normalizer": "default" }, - "releases_total": { "type": "integer" }, - "releases_kbart": { "type": "integer" }, - "releases_ia": { "type": "integer" }, - "releases_ia_sim": { "type": "integer" }, - "releases_shadows": { "type": "integer" }, - "releases_any_file": { "type": "integer" }, - "releases_any_fileset": { "type": "integer" }, - "releases_any_webcapture": { "type": "integer" }, + "releases_total": { "type": "integer" }, + "preservation_bright": { "type": "integer" }, + "preservation_dark": { "type": "integer" }, + "preservation_shadows_only":{ "type": "integer" }, + "preservation_none": { "type": "integer" }, "year": { "type": "alias", "path": "first_year" }, "type": { "type": "alias", "path": "container_type" }, "issn": { "type": "alias", "path": "issns" }, + "release_count": { "type": "alias", "path": "releases_total" }, + "releases_count": { "type": "alias", "path": "releases_total" }, "oa": { "type": "alias", "path": "is_oa" }, "longtail": { "type": "alias", "path": "is_longtail_oa" } } diff --git a/python/fatcat_tools/transforms/elasticsearch.py b/python/fatcat_tools/transforms/elasticsearch.py index 5058989c..fe463fa4 100644 --- a/python/fatcat_tools/transforms/elasticsearch.py +++ b/python/fatcat_tools/transforms/elasticsearch.py @@ -377,7 +377,7 @@ def _rte_url_helper(url_obj) -> dict: return t -def container_to_elasticsearch(entity, force_bool=True): +def container_to_elasticsearch(entity, force_bool=True, stats=None): """ Converts from an entity model/schema to elasticsearch oriented schema. @@ -411,10 +411,13 @@ def container_to_elasticsearch(entity, force_bool=True): entity.extra = dict() for key in ('country', 'languages', 'mimetypes', 'original_name', 'first_year', 'last_year', 'aliases', 'abbrev', 'region', - 'discipline'): + 'discipline', 'publisher_type'): if entity.extra.get(key): t[key] = entity.extra[key] + if entity.extra.get('dblp') and entity.extra['dblp'].get('prefix'): + t['dblp_prefix'] = entity.extra['dblp']['prefix'] + if 'country' in t: t['country_code'] = t.pop('country') @@ -432,6 +435,7 @@ def container_to_elasticsearch(entity, force_bool=True): any_kbart = None any_jstor = None any_ia_sim = None + keepers = [] extra = entity.extra if extra.get('doaj'): @@ -455,6 +459,9 @@ def container_to_elasticsearch(entity, force_bool=True): any_kbart = True if extra['kbart'].get('jstor'): any_jstor = True + for k, v in extra['kbart'].items(): + if v and isinstance(v, dict): + keepers.append(k) if extra.get('ia'): if extra['ia'].get('sim'): any_ia_sim = True @@ -462,6 +469,7 @@ def container_to_elasticsearch(entity, force_bool=True): is_longtail_oa = True t['is_superceded'] = bool(extra.get('superceded')) + t['keepers'] = keepers t['in_doaj'] = bool(in_doaj) t['in_road'] = bool(in_road) t['any_kbart'] = bool(any_kbart) @@ -475,6 +483,14 @@ def container_to_elasticsearch(entity, force_bool=True): t['is_longtail_oa'] = is_longtail_oa t['any_jstor'] = any_jstor t['any_ia_sim'] = any_ia_sim + + # mix in stats, if provided + if stats: + t['releases_total'] = stats['total'] + t['preservation_bright'] = stats['preservation']['bright'] + t['preservation_dark'] = stats['preservation']['dark'] + t['preservation_shadows_only'] = stats['preservation']['shadows_only'] + t['preservation_none'] = stats['preservation']['none'] return t diff --git a/python/tests/transform_elasticsearch.py b/python/tests/transform_elasticsearch.py index 9cf77d4a..ba2b7ea2 100644 --- a/python/tests/transform_elasticsearch.py +++ b/python/tests/transform_elasticsearch.py @@ -147,11 +147,48 @@ def test_elasticsearch_release_from_json(): def test_elasticsearch_container_transform(journal_metadata_importer): with open('tests/files/journal_metadata.sample.json', 'r') as f: - raw = json.loads(f.readline()) - c = journal_metadata_importer.parse_record(raw) - c.state = 'active' - es = container_to_elasticsearch(c) - assert es['publisher'] == c.publisher + raw1 = json.loads(f.readline()) + raw2 = json.loads(f.readline()) + c1 = journal_metadata_importer.parse_record(raw1) + c1.state = 'active' + c2 = journal_metadata_importer.parse_record(raw2) + c2.state = 'active' + + c1.extra['publisher_type'] = "big5" + c1.extra['discipline'] = "history" + es = container_to_elasticsearch(c1) + assert es['publisher'] == c1.publisher + assert es['discipline'] == c1.extra['discipline'] + assert es['publisher_type'] == c1.extra['publisher_type'] + assert es['keepers'] == [] + + stats = { + "ident": "en4qj5ijrbf5djxx7p5zzpjyoq", + "in_kbart": 11136, + "in_web": 9501, + "is_preserved": 11136, + "issnl": "2050-084X", + "preservation": { + "bright": 9501, + "dark": 1635, + "none": 0, + "shadows_only": 0, + "total": 11136 + }, + "release_type": { + "_unknown": 9, + "article-journal": 11124, + "editorial": 2, + "letter": 1 + }, + "total": 11136 + } + es = container_to_elasticsearch(c2, stats=stats) + assert es['name'] == c2.name + assert es['publisher'] == c2.publisher + assert es['keepers'] == list(c2.extra['kbart'].keys()) == ["portico"] + assert es['any_kbart'] == True + def test_elasticsearch_file_transform(matched_importer): f = entity_from_json(open('./tests/files/file_bcah4zp5tvdhjl5bqci2c2lgfa.json', 'r').read(), FileEntity) -- cgit v1.2.3 From 0e171b5aeb77690ead3bb896be196fdcc5c69a39 Mon Sep 17 00:00:00 2001 From: Bryan Newbold Date: Tue, 6 Apr 2021 20:05:05 -0700 Subject: search container stats: changes to be called from index code path Eg, allowing injection of more config values --- python/fatcat_web/search.py | 13 ++++++++++--- python/tests/web_search.py | 10 ++++++++++ 2 files changed, 20 insertions(+), 3 deletions(-) diff --git a/python/fatcat_web/search.py b/python/fatcat_web/search.py index 0cdb604a..2811b9a0 100644 --- a/python/fatcat_web/search.py +++ b/python/fatcat_web/search.py @@ -424,7 +424,7 @@ def get_elastic_search_coverage(query: ReleaseQuery) -> dict: return stats -def get_elastic_container_stats(ident, issnl=None): +def get_elastic_container_stats(ident, issnl=None, es_client=None, es_index=None, merge_shadows=None): """ Returns dict: ident @@ -435,7 +435,14 @@ def get_elastic_container_stats(ident, issnl=None): preserved """ - search = Search(using=app.es_client, index=app.config['ELASTICSEARCH_RELEASE_INDEX']) + if not es_client: + es_client = app.es_client + if not es_index: + es_index = app.config['ELASTICSEARCH_RELEASE_INDEX'] + if merge_shadows is None: + merge_shadows = app.config['FATCAT_MERGE_SHADOW_PRESERVATION'] + + search = Search(using=es_client, index=es_index) search = search.query( 'term', container_id=ident, @@ -479,7 +486,7 @@ def get_elastic_container_stats(ident, issnl=None): for k in ('bright', 'dark', 'shadows_only', 'none'): if not k in preservation_bucket: preservation_bucket[k] = 0 - if app.config['FATCAT_MERGE_SHADOW_PRESERVATION']: + if merge_shadows: preservation_bucket['none'] += preservation_bucket['shadows_only'] preservation_bucket['shadows_only'] = 0 release_type_bucket = agg_to_dict(resp.aggregations.release_type) diff --git a/python/tests/web_search.py b/python/tests/web_search.py index a7bf7ec7..8df01466 100644 --- a/python/tests/web_search.py +++ b/python/tests/web_search.py @@ -165,6 +165,16 @@ def test_container_stats(app, mocker): ] rv = app.get('/container/issnl/1234-5678/stats.json') assert rv.status_code == 200 + stats = rv.json + assert isinstance(stats['total'], int) + assert isinstance(stats['release_type'], dict) + assert isinstance(stats['preservation']['total'], int) + assert isinstance(stats['preservation']['bright'], int) + assert isinstance(stats['preservation']['dark'], int) + assert isinstance(stats['preservation']['none'], int) rv = app.get('/container/aaaaaaaaaaaaaeiraaaaaaaaam/stats.json') assert rv.status_code == 200 + stats = rv.json + assert isinstance(stats['total'], int) + assert stats['ident'] == "aaaaaaaaaaaaaeiraaaaaaaaam" -- cgit v1.2.3 From 261c85ca450f4bb8bc6956e719d7c276927dcd95 Mon Sep 17 00:00:00 2001 From: Bryan Newbold Date: Tue, 6 Apr 2021 20:25:12 -0700 Subject: web infra: log to stderr --- python/fatcat_web/__init__.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/python/fatcat_web/__init__.py b/python/fatcat_web/__init__.py index 487de58a..07b4e083 100644 --- a/python/fatcat_web/__init__.py +++ b/python/fatcat_web/__init__.py @@ -1,4 +1,6 @@ +import sys + from flask import Flask from flask.logging import create_logger from flask_uuid import FlaskUUID @@ -56,10 +58,10 @@ def auth_api(token): return fatcat_openapi_client.DefaultApi(fatcat_openapi_client.ApiClient(conf)) if Config.FATCAT_API_AUTH_TOKEN: - print("Found and using privileged token (eg, for account signup)") + print("Found and using privileged token (eg, for account signup)", file=sys.stderr) priv_api = auth_api(Config.FATCAT_API_AUTH_TOKEN) else: - print("No privileged token found") + print("No privileged token found", file=sys.stderr) priv_api = None # TODO: refactor integration so this doesn't always need to be defined. If -- cgit v1.2.3 From 5ba1859a2308c72e7e3e4e86af3c275210c5bacd Mon Sep 17 00:00:00 2001 From: Bryan Newbold Date: Tue, 6 Apr 2021 20:25:46 -0700 Subject: ES schema updates: doc_index_ts as a str, not datetime The schema is a timestamp, but python needs to serialize as JSON, and doesn't do datetime automatically. --- python/fatcat_tools/transforms/elasticsearch.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/python/fatcat_tools/transforms/elasticsearch.py b/python/fatcat_tools/transforms/elasticsearch.py index fe463fa4..4bf9091a 100644 --- a/python/fatcat_tools/transforms/elasticsearch.py +++ b/python/fatcat_tools/transforms/elasticsearch.py @@ -46,7 +46,7 @@ def release_to_elasticsearch(entity: ReleaseEntity, force_bool: bool = True) -> # First, the easy ones (direct copy) release = entity t = dict( - doc_index_ts=datetime.datetime.utcnow(), + doc_index_ts=datetime.datetime.utcnow().isoformat()+"Z", ident = release.ident, state = release.state, revision = release.revision, @@ -395,7 +395,7 @@ def container_to_elasticsearch(entity, force_bool=True, stats=None): # First, the easy ones (direct copy) t = dict( - doc_index_ts=datetime.datetime.utcnow(), + doc_index_ts=datetime.datetime.utcnow().isoformat()+"Z", ident = entity.ident, state = entity.state, revision = entity.revision, @@ -515,7 +515,7 @@ def changelog_to_elasticsearch(entity): editgroup = entity.editgroup t = dict( - doc_index_ts=datetime.datetime.utcnow(), + doc_index_ts=datetime.datetime.utcnow().isoformat()+"Z", index=entity.index, editgroup_id=entity.editgroup_id, timestamp=entity.timestamp.isoformat(), @@ -579,7 +579,7 @@ def file_to_elasticsearch(entity): # First, the easy ones (direct copy) t = dict( - doc_index_ts=datetime.datetime.utcnow(), + doc_index_ts=datetime.datetime.utcnow().isoformat()+"Z", ident = entity.ident, state = entity.state, revision = entity.revision, -- cgit v1.2.3 From b0c5db8a2bd2e389f99df1b44120c18fa5bc3e52 Mon Sep 17 00:00:00 2001 From: Bryan Newbold Date: Tue, 6 Apr 2021 20:27:05 -0700 Subject: transform tool: container transform stats lookup support --- python/fatcat_transform.py | 28 ++++++++++++++++++++-- .../container_jxqqgho7bncrvgfyfznramju3q.json | 1 + 2 files changed, 27 insertions(+), 2 deletions(-) create mode 100644 python/tests/files/container_jxqqgho7bncrvgfyfznramju3q.json diff --git a/python/fatcat_transform.py b/python/fatcat_transform.py index 8e01c860..93c39e2f 100755 --- a/python/fatcat_transform.py +++ b/python/fatcat_transform.py @@ -9,11 +9,14 @@ import sys import json import argparse +import elasticsearch from fatcat_openapi_client import ReleaseEntity, ContainerEntity, FileEntity, ChangelogEntry + from fatcat_tools import entity_from_json, \ release_to_elasticsearch, container_to_elasticsearch, \ file_to_elasticsearch, changelog_to_elasticsearch, public_api, \ release_to_csl, citeproc_csl +from fatcat_web.search import get_elastic_container_stats def run_elasticsearch_releases(args): @@ -28,6 +31,8 @@ def run_elasticsearch_releases(args): json.dumps(release_to_elasticsearch(entity)) + '\n') def run_elasticsearch_containers(args): + es_client = elasticsearch.Elasticsearch(args.fatcat_elasticsearch_url) + es_release_index = "fatcat_release" for line in args.json_input: line = line.strip() if not line: @@ -35,8 +40,21 @@ def run_elasticsearch_containers(args): entity = entity_from_json(line, ContainerEntity, api_client=args.api.api_client) if entity.state != 'active': continue - args.json_output.write( - json.dumps(container_to_elasticsearch(entity)) + '\n') + + if args.query_stats: + es_doc = container_to_elasticsearch( + entity, + stats=get_elastic_container_stats( + entity.ident, + es_client=es_client, + es_index=es_release_index, + merge_shadows=True, + ), + ) + else: + es_doc = container_to_elasticsearch(entity) + + args.json_output.write(json.dumps(es_doc) + '\n') def run_elasticsearch_files(args): for line in args.json_input: @@ -77,6 +95,9 @@ def main(): parser.add_argument('--fatcat-api-url', default="http://localhost:9411/v0", help="connect to this host/port") + parser.add_argument('--fatcat-elasticsearch-url', + default="http://localhost:9200", + help="connect to this host/port") subparsers = parser.add_subparsers() sub_elasticsearch_releases = subparsers.add_parser('elasticsearch-releases', @@ -98,6 +119,9 @@ def main(): sub_elasticsearch_containers.add_argument('json_output', help="where to send output", default=sys.stdout, type=argparse.FileType('w')) + sub_elasticsearch_containers.add_argument('--query-stats', + action='store_true', + help="whether to query release search index for container stats") sub_elasticsearch_files = subparsers.add_parser('elasticsearch-files', help="convert fatcat file JSON schema to elasticsearch file schema") diff --git a/python/tests/files/container_jxqqgho7bncrvgfyfznramju3q.json b/python/tests/files/container_jxqqgho7bncrvgfyfznramju3q.json new file mode 100644 index 00000000..bb4d46f9 --- /dev/null +++ b/python/tests/files/container_jxqqgho7bncrvgfyfznramju3q.json @@ -0,0 +1 @@ +{"extra":{"abbrev":"Annu. Rev. Pharmacol. Toxicol.","country":"us","ezb":{"color":"red","ezb_id":"2460"},"ia":{"sim":{"peer_reviewed":true,"pub_type":"Scholarly Journals","scholarly_peer_reviewed":true,"sim_pubid":"5091","year_spans":[[1961,2009]]}},"issne":"1545-4304","issnp":"0362-1642","kbart":{"hathitrust":{"year_spans":[[1976,1992]]},"portico":{"year_spans":[[1961,1999],[2001,2001],[2003,2003],[2005,2006],[2008,2010],[2012,2019]]},"scholarsportal":{"year_spans":[[1961,2003],[2005,2019]]}},"languages":["en"],"sherpa_romeo":{"color":"yellow"},"urls":["https://www.annualreviews.org/journal/pharmtox","https://www.annualreviews.org/loi/pharmtox","http://arjournals.annualreviews.org/loi/pharmtox"]},"ident":"jxqqgho7bncrvgfyfznramju3q","issnl":"0362-1642","name":"Annual Review of Pharmacology and Toxicology","publisher":"Annual Reviews","revision":"ff56081b-9130-47a6-9e14-9901c2808502","state":"active"} -- cgit v1.2.3 From 9f110393b90d5b9e95a39b4f83d3e864434dd189 Mon Sep 17 00:00:00 2001 From: Bryan Newbold Date: Tue, 6 Apr 2021 21:48:40 -0700 Subject: container ES index worker: support for querying status --- python/fatcat_tools/workers/elasticsearch.py | 37 ++++++++++++++++++++++++---- python/fatcat_worker.py | 5 ++++ 2 files changed, 37 insertions(+), 5 deletions(-) diff --git a/python/fatcat_tools/workers/elasticsearch.py b/python/fatcat_tools/workers/elasticsearch.py index 4850bb0a..13409eb5 100644 --- a/python/fatcat_tools/workers/elasticsearch.py +++ b/python/fatcat_tools/workers/elasticsearch.py @@ -1,10 +1,16 @@ import json + import requests +import elasticsearch from confluent_kafka import Consumer, KafkaException from fatcat_openapi_client import ReleaseEntity, ContainerEntity, ApiClient, ChangelogEntry -from fatcat_tools import * +from fatcat_tools import (public_api, entity_from_json, + release_to_elasticsearch, container_to_elasticsearch, + changelog_to_elasticsearch, +) +from fatcat_web.search import get_elastic_container_stats from .worker_common import FatcatWorker @@ -18,7 +24,8 @@ class ElasticsearchReleaseWorker(FatcatWorker): def __init__(self, kafka_hosts, consume_topic, poll_interval=10.0, offset=None, elasticsearch_backend="http://localhost:9200", elasticsearch_index="fatcat", - batch_size=200, api_host="https://api.fatcat.wiki/v0"): + elasticsearch_release_index="fatcat_releases", + batch_size=200, api_host="https://api.fatcat.wiki/v0", query_stats=False): super().__init__(kafka_hosts=kafka_hosts, consume_topic=consume_topic) self.consumer_group = "elasticsearch-updates3" @@ -26,14 +33,19 @@ class ElasticsearchReleaseWorker(FatcatWorker): self.poll_interval = poll_interval self.elasticsearch_backend = elasticsearch_backend self.elasticsearch_index = elasticsearch_index + self.elasticsearch_release_index = elasticsearch_release_index self.entity_type = ReleaseEntity self.transform_func = release_to_elasticsearch self.api_host = api_host + self.query_stats = query_stats def run(self): ac = ApiClient() api = public_api(self.api_host) + # only used by container indexing query_stats code path + es_client = elasticsearch.Elasticsearch(self.elasticsearch_backend) + def fail_fast(err, partitions): if err is not None: print("Kafka consumer commit error: {}".format(err)) @@ -104,13 +116,25 @@ class ElasticsearchReleaseWorker(FatcatWorker): entity = api.get_changelog_entry(entity.index) else: key = entity.ident + + if self.entity_type == ContainerEntity and self.query_stats: + stats = get_elastic_container_stats( + entity.ident, + es_client=es_client, + es_index=self.elasticsearch_release_index, + merge_shadows=True, + ) + doc_dict = container_to_elasticsearch(entity, stats=stats) + else: + doc_dict = self.transform_func(entity) + # TODO: handle deletions from index bulk_actions.append(json.dumps({ "index": { "_id": key, }, })) - bulk_actions.append(json.dumps( - self.transform_func(entity))) - print("Upserting, eg, {} (of {} {} in elasticsearch)".format(key, len(batch), self.entity_type)) + bulk_actions.append(json.dumps(doc_dict)) + + print("Upserting, eg, {} (of {} {} in elasticsearch)".format(key, len(batch), self.entity_type.__name__)) elasticsearch_endpoint = "{}/{}/_bulk".format( self.elasticsearch_backend, self.elasticsearch_index) @@ -132,6 +156,7 @@ class ElasticsearchReleaseWorker(FatcatWorker): class ElasticsearchContainerWorker(ElasticsearchReleaseWorker): def __init__(self, kafka_hosts, consume_topic, poll_interval=10.0, offset=None, + query_stats=False, elasticsearch_release_index="fatcat_release", elasticsearch_backend="http://localhost:9200", elasticsearch_index="fatcat", batch_size=200): super().__init__(kafka_hosts=kafka_hosts, @@ -140,6 +165,8 @@ class ElasticsearchContainerWorker(ElasticsearchReleaseWorker): offset=offset, elasticsearch_backend=elasticsearch_backend, elasticsearch_index=elasticsearch_index, + elasticsearch_release_index=elasticsearch_release_index, + query_stats=query_stats, batch_size=batch_size) # previous group got corrupted (by pykafka library?) self.consumer_group = "elasticsearch-updates3" diff --git a/python/fatcat_worker.py b/python/fatcat_worker.py index 5e3fb3e9..95f5024a 100755 --- a/python/fatcat_worker.py +++ b/python/fatcat_worker.py @@ -44,6 +44,8 @@ def run_elasticsearch_release(args): def run_elasticsearch_container(args): consume_topic = "fatcat-{}.container-updates".format(args.env) worker = ElasticsearchContainerWorker(args.kafka_hosts, consume_topic, + query_stats=args.query_stats, + elasticsearch_release_index="fatcat_release", elasticsearch_backend=args.elasticsearch_backend, elasticsearch_index=args.elasticsearch_index) worker.run() @@ -99,6 +101,9 @@ def main(): sub_elasticsearch_container.add_argument('--elasticsearch-index', help="elasticsearch index to push into", default="fatcat_container") + sub_elasticsearch_container.add_argument('--query-stats', + action='store_true', + help="whether to query release search index for container stats") sub_elasticsearch_changelog = subparsers.add_parser('elasticsearch-changelog', help="consume changelog kafka feed, transform and push to search") -- cgit v1.2.3