diff options
author | Bryan Newbold <bnewbold@archive.org> | 2020-11-06 18:32:35 -0800 |
---|---|---|
committer | Bryan Newbold <bnewbold@archive.org> | 2020-11-06 18:32:35 -0800 |
commit | 175019c96fced3e21d0f60ea1a4a37da6b8872ac (patch) | |
tree | f42fbbe9c8ac06ae9eb06373ab9eec96d2b3a177 /python/sandcrawler/persist.py | |
parent | b0b66c20c6ffb9d8acc626068964d7dfd5d3bcdc (diff) | |
parent | 47ca1a273912c8836630b0930b71a4e66fd2c85b (diff) | |
download | sandcrawler-175019c96fced3e21d0f60ea1a4a37da6b8872ac.tar.gz sandcrawler-175019c96fced3e21d0f60ea1a4a37da6b8872ac.zip |
Merge branch 'bnewbold-html-ingest'
Diffstat (limited to 'python/sandcrawler/persist.py')
-rw-r--r-- | python/sandcrawler/persist.py | 105 |
1 files changed, 98 insertions, 7 deletions
diff --git a/python/sandcrawler/persist.py b/python/sandcrawler/persist.py index fbc5273..f13b1f3 100644 --- a/python/sandcrawler/persist.py +++ b/python/sandcrawler/persist.py @@ -20,7 +20,7 @@ grobid """ import os -from typing import Optional +from typing import Optional, AnyStr import xml.etree.ElementTree from sandcrawler.workers import SandcrawlerWorker @@ -28,6 +28,7 @@ from sandcrawler.db import SandcrawlerPostgresClient from sandcrawler.minio import SandcrawlerMinioClient from sandcrawler.grobid import GrobidClient from sandcrawler.pdfextract import PdfExtractResult +from sandcrawler.html_ingest import HtmlMetaRow class PersistCdxWorker(SandcrawlerWorker): @@ -95,8 +96,7 @@ class PersistIngestFileResultWorker(SandcrawlerWorker): if not k in raw: self.counts['skip-request-fields'] += 1 return None - if raw['ingest_type'] not in ('pdf', 'xml'): - print(raw['ingest_type']) + if raw['ingest_type'] not in ('pdf', 'xml', 'html'): self.counts['skip-ingest-type'] += 1 return None request = { @@ -121,7 +121,7 @@ class PersistIngestFileResultWorker(SandcrawlerWorker): return request - def file_result_to_row(self, raw): + def file_result_to_row(self, raw: dict) -> Optional[dict]: """ Converts ingest-result JSON schema (eg, from Kafka) to SQL ingest_file_result schema @@ -137,7 +137,7 @@ class PersistIngestFileResultWorker(SandcrawlerWorker): ingest_type = raw['request'].get('ingest_type') if ingest_type == 'file': ingest_type = 'pdf' - if ingest_type not in ('pdf', 'xml'): + if ingest_type not in ('pdf', 'xml', 'html'): self.counts['skip-ingest-type'] += 1 return None if raw['status'] in ("existing", ): @@ -159,6 +159,22 @@ class PersistIngestFileResultWorker(SandcrawlerWorker): result['terminal_sha1hex'] = terminal.get('terminal_sha1hex') return result + def result_to_html_meta(self, record: dict) -> Optional[HtmlMetaRow]: + html_body = record.get('html_body') + file_meta = record.get('file_meta') + if not (file_meta and html_body): + return None + return HtmlMetaRow( + sha1hex=file_meta["sha1hex"], + status=record.get('status'), + scope=record.get('scope'), + has_teixml=bool(html_body and html_body['status'] == 'success' and html_body.get('tei_xml')), + has_thumbnail=False, # TODO + word_count=(html_body and html_body.get('word_count')) or None, + biblio=record.get('html_biblio'), + resources=record.get('html_resources'), + ) + def push_batch(self, batch): self.counts['total'] += len(batch) @@ -197,6 +213,12 @@ class PersistIngestFileResultWorker(SandcrawlerWorker): self.counts['insert-file_meta'] += resp[0] self.counts['update-file_meta'] += resp[1] + html_meta_batch = [self.result_to_html_meta(r) for r in batch if r.get('hit') and r.get('html_body')] + if html_meta_batch: + resp = self.db.insert_html_meta(self.cur, html_meta_batch, on_conflict="nothing") + self.counts['insert-html_meta'] += resp[0] + self.counts['update-html_meta'] += resp[1] + self.db.commit() return [] @@ -452,9 +474,11 @@ class PersistPdfTextWorker(SandcrawlerWorker): class PersistThumbnailWorker(SandcrawlerWorker): """ - Pushes text file to blob store (S3/seaweed/minio) and PDF metadata to SQL table. + Pushes text file to blob store (S3/seaweed/minio) and PDF metadata to SQL + table. - This worker *must* be used with raw kakfa mode. + This worker *must* be used with raw kakfa mode; thumbnails are *not* + wrapped in JSON like most sandcrawler kafka messages. """ def __init__(self, **kwargs): @@ -487,3 +511,70 @@ class PersistThumbnailWorker(SandcrawlerWorker): ) self.counts['s3-put'] += 1 + +class GenericPersistDocWorker(SandcrawlerWorker): + """ + Pushes blobs from Kafka to S3. + + Objects are assumed to be JSON-wrapped strings. + """ + + def __init__(self, **kwargs): + super().__init__() + self.s3 = SandcrawlerMinioClient( + host_url=kwargs.get('s3_url', 'localhost:9000'), + access_key=kwargs['s3_access_key'], + secret_key=kwargs['s3_secret_key'], + default_bucket=kwargs['s3_bucket'], + ) + self.s3_extension = kwargs.get('s3_extension', ".unknown") + self.s3_folder = kwargs.get('s3_folder', "unknown") + self.doc_key = "unknown" + + def process(self, record: dict, key: Optional[AnyStr] = None) -> None: + + if record.get('status') != 'success' or not record.get(self.doc_key): + return + + assert key is not None + if isinstance(key, bytes): + key_str = key.decode('utf-8') + elif isinstance(key, str): + key_str = key + assert len(key_str) == 40 + if 'sha1hex' in record: + assert key_str == record['sha1hex'] + + resp = self.s3.put_blob( + folder=self.s3_folder, + blob=record[self.doc_key].encode('utf-8'), + sha1hex=key_str, + extension=self.s3_extension, + ) + self.counts['s3-put'] += 1 + + +class PersistXmlDocWorker(GenericPersistDocWorker): + """ + Pushes TEI-XML file to blob store (S3/seaweed/minio). Does not talk to + sandcrawler database (SQL). + """ + + def __init__(self, **kwargs): + super().__init__(**kwargs) + self.s3_extension = kwargs.get('s3_extension', ".jats.xml") + self.s3_folder = kwargs.get('s3_folder', "xml_doc") + self.doc_key = "jats_xml" + + +class PersistHtmlTeiXmlWorker(GenericPersistDocWorker): + """ + Pushes TEI-XML file to blob store (S3/seaweed/minio). Does not talk to + sandcrawler database (SQL). + """ + + def __init__(self, **kwargs): + super().__init__(**kwargs) + self.s3_extension = kwargs.get('s3_extension', ".tei.xml") + self.s3_folder = kwargs.get('s3_folder', "html_body") + self.doc_key = "tei_xml" |