summaryrefslogtreecommitdiffstats
path: root/python
diff options
context:
space:
mode:
authorbnewbold <bnewbold@archive.org>2021-04-08 00:22:33 +0000
committerbnewbold <bnewbold@archive.org>2021-04-08 00:22:33 +0000
commit97280d0a20baa00aa1f8dbd3bec62142ad2ce900 (patch)
tree9320c75d5c19148aba7cd3a0ced0fc200988e6ba /python
parent0b9fc884dad8e3147d10c273725157ba60f48069 (diff)
parent9f110393b90d5b9e95a39b4f83d3e864434dd189 (diff)
downloadfatcat-97280d0a20baa00aa1f8dbd3bec62142ad2ce900.tar.gz
fatcat-97280d0a20baa00aa1f8dbd3bec62142ad2ce900.zip
Merge branch 'bnewbold-es-index-updates' into 'master'
fatcat elasticsearch schema updates See merge request webgroup/fatcat!101
Diffstat (limited to 'python')
-rw-r--r--python/fatcat_tools/transforms/elasticsearch.py26
-rw-r--r--python/fatcat_tools/workers/elasticsearch.py37
-rwxr-xr-xpython/fatcat_transform.py28
-rw-r--r--python/fatcat_web/__init__.py6
-rw-r--r--python/fatcat_web/search.py13
-rwxr-xr-xpython/fatcat_worker.py5
-rw-r--r--python/tests/files/container_jxqqgho7bncrvgfyfznramju3q.json1
-rw-r--r--python/tests/transform_elasticsearch.py47
-rw-r--r--python/tests/web_search.py10
9 files changed, 154 insertions, 19 deletions
diff --git a/python/fatcat_tools/transforms/elasticsearch.py b/python/fatcat_tools/transforms/elasticsearch.py
index f37aadba..4bf9091a 100644
--- a/python/fatcat_tools/transforms/elasticsearch.py
+++ b/python/fatcat_tools/transforms/elasticsearch.py
@@ -46,6 +46,7 @@ def release_to_elasticsearch(entity: ReleaseEntity, force_bool: bool = True) ->
# First, the easy ones (direct copy)
release = entity
t = dict(
+ doc_index_ts=datetime.datetime.utcnow().isoformat()+"Z",
ident = release.ident,
state = release.state,
revision = release.revision,
@@ -295,6 +296,8 @@ def _rte_container_helper(container: ContainerEntity, release_year: Optional[int
t['country_code_upper'] = c_extra['country'].upper()
if c_extra.get('publisher_type'):
t['publisher_type'] = c_extra['publisher_type']
+ if c_extra.get('discipline'):
+ t['discipline'] = c_extra['discipline']
return t
def _rte_content_helper(release: ReleaseEntity) -> dict:
@@ -374,7 +377,7 @@ def _rte_url_helper(url_obj) -> dict:
return t
-def container_to_elasticsearch(entity, force_bool=True):
+def container_to_elasticsearch(entity, force_bool=True, stats=None):
"""
Converts from an entity model/schema to elasticsearch oriented schema.
@@ -392,6 +395,7 @@ def container_to_elasticsearch(entity, force_bool=True):
# First, the easy ones (direct copy)
t = dict(
+ doc_index_ts=datetime.datetime.utcnow().isoformat()+"Z",
ident = entity.ident,
state = entity.state,
revision = entity.revision,
@@ -407,10 +411,13 @@ def container_to_elasticsearch(entity, force_bool=True):
entity.extra = dict()
for key in ('country', 'languages', 'mimetypes', 'original_name',
'first_year', 'last_year', 'aliases', 'abbrev', 'region',
- 'discipline'):
+ 'discipline', 'publisher_type'):
if entity.extra.get(key):
t[key] = entity.extra[key]
+ if entity.extra.get('dblp') and entity.extra['dblp'].get('prefix'):
+ t['dblp_prefix'] = entity.extra['dblp']['prefix']
+
if 'country' in t:
t['country_code'] = t.pop('country')
@@ -428,6 +435,7 @@ def container_to_elasticsearch(entity, force_bool=True):
any_kbart = None
any_jstor = None
any_ia_sim = None
+ keepers = []
extra = entity.extra
if extra.get('doaj'):
@@ -451,6 +459,9 @@ def container_to_elasticsearch(entity, force_bool=True):
any_kbart = True
if extra['kbart'].get('jstor'):
any_jstor = True
+ for k, v in extra['kbart'].items():
+ if v and isinstance(v, dict):
+ keepers.append(k)
if extra.get('ia'):
if extra['ia'].get('sim'):
any_ia_sim = True
@@ -458,6 +469,7 @@ def container_to_elasticsearch(entity, force_bool=True):
is_longtail_oa = True
t['is_superceded'] = bool(extra.get('superceded'))
+ t['keepers'] = keepers
t['in_doaj'] = bool(in_doaj)
t['in_road'] = bool(in_road)
t['any_kbart'] = bool(any_kbart)
@@ -471,6 +483,14 @@ def container_to_elasticsearch(entity, force_bool=True):
t['is_longtail_oa'] = is_longtail_oa
t['any_jstor'] = any_jstor
t['any_ia_sim'] = any_ia_sim
+
+ # mix in stats, if provided
+ if stats:
+ t['releases_total'] = stats['total']
+ t['preservation_bright'] = stats['preservation']['bright']
+ t['preservation_dark'] = stats['preservation']['dark']
+ t['preservation_shadows_only'] = stats['preservation']['shadows_only']
+ t['preservation_none'] = stats['preservation']['none']
return t
@@ -495,6 +515,7 @@ def changelog_to_elasticsearch(entity):
editgroup = entity.editgroup
t = dict(
+ doc_index_ts=datetime.datetime.utcnow().isoformat()+"Z",
index=entity.index,
editgroup_id=entity.editgroup_id,
timestamp=entity.timestamp.isoformat(),
@@ -558,6 +579,7 @@ def file_to_elasticsearch(entity):
# First, the easy ones (direct copy)
t = dict(
+ doc_index_ts=datetime.datetime.utcnow().isoformat()+"Z",
ident = entity.ident,
state = entity.state,
revision = entity.revision,
diff --git a/python/fatcat_tools/workers/elasticsearch.py b/python/fatcat_tools/workers/elasticsearch.py
index 4850bb0a..13409eb5 100644
--- a/python/fatcat_tools/workers/elasticsearch.py
+++ b/python/fatcat_tools/workers/elasticsearch.py
@@ -1,10 +1,16 @@
import json
+
import requests
+import elasticsearch
from confluent_kafka import Consumer, KafkaException
from fatcat_openapi_client import ReleaseEntity, ContainerEntity, ApiClient, ChangelogEntry
-from fatcat_tools import *
+from fatcat_tools import (public_api, entity_from_json,
+ release_to_elasticsearch, container_to_elasticsearch,
+ changelog_to_elasticsearch,
+)
+from fatcat_web.search import get_elastic_container_stats
from .worker_common import FatcatWorker
@@ -18,7 +24,8 @@ class ElasticsearchReleaseWorker(FatcatWorker):
def __init__(self, kafka_hosts, consume_topic, poll_interval=10.0, offset=None,
elasticsearch_backend="http://localhost:9200", elasticsearch_index="fatcat",
- batch_size=200, api_host="https://api.fatcat.wiki/v0"):
+ elasticsearch_release_index="fatcat_releases",
+ batch_size=200, api_host="https://api.fatcat.wiki/v0", query_stats=False):
super().__init__(kafka_hosts=kafka_hosts,
consume_topic=consume_topic)
self.consumer_group = "elasticsearch-updates3"
@@ -26,14 +33,19 @@ class ElasticsearchReleaseWorker(FatcatWorker):
self.poll_interval = poll_interval
self.elasticsearch_backend = elasticsearch_backend
self.elasticsearch_index = elasticsearch_index
+ self.elasticsearch_release_index = elasticsearch_release_index
self.entity_type = ReleaseEntity
self.transform_func = release_to_elasticsearch
self.api_host = api_host
+ self.query_stats = query_stats
def run(self):
ac = ApiClient()
api = public_api(self.api_host)
+ # only used by container indexing query_stats code path
+ es_client = elasticsearch.Elasticsearch(self.elasticsearch_backend)
+
def fail_fast(err, partitions):
if err is not None:
print("Kafka consumer commit error: {}".format(err))
@@ -104,13 +116,25 @@ class ElasticsearchReleaseWorker(FatcatWorker):
entity = api.get_changelog_entry(entity.index)
else:
key = entity.ident
+
+ if self.entity_type == ContainerEntity and self.query_stats:
+ stats = get_elastic_container_stats(
+ entity.ident,
+ es_client=es_client,
+ es_index=self.elasticsearch_release_index,
+ merge_shadows=True,
+ )
+ doc_dict = container_to_elasticsearch(entity, stats=stats)
+ else:
+ doc_dict = self.transform_func(entity)
+
# TODO: handle deletions from index
bulk_actions.append(json.dumps({
"index": { "_id": key, },
}))
- bulk_actions.append(json.dumps(
- self.transform_func(entity)))
- print("Upserting, eg, {} (of {} {} in elasticsearch)".format(key, len(batch), self.entity_type))
+ bulk_actions.append(json.dumps(doc_dict))
+
+ print("Upserting, eg, {} (of {} {} in elasticsearch)".format(key, len(batch), self.entity_type.__name__))
elasticsearch_endpoint = "{}/{}/_bulk".format(
self.elasticsearch_backend,
self.elasticsearch_index)
@@ -132,6 +156,7 @@ class ElasticsearchReleaseWorker(FatcatWorker):
class ElasticsearchContainerWorker(ElasticsearchReleaseWorker):
def __init__(self, kafka_hosts, consume_topic, poll_interval=10.0, offset=None,
+ query_stats=False, elasticsearch_release_index="fatcat_release",
elasticsearch_backend="http://localhost:9200", elasticsearch_index="fatcat",
batch_size=200):
super().__init__(kafka_hosts=kafka_hosts,
@@ -140,6 +165,8 @@ class ElasticsearchContainerWorker(ElasticsearchReleaseWorker):
offset=offset,
elasticsearch_backend=elasticsearch_backend,
elasticsearch_index=elasticsearch_index,
+ elasticsearch_release_index=elasticsearch_release_index,
+ query_stats=query_stats,
batch_size=batch_size)
# previous group got corrupted (by pykafka library?)
self.consumer_group = "elasticsearch-updates3"
diff --git a/python/fatcat_transform.py b/python/fatcat_transform.py
index 8e01c860..93c39e2f 100755
--- a/python/fatcat_transform.py
+++ b/python/fatcat_transform.py
@@ -9,11 +9,14 @@ import sys
import json
import argparse
+import elasticsearch
from fatcat_openapi_client import ReleaseEntity, ContainerEntity, FileEntity, ChangelogEntry
+
from fatcat_tools import entity_from_json, \
release_to_elasticsearch, container_to_elasticsearch, \
file_to_elasticsearch, changelog_to_elasticsearch, public_api, \
release_to_csl, citeproc_csl
+from fatcat_web.search import get_elastic_container_stats
def run_elasticsearch_releases(args):
@@ -28,6 +31,8 @@ def run_elasticsearch_releases(args):
json.dumps(release_to_elasticsearch(entity)) + '\n')
def run_elasticsearch_containers(args):
+ es_client = elasticsearch.Elasticsearch(args.fatcat_elasticsearch_url)
+ es_release_index = "fatcat_release"
for line in args.json_input:
line = line.strip()
if not line:
@@ -35,8 +40,21 @@ def run_elasticsearch_containers(args):
entity = entity_from_json(line, ContainerEntity, api_client=args.api.api_client)
if entity.state != 'active':
continue
- args.json_output.write(
- json.dumps(container_to_elasticsearch(entity)) + '\n')
+
+ if args.query_stats:
+ es_doc = container_to_elasticsearch(
+ entity,
+ stats=get_elastic_container_stats(
+ entity.ident,
+ es_client=es_client,
+ es_index=es_release_index,
+ merge_shadows=True,
+ ),
+ )
+ else:
+ es_doc = container_to_elasticsearch(entity)
+
+ args.json_output.write(json.dumps(es_doc) + '\n')
def run_elasticsearch_files(args):
for line in args.json_input:
@@ -77,6 +95,9 @@ def main():
parser.add_argument('--fatcat-api-url',
default="http://localhost:9411/v0",
help="connect to this host/port")
+ parser.add_argument('--fatcat-elasticsearch-url',
+ default="http://localhost:9200",
+ help="connect to this host/port")
subparsers = parser.add_subparsers()
sub_elasticsearch_releases = subparsers.add_parser('elasticsearch-releases',
@@ -98,6 +119,9 @@ def main():
sub_elasticsearch_containers.add_argument('json_output',
help="where to send output",
default=sys.stdout, type=argparse.FileType('w'))
+ sub_elasticsearch_containers.add_argument('--query-stats',
+ action='store_true',
+ help="whether to query release search index for container stats")
sub_elasticsearch_files = subparsers.add_parser('elasticsearch-files',
help="convert fatcat file JSON schema to elasticsearch file schema")
diff --git a/python/fatcat_web/__init__.py b/python/fatcat_web/__init__.py
index 487de58a..07b4e083 100644
--- a/python/fatcat_web/__init__.py
+++ b/python/fatcat_web/__init__.py
@@ -1,4 +1,6 @@
+import sys
+
from flask import Flask
from flask.logging import create_logger
from flask_uuid import FlaskUUID
@@ -56,10 +58,10 @@ def auth_api(token):
return fatcat_openapi_client.DefaultApi(fatcat_openapi_client.ApiClient(conf))
if Config.FATCAT_API_AUTH_TOKEN:
- print("Found and using privileged token (eg, for account signup)")
+ print("Found and using privileged token (eg, for account signup)", file=sys.stderr)
priv_api = auth_api(Config.FATCAT_API_AUTH_TOKEN)
else:
- print("No privileged token found")
+ print("No privileged token found", file=sys.stderr)
priv_api = None
# TODO: refactor integration so this doesn't always need to be defined. If
diff --git a/python/fatcat_web/search.py b/python/fatcat_web/search.py
index 0cdb604a..2811b9a0 100644
--- a/python/fatcat_web/search.py
+++ b/python/fatcat_web/search.py
@@ -424,7 +424,7 @@ def get_elastic_search_coverage(query: ReleaseQuery) -> dict:
return stats
-def get_elastic_container_stats(ident, issnl=None):
+def get_elastic_container_stats(ident, issnl=None, es_client=None, es_index=None, merge_shadows=None):
"""
Returns dict:
ident
@@ -435,7 +435,14 @@ def get_elastic_container_stats(ident, issnl=None):
preserved
"""
- search = Search(using=app.es_client, index=app.config['ELASTICSEARCH_RELEASE_INDEX'])
+ if not es_client:
+ es_client = app.es_client
+ if not es_index:
+ es_index = app.config['ELASTICSEARCH_RELEASE_INDEX']
+ if merge_shadows is None:
+ merge_shadows = app.config['FATCAT_MERGE_SHADOW_PRESERVATION']
+
+ search = Search(using=es_client, index=es_index)
search = search.query(
'term',
container_id=ident,
@@ -479,7 +486,7 @@ def get_elastic_container_stats(ident, issnl=None):
for k in ('bright', 'dark', 'shadows_only', 'none'):
if not k in preservation_bucket:
preservation_bucket[k] = 0
- if app.config['FATCAT_MERGE_SHADOW_PRESERVATION']:
+ if merge_shadows:
preservation_bucket['none'] += preservation_bucket['shadows_only']
preservation_bucket['shadows_only'] = 0
release_type_bucket = agg_to_dict(resp.aggregations.release_type)
diff --git a/python/fatcat_worker.py b/python/fatcat_worker.py
index 5e3fb3e9..95f5024a 100755
--- a/python/fatcat_worker.py
+++ b/python/fatcat_worker.py
@@ -44,6 +44,8 @@ def run_elasticsearch_release(args):
def run_elasticsearch_container(args):
consume_topic = "fatcat-{}.container-updates".format(args.env)
worker = ElasticsearchContainerWorker(args.kafka_hosts, consume_topic,
+ query_stats=args.query_stats,
+ elasticsearch_release_index="fatcat_release",
elasticsearch_backend=args.elasticsearch_backend,
elasticsearch_index=args.elasticsearch_index)
worker.run()
@@ -99,6 +101,9 @@ def main():
sub_elasticsearch_container.add_argument('--elasticsearch-index',
help="elasticsearch index to push into",
default="fatcat_container")
+ sub_elasticsearch_container.add_argument('--query-stats',
+ action='store_true',
+ help="whether to query release search index for container stats")
sub_elasticsearch_changelog = subparsers.add_parser('elasticsearch-changelog',
help="consume changelog kafka feed, transform and push to search")
diff --git a/python/tests/files/container_jxqqgho7bncrvgfyfznramju3q.json b/python/tests/files/container_jxqqgho7bncrvgfyfznramju3q.json
new file mode 100644
index 00000000..bb4d46f9
--- /dev/null
+++ b/python/tests/files/container_jxqqgho7bncrvgfyfznramju3q.json
@@ -0,0 +1 @@
+{"extra":{"abbrev":"Annu. Rev. Pharmacol. Toxicol.","country":"us","ezb":{"color":"red","ezb_id":"2460"},"ia":{"sim":{"peer_reviewed":true,"pub_type":"Scholarly Journals","scholarly_peer_reviewed":true,"sim_pubid":"5091","year_spans":[[1961,2009]]}},"issne":"1545-4304","issnp":"0362-1642","kbart":{"hathitrust":{"year_spans":[[1976,1992]]},"portico":{"year_spans":[[1961,1999],[2001,2001],[2003,2003],[2005,2006],[2008,2010],[2012,2019]]},"scholarsportal":{"year_spans":[[1961,2003],[2005,2019]]}},"languages":["en"],"sherpa_romeo":{"color":"yellow"},"urls":["https://www.annualreviews.org/journal/pharmtox","https://www.annualreviews.org/loi/pharmtox","http://arjournals.annualreviews.org/loi/pharmtox"]},"ident":"jxqqgho7bncrvgfyfznramju3q","issnl":"0362-1642","name":"Annual Review of Pharmacology and Toxicology","publisher":"Annual Reviews","revision":"ff56081b-9130-47a6-9e14-9901c2808502","state":"active"}
diff --git a/python/tests/transform_elasticsearch.py b/python/tests/transform_elasticsearch.py
index 9cf77d4a..ba2b7ea2 100644
--- a/python/tests/transform_elasticsearch.py
+++ b/python/tests/transform_elasticsearch.py
@@ -147,11 +147,48 @@ def test_elasticsearch_release_from_json():
def test_elasticsearch_container_transform(journal_metadata_importer):
with open('tests/files/journal_metadata.sample.json', 'r') as f:
- raw = json.loads(f.readline())
- c = journal_metadata_importer.parse_record(raw)
- c.state = 'active'
- es = container_to_elasticsearch(c)
- assert es['publisher'] == c.publisher
+ raw1 = json.loads(f.readline())
+ raw2 = json.loads(f.readline())
+ c1 = journal_metadata_importer.parse_record(raw1)
+ c1.state = 'active'
+ c2 = journal_metadata_importer.parse_record(raw2)
+ c2.state = 'active'
+
+ c1.extra['publisher_type'] = "big5"
+ c1.extra['discipline'] = "history"
+ es = container_to_elasticsearch(c1)
+ assert es['publisher'] == c1.publisher
+ assert es['discipline'] == c1.extra['discipline']
+ assert es['publisher_type'] == c1.extra['publisher_type']
+ assert es['keepers'] == []
+
+ stats = {
+ "ident": "en4qj5ijrbf5djxx7p5zzpjyoq",
+ "in_kbart": 11136,
+ "in_web": 9501,
+ "is_preserved": 11136,
+ "issnl": "2050-084X",
+ "preservation": {
+ "bright": 9501,
+ "dark": 1635,
+ "none": 0,
+ "shadows_only": 0,
+ "total": 11136
+ },
+ "release_type": {
+ "_unknown": 9,
+ "article-journal": 11124,
+ "editorial": 2,
+ "letter": 1
+ },
+ "total": 11136
+ }
+ es = container_to_elasticsearch(c2, stats=stats)
+ assert es['name'] == c2.name
+ assert es['publisher'] == c2.publisher
+ assert es['keepers'] == list(c2.extra['kbart'].keys()) == ["portico"]
+ assert es['any_kbart'] == True
+
def test_elasticsearch_file_transform(matched_importer):
f = entity_from_json(open('./tests/files/file_bcah4zp5tvdhjl5bqci2c2lgfa.json', 'r').read(), FileEntity)
diff --git a/python/tests/web_search.py b/python/tests/web_search.py
index a7bf7ec7..8df01466 100644
--- a/python/tests/web_search.py
+++ b/python/tests/web_search.py
@@ -165,6 +165,16 @@ def test_container_stats(app, mocker):
]
rv = app.get('/container/issnl/1234-5678/stats.json')
assert rv.status_code == 200
+ stats = rv.json
+ assert isinstance(stats['total'], int)
+ assert isinstance(stats['release_type'], dict)
+ assert isinstance(stats['preservation']['total'], int)
+ assert isinstance(stats['preservation']['bright'], int)
+ assert isinstance(stats['preservation']['dark'], int)
+ assert isinstance(stats['preservation']['none'], int)
rv = app.get('/container/aaaaaaaaaaaaaeiraaaaaaaaam/stats.json')
assert rv.status_code == 200
+ stats = rv.json
+ assert isinstance(stats['total'], int)
+ assert stats['ident'] == "aaaaaaaaaaaaaeiraaaaaaaaam"