diff options
| -rw-r--r-- | fatcat_scholar/kafka.py | 2 | ||||
| -rw-r--r-- | fatcat_scholar/transform.py | 16 | ||||
| -rw-r--r-- | fatcat_scholar/web.py | 11 | ||||
| -rw-r--r-- | fatcat_scholar/work_pipeline.py | 8 | ||||
| -rw-r--r-- | fatcat_scholar/worker.py | 4 | ||||
| -rw-r--r-- | settings.toml | 2 | ||||
| -rw-r--r-- | tests/test_web.py | 6 | 
7 files changed, 42 insertions, 7 deletions
diff --git a/fatcat_scholar/kafka.py b/fatcat_scholar/kafka.py index 5faff9a..e71bc3d 100644 --- a/fatcat_scholar/kafka.py +++ b/fatcat_scholar/kafka.py @@ -60,7 +60,7 @@ class KafkaWorker(object):          self.counts: Counter = Counter()          self.kafka_brokers = kafka_brokers          self.batch_size = kwargs.get("batch_size", 1) -        self.batch_timeout_sec = kwargs.get("batch_timeout_sec", 30) +        self.batch_timeout_sec = kwargs.get("batch_timeout_sec", 60)          self.poll_interval_sec = kwargs.get("poll_interval_sec", 5.0)          self.consumer = self.create_kafka_consumer(              kafka_brokers, consume_topics, consumer_group diff --git a/fatcat_scholar/transform.py b/fatcat_scholar/transform.py index cbf0a89..bf24da9 100644 --- a/fatcat_scholar/transform.py +++ b/fatcat_scholar/transform.py @@ -13,15 +13,20 @@ from fatcat_scholar.schema import *  from fatcat_scholar.config import settings, GIT_REVISION  from fatcat_scholar.grobid2json import teixml2json +MAX_BODY_CHARS = 1024 * 1024 +  def es_fulltext_from_sim(sim: Dict[str, Any]) -> Optional[ScholarFulltext]:      if not sim["page_texts"]:          return None      first_page = sim["page_texts"][0]["page_num"]      issue_item = sim["issue_item"] +    body = "\n".join([p["raw_text"] for p in sim["page_texts"]]) +    if body and len(body) > MAX_BODY_CHARS: +        body = body[MAX_BODY_CHARS:]      return ScholarFulltext(          lang_code=None,  # TODO: pub/issue metadata? or langdetect? -        body="\n".join([p["raw_text"] for p in sim["page_texts"]]), +        body=body,          # acknowledgement=None,          # annex=None,          release_ident=sim.get("release_ident"), @@ -221,9 +226,12 @@ def es_fulltext_from_grobid(  ) -> Optional[ScholarFulltext]:      if not tei_dict.get("body"):          return None +    body = tei_dict.get("body") +    if body and len(body) > MAX_BODY_CHARS: +        body = body[MAX_BODY_CHARS:]      ret = ScholarFulltext(          lang_code=tei_dict.get("lang"), -        body=tei_dict.get("body"), +        body=body,          acknowledgement=tei_dict.get("acknowledgement"),          annex=tei_dict.get("annex"),      ) @@ -234,6 +242,8 @@ def es_fulltext_from_pdftotext(      raw_text: str, pdf_meta: Optional[dict], re: ReleaseEntity, fe: FileEntity  ) -> Optional[ScholarFulltext]: +    if raw_text and len(raw_text) > MAX_BODY_CHARS: +        raw_text = raw_text[MAX_BODY_CHARS:]      ret = ScholarFulltext(          lang_code=re.language, body=raw_text, acknowledgement=None, annex=None,      ) @@ -252,6 +262,8 @@ def es_fulltext_from_html(      body = tree.find(".//tei:body", ns)      if body:          raw_text = " ".join(body.itertext()) +        if raw_text and len(raw_text) > MAX_BODY_CHARS: +            raw_text = raw_text[MAX_BODY_CHARS:]      else:          return None diff --git a/fatcat_scholar/web.py b/fatcat_scholar/web.py index 124e269..dce732b 100644 --- a/fatcat_scholar/web.py +++ b/fatcat_scholar/web.py @@ -13,6 +13,7 @@ import babel.support  from fastapi import FastAPI, APIRouter, Request, Depends, Response, HTTPException  from fastapi.staticfiles import StaticFiles  from fastapi.responses import PlainTextResponse, JSONResponse, FileResponse +from fastapi.middleware.cors import CORSMiddleware  import sentry_sdk  from sentry_sdk.integrations.asgi import SentryAsgiMiddleware  from starlette_prometheus import metrics, PrometheusMiddleware @@ -97,6 +98,8 @@ class HitsModel(BaseModel):  @api.get("/search", operation_id="get_search", response_model=HitsModel)  async def search(query: FulltextQuery = Depends(FulltextQuery)) -> FulltextHits:      hits: Optional[FulltextHits] = None +    if query.q is None: +        raise HTTPException(status_code=400, detail="Expected a 'q' query parameter")      try:          hits = process_query(query)      except ValueError as e: @@ -320,6 +323,14 @@ async def http_exception_handler(request: Request, exc: StarletteHTTPException)  # configure middleware +app.add_middleware( +    CORSMiddleware, +    allow_origins=["*"], +    allow_credentials=False, +    allow_methods=["GET"], +    allow_headers=[],  # some defaults always enabled +) +  if settings.SENTRY_DSN:      logger.info("Sentry integration enabled")      sentry_sdk.init( diff --git a/fatcat_scholar/work_pipeline.py b/fatcat_scholar/work_pipeline.py index cb96274..10b701b 100644 --- a/fatcat_scholar/work_pipeline.py +++ b/fatcat_scholar/work_pipeline.py @@ -171,6 +171,10 @@ class WorkPipeline:              # print(raw_text)          except minio.error.NoSuchKey:              return None +        except urllib3.exceptions.MaxRetryError: +            # HACK: work around broken seaweedfs keys +            print(f"seaweedfs failure: sha1hex={fe.sha1}", file=sys.stderr) +            return None          return dict(              raw_text=raw_text, release_ident=release_ident, file_ident=fe.ident,          ) @@ -202,6 +206,10 @@ class WorkPipeline:              # print(grobid_xml)          except minio.error.NoSuchKey:              return None +        except urllib3.exceptions.MaxRetryError: +            # HACK: work around broken seaweedfs keys +            print(f"seaweedfs failure: sha1hex={sha1hex}", file=sys.stderr) +            return None          return dict(              html_meta=html_meta, diff --git a/fatcat_scholar/worker.py b/fatcat_scholar/worker.py index 854c1a2..823f1bd 100644 --- a/fatcat_scholar/worker.py +++ b/fatcat_scholar/worker.py @@ -153,7 +153,7 @@ class IndexDocsWorker(KafkaWorker):          if not bulk_actions:              return -        elasticsearch.helpers.bulk(self.es_client, bulk_actions, timeout="30s") +        elasticsearch.helpers.bulk(self.es_client, bulk_actions, timeout="50s")          self.counts["batches-indexed"] += 1 @@ -236,7 +236,7 @@ def main() -> None:          fdw.run()      elif args.worker == "index-docs-worker":          es_client = elasticsearch.Elasticsearch( -            settings.ELASTICSEARCH_WRITE_BASE, timeout=25.0 +            settings.ELASTICSEARCH_WRITE_BASE, timeout=50.0          )          idw = IndexDocsWorker(              kafka_brokers=settings.KAFKA_BROKERS, diff --git a/settings.toml b/settings.toml index 0f10e7f..aececc5 100644 --- a/settings.toml +++ b/settings.toml @@ -66,7 +66,7 @@ SCHOLAR_ENV = "prod"  ELASTICSEARCH_QUERY_BASE = "http://scholar-svc500.fatcat.wiki:9292"  ELASTICSEARCH_QUERY_FULLTEXT_INDEX = "scholar_fulltext"  ELASTICSEARCH_WRITE_BASE = "http://localhost:9200" -ELASTICSEARCH_WRITE_FULLTEXT_INDEX = "scholar_fulltext_v01" +ELASTICSEARCH_WRITE_FULLTEXT_INDEX = "scholar_fulltext_v01_20210128"  ELASTICSEARCH_PUBLIC_URL = "http://scholar-svc500.fatcat.wiki:9292"  KAFKA_BROKERS = ["wbgrp-svc263.us.archive.org"]  ENABLE_GOATCOUNTER = true diff --git a/tests/test_web.py b/tests/test_web.py index 810f8e3..df8b832 100644 --- a/tests/test_web.py +++ b/tests/test_web.py @@ -39,6 +39,10 @@ def test_basic_api(client: Any, mocker: Any) -> None:      assert resp.status_code == 200      assert resp.json() +    # request with no 'q' parameter is an error +    resp = client.get("/search", headers=headers) +    assert resp.status_code == 400 +      with open("tests/files/elastic_fulltext_search.json") as f:          elastic_resp = json.loads(f.read()) @@ -49,7 +53,7 @@ def test_basic_api(client: Any, mocker: Any) -> None:          (200, {}, json.dumps(elastic_resp)),      ] -    resp = client.get("/search", headers=headers) +    resp = client.get("/search?q=blood", headers=headers)      assert resp.status_code == 200      assert resp.json()  | 
