From 7dc382519187e32856f494af76248fa93aef7770 Mon Sep 17 00:00:00 2001 From: Bryan Newbold Date: Tue, 3 Nov 2020 19:28:21 -0800 Subject: persist: XML and HTML persist workers --- python/sandcrawler/persist.py | 77 +++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 74 insertions(+), 3 deletions(-) (limited to 'python/sandcrawler/persist.py') diff --git a/python/sandcrawler/persist.py b/python/sandcrawler/persist.py index aa05195..81cf664 100644 --- a/python/sandcrawler/persist.py +++ b/python/sandcrawler/persist.py @@ -95,7 +95,7 @@ class PersistIngestFileResultWorker(SandcrawlerWorker): if not k in raw: self.counts['skip-request-fields'] += 1 return None - if raw['ingest_type'] not in ('pdf', 'xml'): + if raw['ingest_type'] not in ('pdf', 'xml', 'html'): print(raw['ingest_type']) self.counts['skip-ingest-type'] += 1 return None @@ -121,7 +121,7 @@ class PersistIngestFileResultWorker(SandcrawlerWorker): return request - def file_result_to_row(self, raw): + def file_result_to_row(self, raw: dict) -> Optional[dict]: """ Converts ingest-result JSON schema (eg, from Kafka) to SQL ingest_file_result schema @@ -137,7 +137,7 @@ class PersistIngestFileResultWorker(SandcrawlerWorker): ingest_type = raw['request'].get('ingest_type') if ingest_type == 'file': ingest_type = 'pdf' - if ingest_type not in ('pdf', 'xml'): + if ingest_type not in ('pdf', 'xml', 'html'): self.counts['skip-ingest-type'] += 1 return None if raw['status'] in ("existing", ): @@ -159,6 +159,9 @@ class PersistIngestFileResultWorker(SandcrawlerWorker): result['terminal_sha1hex'] = terminal.get('terminal_sha1hex') return result + def result_to_html_meta(self, record: dict) -> Optional[dict]: + raise NotImplementedError() + def push_batch(self, batch): self.counts['total'] += len(batch) @@ -197,6 +200,12 @@ class PersistIngestFileResultWorker(SandcrawlerWorker): self.counts['insert-file_meta'] += resp[0] self.counts['update-file_meta'] += resp[1] + html_meta_batch = [self.result_to_html_meta(r) for r in batch if r.get('hit') and r.get('html_meta')] + if html_meta_batch: + resp = self.db.insert_html_meta(self.cur, html_meta_batch, on_conflict="nothing") + self.counts['insert-html_meta'] += resp[0] + self.counts['update-html_meta'] += resp[1] + self.db.commit() return [] @@ -489,3 +498,65 @@ class PersistThumbnailWorker(SandcrawlerWorker): ) self.counts['s3-put'] += 1 + +class GenericPersistDocWorker(SandcrawlerWorker): + """ + Pushes blobs from Kafka to S3. + + Objects are assumed to be JSON-wrapped strings. + """ + + def __init__(self, **kwargs): + super().__init__() + self.s3 = SandcrawlerMinioClient( + host_url=kwargs.get('s3_url', 'localhost:9000'), + access_key=kwargs['s3_access_key'], + secret_key=kwargs['s3_secret_key'], + default_bucket=kwargs['s3_bucket'], + ) + self.s3_extension = kwargs.get('s3_extension', ".unknown") + self.s3_folder = kwargs.get('s3_folder', "unknown") + self.doc_key = "unknown" + + def process(self, record: dict, key: Optional[str] = None) -> None: + + if record.get('status') != 'success' or not record.get(self.doc_key): + return + + assert key is not None and len(key) == 40 and isinstance(key, str) + if 'sha1hex' in record: + assert key == record['sha1hex'] + + resp = self.s3.put_blob( + folder=self.s3_folder, + blob=record[self.doc_key].encode('utf-8'), + sha1hex=key, + extension=self.s3_extension, + ) + self.counts['s3-put'] += 1 + + +class PersistXmlDocWorker(GenericPersistDocWorker): + """ + Pushes TEI-XML file to blob store (S3/seaweed/minio). Does not talk to + sandcrawler database (SQL). + """ + + def __init__(self, **kwargs): + super().__init__(**kwargs) + self.s3_extension = kwargs.get('s3_extension', ".jats.xml") + self.s3_folder = kwargs.get('s3_folder', "xml_doc") + self.doc_key = "jats_xml" + + +class PersistHtmlTeiXmlWorker(GenericPersistDocWorker): + """ + Pushes TEI-XML file to blob store (S3/seaweed/minio). Does not talk to + sandcrawler database (SQL). + """ + + def __init__(self, **kwargs): + super().__init__(**kwargs) + self.s3_extension = kwargs.get('s3_extension', ".tei.xml") + self.s3_folder = kwargs.get('s3_folder', "html_body") + self.doc_key = "tei_xml" -- cgit v1.2.3