aboutsummaryrefslogtreecommitdiffstats
path: root/python/fatcat_tools
diff options
context:
space:
mode:
Diffstat (limited to 'python/fatcat_tools')
-rw-r--r--python/fatcat_tools/transforms/elasticsearch.py26
-rw-r--r--python/fatcat_tools/workers/elasticsearch.py37
2 files changed, 56 insertions, 7 deletions
diff --git a/python/fatcat_tools/transforms/elasticsearch.py b/python/fatcat_tools/transforms/elasticsearch.py
index f37aadba..4bf9091a 100644
--- a/python/fatcat_tools/transforms/elasticsearch.py
+++ b/python/fatcat_tools/transforms/elasticsearch.py
@@ -46,6 +46,7 @@ def release_to_elasticsearch(entity: ReleaseEntity, force_bool: bool = True) ->
# First, the easy ones (direct copy)
release = entity
t = dict(
+ doc_index_ts=datetime.datetime.utcnow().isoformat()+"Z",
ident = release.ident,
state = release.state,
revision = release.revision,
@@ -295,6 +296,8 @@ def _rte_container_helper(container: ContainerEntity, release_year: Optional[int
t['country_code_upper'] = c_extra['country'].upper()
if c_extra.get('publisher_type'):
t['publisher_type'] = c_extra['publisher_type']
+ if c_extra.get('discipline'):
+ t['discipline'] = c_extra['discipline']
return t
def _rte_content_helper(release: ReleaseEntity) -> dict:
@@ -374,7 +377,7 @@ def _rte_url_helper(url_obj) -> dict:
return t
-def container_to_elasticsearch(entity, force_bool=True):
+def container_to_elasticsearch(entity, force_bool=True, stats=None):
"""
Converts from an entity model/schema to elasticsearch oriented schema.
@@ -392,6 +395,7 @@ def container_to_elasticsearch(entity, force_bool=True):
# First, the easy ones (direct copy)
t = dict(
+ doc_index_ts=datetime.datetime.utcnow().isoformat()+"Z",
ident = entity.ident,
state = entity.state,
revision = entity.revision,
@@ -407,10 +411,13 @@ def container_to_elasticsearch(entity, force_bool=True):
entity.extra = dict()
for key in ('country', 'languages', 'mimetypes', 'original_name',
'first_year', 'last_year', 'aliases', 'abbrev', 'region',
- 'discipline'):
+ 'discipline', 'publisher_type'):
if entity.extra.get(key):
t[key] = entity.extra[key]
+ if entity.extra.get('dblp') and entity.extra['dblp'].get('prefix'):
+ t['dblp_prefix'] = entity.extra['dblp']['prefix']
+
if 'country' in t:
t['country_code'] = t.pop('country')
@@ -428,6 +435,7 @@ def container_to_elasticsearch(entity, force_bool=True):
any_kbart = None
any_jstor = None
any_ia_sim = None
+ keepers = []
extra = entity.extra
if extra.get('doaj'):
@@ -451,6 +459,9 @@ def container_to_elasticsearch(entity, force_bool=True):
any_kbart = True
if extra['kbart'].get('jstor'):
any_jstor = True
+ for k, v in extra['kbart'].items():
+ if v and isinstance(v, dict):
+ keepers.append(k)
if extra.get('ia'):
if extra['ia'].get('sim'):
any_ia_sim = True
@@ -458,6 +469,7 @@ def container_to_elasticsearch(entity, force_bool=True):
is_longtail_oa = True
t['is_superceded'] = bool(extra.get('superceded'))
+ t['keepers'] = keepers
t['in_doaj'] = bool(in_doaj)
t['in_road'] = bool(in_road)
t['any_kbart'] = bool(any_kbart)
@@ -471,6 +483,14 @@ def container_to_elasticsearch(entity, force_bool=True):
t['is_longtail_oa'] = is_longtail_oa
t['any_jstor'] = any_jstor
t['any_ia_sim'] = any_ia_sim
+
+ # mix in stats, if provided
+ if stats:
+ t['releases_total'] = stats['total']
+ t['preservation_bright'] = stats['preservation']['bright']
+ t['preservation_dark'] = stats['preservation']['dark']
+ t['preservation_shadows_only'] = stats['preservation']['shadows_only']
+ t['preservation_none'] = stats['preservation']['none']
return t
@@ -495,6 +515,7 @@ def changelog_to_elasticsearch(entity):
editgroup = entity.editgroup
t = dict(
+ doc_index_ts=datetime.datetime.utcnow().isoformat()+"Z",
index=entity.index,
editgroup_id=entity.editgroup_id,
timestamp=entity.timestamp.isoformat(),
@@ -558,6 +579,7 @@ def file_to_elasticsearch(entity):
# First, the easy ones (direct copy)
t = dict(
+ doc_index_ts=datetime.datetime.utcnow().isoformat()+"Z",
ident = entity.ident,
state = entity.state,
revision = entity.revision,
diff --git a/python/fatcat_tools/workers/elasticsearch.py b/python/fatcat_tools/workers/elasticsearch.py
index 4850bb0a..13409eb5 100644
--- a/python/fatcat_tools/workers/elasticsearch.py
+++ b/python/fatcat_tools/workers/elasticsearch.py
@@ -1,10 +1,16 @@
import json
+
import requests
+import elasticsearch
from confluent_kafka import Consumer, KafkaException
from fatcat_openapi_client import ReleaseEntity, ContainerEntity, ApiClient, ChangelogEntry
-from fatcat_tools import *
+from fatcat_tools import (public_api, entity_from_json,
+ release_to_elasticsearch, container_to_elasticsearch,
+ changelog_to_elasticsearch,
+)
+from fatcat_web.search import get_elastic_container_stats
from .worker_common import FatcatWorker
@@ -18,7 +24,8 @@ class ElasticsearchReleaseWorker(FatcatWorker):
def __init__(self, kafka_hosts, consume_topic, poll_interval=10.0, offset=None,
elasticsearch_backend="http://localhost:9200", elasticsearch_index="fatcat",
- batch_size=200, api_host="https://api.fatcat.wiki/v0"):
+ elasticsearch_release_index="fatcat_releases",
+ batch_size=200, api_host="https://api.fatcat.wiki/v0", query_stats=False):
super().__init__(kafka_hosts=kafka_hosts,
consume_topic=consume_topic)
self.consumer_group = "elasticsearch-updates3"
@@ -26,14 +33,19 @@ class ElasticsearchReleaseWorker(FatcatWorker):
self.poll_interval = poll_interval
self.elasticsearch_backend = elasticsearch_backend
self.elasticsearch_index = elasticsearch_index
+ self.elasticsearch_release_index = elasticsearch_release_index
self.entity_type = ReleaseEntity
self.transform_func = release_to_elasticsearch
self.api_host = api_host
+ self.query_stats = query_stats
def run(self):
ac = ApiClient()
api = public_api(self.api_host)
+ # only used by container indexing query_stats code path
+ es_client = elasticsearch.Elasticsearch(self.elasticsearch_backend)
+
def fail_fast(err, partitions):
if err is not None:
print("Kafka consumer commit error: {}".format(err))
@@ -104,13 +116,25 @@ class ElasticsearchReleaseWorker(FatcatWorker):
entity = api.get_changelog_entry(entity.index)
else:
key = entity.ident
+
+ if self.entity_type == ContainerEntity and self.query_stats:
+ stats = get_elastic_container_stats(
+ entity.ident,
+ es_client=es_client,
+ es_index=self.elasticsearch_release_index,
+ merge_shadows=True,
+ )
+ doc_dict = container_to_elasticsearch(entity, stats=stats)
+ else:
+ doc_dict = self.transform_func(entity)
+
# TODO: handle deletions from index
bulk_actions.append(json.dumps({
"index": { "_id": key, },
}))
- bulk_actions.append(json.dumps(
- self.transform_func(entity)))
- print("Upserting, eg, {} (of {} {} in elasticsearch)".format(key, len(batch), self.entity_type))
+ bulk_actions.append(json.dumps(doc_dict))
+
+ print("Upserting, eg, {} (of {} {} in elasticsearch)".format(key, len(batch), self.entity_type.__name__))
elasticsearch_endpoint = "{}/{}/_bulk".format(
self.elasticsearch_backend,
self.elasticsearch_index)
@@ -132,6 +156,7 @@ class ElasticsearchReleaseWorker(FatcatWorker):
class ElasticsearchContainerWorker(ElasticsearchReleaseWorker):
def __init__(self, kafka_hosts, consume_topic, poll_interval=10.0, offset=None,
+ query_stats=False, elasticsearch_release_index="fatcat_release",
elasticsearch_backend="http://localhost:9200", elasticsearch_index="fatcat",
batch_size=200):
super().__init__(kafka_hosts=kafka_hosts,
@@ -140,6 +165,8 @@ class ElasticsearchContainerWorker(ElasticsearchReleaseWorker):
offset=offset,
elasticsearch_backend=elasticsearch_backend,
elasticsearch_index=elasticsearch_index,
+ elasticsearch_release_index=elasticsearch_release_index,
+ query_stats=query_stats,
batch_size=batch_size)
# previous group got corrupted (by pykafka library?)
self.consumer_group = "elasticsearch-updates3"