aboutsummaryrefslogtreecommitdiffstats
path: root/python
diff options
context:
space:
mode:
authorBryan Newbold <bnewbold@archive.org>2020-11-03 19:28:21 -0800
committerBryan Newbold <bnewbold@archive.org>2020-11-03 19:28:21 -0800
commit7dc382519187e32856f494af76248fa93aef7770 (patch)
tree3cff23b1ebf689239993731b7d0546f94bac8414 /python
parent2885b34ab3e4c862f9e895a237108d42793efb1d (diff)
downloadsandcrawler-7dc382519187e32856f494af76248fa93aef7770.tar.gz
sandcrawler-7dc382519187e32856f494af76248fa93aef7770.zip
persist: XML and HTML persist workers
Diffstat (limited to 'python')
-rw-r--r--python/sandcrawler/persist.py77
-rwxr-xr-xpython/sandcrawler_worker.py47
2 files changed, 121 insertions, 3 deletions
diff --git a/python/sandcrawler/persist.py b/python/sandcrawler/persist.py
index aa05195..81cf664 100644
--- a/python/sandcrawler/persist.py
+++ b/python/sandcrawler/persist.py
@@ -95,7 +95,7 @@ class PersistIngestFileResultWorker(SandcrawlerWorker):
if not k in raw:
self.counts['skip-request-fields'] += 1
return None
- if raw['ingest_type'] not in ('pdf', 'xml'):
+ if raw['ingest_type'] not in ('pdf', 'xml', 'html'):
print(raw['ingest_type'])
self.counts['skip-ingest-type'] += 1
return None
@@ -121,7 +121,7 @@ class PersistIngestFileResultWorker(SandcrawlerWorker):
return request
- def file_result_to_row(self, raw):
+ def file_result_to_row(self, raw: dict) -> Optional[dict]:
"""
Converts ingest-result JSON schema (eg, from Kafka) to SQL ingest_file_result schema
@@ -137,7 +137,7 @@ class PersistIngestFileResultWorker(SandcrawlerWorker):
ingest_type = raw['request'].get('ingest_type')
if ingest_type == 'file':
ingest_type = 'pdf'
- if ingest_type not in ('pdf', 'xml'):
+ if ingest_type not in ('pdf', 'xml', 'html'):
self.counts['skip-ingest-type'] += 1
return None
if raw['status'] in ("existing", ):
@@ -159,6 +159,9 @@ class PersistIngestFileResultWorker(SandcrawlerWorker):
result['terminal_sha1hex'] = terminal.get('terminal_sha1hex')
return result
+ def result_to_html_meta(self, record: dict) -> Optional[dict]:
+ raise NotImplementedError()
+
def push_batch(self, batch):
self.counts['total'] += len(batch)
@@ -197,6 +200,12 @@ class PersistIngestFileResultWorker(SandcrawlerWorker):
self.counts['insert-file_meta'] += resp[0]
self.counts['update-file_meta'] += resp[1]
+ html_meta_batch = [self.result_to_html_meta(r) for r in batch if r.get('hit') and r.get('html_meta')]
+ if html_meta_batch:
+ resp = self.db.insert_html_meta(self.cur, html_meta_batch, on_conflict="nothing")
+ self.counts['insert-html_meta'] += resp[0]
+ self.counts['update-html_meta'] += resp[1]
+
self.db.commit()
return []
@@ -489,3 +498,65 @@ class PersistThumbnailWorker(SandcrawlerWorker):
)
self.counts['s3-put'] += 1
+
+class GenericPersistDocWorker(SandcrawlerWorker):
+ """
+ Pushes blobs from Kafka to S3.
+
+ Objects are assumed to be JSON-wrapped strings.
+ """
+
+ def __init__(self, **kwargs):
+ super().__init__()
+ self.s3 = SandcrawlerMinioClient(
+ host_url=kwargs.get('s3_url', 'localhost:9000'),
+ access_key=kwargs['s3_access_key'],
+ secret_key=kwargs['s3_secret_key'],
+ default_bucket=kwargs['s3_bucket'],
+ )
+ self.s3_extension = kwargs.get('s3_extension', ".unknown")
+ self.s3_folder = kwargs.get('s3_folder', "unknown")
+ self.doc_key = "unknown"
+
+ def process(self, record: dict, key: Optional[str] = None) -> None:
+
+ if record.get('status') != 'success' or not record.get(self.doc_key):
+ return
+
+ assert key is not None and len(key) == 40 and isinstance(key, str)
+ if 'sha1hex' in record:
+ assert key == record['sha1hex']
+
+ resp = self.s3.put_blob(
+ folder=self.s3_folder,
+ blob=record[self.doc_key].encode('utf-8'),
+ sha1hex=key,
+ extension=self.s3_extension,
+ )
+ self.counts['s3-put'] += 1
+
+
+class PersistXmlDocWorker(GenericPersistDocWorker):
+ """
+ Pushes TEI-XML file to blob store (S3/seaweed/minio). Does not talk to
+ sandcrawler database (SQL).
+ """
+
+ def __init__(self, **kwargs):
+ super().__init__(**kwargs)
+ self.s3_extension = kwargs.get('s3_extension', ".jats.xml")
+ self.s3_folder = kwargs.get('s3_folder', "xml_doc")
+ self.doc_key = "jats_xml"
+
+
+class PersistHtmlTeiXmlWorker(GenericPersistDocWorker):
+ """
+ Pushes TEI-XML file to blob store (S3/seaweed/minio). Does not talk to
+ sandcrawler database (SQL).
+ """
+
+ def __init__(self, **kwargs):
+ super().__init__(**kwargs)
+ self.s3_extension = kwargs.get('s3_extension', ".tei.xml")
+ self.s3_folder = kwargs.get('s3_folder', "html_body")
+ self.doc_key = "tei_xml"
diff --git a/python/sandcrawler_worker.py b/python/sandcrawler_worker.py
index b62fa80..24dbdd0 100755
--- a/python/sandcrawler_worker.py
+++ b/python/sandcrawler_worker.py
@@ -13,6 +13,7 @@ import datetime
import raven
from sandcrawler import *
+from sandcrawler.persist import PersistXmlDocWorker, PersistHtmlTeiXmlWorker
# Yep, a global. Gets DSN from `SENTRY_DSN` environment variable
try:
@@ -148,6 +149,44 @@ def run_persist_thumbnail(args):
)
pusher.run()
+def run_persist_xml_doc(args: argparse.Namespace) -> None:
+ consume_topic = f"sandcrawler-{args.env}.xml-doc"
+ worker = PersistXmlDocWorker(
+ s3_url=args.s3_url,
+ s3_bucket=args.s3_bucket,
+ s3_access_key=args.s3_access_key,
+ s3_secret_key=args.s3_secret_key,
+ )
+ pusher = KafkaJsonPusher(
+ worker=worker,
+ kafka_hosts=args.kafka_hosts,
+ consume_topic=consume_topic,
+ group="persist-xml-doc",
+ push_batches=False,
+ raw_records=True,
+ batch_size=25,
+ )
+ pusher.run()
+
+def run_persist_html_teixml(args: argparse.Namespace) -> None:
+ consume_topic = f"sandcrawler-{args.env}.html-teixml"
+ worker = PersistHtmlTeiXmlWorker(
+ s3_url=args.s3_url,
+ s3_bucket=args.s3_bucket,
+ s3_access_key=args.s3_access_key,
+ s3_secret_key=args.s3_secret_key,
+ )
+ pusher = KafkaJsonPusher(
+ worker=worker,
+ kafka_hosts=args.kafka_hosts,
+ consume_topic=consume_topic,
+ group="persist-html-teixml",
+ push_batches=False,
+ raw_records=True,
+ batch_size=25,
+ )
+ pusher.run()
+
def run_persist_pdftrio(args):
consume_topic = "sandcrawler-{}.pdftrio-output".format(args.env)
worker = PersistPdfTrioWorker(
@@ -293,6 +332,14 @@ def main():
help="daemon that consumes thumbnail output from Kafka and pushes to S3 (seaweedfs) and postgres")
sub_persist_thumbnail.set_defaults(func=run_persist_thumbnail)
+ sub_persist_xml_doc = subparsers.add_parser('persist-xml-doc',
+ help="daemon that consumes xml-doc output from Kafka and pushes to S3 (seaweedfs) bucket")
+ sub_persist_xml_doc.set_defaults(func=run_persist_xml_doc)
+
+ sub_persist_html_teixml = subparsers.add_parser('persist-html-teixml',
+ help="daemon that consumes html-teixml output from Kafka and pushes to S3 (seaweedfs) bucket")
+ sub_persist_html_teixml.set_defaults(func=run_persist_html_teixml)
+
sub_persist_pdftrio = subparsers.add_parser('persist-pdftrio',
help="daemon that consumes pdftrio output from Kafka and pushes to postgres")
sub_persist_pdftrio.set_defaults(func=run_persist_pdftrio)