aboutsummaryrefslogtreecommitdiffstats
path: root/fatcat_scholar
diff options
context:
space:
mode:
Diffstat (limited to 'fatcat_scholar')
-rw-r--r--fatcat_scholar/kafka.py2
-rw-r--r--fatcat_scholar/transform.py16
-rw-r--r--fatcat_scholar/web.py11
-rw-r--r--fatcat_scholar/work_pipeline.py8
-rw-r--r--fatcat_scholar/worker.py4
5 files changed, 36 insertions, 5 deletions
diff --git a/fatcat_scholar/kafka.py b/fatcat_scholar/kafka.py
index 5faff9a..e71bc3d 100644
--- a/fatcat_scholar/kafka.py
+++ b/fatcat_scholar/kafka.py
@@ -60,7 +60,7 @@ class KafkaWorker(object):
self.counts: Counter = Counter()
self.kafka_brokers = kafka_brokers
self.batch_size = kwargs.get("batch_size", 1)
- self.batch_timeout_sec = kwargs.get("batch_timeout_sec", 30)
+ self.batch_timeout_sec = kwargs.get("batch_timeout_sec", 60)
self.poll_interval_sec = kwargs.get("poll_interval_sec", 5.0)
self.consumer = self.create_kafka_consumer(
kafka_brokers, consume_topics, consumer_group
diff --git a/fatcat_scholar/transform.py b/fatcat_scholar/transform.py
index cbf0a89..bf24da9 100644
--- a/fatcat_scholar/transform.py
+++ b/fatcat_scholar/transform.py
@@ -13,15 +13,20 @@ from fatcat_scholar.schema import *
from fatcat_scholar.config import settings, GIT_REVISION
from fatcat_scholar.grobid2json import teixml2json
+MAX_BODY_CHARS = 1024 * 1024
+
def es_fulltext_from_sim(sim: Dict[str, Any]) -> Optional[ScholarFulltext]:
if not sim["page_texts"]:
return None
first_page = sim["page_texts"][0]["page_num"]
issue_item = sim["issue_item"]
+ body = "\n".join([p["raw_text"] for p in sim["page_texts"]])
+ if body and len(body) > MAX_BODY_CHARS:
+ body = body[MAX_BODY_CHARS:]
return ScholarFulltext(
lang_code=None, # TODO: pub/issue metadata? or langdetect?
- body="\n".join([p["raw_text"] for p in sim["page_texts"]]),
+ body=body,
# acknowledgement=None,
# annex=None,
release_ident=sim.get("release_ident"),
@@ -221,9 +226,12 @@ def es_fulltext_from_grobid(
) -> Optional[ScholarFulltext]:
if not tei_dict.get("body"):
return None
+ body = tei_dict.get("body")
+ if body and len(body) > MAX_BODY_CHARS:
+ body = body[MAX_BODY_CHARS:]
ret = ScholarFulltext(
lang_code=tei_dict.get("lang"),
- body=tei_dict.get("body"),
+ body=body,
acknowledgement=tei_dict.get("acknowledgement"),
annex=tei_dict.get("annex"),
)
@@ -234,6 +242,8 @@ def es_fulltext_from_pdftotext(
raw_text: str, pdf_meta: Optional[dict], re: ReleaseEntity, fe: FileEntity
) -> Optional[ScholarFulltext]:
+ if raw_text and len(raw_text) > MAX_BODY_CHARS:
+ raw_text = raw_text[MAX_BODY_CHARS:]
ret = ScholarFulltext(
lang_code=re.language, body=raw_text, acknowledgement=None, annex=None,
)
@@ -252,6 +262,8 @@ def es_fulltext_from_html(
body = tree.find(".//tei:body", ns)
if body:
raw_text = " ".join(body.itertext())
+ if raw_text and len(raw_text) > MAX_BODY_CHARS:
+ raw_text = raw_text[MAX_BODY_CHARS:]
else:
return None
diff --git a/fatcat_scholar/web.py b/fatcat_scholar/web.py
index 124e269..dce732b 100644
--- a/fatcat_scholar/web.py
+++ b/fatcat_scholar/web.py
@@ -13,6 +13,7 @@ import babel.support
from fastapi import FastAPI, APIRouter, Request, Depends, Response, HTTPException
from fastapi.staticfiles import StaticFiles
from fastapi.responses import PlainTextResponse, JSONResponse, FileResponse
+from fastapi.middleware.cors import CORSMiddleware
import sentry_sdk
from sentry_sdk.integrations.asgi import SentryAsgiMiddleware
from starlette_prometheus import metrics, PrometheusMiddleware
@@ -97,6 +98,8 @@ class HitsModel(BaseModel):
@api.get("/search", operation_id="get_search", response_model=HitsModel)
async def search(query: FulltextQuery = Depends(FulltextQuery)) -> FulltextHits:
hits: Optional[FulltextHits] = None
+ if query.q is None:
+ raise HTTPException(status_code=400, detail="Expected a 'q' query parameter")
try:
hits = process_query(query)
except ValueError as e:
@@ -320,6 +323,14 @@ async def http_exception_handler(request: Request, exc: StarletteHTTPException)
# configure middleware
+app.add_middleware(
+ CORSMiddleware,
+ allow_origins=["*"],
+ allow_credentials=False,
+ allow_methods=["GET"],
+ allow_headers=[], # some defaults always enabled
+)
+
if settings.SENTRY_DSN:
logger.info("Sentry integration enabled")
sentry_sdk.init(
diff --git a/fatcat_scholar/work_pipeline.py b/fatcat_scholar/work_pipeline.py
index cb96274..10b701b 100644
--- a/fatcat_scholar/work_pipeline.py
+++ b/fatcat_scholar/work_pipeline.py
@@ -171,6 +171,10 @@ class WorkPipeline:
# print(raw_text)
except minio.error.NoSuchKey:
return None
+ except urllib3.exceptions.MaxRetryError:
+ # HACK: work around broken seaweedfs keys
+ print(f"seaweedfs failure: sha1hex={fe.sha1}", file=sys.stderr)
+ return None
return dict(
raw_text=raw_text, release_ident=release_ident, file_ident=fe.ident,
)
@@ -202,6 +206,10 @@ class WorkPipeline:
# print(grobid_xml)
except minio.error.NoSuchKey:
return None
+ except urllib3.exceptions.MaxRetryError:
+ # HACK: work around broken seaweedfs keys
+ print(f"seaweedfs failure: sha1hex={sha1hex}", file=sys.stderr)
+ return None
return dict(
html_meta=html_meta,
diff --git a/fatcat_scholar/worker.py b/fatcat_scholar/worker.py
index 854c1a2..823f1bd 100644
--- a/fatcat_scholar/worker.py
+++ b/fatcat_scholar/worker.py
@@ -153,7 +153,7 @@ class IndexDocsWorker(KafkaWorker):
if not bulk_actions:
return
- elasticsearch.helpers.bulk(self.es_client, bulk_actions, timeout="30s")
+ elasticsearch.helpers.bulk(self.es_client, bulk_actions, timeout="50s")
self.counts["batches-indexed"] += 1
@@ -236,7 +236,7 @@ def main() -> None:
fdw.run()
elif args.worker == "index-docs-worker":
es_client = elasticsearch.Elasticsearch(
- settings.ELASTICSEARCH_WRITE_BASE, timeout=25.0
+ settings.ELASTICSEARCH_WRITE_BASE, timeout=50.0
)
idw = IndexDocsWorker(
kafka_brokers=settings.KAFKA_BROKERS,