From 7dc382519187e32856f494af76248fa93aef7770 Mon Sep 17 00:00:00 2001 From: Bryan Newbold Date: Tue, 3 Nov 2020 19:28:21 -0800 Subject: persist: XML and HTML persist workers --- python/sandcrawler/persist.py | 77 +++++++++++++++++++++++++++++++++++++++++-- python/sandcrawler_worker.py | 47 ++++++++++++++++++++++++++ 2 files changed, 121 insertions(+), 3 deletions(-) (limited to 'python') diff --git a/python/sandcrawler/persist.py b/python/sandcrawler/persist.py index aa05195..81cf664 100644 --- a/python/sandcrawler/persist.py +++ b/python/sandcrawler/persist.py @@ -95,7 +95,7 @@ class PersistIngestFileResultWorker(SandcrawlerWorker): if not k in raw: self.counts['skip-request-fields'] += 1 return None - if raw['ingest_type'] not in ('pdf', 'xml'): + if raw['ingest_type'] not in ('pdf', 'xml', 'html'): print(raw['ingest_type']) self.counts['skip-ingest-type'] += 1 return None @@ -121,7 +121,7 @@ class PersistIngestFileResultWorker(SandcrawlerWorker): return request - def file_result_to_row(self, raw): + def file_result_to_row(self, raw: dict) -> Optional[dict]: """ Converts ingest-result JSON schema (eg, from Kafka) to SQL ingest_file_result schema @@ -137,7 +137,7 @@ class PersistIngestFileResultWorker(SandcrawlerWorker): ingest_type = raw['request'].get('ingest_type') if ingest_type == 'file': ingest_type = 'pdf' - if ingest_type not in ('pdf', 'xml'): + if ingest_type not in ('pdf', 'xml', 'html'): self.counts['skip-ingest-type'] += 1 return None if raw['status'] in ("existing", ): @@ -159,6 +159,9 @@ class PersistIngestFileResultWorker(SandcrawlerWorker): result['terminal_sha1hex'] = terminal.get('terminal_sha1hex') return result + def result_to_html_meta(self, record: dict) -> Optional[dict]: + raise NotImplementedError() + def push_batch(self, batch): self.counts['total'] += len(batch) @@ -197,6 +200,12 @@ class PersistIngestFileResultWorker(SandcrawlerWorker): self.counts['insert-file_meta'] += resp[0] self.counts['update-file_meta'] += resp[1] + html_meta_batch = [self.result_to_html_meta(r) for r in batch if r.get('hit') and r.get('html_meta')] + if html_meta_batch: + resp = self.db.insert_html_meta(self.cur, html_meta_batch, on_conflict="nothing") + self.counts['insert-html_meta'] += resp[0] + self.counts['update-html_meta'] += resp[1] + self.db.commit() return [] @@ -489,3 +498,65 @@ class PersistThumbnailWorker(SandcrawlerWorker): ) self.counts['s3-put'] += 1 + +class GenericPersistDocWorker(SandcrawlerWorker): + """ + Pushes blobs from Kafka to S3. + + Objects are assumed to be JSON-wrapped strings. + """ + + def __init__(self, **kwargs): + super().__init__() + self.s3 = SandcrawlerMinioClient( + host_url=kwargs.get('s3_url', 'localhost:9000'), + access_key=kwargs['s3_access_key'], + secret_key=kwargs['s3_secret_key'], + default_bucket=kwargs['s3_bucket'], + ) + self.s3_extension = kwargs.get('s3_extension', ".unknown") + self.s3_folder = kwargs.get('s3_folder', "unknown") + self.doc_key = "unknown" + + def process(self, record: dict, key: Optional[str] = None) -> None: + + if record.get('status') != 'success' or not record.get(self.doc_key): + return + + assert key is not None and len(key) == 40 and isinstance(key, str) + if 'sha1hex' in record: + assert key == record['sha1hex'] + + resp = self.s3.put_blob( + folder=self.s3_folder, + blob=record[self.doc_key].encode('utf-8'), + sha1hex=key, + extension=self.s3_extension, + ) + self.counts['s3-put'] += 1 + + +class PersistXmlDocWorker(GenericPersistDocWorker): + """ + Pushes TEI-XML file to blob store (S3/seaweed/minio). Does not talk to + sandcrawler database (SQL). + """ + + def __init__(self, **kwargs): + super().__init__(**kwargs) + self.s3_extension = kwargs.get('s3_extension', ".jats.xml") + self.s3_folder = kwargs.get('s3_folder', "xml_doc") + self.doc_key = "jats_xml" + + +class PersistHtmlTeiXmlWorker(GenericPersistDocWorker): + """ + Pushes TEI-XML file to blob store (S3/seaweed/minio). Does not talk to + sandcrawler database (SQL). + """ + + def __init__(self, **kwargs): + super().__init__(**kwargs) + self.s3_extension = kwargs.get('s3_extension', ".tei.xml") + self.s3_folder = kwargs.get('s3_folder', "html_body") + self.doc_key = "tei_xml" diff --git a/python/sandcrawler_worker.py b/python/sandcrawler_worker.py index b62fa80..24dbdd0 100755 --- a/python/sandcrawler_worker.py +++ b/python/sandcrawler_worker.py @@ -13,6 +13,7 @@ import datetime import raven from sandcrawler import * +from sandcrawler.persist import PersistXmlDocWorker, PersistHtmlTeiXmlWorker # Yep, a global. Gets DSN from `SENTRY_DSN` environment variable try: @@ -148,6 +149,44 @@ def run_persist_thumbnail(args): ) pusher.run() +def run_persist_xml_doc(args: argparse.Namespace) -> None: + consume_topic = f"sandcrawler-{args.env}.xml-doc" + worker = PersistXmlDocWorker( + s3_url=args.s3_url, + s3_bucket=args.s3_bucket, + s3_access_key=args.s3_access_key, + s3_secret_key=args.s3_secret_key, + ) + pusher = KafkaJsonPusher( + worker=worker, + kafka_hosts=args.kafka_hosts, + consume_topic=consume_topic, + group="persist-xml-doc", + push_batches=False, + raw_records=True, + batch_size=25, + ) + pusher.run() + +def run_persist_html_teixml(args: argparse.Namespace) -> None: + consume_topic = f"sandcrawler-{args.env}.html-teixml" + worker = PersistHtmlTeiXmlWorker( + s3_url=args.s3_url, + s3_bucket=args.s3_bucket, + s3_access_key=args.s3_access_key, + s3_secret_key=args.s3_secret_key, + ) + pusher = KafkaJsonPusher( + worker=worker, + kafka_hosts=args.kafka_hosts, + consume_topic=consume_topic, + group="persist-html-teixml", + push_batches=False, + raw_records=True, + batch_size=25, + ) + pusher.run() + def run_persist_pdftrio(args): consume_topic = "sandcrawler-{}.pdftrio-output".format(args.env) worker = PersistPdfTrioWorker( @@ -293,6 +332,14 @@ def main(): help="daemon that consumes thumbnail output from Kafka and pushes to S3 (seaweedfs) and postgres") sub_persist_thumbnail.set_defaults(func=run_persist_thumbnail) + sub_persist_xml_doc = subparsers.add_parser('persist-xml-doc', + help="daemon that consumes xml-doc output from Kafka and pushes to S3 (seaweedfs) bucket") + sub_persist_xml_doc.set_defaults(func=run_persist_xml_doc) + + sub_persist_html_teixml = subparsers.add_parser('persist-html-teixml', + help="daemon that consumes html-teixml output from Kafka and pushes to S3 (seaweedfs) bucket") + sub_persist_html_teixml.set_defaults(func=run_persist_html_teixml) + sub_persist_pdftrio = subparsers.add_parser('persist-pdftrio', help="daemon that consumes pdftrio output from Kafka and pushes to postgres") sub_persist_pdftrio.set_defaults(func=run_persist_pdftrio) -- cgit v1.2.3