aboutsummaryrefslogtreecommitdiffstats
path: root/python/sandcrawler/persist.py
diff options
context:
space:
mode:
authorBryan Newbold <bnewbold@archive.org>2020-11-03 19:28:21 -0800
committerBryan Newbold <bnewbold@archive.org>2020-11-03 19:28:21 -0800
commit7dc382519187e32856f494af76248fa93aef7770 (patch)
tree3cff23b1ebf689239993731b7d0546f94bac8414 /python/sandcrawler/persist.py
parent2885b34ab3e4c862f9e895a237108d42793efb1d (diff)
downloadsandcrawler-7dc382519187e32856f494af76248fa93aef7770.tar.gz
sandcrawler-7dc382519187e32856f494af76248fa93aef7770.zip
persist: XML and HTML persist workers
Diffstat (limited to 'python/sandcrawler/persist.py')
-rw-r--r--python/sandcrawler/persist.py77
1 files changed, 74 insertions, 3 deletions
diff --git a/python/sandcrawler/persist.py b/python/sandcrawler/persist.py
index aa05195..81cf664 100644
--- a/python/sandcrawler/persist.py
+++ b/python/sandcrawler/persist.py
@@ -95,7 +95,7 @@ class PersistIngestFileResultWorker(SandcrawlerWorker):
if not k in raw:
self.counts['skip-request-fields'] += 1
return None
- if raw['ingest_type'] not in ('pdf', 'xml'):
+ if raw['ingest_type'] not in ('pdf', 'xml', 'html'):
print(raw['ingest_type'])
self.counts['skip-ingest-type'] += 1
return None
@@ -121,7 +121,7 @@ class PersistIngestFileResultWorker(SandcrawlerWorker):
return request
- def file_result_to_row(self, raw):
+ def file_result_to_row(self, raw: dict) -> Optional[dict]:
"""
Converts ingest-result JSON schema (eg, from Kafka) to SQL ingest_file_result schema
@@ -137,7 +137,7 @@ class PersistIngestFileResultWorker(SandcrawlerWorker):
ingest_type = raw['request'].get('ingest_type')
if ingest_type == 'file':
ingest_type = 'pdf'
- if ingest_type not in ('pdf', 'xml'):
+ if ingest_type not in ('pdf', 'xml', 'html'):
self.counts['skip-ingest-type'] += 1
return None
if raw['status'] in ("existing", ):
@@ -159,6 +159,9 @@ class PersistIngestFileResultWorker(SandcrawlerWorker):
result['terminal_sha1hex'] = terminal.get('terminal_sha1hex')
return result
+ def result_to_html_meta(self, record: dict) -> Optional[dict]:
+ raise NotImplementedError()
+
def push_batch(self, batch):
self.counts['total'] += len(batch)
@@ -197,6 +200,12 @@ class PersistIngestFileResultWorker(SandcrawlerWorker):
self.counts['insert-file_meta'] += resp[0]
self.counts['update-file_meta'] += resp[1]
+ html_meta_batch = [self.result_to_html_meta(r) for r in batch if r.get('hit') and r.get('html_meta')]
+ if html_meta_batch:
+ resp = self.db.insert_html_meta(self.cur, html_meta_batch, on_conflict="nothing")
+ self.counts['insert-html_meta'] += resp[0]
+ self.counts['update-html_meta'] += resp[1]
+
self.db.commit()
return []
@@ -489,3 +498,65 @@ class PersistThumbnailWorker(SandcrawlerWorker):
)
self.counts['s3-put'] += 1
+
+class GenericPersistDocWorker(SandcrawlerWorker):
+ """
+ Pushes blobs from Kafka to S3.
+
+ Objects are assumed to be JSON-wrapped strings.
+ """
+
+ def __init__(self, **kwargs):
+ super().__init__()
+ self.s3 = SandcrawlerMinioClient(
+ host_url=kwargs.get('s3_url', 'localhost:9000'),
+ access_key=kwargs['s3_access_key'],
+ secret_key=kwargs['s3_secret_key'],
+ default_bucket=kwargs['s3_bucket'],
+ )
+ self.s3_extension = kwargs.get('s3_extension', ".unknown")
+ self.s3_folder = kwargs.get('s3_folder', "unknown")
+ self.doc_key = "unknown"
+
+ def process(self, record: dict, key: Optional[str] = None) -> None:
+
+ if record.get('status') != 'success' or not record.get(self.doc_key):
+ return
+
+ assert key is not None and len(key) == 40 and isinstance(key, str)
+ if 'sha1hex' in record:
+ assert key == record['sha1hex']
+
+ resp = self.s3.put_blob(
+ folder=self.s3_folder,
+ blob=record[self.doc_key].encode('utf-8'),
+ sha1hex=key,
+ extension=self.s3_extension,
+ )
+ self.counts['s3-put'] += 1
+
+
+class PersistXmlDocWorker(GenericPersistDocWorker):
+ """
+ Pushes TEI-XML file to blob store (S3/seaweed/minio). Does not talk to
+ sandcrawler database (SQL).
+ """
+
+ def __init__(self, **kwargs):
+ super().__init__(**kwargs)
+ self.s3_extension = kwargs.get('s3_extension', ".jats.xml")
+ self.s3_folder = kwargs.get('s3_folder', "xml_doc")
+ self.doc_key = "jats_xml"
+
+
+class PersistHtmlTeiXmlWorker(GenericPersistDocWorker):
+ """
+ Pushes TEI-XML file to blob store (S3/seaweed/minio). Does not talk to
+ sandcrawler database (SQL).
+ """
+
+ def __init__(self, **kwargs):
+ super().__init__(**kwargs)
+ self.s3_extension = kwargs.get('s3_extension', ".tei.xml")
+ self.s3_folder = kwargs.get('s3_folder', "html_body")
+ self.doc_key = "tei_xml"