diff options
author | bnewbold <bnewbold@archive.org> | 2021-04-08 00:22:33 +0000 |
---|---|---|
committer | bnewbold <bnewbold@archive.org> | 2021-04-08 00:22:33 +0000 |
commit | 97280d0a20baa00aa1f8dbd3bec62142ad2ce900 (patch) | |
tree | 9320c75d5c19148aba7cd3a0ced0fc200988e6ba /python | |
parent | 0b9fc884dad8e3147d10c273725157ba60f48069 (diff) | |
parent | 9f110393b90d5b9e95a39b4f83d3e864434dd189 (diff) | |
download | fatcat-97280d0a20baa00aa1f8dbd3bec62142ad2ce900.tar.gz fatcat-97280d0a20baa00aa1f8dbd3bec62142ad2ce900.zip |
Merge branch 'bnewbold-es-index-updates' into 'master'
fatcat elasticsearch schema updates
See merge request webgroup/fatcat!101
Diffstat (limited to 'python')
-rw-r--r-- | python/fatcat_tools/transforms/elasticsearch.py | 26 | ||||
-rw-r--r-- | python/fatcat_tools/workers/elasticsearch.py | 37 | ||||
-rwxr-xr-x | python/fatcat_transform.py | 28 | ||||
-rw-r--r-- | python/fatcat_web/__init__.py | 6 | ||||
-rw-r--r-- | python/fatcat_web/search.py | 13 | ||||
-rwxr-xr-x | python/fatcat_worker.py | 5 | ||||
-rw-r--r-- | python/tests/files/container_jxqqgho7bncrvgfyfznramju3q.json | 1 | ||||
-rw-r--r-- | python/tests/transform_elasticsearch.py | 47 | ||||
-rw-r--r-- | python/tests/web_search.py | 10 |
9 files changed, 154 insertions, 19 deletions
diff --git a/python/fatcat_tools/transforms/elasticsearch.py b/python/fatcat_tools/transforms/elasticsearch.py index f37aadba..4bf9091a 100644 --- a/python/fatcat_tools/transforms/elasticsearch.py +++ b/python/fatcat_tools/transforms/elasticsearch.py @@ -46,6 +46,7 @@ def release_to_elasticsearch(entity: ReleaseEntity, force_bool: bool = True) -> # First, the easy ones (direct copy) release = entity t = dict( + doc_index_ts=datetime.datetime.utcnow().isoformat()+"Z", ident = release.ident, state = release.state, revision = release.revision, @@ -295,6 +296,8 @@ def _rte_container_helper(container: ContainerEntity, release_year: Optional[int t['country_code_upper'] = c_extra['country'].upper() if c_extra.get('publisher_type'): t['publisher_type'] = c_extra['publisher_type'] + if c_extra.get('discipline'): + t['discipline'] = c_extra['discipline'] return t def _rte_content_helper(release: ReleaseEntity) -> dict: @@ -374,7 +377,7 @@ def _rte_url_helper(url_obj) -> dict: return t -def container_to_elasticsearch(entity, force_bool=True): +def container_to_elasticsearch(entity, force_bool=True, stats=None): """ Converts from an entity model/schema to elasticsearch oriented schema. @@ -392,6 +395,7 @@ def container_to_elasticsearch(entity, force_bool=True): # First, the easy ones (direct copy) t = dict( + doc_index_ts=datetime.datetime.utcnow().isoformat()+"Z", ident = entity.ident, state = entity.state, revision = entity.revision, @@ -407,10 +411,13 @@ def container_to_elasticsearch(entity, force_bool=True): entity.extra = dict() for key in ('country', 'languages', 'mimetypes', 'original_name', 'first_year', 'last_year', 'aliases', 'abbrev', 'region', - 'discipline'): + 'discipline', 'publisher_type'): if entity.extra.get(key): t[key] = entity.extra[key] + if entity.extra.get('dblp') and entity.extra['dblp'].get('prefix'): + t['dblp_prefix'] = entity.extra['dblp']['prefix'] + if 'country' in t: t['country_code'] = t.pop('country') @@ -428,6 +435,7 @@ def container_to_elasticsearch(entity, force_bool=True): any_kbart = None any_jstor = None any_ia_sim = None + keepers = [] extra = entity.extra if extra.get('doaj'): @@ -451,6 +459,9 @@ def container_to_elasticsearch(entity, force_bool=True): any_kbart = True if extra['kbart'].get('jstor'): any_jstor = True + for k, v in extra['kbart'].items(): + if v and isinstance(v, dict): + keepers.append(k) if extra.get('ia'): if extra['ia'].get('sim'): any_ia_sim = True @@ -458,6 +469,7 @@ def container_to_elasticsearch(entity, force_bool=True): is_longtail_oa = True t['is_superceded'] = bool(extra.get('superceded')) + t['keepers'] = keepers t['in_doaj'] = bool(in_doaj) t['in_road'] = bool(in_road) t['any_kbart'] = bool(any_kbart) @@ -471,6 +483,14 @@ def container_to_elasticsearch(entity, force_bool=True): t['is_longtail_oa'] = is_longtail_oa t['any_jstor'] = any_jstor t['any_ia_sim'] = any_ia_sim + + # mix in stats, if provided + if stats: + t['releases_total'] = stats['total'] + t['preservation_bright'] = stats['preservation']['bright'] + t['preservation_dark'] = stats['preservation']['dark'] + t['preservation_shadows_only'] = stats['preservation']['shadows_only'] + t['preservation_none'] = stats['preservation']['none'] return t @@ -495,6 +515,7 @@ def changelog_to_elasticsearch(entity): editgroup = entity.editgroup t = dict( + doc_index_ts=datetime.datetime.utcnow().isoformat()+"Z", index=entity.index, editgroup_id=entity.editgroup_id, timestamp=entity.timestamp.isoformat(), @@ -558,6 +579,7 @@ def file_to_elasticsearch(entity): # First, the easy ones (direct copy) t = dict( + doc_index_ts=datetime.datetime.utcnow().isoformat()+"Z", ident = entity.ident, state = entity.state, revision = entity.revision, diff --git a/python/fatcat_tools/workers/elasticsearch.py b/python/fatcat_tools/workers/elasticsearch.py index 4850bb0a..13409eb5 100644 --- a/python/fatcat_tools/workers/elasticsearch.py +++ b/python/fatcat_tools/workers/elasticsearch.py @@ -1,10 +1,16 @@ import json + import requests +import elasticsearch from confluent_kafka import Consumer, KafkaException from fatcat_openapi_client import ReleaseEntity, ContainerEntity, ApiClient, ChangelogEntry -from fatcat_tools import * +from fatcat_tools import (public_api, entity_from_json, + release_to_elasticsearch, container_to_elasticsearch, + changelog_to_elasticsearch, +) +from fatcat_web.search import get_elastic_container_stats from .worker_common import FatcatWorker @@ -18,7 +24,8 @@ class ElasticsearchReleaseWorker(FatcatWorker): def __init__(self, kafka_hosts, consume_topic, poll_interval=10.0, offset=None, elasticsearch_backend="http://localhost:9200", elasticsearch_index="fatcat", - batch_size=200, api_host="https://api.fatcat.wiki/v0"): + elasticsearch_release_index="fatcat_releases", + batch_size=200, api_host="https://api.fatcat.wiki/v0", query_stats=False): super().__init__(kafka_hosts=kafka_hosts, consume_topic=consume_topic) self.consumer_group = "elasticsearch-updates3" @@ -26,14 +33,19 @@ class ElasticsearchReleaseWorker(FatcatWorker): self.poll_interval = poll_interval self.elasticsearch_backend = elasticsearch_backend self.elasticsearch_index = elasticsearch_index + self.elasticsearch_release_index = elasticsearch_release_index self.entity_type = ReleaseEntity self.transform_func = release_to_elasticsearch self.api_host = api_host + self.query_stats = query_stats def run(self): ac = ApiClient() api = public_api(self.api_host) + # only used by container indexing query_stats code path + es_client = elasticsearch.Elasticsearch(self.elasticsearch_backend) + def fail_fast(err, partitions): if err is not None: print("Kafka consumer commit error: {}".format(err)) @@ -104,13 +116,25 @@ class ElasticsearchReleaseWorker(FatcatWorker): entity = api.get_changelog_entry(entity.index) else: key = entity.ident + + if self.entity_type == ContainerEntity and self.query_stats: + stats = get_elastic_container_stats( + entity.ident, + es_client=es_client, + es_index=self.elasticsearch_release_index, + merge_shadows=True, + ) + doc_dict = container_to_elasticsearch(entity, stats=stats) + else: + doc_dict = self.transform_func(entity) + # TODO: handle deletions from index bulk_actions.append(json.dumps({ "index": { "_id": key, }, })) - bulk_actions.append(json.dumps( - self.transform_func(entity))) - print("Upserting, eg, {} (of {} {} in elasticsearch)".format(key, len(batch), self.entity_type)) + bulk_actions.append(json.dumps(doc_dict)) + + print("Upserting, eg, {} (of {} {} in elasticsearch)".format(key, len(batch), self.entity_type.__name__)) elasticsearch_endpoint = "{}/{}/_bulk".format( self.elasticsearch_backend, self.elasticsearch_index) @@ -132,6 +156,7 @@ class ElasticsearchReleaseWorker(FatcatWorker): class ElasticsearchContainerWorker(ElasticsearchReleaseWorker): def __init__(self, kafka_hosts, consume_topic, poll_interval=10.0, offset=None, + query_stats=False, elasticsearch_release_index="fatcat_release", elasticsearch_backend="http://localhost:9200", elasticsearch_index="fatcat", batch_size=200): super().__init__(kafka_hosts=kafka_hosts, @@ -140,6 +165,8 @@ class ElasticsearchContainerWorker(ElasticsearchReleaseWorker): offset=offset, elasticsearch_backend=elasticsearch_backend, elasticsearch_index=elasticsearch_index, + elasticsearch_release_index=elasticsearch_release_index, + query_stats=query_stats, batch_size=batch_size) # previous group got corrupted (by pykafka library?) self.consumer_group = "elasticsearch-updates3" diff --git a/python/fatcat_transform.py b/python/fatcat_transform.py index 8e01c860..93c39e2f 100755 --- a/python/fatcat_transform.py +++ b/python/fatcat_transform.py @@ -9,11 +9,14 @@ import sys import json import argparse +import elasticsearch from fatcat_openapi_client import ReleaseEntity, ContainerEntity, FileEntity, ChangelogEntry + from fatcat_tools import entity_from_json, \ release_to_elasticsearch, container_to_elasticsearch, \ file_to_elasticsearch, changelog_to_elasticsearch, public_api, \ release_to_csl, citeproc_csl +from fatcat_web.search import get_elastic_container_stats def run_elasticsearch_releases(args): @@ -28,6 +31,8 @@ def run_elasticsearch_releases(args): json.dumps(release_to_elasticsearch(entity)) + '\n') def run_elasticsearch_containers(args): + es_client = elasticsearch.Elasticsearch(args.fatcat_elasticsearch_url) + es_release_index = "fatcat_release" for line in args.json_input: line = line.strip() if not line: @@ -35,8 +40,21 @@ def run_elasticsearch_containers(args): entity = entity_from_json(line, ContainerEntity, api_client=args.api.api_client) if entity.state != 'active': continue - args.json_output.write( - json.dumps(container_to_elasticsearch(entity)) + '\n') + + if args.query_stats: + es_doc = container_to_elasticsearch( + entity, + stats=get_elastic_container_stats( + entity.ident, + es_client=es_client, + es_index=es_release_index, + merge_shadows=True, + ), + ) + else: + es_doc = container_to_elasticsearch(entity) + + args.json_output.write(json.dumps(es_doc) + '\n') def run_elasticsearch_files(args): for line in args.json_input: @@ -77,6 +95,9 @@ def main(): parser.add_argument('--fatcat-api-url', default="http://localhost:9411/v0", help="connect to this host/port") + parser.add_argument('--fatcat-elasticsearch-url', + default="http://localhost:9200", + help="connect to this host/port") subparsers = parser.add_subparsers() sub_elasticsearch_releases = subparsers.add_parser('elasticsearch-releases', @@ -98,6 +119,9 @@ def main(): sub_elasticsearch_containers.add_argument('json_output', help="where to send output", default=sys.stdout, type=argparse.FileType('w')) + sub_elasticsearch_containers.add_argument('--query-stats', + action='store_true', + help="whether to query release search index for container stats") sub_elasticsearch_files = subparsers.add_parser('elasticsearch-files', help="convert fatcat file JSON schema to elasticsearch file schema") diff --git a/python/fatcat_web/__init__.py b/python/fatcat_web/__init__.py index 487de58a..07b4e083 100644 --- a/python/fatcat_web/__init__.py +++ b/python/fatcat_web/__init__.py @@ -1,4 +1,6 @@ +import sys + from flask import Flask from flask.logging import create_logger from flask_uuid import FlaskUUID @@ -56,10 +58,10 @@ def auth_api(token): return fatcat_openapi_client.DefaultApi(fatcat_openapi_client.ApiClient(conf)) if Config.FATCAT_API_AUTH_TOKEN: - print("Found and using privileged token (eg, for account signup)") + print("Found and using privileged token (eg, for account signup)", file=sys.stderr) priv_api = auth_api(Config.FATCAT_API_AUTH_TOKEN) else: - print("No privileged token found") + print("No privileged token found", file=sys.stderr) priv_api = None # TODO: refactor integration so this doesn't always need to be defined. If diff --git a/python/fatcat_web/search.py b/python/fatcat_web/search.py index 0cdb604a..2811b9a0 100644 --- a/python/fatcat_web/search.py +++ b/python/fatcat_web/search.py @@ -424,7 +424,7 @@ def get_elastic_search_coverage(query: ReleaseQuery) -> dict: return stats -def get_elastic_container_stats(ident, issnl=None): +def get_elastic_container_stats(ident, issnl=None, es_client=None, es_index=None, merge_shadows=None): """ Returns dict: ident @@ -435,7 +435,14 @@ def get_elastic_container_stats(ident, issnl=None): preserved """ - search = Search(using=app.es_client, index=app.config['ELASTICSEARCH_RELEASE_INDEX']) + if not es_client: + es_client = app.es_client + if not es_index: + es_index = app.config['ELASTICSEARCH_RELEASE_INDEX'] + if merge_shadows is None: + merge_shadows = app.config['FATCAT_MERGE_SHADOW_PRESERVATION'] + + search = Search(using=es_client, index=es_index) search = search.query( 'term', container_id=ident, @@ -479,7 +486,7 @@ def get_elastic_container_stats(ident, issnl=None): for k in ('bright', 'dark', 'shadows_only', 'none'): if not k in preservation_bucket: preservation_bucket[k] = 0 - if app.config['FATCAT_MERGE_SHADOW_PRESERVATION']: + if merge_shadows: preservation_bucket['none'] += preservation_bucket['shadows_only'] preservation_bucket['shadows_only'] = 0 release_type_bucket = agg_to_dict(resp.aggregations.release_type) diff --git a/python/fatcat_worker.py b/python/fatcat_worker.py index 5e3fb3e9..95f5024a 100755 --- a/python/fatcat_worker.py +++ b/python/fatcat_worker.py @@ -44,6 +44,8 @@ def run_elasticsearch_release(args): def run_elasticsearch_container(args): consume_topic = "fatcat-{}.container-updates".format(args.env) worker = ElasticsearchContainerWorker(args.kafka_hosts, consume_topic, + query_stats=args.query_stats, + elasticsearch_release_index="fatcat_release", elasticsearch_backend=args.elasticsearch_backend, elasticsearch_index=args.elasticsearch_index) worker.run() @@ -99,6 +101,9 @@ def main(): sub_elasticsearch_container.add_argument('--elasticsearch-index', help="elasticsearch index to push into", default="fatcat_container") + sub_elasticsearch_container.add_argument('--query-stats', + action='store_true', + help="whether to query release search index for container stats") sub_elasticsearch_changelog = subparsers.add_parser('elasticsearch-changelog', help="consume changelog kafka feed, transform and push to search") diff --git a/python/tests/files/container_jxqqgho7bncrvgfyfznramju3q.json b/python/tests/files/container_jxqqgho7bncrvgfyfznramju3q.json new file mode 100644 index 00000000..bb4d46f9 --- /dev/null +++ b/python/tests/files/container_jxqqgho7bncrvgfyfznramju3q.json @@ -0,0 +1 @@ +{"extra":{"abbrev":"Annu. Rev. Pharmacol. Toxicol.","country":"us","ezb":{"color":"red","ezb_id":"2460"},"ia":{"sim":{"peer_reviewed":true,"pub_type":"Scholarly Journals","scholarly_peer_reviewed":true,"sim_pubid":"5091","year_spans":[[1961,2009]]}},"issne":"1545-4304","issnp":"0362-1642","kbart":{"hathitrust":{"year_spans":[[1976,1992]]},"portico":{"year_spans":[[1961,1999],[2001,2001],[2003,2003],[2005,2006],[2008,2010],[2012,2019]]},"scholarsportal":{"year_spans":[[1961,2003],[2005,2019]]}},"languages":["en"],"sherpa_romeo":{"color":"yellow"},"urls":["https://www.annualreviews.org/journal/pharmtox","https://www.annualreviews.org/loi/pharmtox","http://arjournals.annualreviews.org/loi/pharmtox"]},"ident":"jxqqgho7bncrvgfyfznramju3q","issnl":"0362-1642","name":"Annual Review of Pharmacology and Toxicology","publisher":"Annual Reviews","revision":"ff56081b-9130-47a6-9e14-9901c2808502","state":"active"} diff --git a/python/tests/transform_elasticsearch.py b/python/tests/transform_elasticsearch.py index 9cf77d4a..ba2b7ea2 100644 --- a/python/tests/transform_elasticsearch.py +++ b/python/tests/transform_elasticsearch.py @@ -147,11 +147,48 @@ def test_elasticsearch_release_from_json(): def test_elasticsearch_container_transform(journal_metadata_importer): with open('tests/files/journal_metadata.sample.json', 'r') as f: - raw = json.loads(f.readline()) - c = journal_metadata_importer.parse_record(raw) - c.state = 'active' - es = container_to_elasticsearch(c) - assert es['publisher'] == c.publisher + raw1 = json.loads(f.readline()) + raw2 = json.loads(f.readline()) + c1 = journal_metadata_importer.parse_record(raw1) + c1.state = 'active' + c2 = journal_metadata_importer.parse_record(raw2) + c2.state = 'active' + + c1.extra['publisher_type'] = "big5" + c1.extra['discipline'] = "history" + es = container_to_elasticsearch(c1) + assert es['publisher'] == c1.publisher + assert es['discipline'] == c1.extra['discipline'] + assert es['publisher_type'] == c1.extra['publisher_type'] + assert es['keepers'] == [] + + stats = { + "ident": "en4qj5ijrbf5djxx7p5zzpjyoq", + "in_kbart": 11136, + "in_web": 9501, + "is_preserved": 11136, + "issnl": "2050-084X", + "preservation": { + "bright": 9501, + "dark": 1635, + "none": 0, + "shadows_only": 0, + "total": 11136 + }, + "release_type": { + "_unknown": 9, + "article-journal": 11124, + "editorial": 2, + "letter": 1 + }, + "total": 11136 + } + es = container_to_elasticsearch(c2, stats=stats) + assert es['name'] == c2.name + assert es['publisher'] == c2.publisher + assert es['keepers'] == list(c2.extra['kbart'].keys()) == ["portico"] + assert es['any_kbart'] == True + def test_elasticsearch_file_transform(matched_importer): f = entity_from_json(open('./tests/files/file_bcah4zp5tvdhjl5bqci2c2lgfa.json', 'r').read(), FileEntity) diff --git a/python/tests/web_search.py b/python/tests/web_search.py index a7bf7ec7..8df01466 100644 --- a/python/tests/web_search.py +++ b/python/tests/web_search.py @@ -165,6 +165,16 @@ def test_container_stats(app, mocker): ] rv = app.get('/container/issnl/1234-5678/stats.json') assert rv.status_code == 200 + stats = rv.json + assert isinstance(stats['total'], int) + assert isinstance(stats['release_type'], dict) + assert isinstance(stats['preservation']['total'], int) + assert isinstance(stats['preservation']['bright'], int) + assert isinstance(stats['preservation']['dark'], int) + assert isinstance(stats['preservation']['none'], int) rv = app.get('/container/aaaaaaaaaaaaaeiraaaaaaaaam/stats.json') assert rv.status_code == 200 + stats = rv.json + assert isinstance(stats['total'], int) + assert stats['ident'] == "aaaaaaaaaaaaaeiraaaaaaaaam" |