diff options
-rw-r--r-- | fatcat_scholar/kafka.py | 219 | ||||
-rw-r--r-- | fatcat_scholar/schema.py | 2 | ||||
-rw-r--r-- | fatcat_scholar/sim_pipeline.py | 55 | ||||
-rw-r--r-- | fatcat_scholar/transform.py | 7 | ||||
-rw-r--r-- | fatcat_scholar/worker.py | 238 | ||||
-rw-r--r-- | proposals/kafka_update_pipeline.md | 62 | ||||
-rw-r--r-- | settings.toml | 9 |
7 files changed, 567 insertions, 25 deletions
diff --git a/fatcat_scholar/kafka.py b/fatcat_scholar/kafka.py new file mode 100644 index 0000000..5faff9a --- /dev/null +++ b/fatcat_scholar/kafka.py @@ -0,0 +1,219 @@ +import sys +import json +import signal +from collections import Counter +from typing import List, Any + +from confluent_kafka import Consumer, Producer, KafkaException + + +class KafkaWorker(object): + """ + Base class for Scholar workers which consume from Kafka topics. + + Configuration (passed to __init__): + + kafka_brokers (List[str]): brokers to connect to + + consume_topics (List[str]): topics to consume from + + consumer_group (str): kafka consumer group + + batch_size (int): number of records to consume and process at a time + + batch_timeout_sec (int): max seconds for each batch to process. set to 0 to disable + + API: + + __init__() + + run() + starts consuming, calling process_batch() for each message batch + + process_batch(batch: List[dict]) -> None + implemented by sub-class + + process_msg(msg: dict) -> None + implemented by sub-class + + Example of producing (in a worker): + + producer = self.create_kafka_producer(...) + + producer.produce( + topic, + some_obj.json(exclude_none=True).encode('UTF-8'), + key=key, + on_delivery=self._fail_fast_produce) + + # check for errors etc + producer.poll(0) + """ + + def __init__( + self, + kafka_brokers: List[str], + consume_topics: List[str], + consumer_group: str, + **kwargs: Any, + ): + self.counts: Counter = Counter() + self.kafka_brokers = kafka_brokers + self.batch_size = kwargs.get("batch_size", 1) + self.batch_timeout_sec = kwargs.get("batch_timeout_sec", 30) + self.poll_interval_sec = kwargs.get("poll_interval_sec", 5.0) + self.consumer = self.create_kafka_consumer( + kafka_brokers, consume_topics, consumer_group + ) + + @staticmethod + def _fail_fast_produce(err: Any, msg: Any) -> None: + if err is not None: + print("Kafka producer delivery error: {}".format(err), file=sys.stderr) + raise KafkaException(err) + + @staticmethod + def _timeout_handler(signum: Any, frame: Any) -> None: + raise TimeoutError("timeout processing record") + + @staticmethod + def create_kafka_consumer( + kafka_brokers: List[str], consume_topics: List[str], consumer_group: str + ) -> Consumer: + """ + NOTE: it is important that consume_topics be str, *not* bytes + """ + + def _on_rebalance(consumer: Any, partitions: Any) -> None: + + for p in partitions: + if p.error: + raise KafkaException(p.error) + + print( + f"Kafka partitions rebalanced: {consumer} / {partitions}", + file=sys.stderr, + ) + + def _fail_fast_consume(err: Any, partitions: Any) -> None: + if err is not None: + print("Kafka consumer commit error: {}".format(err), file=sys.stderr) + raise KafkaException(err) + for p in partitions: + # check for partition-specific commit errors + if p.error: + print( + "Kafka consumer commit error: {}".format(p.error), + file=sys.stderr, + ) + raise KafkaException(p.error) + + config = { + "bootstrap.servers": ",".join(kafka_brokers), + "group.id": consumer_group, + "on_commit": _fail_fast_consume, + # messages don't have offset marked as stored until processed, + # but we do auto-commit stored offsets to broker + "enable.auto.offset.store": False, + "enable.auto.commit": True, + # user code timeout; if no poll after this long, assume user code + # hung and rebalance (default: 6min) + "max.poll.interval.ms": 360000, + "default.topic.config": {"auto.offset.reset": "latest",}, + } + + consumer = Consumer(config) + consumer.subscribe( + consume_topics, on_assign=_on_rebalance, on_revoke=_on_rebalance, + ) + print( + f"Consuming from kafka topics {consume_topics}, group {consumer_group}", + file=sys.stderr, + ) + return consumer + + @staticmethod + def create_kafka_producer(kafka_brokers: List[str]) -> Producer: + """ + This configuration is for large compressed messages. + """ + + config = { + "bootstrap.servers": ",".join(kafka_brokers), + "message.max.bytes": 30000000, # ~30 MBytes; broker is ~50 MBytes + "api.version.request": True, + "api.version.fallback.ms": 0, + "compression.codec": "gzip", + "retry.backoff.ms": 250, + "linger.ms": 1000, + "batch.num.messages": 50, + "delivery.report.only.error": True, + "default.topic.config": { + "message.timeout.ms": 30000, + "request.required.acks": -1, # all brokers must confirm + }, + } + return Producer(config) + + def run(self) -> Counter: + + if self.batch_timeout_sec: + signal.signal(signal.SIGALRM, self._timeout_handler) + + while True: + batch = self.consumer.consume( + num_messages=self.batch_size, timeout=self.poll_interval_sec, + ) + + print( + f"... got {len(batch)} kafka messages ({self.poll_interval_sec}sec poll interval). stats: {self.counts}", + file=sys.stderr, + ) + + if not batch: + continue + + # first check errors on entire batch... + for msg in batch: + if msg.error(): + raise KafkaException(msg.error()) + + # ... then process, with optional timeout + self.counts["total"] += len(batch) + records = [json.loads(msg.value().decode("utf-8")) for msg in batch] + + if self.batch_timeout_sec: + signal.alarm(int(self.batch_timeout_sec)) + try: + self.process_batch(records) + except TimeoutError as te: + raise te + finally: + signal.alarm(0) + else: + self.process_batch(records) + + self.counts["processed"] += len(batch) + + # ... then record progress + for msg in batch: + # will be auto-commited by librdkafka from this "stored" value + self.consumer.store_offsets(message=msg) + + # Note: never actually get here, but including as documentation on how to clean up + self.consumer.close() + return self.counts + + def process_batch(self, batch: List[dict]) -> None: + """ + Workers can override this method for batch processing. By default it + calls process_msg() for each message in the batch. + """ + for msg in batch: + self.process_msg(msg) + + def process_msg(self, msg: dict) -> None: + """ + Workers can override this method for individual record processing. + """ + raise NotImplementedError("implementation required") diff --git a/fatcat_scholar/schema.py b/fatcat_scholar/schema.py index db3b668..5637e0a 100644 --- a/fatcat_scholar/schema.py +++ b/fatcat_scholar/schema.py @@ -34,11 +34,13 @@ class IntermediateBundle(BaseModel): pdftotext_fulltext: Optional[Dict[str, Any]] pdf_meta: Optional[Dict[str, Any]] sim_fulltext: Optional[Dict[str, Any]] + fetched: Optional[datetime.datetime] class Config: arbitrary_types_allowed = True json_encoders = { ReleaseEntity: lambda re: entity_to_dict(re), + datetime.datetime: lambda dt: dt.isoformat(), } diff --git a/fatcat_scholar/sim_pipeline.py b/fatcat_scholar/sim_pipeline.py index 95e5cad..47a3e22 100644 --- a/fatcat_scholar/sim_pipeline.py +++ b/fatcat_scholar/sim_pipeline.py @@ -46,7 +46,7 @@ class SimPipeline: self.issue_db: IssueDB = issue_db self.ia_client = internetarchive.get_session() - def fetch_sim_issue(self, issue_db_row: Any) -> Optional[Any]: + def fetch_sim_issue(self, issue_item: str, pub_collection: str) -> Optional[Any]: """ issue_item pages: str @@ -59,8 +59,8 @@ class SimPipeline: issue_item_metadata """ # fetch full metadata from API - issue_meta = self.ia_client.get_metadata(issue_db_row["issue_item"]) - pub_meta = self.ia_client.get_metadata(issue_db_row["pub_collection"]) + issue_meta = self.ia_client.get_metadata(issue_item) + pub_meta = self.ia_client.get_metadata(pub_collection) leaf_index = dict() leaf_list = [] @@ -79,8 +79,8 @@ class SimPipeline: return None page_texts: List[Dict[str, Any]] = [] - issue_item = self.ia_client.get_item(issue_db_row["issue_item"]) - issue_item_djvu = issue_item.get_file(issue_db_row["issue_item"] + "_djvu.xml") + issue_item_obj = self.ia_client.get_item(issue_item) + issue_item_djvu = issue_item_obj.get_file(issue_item + "_djvu.xml") # override 'close()' method so we can still read out contents djvu_bytes = io.BytesIO() @@ -102,7 +102,7 @@ class SimPipeline: ) return dict( - issue_item=issue_db_row["issue_item"], + issue_item=issue_item, pages=None, page_texts=page_texts, release_ident=None, @@ -110,6 +110,27 @@ class SimPipeline: issue_item_metadata=truncate_issue_meta(issue_meta), ) + def full_issue_to_pages(self, full_issue: dict) -> List[IntermediateBundle]: + pages = [] + for leaf in full_issue["page_texts"]: + bundle = IntermediateBundle( + doc_type=DocType.sim_page, + releases=[], + biblio_release_ident=None, + grobid_fulltext=None, + pdftotext_fulltext=None, + sim_fulltext=dict( + issue_item=full_issue["issue_item"], + pages=str(leaf["page_num"]), + page_texts=[leaf], + release_ident=None, + pub_item_metadata=full_issue["pub_item_metadata"], + issue_item_metadata=full_issue["issue_item_metadata"], + ), + ) + pages.append(bundle) + return pages + def run_issue_db(self, limit: int = None) -> None: count = 0 self.issue_db.db.row_factory = sqlite3.Row @@ -124,7 +145,9 @@ class SimPipeline: ): continue try: - full_issue = self.fetch_sim_issue(row) + full_issue = self.fetch_sim_issue( + row["issue_item"], row["pub_collection"] + ) except requests.exceptions.ConnectionError as e: print(str(e), file=sys.stderr) continue @@ -133,22 +156,8 @@ class SimPipeline: continue if not full_issue: continue - for leaf in full_issue["page_texts"]: - bundle = IntermediateBundle( - doc_type=DocType.sim_page, - releases=[], - biblio_release_ident=None, - grobid_fulltext=None, - pdftotext_fulltext=None, - sim_fulltext=dict( - issue_item=full_issue["issue_item"], - pages=str(leaf["page_num"]), - page_texts=[leaf], - release_ident=None, - pub_item_metadata=full_issue["pub_item_metadata"], - issue_item_metadata=full_issue["issue_item_metadata"], - ), - ) + pages = self.full_issue_to_pages(full_issue) + for bundle in pages: print(bundle.json(exclude_none=True, sort_keys=True)) count += 1 if limit is not None and count >= limit: diff --git a/fatcat_scholar/transform.py b/fatcat_scholar/transform.py index c08be7b..590d44a 100644 --- a/fatcat_scholar/transform.py +++ b/fatcat_scholar/transform.py @@ -50,9 +50,12 @@ SIM_RELEASE_TYPE_MAP = { } SIM_LANG_MAP = { "English": "en", - "Spanish": "es", "German": "de", - # TODO: + "Italian": "it", + "French": "fr", + "Afrikaans": "af", + "Spanish": "es", + # TODO: more } SIM_COUNTRY_MAP = { "United States": "us", diff --git a/fatcat_scholar/worker.py b/fatcat_scholar/worker.py new file mode 100644 index 0000000..af84dd1 --- /dev/null +++ b/fatcat_scholar/worker.py @@ -0,0 +1,238 @@ +import os +import sys +import json +import argparse +import datetime +from typing import List, Any + +import requests +import elasticsearch +import fatcat_openapi_client +from fatcat_openapi_client import ReleaseEntity + +from fatcat_scholar.config import settings +from fatcat_scholar.issue_db import IssueDB +from fatcat_scholar.sandcrawler import ( + SandcrawlerPostgrestClient, + SandcrawlerMinioClient, +) +from fatcat_scholar.schema import ( + DocType, + IntermediateBundle, +) +from fatcat_scholar.transform import transform_heavy +from fatcat_scholar.api_entities import entity_from_json +from fatcat_scholar.work_pipeline import WorkPipeline +from fatcat_scholar.sim_pipeline import SimPipeline +from fatcat_scholar.kafka import KafkaWorker + + +class FetchDocsWorker(KafkaWorker): + def __init__( + self, + work_pipeline: WorkPipeline, + sim_pipeline: SimPipeline, + produce_docs_topic: str, + **kwargs: Any, + ): + super().__init__(**kwargs) + self.sim_pipeline = sim_pipeline + self.work_pipeline = work_pipeline + api_conf = fatcat_openapi_client.Configuration() + api_conf.host = kwargs.get("fatcat_api_host", "https://api.fatcat.wiki/v0") + self.fatcat_api = fatcat_openapi_client.DefaultApi( + fatcat_openapi_client.ApiClient(api_conf) + ) + self.producer = self.create_kafka_producer(self.kafka_brokers) + self.produce_topic = produce_docs_topic + print(f"Will produce bundles to: {self.produce_topic}", file=sys.stderr) + + def process_msg(self, msg: dict) -> None: + key = msg["key"] + if key.startswith("work_") and msg.get("work_ident"): + stubs = self.fatcat_api.get_work_releases( + ident=msg["work_ident"], hide="abstracts,references", + ) + full_releases = [] + for r in stubs: + full = self.fatcat_api.get_release( + r.ident, + expand="container,files,filesets,webcaptures", + hide="references", + ) + full_releases.append(full) + if not full_releases: + return + bundle = self.work_pipeline.process_release_list(full_releases) + bundle.fetched = datetime.datetime.utcnow() + self.producer.produce( + self.produce_topic, + bundle.json(exclude_none=True).encode("UTF-8"), + key=key, + on_delivery=self._fail_fast_produce, + ) + self.counts["works-produced"] += 1 + + # check for errors etc + self.producer.poll(0) + elif key.startswith("sim_"): + # filter out "contents" and "index" items (again) + if msg["issue_item"].endswith("_contents") or msg["issue_item"].endswith( + "_index" + ): + return + try: + full_issue = self.sim_pipeline.fetch_sim_issue( + msg["issue_item"], msg["pub_collection"] + ) + except requests.exceptions.ConnectionError as e: + print(str(e), file=sys.stderr) + raise e + except requests.exceptions.ReadTimeout as e: + print(str(e), file=sys.stderr) + raise e + if not full_issue: + return + pages = self.sim_pipeline.full_issue_to_pages(full_issue) + for bundle in pages: + bundle.fetched = datetime.datetime.utcnow() + self.producer.produce( + self.produce_topic, + bundle.json(exclude_none=True).encode("UTF-8"), + # NOTE: this isn't going to be the document key, but it will sort by issue + key=key, + on_delivery=self._fail_fast_produce, + ) + self.counts["pages-produced"] += 1 + + # check for errors etc + self.producer.poll(0) + else: + raise NotImplementedError(f"can't process updated for key={key}") + + +class IndexDocsWorker(KafkaWorker): + def __init__(self, es_client: Any, es_index: str, **kwargs: Any): + super().__init__(**kwargs) + self.es_client = es_client + self.es_index = es_index + + def process_batch(self, batch: List[dict]) -> None: + + bulk_actions = [] + for obj in batch: + bundle = IntermediateBundle( + doc_type=DocType(obj["doc_type"]), + releases=[ + entity_from_json(json.dumps(re), ReleaseEntity) + for re in obj["releases"] + ], + biblio_release_ident=obj.get("biblio_release_ident"), + grobid_fulltext=obj.get("grobid_fulltext"), + pdftotext_fulltext=obj.get("pdftotext_fulltext"), + pdf_meta=obj.get("pdf_meta"), + sim_fulltext=obj.get("sim_fulltext"), + ) + es_doc = transform_heavy(bundle) + if not es_doc: + continue + else: + bulk_actions.append(json.dumps({"index": {"_id": es_doc.key,},})) + bulk_actions.append(es_doc.json(exclude_none=True, sort_keys=True)) + self.counts["docs-indexed"] += 1 + + if not bulk_actions: + return + + self.es_client.bulk( + "\n".join(bulk_actions), self.es_index, timeout="30s", + ) + self.counts["batches-indexed"] += 1 + + +def main() -> None: + """ + Run this command like: + + python -m fatcat_scholar.work_pipeline + """ + + parser = argparse.ArgumentParser( + formatter_class=argparse.ArgumentDefaultsHelpFormatter + ) + subparsers = parser.add_subparsers() + + parser.add_argument( + "--issue-db-file", + help="sqlite3 database file to open", + default=settings.SCHOLAR_ISSUEDB_PATH, + type=str, + ) + parser.add_argument( + "--sandcrawler-db-api", + help="Sandcrawler Postgrest API endpoint", + default=settings.SANDCRAWLER_DB_API, + type=str, + ) + parser.add_argument( + "--sandcrawler-s3-api", + help="Sandcrawler S3 (minio/seaweedfs) API endpoint", + default=settings.SANDCRAWLER_S3_API, + type=str, + ) + + sub = subparsers.add_parser("fetch-docs-worker",) + sub.set_defaults(worker="fetch-docs-worker") + + sub = subparsers.add_parser("index-docs-worker",) + sub.set_defaults(worker="index-docs-worker") + + args = parser.parse_args() + if not args.__dict__.get("worker"): + parser.print_help(file=sys.stderr) + sys.exit(-1) + + if args.worker == "fetch-docs-worker": + issue_db = IssueDB(args.issue_db_file) + wp = WorkPipeline( + issue_db=issue_db, + sandcrawler_db_client=SandcrawlerPostgrestClient( + api_url=args.sandcrawler_db_api + ), + sandcrawler_s3_client=SandcrawlerMinioClient( + host_url=args.sandcrawler_s3_api, + access_key=os.environ.get("MINIO_ACCESS_KEY"), + secret_key=os.environ.get("MINIO_SECRET_KEY"), + ), + ) + sp = SimPipeline(issue_db=issue_db) + fdw = FetchDocsWorker( + kafka_brokers=settings.KAFKA_BROKERS, + consume_topics=[ + f"fatcat-{settings.SCHOLAR_ENV}.work-ident-updates", + # TODO: f"scholar-{settings.SCHOLAR_ENV}.sim-updates", + ], + consumer_group=f"scholar-{settings.SCHOLAR_ENV}-fetch-workers", + work_pipeline=wp, + sim_pipeline=sp, + produce_docs_topic=f"scholar-{settings.SCHOLAR_ENV}.update-docs", + fatcat_api_host=settings.FATCAT_API_HOST, + ) + fdw.run() + elif args.worker == "index-docs-worker": + es_client = elasticsearch.Elasticsearch( + settings.ELASTICSEARCH_BACKEND, timeout=25.0 + ) + idw = IndexDocsWorker( + kafka_brokers=settings.KAFKA_BROKERS, + batch_size=settings.INDEX_WORKER_BATCH_SIZE, + consume_topics=[f"scholar-{settings.SCHOLAR_ENV}.update-docs"], + consumer_group=f"scholar-{settings.SCHOLAR_ENV}-index-workers", + es_client=es_client, + es_index=settings.ELASTICSEARCH_FULLTEXT_INDEX, + ) + idw.run() + + +if __name__ == "__main__": + main() diff --git a/proposals/kafka_update_pipeline.md b/proposals/kafka_update_pipeline.md new file mode 100644 index 0000000..a953d9c --- /dev/null +++ b/proposals/kafka_update_pipeline.md @@ -0,0 +1,62 @@ + +Want to receive a continual stream of updates from both fatcat and SIM +scanning; index the updated content; and push into elasticsearch. + + +## Filtering and Affordances + +The `updated` and `fetched` timestamps are not immediately necessary or +implemented, but they can be used to filter updates. For example, after +re-loading from a build entity dump, could "roll back" update pipeline to only +fatcat (work) updates after the changelog index that the bulk dump is stamped +with. + +At least in theory, the `fetched` timestamp could be used to prevent re-updates +of existing documents in the ES index. + +The `doc_index_ts` timestamp in the ES index could be used in a future +fetch-and-reindex worker to select documents for re-indexing, or to delete +old/stale documents (eg, after SIM issue re-indexing if there were spurious +"page" type documents remaining). + +## Message Types + +Scholar Update Request JSON +- `key`: str - `type`: str + - `fatcat_work` + - `sim_issue` +- `updated`: datetime, UTC, of event resulting in this request +- `work_ident`: str (works) +- `fatcat_changelog`: int (works) +- `sim_item`: str (items) + +"Heavy Intermediate" JSON (existing schema) +- key +- `fetched`: Optional[datetime], UTC, when this doc was collected + +Scholar Fulltext ES JSON (existing schema) + + +## Kafka Topics + +fatcat-ENV.work-ident-updates + 6x, long retention, key compaction + key: doc ident +scholar-ENV.sim-updates + 6x, long retention, key compaction + key: doc ident +scholar-ENV.update-docs + 12x, short retention (2 months?) + key: doc ident + +## Workers + +scholar-fetch-docs-worker + consumes fatcat and/or sim update requests, individually + constructs heavy intermediate + publishes to update-docs topic + +scholar-index-docs-worker + consumes updated "heavy intermediate" documents, in batches + transforms to elasticsearch schema + updates elasticsearch diff --git a/settings.toml b/settings.toml index a1c39bc..093acc4 100644 --- a/settings.toml +++ b/settings.toml @@ -2,27 +2,36 @@ [default] SCHOLAR_ENV = "default" SCHOLAR_DOMAIN = "fatcat.wiki" +SCHOLAR_ISSUEDB_PATH = "data/issue_db.sqlite" I18N_LANG_DEFAULT = "en" ELASTICSEARCH_BACKEND = "https://search.fatcat.wiki" ELASTICSEARCH_FULLTEXT_INDEX = "scholar_fulltext" THUMBNAIL_URL_PREFIX = "https://blobs.fatcat.wiki/thumbnail/pdf/" SANDCRAWLER_DB_API = "http://aitio.us.archive.org:3030" SANDCRAWLER_S3_API = "wbgrp-svc169.us.archive.org:8333" +KAFKA_BROKERS = [] +FATCAT_API_HOST = "https://api.fatcat.wiki/v0" +INDEX_WORKER_BATCH_SIZE = 50 [test] SCHOLAR_ENV = "test" +FATCAT_API_HOST = "http://localhost:9411/v0" [dev] SCHOLAR_ENV = "dev" ELASTICSEARCH_FULLTEXT_INDEX = "dev_scholar_fulltext" ELASTICSEARCH_BACKEND = "http://localhost:9200" +KAFKA_BROKERS = ["localhost"] +FATCAT_API_HOST = "http://localhost:9411/v0" [qa] SCHOLAR_ENV = "qa" ELASTICSEARCH_FULLTEXT_INDEX = "qa_scholar_fulltext" ELASTICSEARCH_BACKEND = "https://search.fatcat.wiki" +KAFKA_BROKERS = ["wbgrp-svc263.us.archive.org"] [prod] SCHOLAR_ENV = "prod" ELASTICSEARCH_FULLTEXT_INDEX = "scholar_fulltext" ELASTICSEARCH_BACKEND = "https://search.fatcat.wiki" +KAFKA_BROKERS = ["wbgrp-svc263.us.archive.org"] |