diff options
Diffstat (limited to 'fatcat_scholar')
-rw-r--r-- | fatcat_scholar/kafka.py | 2 | ||||
-rw-r--r-- | fatcat_scholar/transform.py | 16 | ||||
-rw-r--r-- | fatcat_scholar/web.py | 11 | ||||
-rw-r--r-- | fatcat_scholar/work_pipeline.py | 8 | ||||
-rw-r--r-- | fatcat_scholar/worker.py | 4 |
5 files changed, 36 insertions, 5 deletions
diff --git a/fatcat_scholar/kafka.py b/fatcat_scholar/kafka.py index 5faff9a..e71bc3d 100644 --- a/fatcat_scholar/kafka.py +++ b/fatcat_scholar/kafka.py @@ -60,7 +60,7 @@ class KafkaWorker(object): self.counts: Counter = Counter() self.kafka_brokers = kafka_brokers self.batch_size = kwargs.get("batch_size", 1) - self.batch_timeout_sec = kwargs.get("batch_timeout_sec", 30) + self.batch_timeout_sec = kwargs.get("batch_timeout_sec", 60) self.poll_interval_sec = kwargs.get("poll_interval_sec", 5.0) self.consumer = self.create_kafka_consumer( kafka_brokers, consume_topics, consumer_group diff --git a/fatcat_scholar/transform.py b/fatcat_scholar/transform.py index cbf0a89..bf24da9 100644 --- a/fatcat_scholar/transform.py +++ b/fatcat_scholar/transform.py @@ -13,15 +13,20 @@ from fatcat_scholar.schema import * from fatcat_scholar.config import settings, GIT_REVISION from fatcat_scholar.grobid2json import teixml2json +MAX_BODY_CHARS = 1024 * 1024 + def es_fulltext_from_sim(sim: Dict[str, Any]) -> Optional[ScholarFulltext]: if not sim["page_texts"]: return None first_page = sim["page_texts"][0]["page_num"] issue_item = sim["issue_item"] + body = "\n".join([p["raw_text"] for p in sim["page_texts"]]) + if body and len(body) > MAX_BODY_CHARS: + body = body[MAX_BODY_CHARS:] return ScholarFulltext( lang_code=None, # TODO: pub/issue metadata? or langdetect? - body="\n".join([p["raw_text"] for p in sim["page_texts"]]), + body=body, # acknowledgement=None, # annex=None, release_ident=sim.get("release_ident"), @@ -221,9 +226,12 @@ def es_fulltext_from_grobid( ) -> Optional[ScholarFulltext]: if not tei_dict.get("body"): return None + body = tei_dict.get("body") + if body and len(body) > MAX_BODY_CHARS: + body = body[MAX_BODY_CHARS:] ret = ScholarFulltext( lang_code=tei_dict.get("lang"), - body=tei_dict.get("body"), + body=body, acknowledgement=tei_dict.get("acknowledgement"), annex=tei_dict.get("annex"), ) @@ -234,6 +242,8 @@ def es_fulltext_from_pdftotext( raw_text: str, pdf_meta: Optional[dict], re: ReleaseEntity, fe: FileEntity ) -> Optional[ScholarFulltext]: + if raw_text and len(raw_text) > MAX_BODY_CHARS: + raw_text = raw_text[MAX_BODY_CHARS:] ret = ScholarFulltext( lang_code=re.language, body=raw_text, acknowledgement=None, annex=None, ) @@ -252,6 +262,8 @@ def es_fulltext_from_html( body = tree.find(".//tei:body", ns) if body: raw_text = " ".join(body.itertext()) + if raw_text and len(raw_text) > MAX_BODY_CHARS: + raw_text = raw_text[MAX_BODY_CHARS:] else: return None diff --git a/fatcat_scholar/web.py b/fatcat_scholar/web.py index 124e269..dce732b 100644 --- a/fatcat_scholar/web.py +++ b/fatcat_scholar/web.py @@ -13,6 +13,7 @@ import babel.support from fastapi import FastAPI, APIRouter, Request, Depends, Response, HTTPException from fastapi.staticfiles import StaticFiles from fastapi.responses import PlainTextResponse, JSONResponse, FileResponse +from fastapi.middleware.cors import CORSMiddleware import sentry_sdk from sentry_sdk.integrations.asgi import SentryAsgiMiddleware from starlette_prometheus import metrics, PrometheusMiddleware @@ -97,6 +98,8 @@ class HitsModel(BaseModel): @api.get("/search", operation_id="get_search", response_model=HitsModel) async def search(query: FulltextQuery = Depends(FulltextQuery)) -> FulltextHits: hits: Optional[FulltextHits] = None + if query.q is None: + raise HTTPException(status_code=400, detail="Expected a 'q' query parameter") try: hits = process_query(query) except ValueError as e: @@ -320,6 +323,14 @@ async def http_exception_handler(request: Request, exc: StarletteHTTPException) # configure middleware +app.add_middleware( + CORSMiddleware, + allow_origins=["*"], + allow_credentials=False, + allow_methods=["GET"], + allow_headers=[], # some defaults always enabled +) + if settings.SENTRY_DSN: logger.info("Sentry integration enabled") sentry_sdk.init( diff --git a/fatcat_scholar/work_pipeline.py b/fatcat_scholar/work_pipeline.py index cb96274..10b701b 100644 --- a/fatcat_scholar/work_pipeline.py +++ b/fatcat_scholar/work_pipeline.py @@ -171,6 +171,10 @@ class WorkPipeline: # print(raw_text) except minio.error.NoSuchKey: return None + except urllib3.exceptions.MaxRetryError: + # HACK: work around broken seaweedfs keys + print(f"seaweedfs failure: sha1hex={fe.sha1}", file=sys.stderr) + return None return dict( raw_text=raw_text, release_ident=release_ident, file_ident=fe.ident, ) @@ -202,6 +206,10 @@ class WorkPipeline: # print(grobid_xml) except minio.error.NoSuchKey: return None + except urllib3.exceptions.MaxRetryError: + # HACK: work around broken seaweedfs keys + print(f"seaweedfs failure: sha1hex={sha1hex}", file=sys.stderr) + return None return dict( html_meta=html_meta, diff --git a/fatcat_scholar/worker.py b/fatcat_scholar/worker.py index 854c1a2..823f1bd 100644 --- a/fatcat_scholar/worker.py +++ b/fatcat_scholar/worker.py @@ -153,7 +153,7 @@ class IndexDocsWorker(KafkaWorker): if not bulk_actions: return - elasticsearch.helpers.bulk(self.es_client, bulk_actions, timeout="30s") + elasticsearch.helpers.bulk(self.es_client, bulk_actions, timeout="50s") self.counts["batches-indexed"] += 1 @@ -236,7 +236,7 @@ def main() -> None: fdw.run() elif args.worker == "index-docs-worker": es_client = elasticsearch.Elasticsearch( - settings.ELASTICSEARCH_WRITE_BASE, timeout=25.0 + settings.ELASTICSEARCH_WRITE_BASE, timeout=50.0 ) idw = IndexDocsWorker( kafka_brokers=settings.KAFKA_BROKERS, |