aboutsummaryrefslogtreecommitdiffstats
path: root/fatcat_scholar
diff options
context:
space:
mode:
Diffstat (limited to 'fatcat_scholar')
-rw-r--r--fatcat_scholar/kafka.py219
-rw-r--r--fatcat_scholar/schema.py2
-rw-r--r--fatcat_scholar/sim_pipeline.py55
-rw-r--r--fatcat_scholar/transform.py7
-rw-r--r--fatcat_scholar/worker.py238
5 files changed, 496 insertions, 25 deletions
diff --git a/fatcat_scholar/kafka.py b/fatcat_scholar/kafka.py
new file mode 100644
index 0000000..5faff9a
--- /dev/null
+++ b/fatcat_scholar/kafka.py
@@ -0,0 +1,219 @@
+import sys
+import json
+import signal
+from collections import Counter
+from typing import List, Any
+
+from confluent_kafka import Consumer, Producer, KafkaException
+
+
+class KafkaWorker(object):
+ """
+ Base class for Scholar workers which consume from Kafka topics.
+
+ Configuration (passed to __init__):
+
+ kafka_brokers (List[str]): brokers to connect to
+
+ consume_topics (List[str]): topics to consume from
+
+ consumer_group (str): kafka consumer group
+
+ batch_size (int): number of records to consume and process at a time
+
+ batch_timeout_sec (int): max seconds for each batch to process. set to 0 to disable
+
+ API:
+
+ __init__()
+
+ run()
+ starts consuming, calling process_batch() for each message batch
+
+ process_batch(batch: List[dict]) -> None
+ implemented by sub-class
+
+ process_msg(msg: dict) -> None
+ implemented by sub-class
+
+ Example of producing (in a worker):
+
+ producer = self.create_kafka_producer(...)
+
+ producer.produce(
+ topic,
+ some_obj.json(exclude_none=True).encode('UTF-8'),
+ key=key,
+ on_delivery=self._fail_fast_produce)
+
+ # check for errors etc
+ producer.poll(0)
+ """
+
+ def __init__(
+ self,
+ kafka_brokers: List[str],
+ consume_topics: List[str],
+ consumer_group: str,
+ **kwargs: Any,
+ ):
+ self.counts: Counter = Counter()
+ self.kafka_brokers = kafka_brokers
+ self.batch_size = kwargs.get("batch_size", 1)
+ self.batch_timeout_sec = kwargs.get("batch_timeout_sec", 30)
+ self.poll_interval_sec = kwargs.get("poll_interval_sec", 5.0)
+ self.consumer = self.create_kafka_consumer(
+ kafka_brokers, consume_topics, consumer_group
+ )
+
+ @staticmethod
+ def _fail_fast_produce(err: Any, msg: Any) -> None:
+ if err is not None:
+ print("Kafka producer delivery error: {}".format(err), file=sys.stderr)
+ raise KafkaException(err)
+
+ @staticmethod
+ def _timeout_handler(signum: Any, frame: Any) -> None:
+ raise TimeoutError("timeout processing record")
+
+ @staticmethod
+ def create_kafka_consumer(
+ kafka_brokers: List[str], consume_topics: List[str], consumer_group: str
+ ) -> Consumer:
+ """
+ NOTE: it is important that consume_topics be str, *not* bytes
+ """
+
+ def _on_rebalance(consumer: Any, partitions: Any) -> None:
+
+ for p in partitions:
+ if p.error:
+ raise KafkaException(p.error)
+
+ print(
+ f"Kafka partitions rebalanced: {consumer} / {partitions}",
+ file=sys.stderr,
+ )
+
+ def _fail_fast_consume(err: Any, partitions: Any) -> None:
+ if err is not None:
+ print("Kafka consumer commit error: {}".format(err), file=sys.stderr)
+ raise KafkaException(err)
+ for p in partitions:
+ # check for partition-specific commit errors
+ if p.error:
+ print(
+ "Kafka consumer commit error: {}".format(p.error),
+ file=sys.stderr,
+ )
+ raise KafkaException(p.error)
+
+ config = {
+ "bootstrap.servers": ",".join(kafka_brokers),
+ "group.id": consumer_group,
+ "on_commit": _fail_fast_consume,
+ # messages don't have offset marked as stored until processed,
+ # but we do auto-commit stored offsets to broker
+ "enable.auto.offset.store": False,
+ "enable.auto.commit": True,
+ # user code timeout; if no poll after this long, assume user code
+ # hung and rebalance (default: 6min)
+ "max.poll.interval.ms": 360000,
+ "default.topic.config": {"auto.offset.reset": "latest",},
+ }
+
+ consumer = Consumer(config)
+ consumer.subscribe(
+ consume_topics, on_assign=_on_rebalance, on_revoke=_on_rebalance,
+ )
+ print(
+ f"Consuming from kafka topics {consume_topics}, group {consumer_group}",
+ file=sys.stderr,
+ )
+ return consumer
+
+ @staticmethod
+ def create_kafka_producer(kafka_brokers: List[str]) -> Producer:
+ """
+ This configuration is for large compressed messages.
+ """
+
+ config = {
+ "bootstrap.servers": ",".join(kafka_brokers),
+ "message.max.bytes": 30000000, # ~30 MBytes; broker is ~50 MBytes
+ "api.version.request": True,
+ "api.version.fallback.ms": 0,
+ "compression.codec": "gzip",
+ "retry.backoff.ms": 250,
+ "linger.ms": 1000,
+ "batch.num.messages": 50,
+ "delivery.report.only.error": True,
+ "default.topic.config": {
+ "message.timeout.ms": 30000,
+ "request.required.acks": -1, # all brokers must confirm
+ },
+ }
+ return Producer(config)
+
+ def run(self) -> Counter:
+
+ if self.batch_timeout_sec:
+ signal.signal(signal.SIGALRM, self._timeout_handler)
+
+ while True:
+ batch = self.consumer.consume(
+ num_messages=self.batch_size, timeout=self.poll_interval_sec,
+ )
+
+ print(
+ f"... got {len(batch)} kafka messages ({self.poll_interval_sec}sec poll interval). stats: {self.counts}",
+ file=sys.stderr,
+ )
+
+ if not batch:
+ continue
+
+ # first check errors on entire batch...
+ for msg in batch:
+ if msg.error():
+ raise KafkaException(msg.error())
+
+ # ... then process, with optional timeout
+ self.counts["total"] += len(batch)
+ records = [json.loads(msg.value().decode("utf-8")) for msg in batch]
+
+ if self.batch_timeout_sec:
+ signal.alarm(int(self.batch_timeout_sec))
+ try:
+ self.process_batch(records)
+ except TimeoutError as te:
+ raise te
+ finally:
+ signal.alarm(0)
+ else:
+ self.process_batch(records)
+
+ self.counts["processed"] += len(batch)
+
+ # ... then record progress
+ for msg in batch:
+ # will be auto-commited by librdkafka from this "stored" value
+ self.consumer.store_offsets(message=msg)
+
+ # Note: never actually get here, but including as documentation on how to clean up
+ self.consumer.close()
+ return self.counts
+
+ def process_batch(self, batch: List[dict]) -> None:
+ """
+ Workers can override this method for batch processing. By default it
+ calls process_msg() for each message in the batch.
+ """
+ for msg in batch:
+ self.process_msg(msg)
+
+ def process_msg(self, msg: dict) -> None:
+ """
+ Workers can override this method for individual record processing.
+ """
+ raise NotImplementedError("implementation required")
diff --git a/fatcat_scholar/schema.py b/fatcat_scholar/schema.py
index db3b668..5637e0a 100644
--- a/fatcat_scholar/schema.py
+++ b/fatcat_scholar/schema.py
@@ -34,11 +34,13 @@ class IntermediateBundle(BaseModel):
pdftotext_fulltext: Optional[Dict[str, Any]]
pdf_meta: Optional[Dict[str, Any]]
sim_fulltext: Optional[Dict[str, Any]]
+ fetched: Optional[datetime.datetime]
class Config:
arbitrary_types_allowed = True
json_encoders = {
ReleaseEntity: lambda re: entity_to_dict(re),
+ datetime.datetime: lambda dt: dt.isoformat(),
}
diff --git a/fatcat_scholar/sim_pipeline.py b/fatcat_scholar/sim_pipeline.py
index 95e5cad..47a3e22 100644
--- a/fatcat_scholar/sim_pipeline.py
+++ b/fatcat_scholar/sim_pipeline.py
@@ -46,7 +46,7 @@ class SimPipeline:
self.issue_db: IssueDB = issue_db
self.ia_client = internetarchive.get_session()
- def fetch_sim_issue(self, issue_db_row: Any) -> Optional[Any]:
+ def fetch_sim_issue(self, issue_item: str, pub_collection: str) -> Optional[Any]:
"""
issue_item
pages: str
@@ -59,8 +59,8 @@ class SimPipeline:
issue_item_metadata
"""
# fetch full metadata from API
- issue_meta = self.ia_client.get_metadata(issue_db_row["issue_item"])
- pub_meta = self.ia_client.get_metadata(issue_db_row["pub_collection"])
+ issue_meta = self.ia_client.get_metadata(issue_item)
+ pub_meta = self.ia_client.get_metadata(pub_collection)
leaf_index = dict()
leaf_list = []
@@ -79,8 +79,8 @@ class SimPipeline:
return None
page_texts: List[Dict[str, Any]] = []
- issue_item = self.ia_client.get_item(issue_db_row["issue_item"])
- issue_item_djvu = issue_item.get_file(issue_db_row["issue_item"] + "_djvu.xml")
+ issue_item_obj = self.ia_client.get_item(issue_item)
+ issue_item_djvu = issue_item_obj.get_file(issue_item + "_djvu.xml")
# override 'close()' method so we can still read out contents
djvu_bytes = io.BytesIO()
@@ -102,7 +102,7 @@ class SimPipeline:
)
return dict(
- issue_item=issue_db_row["issue_item"],
+ issue_item=issue_item,
pages=None,
page_texts=page_texts,
release_ident=None,
@@ -110,6 +110,27 @@ class SimPipeline:
issue_item_metadata=truncate_issue_meta(issue_meta),
)
+ def full_issue_to_pages(self, full_issue: dict) -> List[IntermediateBundle]:
+ pages = []
+ for leaf in full_issue["page_texts"]:
+ bundle = IntermediateBundle(
+ doc_type=DocType.sim_page,
+ releases=[],
+ biblio_release_ident=None,
+ grobid_fulltext=None,
+ pdftotext_fulltext=None,
+ sim_fulltext=dict(
+ issue_item=full_issue["issue_item"],
+ pages=str(leaf["page_num"]),
+ page_texts=[leaf],
+ release_ident=None,
+ pub_item_metadata=full_issue["pub_item_metadata"],
+ issue_item_metadata=full_issue["issue_item_metadata"],
+ ),
+ )
+ pages.append(bundle)
+ return pages
+
def run_issue_db(self, limit: int = None) -> None:
count = 0
self.issue_db.db.row_factory = sqlite3.Row
@@ -124,7 +145,9 @@ class SimPipeline:
):
continue
try:
- full_issue = self.fetch_sim_issue(row)
+ full_issue = self.fetch_sim_issue(
+ row["issue_item"], row["pub_collection"]
+ )
except requests.exceptions.ConnectionError as e:
print(str(e), file=sys.stderr)
continue
@@ -133,22 +156,8 @@ class SimPipeline:
continue
if not full_issue:
continue
- for leaf in full_issue["page_texts"]:
- bundle = IntermediateBundle(
- doc_type=DocType.sim_page,
- releases=[],
- biblio_release_ident=None,
- grobid_fulltext=None,
- pdftotext_fulltext=None,
- sim_fulltext=dict(
- issue_item=full_issue["issue_item"],
- pages=str(leaf["page_num"]),
- page_texts=[leaf],
- release_ident=None,
- pub_item_metadata=full_issue["pub_item_metadata"],
- issue_item_metadata=full_issue["issue_item_metadata"],
- ),
- )
+ pages = self.full_issue_to_pages(full_issue)
+ for bundle in pages:
print(bundle.json(exclude_none=True, sort_keys=True))
count += 1
if limit is not None and count >= limit:
diff --git a/fatcat_scholar/transform.py b/fatcat_scholar/transform.py
index c08be7b..590d44a 100644
--- a/fatcat_scholar/transform.py
+++ b/fatcat_scholar/transform.py
@@ -50,9 +50,12 @@ SIM_RELEASE_TYPE_MAP = {
}
SIM_LANG_MAP = {
"English": "en",
- "Spanish": "es",
"German": "de",
- # TODO:
+ "Italian": "it",
+ "French": "fr",
+ "Afrikaans": "af",
+ "Spanish": "es",
+ # TODO: more
}
SIM_COUNTRY_MAP = {
"United States": "us",
diff --git a/fatcat_scholar/worker.py b/fatcat_scholar/worker.py
new file mode 100644
index 0000000..af84dd1
--- /dev/null
+++ b/fatcat_scholar/worker.py
@@ -0,0 +1,238 @@
+import os
+import sys
+import json
+import argparse
+import datetime
+from typing import List, Any
+
+import requests
+import elasticsearch
+import fatcat_openapi_client
+from fatcat_openapi_client import ReleaseEntity
+
+from fatcat_scholar.config import settings
+from fatcat_scholar.issue_db import IssueDB
+from fatcat_scholar.sandcrawler import (
+ SandcrawlerPostgrestClient,
+ SandcrawlerMinioClient,
+)
+from fatcat_scholar.schema import (
+ DocType,
+ IntermediateBundle,
+)
+from fatcat_scholar.transform import transform_heavy
+from fatcat_scholar.api_entities import entity_from_json
+from fatcat_scholar.work_pipeline import WorkPipeline
+from fatcat_scholar.sim_pipeline import SimPipeline
+from fatcat_scholar.kafka import KafkaWorker
+
+
+class FetchDocsWorker(KafkaWorker):
+ def __init__(
+ self,
+ work_pipeline: WorkPipeline,
+ sim_pipeline: SimPipeline,
+ produce_docs_topic: str,
+ **kwargs: Any,
+ ):
+ super().__init__(**kwargs)
+ self.sim_pipeline = sim_pipeline
+ self.work_pipeline = work_pipeline
+ api_conf = fatcat_openapi_client.Configuration()
+ api_conf.host = kwargs.get("fatcat_api_host", "https://api.fatcat.wiki/v0")
+ self.fatcat_api = fatcat_openapi_client.DefaultApi(
+ fatcat_openapi_client.ApiClient(api_conf)
+ )
+ self.producer = self.create_kafka_producer(self.kafka_brokers)
+ self.produce_topic = produce_docs_topic
+ print(f"Will produce bundles to: {self.produce_topic}", file=sys.stderr)
+
+ def process_msg(self, msg: dict) -> None:
+ key = msg["key"]
+ if key.startswith("work_") and msg.get("work_ident"):
+ stubs = self.fatcat_api.get_work_releases(
+ ident=msg["work_ident"], hide="abstracts,references",
+ )
+ full_releases = []
+ for r in stubs:
+ full = self.fatcat_api.get_release(
+ r.ident,
+ expand="container,files,filesets,webcaptures",
+ hide="references",
+ )
+ full_releases.append(full)
+ if not full_releases:
+ return
+ bundle = self.work_pipeline.process_release_list(full_releases)
+ bundle.fetched = datetime.datetime.utcnow()
+ self.producer.produce(
+ self.produce_topic,
+ bundle.json(exclude_none=True).encode("UTF-8"),
+ key=key,
+ on_delivery=self._fail_fast_produce,
+ )
+ self.counts["works-produced"] += 1
+
+ # check for errors etc
+ self.producer.poll(0)
+ elif key.startswith("sim_"):
+ # filter out "contents" and "index" items (again)
+ if msg["issue_item"].endswith("_contents") or msg["issue_item"].endswith(
+ "_index"
+ ):
+ return
+ try:
+ full_issue = self.sim_pipeline.fetch_sim_issue(
+ msg["issue_item"], msg["pub_collection"]
+ )
+ except requests.exceptions.ConnectionError as e:
+ print(str(e), file=sys.stderr)
+ raise e
+ except requests.exceptions.ReadTimeout as e:
+ print(str(e), file=sys.stderr)
+ raise e
+ if not full_issue:
+ return
+ pages = self.sim_pipeline.full_issue_to_pages(full_issue)
+ for bundle in pages:
+ bundle.fetched = datetime.datetime.utcnow()
+ self.producer.produce(
+ self.produce_topic,
+ bundle.json(exclude_none=True).encode("UTF-8"),
+ # NOTE: this isn't going to be the document key, but it will sort by issue
+ key=key,
+ on_delivery=self._fail_fast_produce,
+ )
+ self.counts["pages-produced"] += 1
+
+ # check for errors etc
+ self.producer.poll(0)
+ else:
+ raise NotImplementedError(f"can't process updated for key={key}")
+
+
+class IndexDocsWorker(KafkaWorker):
+ def __init__(self, es_client: Any, es_index: str, **kwargs: Any):
+ super().__init__(**kwargs)
+ self.es_client = es_client
+ self.es_index = es_index
+
+ def process_batch(self, batch: List[dict]) -> None:
+
+ bulk_actions = []
+ for obj in batch:
+ bundle = IntermediateBundle(
+ doc_type=DocType(obj["doc_type"]),
+ releases=[
+ entity_from_json(json.dumps(re), ReleaseEntity)
+ for re in obj["releases"]
+ ],
+ biblio_release_ident=obj.get("biblio_release_ident"),
+ grobid_fulltext=obj.get("grobid_fulltext"),
+ pdftotext_fulltext=obj.get("pdftotext_fulltext"),
+ pdf_meta=obj.get("pdf_meta"),
+ sim_fulltext=obj.get("sim_fulltext"),
+ )
+ es_doc = transform_heavy(bundle)
+ if not es_doc:
+ continue
+ else:
+ bulk_actions.append(json.dumps({"index": {"_id": es_doc.key,},}))
+ bulk_actions.append(es_doc.json(exclude_none=True, sort_keys=True))
+ self.counts["docs-indexed"] += 1
+
+ if not bulk_actions:
+ return
+
+ self.es_client.bulk(
+ "\n".join(bulk_actions), self.es_index, timeout="30s",
+ )
+ self.counts["batches-indexed"] += 1
+
+
+def main() -> None:
+ """
+ Run this command like:
+
+ python -m fatcat_scholar.work_pipeline
+ """
+
+ parser = argparse.ArgumentParser(
+ formatter_class=argparse.ArgumentDefaultsHelpFormatter
+ )
+ subparsers = parser.add_subparsers()
+
+ parser.add_argument(
+ "--issue-db-file",
+ help="sqlite3 database file to open",
+ default=settings.SCHOLAR_ISSUEDB_PATH,
+ type=str,
+ )
+ parser.add_argument(
+ "--sandcrawler-db-api",
+ help="Sandcrawler Postgrest API endpoint",
+ default=settings.SANDCRAWLER_DB_API,
+ type=str,
+ )
+ parser.add_argument(
+ "--sandcrawler-s3-api",
+ help="Sandcrawler S3 (minio/seaweedfs) API endpoint",
+ default=settings.SANDCRAWLER_S3_API,
+ type=str,
+ )
+
+ sub = subparsers.add_parser("fetch-docs-worker",)
+ sub.set_defaults(worker="fetch-docs-worker")
+
+ sub = subparsers.add_parser("index-docs-worker",)
+ sub.set_defaults(worker="index-docs-worker")
+
+ args = parser.parse_args()
+ if not args.__dict__.get("worker"):
+ parser.print_help(file=sys.stderr)
+ sys.exit(-1)
+
+ if args.worker == "fetch-docs-worker":
+ issue_db = IssueDB(args.issue_db_file)
+ wp = WorkPipeline(
+ issue_db=issue_db,
+ sandcrawler_db_client=SandcrawlerPostgrestClient(
+ api_url=args.sandcrawler_db_api
+ ),
+ sandcrawler_s3_client=SandcrawlerMinioClient(
+ host_url=args.sandcrawler_s3_api,
+ access_key=os.environ.get("MINIO_ACCESS_KEY"),
+ secret_key=os.environ.get("MINIO_SECRET_KEY"),
+ ),
+ )
+ sp = SimPipeline(issue_db=issue_db)
+ fdw = FetchDocsWorker(
+ kafka_brokers=settings.KAFKA_BROKERS,
+ consume_topics=[
+ f"fatcat-{settings.SCHOLAR_ENV}.work-ident-updates",
+ # TODO: f"scholar-{settings.SCHOLAR_ENV}.sim-updates",
+ ],
+ consumer_group=f"scholar-{settings.SCHOLAR_ENV}-fetch-workers",
+ work_pipeline=wp,
+ sim_pipeline=sp,
+ produce_docs_topic=f"scholar-{settings.SCHOLAR_ENV}.update-docs",
+ fatcat_api_host=settings.FATCAT_API_HOST,
+ )
+ fdw.run()
+ elif args.worker == "index-docs-worker":
+ es_client = elasticsearch.Elasticsearch(
+ settings.ELASTICSEARCH_BACKEND, timeout=25.0
+ )
+ idw = IndexDocsWorker(
+ kafka_brokers=settings.KAFKA_BROKERS,
+ batch_size=settings.INDEX_WORKER_BATCH_SIZE,
+ consume_topics=[f"scholar-{settings.SCHOLAR_ENV}.update-docs"],
+ consumer_group=f"scholar-{settings.SCHOLAR_ENV}-index-workers",
+ es_client=es_client,
+ es_index=settings.ELASTICSEARCH_FULLTEXT_INDEX,
+ )
+ idw.run()
+
+
+if __name__ == "__main__":
+ main()