aboutsummaryrefslogtreecommitdiffstats
path: root/python/sandcrawler/persist.py
diff options
context:
space:
mode:
authorBryan Newbold <bnewbold@archive.org>2020-11-06 18:32:35 -0800
committerBryan Newbold <bnewbold@archive.org>2020-11-06 18:32:35 -0800
commit175019c96fced3e21d0f60ea1a4a37da6b8872ac (patch)
treef42fbbe9c8ac06ae9eb06373ab9eec96d2b3a177 /python/sandcrawler/persist.py
parentb0b66c20c6ffb9d8acc626068964d7dfd5d3bcdc (diff)
parent47ca1a273912c8836630b0930b71a4e66fd2c85b (diff)
downloadsandcrawler-175019c96fced3e21d0f60ea1a4a37da6b8872ac.tar.gz
sandcrawler-175019c96fced3e21d0f60ea1a4a37da6b8872ac.zip
Merge branch 'bnewbold-html-ingest'
Diffstat (limited to 'python/sandcrawler/persist.py')
-rw-r--r--python/sandcrawler/persist.py105
1 files changed, 98 insertions, 7 deletions
diff --git a/python/sandcrawler/persist.py b/python/sandcrawler/persist.py
index fbc5273..f13b1f3 100644
--- a/python/sandcrawler/persist.py
+++ b/python/sandcrawler/persist.py
@@ -20,7 +20,7 @@ grobid
"""
import os
-from typing import Optional
+from typing import Optional, AnyStr
import xml.etree.ElementTree
from sandcrawler.workers import SandcrawlerWorker
@@ -28,6 +28,7 @@ from sandcrawler.db import SandcrawlerPostgresClient
from sandcrawler.minio import SandcrawlerMinioClient
from sandcrawler.grobid import GrobidClient
from sandcrawler.pdfextract import PdfExtractResult
+from sandcrawler.html_ingest import HtmlMetaRow
class PersistCdxWorker(SandcrawlerWorker):
@@ -95,8 +96,7 @@ class PersistIngestFileResultWorker(SandcrawlerWorker):
if not k in raw:
self.counts['skip-request-fields'] += 1
return None
- if raw['ingest_type'] not in ('pdf', 'xml'):
- print(raw['ingest_type'])
+ if raw['ingest_type'] not in ('pdf', 'xml', 'html'):
self.counts['skip-ingest-type'] += 1
return None
request = {
@@ -121,7 +121,7 @@ class PersistIngestFileResultWorker(SandcrawlerWorker):
return request
- def file_result_to_row(self, raw):
+ def file_result_to_row(self, raw: dict) -> Optional[dict]:
"""
Converts ingest-result JSON schema (eg, from Kafka) to SQL ingest_file_result schema
@@ -137,7 +137,7 @@ class PersistIngestFileResultWorker(SandcrawlerWorker):
ingest_type = raw['request'].get('ingest_type')
if ingest_type == 'file':
ingest_type = 'pdf'
- if ingest_type not in ('pdf', 'xml'):
+ if ingest_type not in ('pdf', 'xml', 'html'):
self.counts['skip-ingest-type'] += 1
return None
if raw['status'] in ("existing", ):
@@ -159,6 +159,22 @@ class PersistIngestFileResultWorker(SandcrawlerWorker):
result['terminal_sha1hex'] = terminal.get('terminal_sha1hex')
return result
+ def result_to_html_meta(self, record: dict) -> Optional[HtmlMetaRow]:
+ html_body = record.get('html_body')
+ file_meta = record.get('file_meta')
+ if not (file_meta and html_body):
+ return None
+ return HtmlMetaRow(
+ sha1hex=file_meta["sha1hex"],
+ status=record.get('status'),
+ scope=record.get('scope'),
+ has_teixml=bool(html_body and html_body['status'] == 'success' and html_body.get('tei_xml')),
+ has_thumbnail=False, # TODO
+ word_count=(html_body and html_body.get('word_count')) or None,
+ biblio=record.get('html_biblio'),
+ resources=record.get('html_resources'),
+ )
+
def push_batch(self, batch):
self.counts['total'] += len(batch)
@@ -197,6 +213,12 @@ class PersistIngestFileResultWorker(SandcrawlerWorker):
self.counts['insert-file_meta'] += resp[0]
self.counts['update-file_meta'] += resp[1]
+ html_meta_batch = [self.result_to_html_meta(r) for r in batch if r.get('hit') and r.get('html_body')]
+ if html_meta_batch:
+ resp = self.db.insert_html_meta(self.cur, html_meta_batch, on_conflict="nothing")
+ self.counts['insert-html_meta'] += resp[0]
+ self.counts['update-html_meta'] += resp[1]
+
self.db.commit()
return []
@@ -452,9 +474,11 @@ class PersistPdfTextWorker(SandcrawlerWorker):
class PersistThumbnailWorker(SandcrawlerWorker):
"""
- Pushes text file to blob store (S3/seaweed/minio) and PDF metadata to SQL table.
+ Pushes text file to blob store (S3/seaweed/minio) and PDF metadata to SQL
+ table.
- This worker *must* be used with raw kakfa mode.
+ This worker *must* be used with raw kakfa mode; thumbnails are *not*
+ wrapped in JSON like most sandcrawler kafka messages.
"""
def __init__(self, **kwargs):
@@ -487,3 +511,70 @@ class PersistThumbnailWorker(SandcrawlerWorker):
)
self.counts['s3-put'] += 1
+
+class GenericPersistDocWorker(SandcrawlerWorker):
+ """
+ Pushes blobs from Kafka to S3.
+
+ Objects are assumed to be JSON-wrapped strings.
+ """
+
+ def __init__(self, **kwargs):
+ super().__init__()
+ self.s3 = SandcrawlerMinioClient(
+ host_url=kwargs.get('s3_url', 'localhost:9000'),
+ access_key=kwargs['s3_access_key'],
+ secret_key=kwargs['s3_secret_key'],
+ default_bucket=kwargs['s3_bucket'],
+ )
+ self.s3_extension = kwargs.get('s3_extension', ".unknown")
+ self.s3_folder = kwargs.get('s3_folder', "unknown")
+ self.doc_key = "unknown"
+
+ def process(self, record: dict, key: Optional[AnyStr] = None) -> None:
+
+ if record.get('status') != 'success' or not record.get(self.doc_key):
+ return
+
+ assert key is not None
+ if isinstance(key, bytes):
+ key_str = key.decode('utf-8')
+ elif isinstance(key, str):
+ key_str = key
+ assert len(key_str) == 40
+ if 'sha1hex' in record:
+ assert key_str == record['sha1hex']
+
+ resp = self.s3.put_blob(
+ folder=self.s3_folder,
+ blob=record[self.doc_key].encode('utf-8'),
+ sha1hex=key_str,
+ extension=self.s3_extension,
+ )
+ self.counts['s3-put'] += 1
+
+
+class PersistXmlDocWorker(GenericPersistDocWorker):
+ """
+ Pushes TEI-XML file to blob store (S3/seaweed/minio). Does not talk to
+ sandcrawler database (SQL).
+ """
+
+ def __init__(self, **kwargs):
+ super().__init__(**kwargs)
+ self.s3_extension = kwargs.get('s3_extension', ".jats.xml")
+ self.s3_folder = kwargs.get('s3_folder', "xml_doc")
+ self.doc_key = "jats_xml"
+
+
+class PersistHtmlTeiXmlWorker(GenericPersistDocWorker):
+ """
+ Pushes TEI-XML file to blob store (S3/seaweed/minio). Does not talk to
+ sandcrawler database (SQL).
+ """
+
+ def __init__(self, **kwargs):
+ super().__init__(**kwargs)
+ self.s3_extension = kwargs.get('s3_extension', ".tei.xml")
+ self.s3_folder = kwargs.get('s3_folder', "html_body")
+ self.doc_key = "tei_xml"