From b4a40d99b23a83eabeed490c0dce52dba31dc7b8 Mon Sep 17 00:00:00 2001 From: Bryan Newbold Date: Wed, 17 Jun 2020 21:23:08 -0700 Subject: fixes and tweaks from testing locally --- python/sandcrawler/db.py | 47 ++++++++++++++++++++++++++++ python/sandcrawler/minio.py | 2 +- python/sandcrawler/pdfextract.py | 67 ++++++++++++++++++++++++++++++++++++++-- python/sandcrawler/persist.py | 29 ++++++++++------- python/sandcrawler/workers.py | 4 +-- python/sandcrawler_worker.py | 3 +- 6 files changed, 134 insertions(+), 18 deletions(-) diff --git a/python/sandcrawler/db.py b/python/sandcrawler/db.py index 03cc15f..793f1c4 100644 --- a/python/sandcrawler/db.py +++ b/python/sandcrawler/db.py @@ -34,6 +34,15 @@ class SandcrawlerPostgrestClient: else: return None + def get_pdf_meta(self, sha1): + resp = requests.get(self.api_url + "/pdf_meta", params=dict(sha1hex='eq.'+sha1)) + resp.raise_for_status() + resp = resp.json() + if resp: + return resp[0] + else: + return None + def get_file_meta(self, sha1): resp = requests.get(self.api_url + "/file_meta", params=dict(sha1hex='eq.'+sha1)) resp.raise_for_status() @@ -185,6 +194,44 @@ class SandcrawlerPostgresClient: resp = psycopg2.extras.execute_values(cur, sql, batch, page_size=250, fetch=True) return self._inserts_and_updates(resp, on_conflict) + def insert_pdf_meta(self, cur, batch, on_conflict="nothing"): + """ + batch elements are expected to have .to_sql_tuple() method + """ + sql = """ + INSERT INTO + pdf_meta (sha1hex, updated, status, has_page0_thumbnail, page_count, word_count, page0_height, page0_width, permanent_id, pdf_created, pdf_version, metadata) + VALUES %s + ON CONFLICT (sha1hex) DO + """ + if on_conflict.lower() == "nothing": + sql += " NOTHING" + elif on_conflict.lower() == "update": + sql += """ UPDATE SET + updated=EXCLUDED.updated, + status=EXCLUDED.status, + has_page0_thumbnail=EXCLUDED.has_page0_thumbnail, + page_count=EXCLUDED.page_count, + word_count=EXCLUDED.word_count, + page0_height=EXCLUDED.page0_height, + page0_width=EXCLUDED.page0_width, + permanent_id=EXCLUDED.permanent_id, + pdf_created=EXCLUDED.pdf_created, + pdf_version=EXCLUDED.pdf_version, + metadata=EXCLUDED.metadata + """ + else: + raise NotImplementedError("on_conflict: {}".format(on_conflict)) + sql += " RETURNING xmax;" + batch = [d.to_sql_tuple() for d in batch] + # filter out duplicate rows by key (sha1hex) + batch_dict = dict() + for b in batch: + batch_dict[b[0]] = b + batch = list(batch_dict.values()) + resp = psycopg2.extras.execute_values(cur, sql, batch, page_size=250, fetch=True) + return self._inserts_and_updates(resp, on_conflict) + def insert_pdftrio(self, cur, batch, on_conflict="nothing"): sql = """ INSERT INTO diff --git a/python/sandcrawler/minio.py b/python/sandcrawler/minio.py index 41dd29a..4126b4b 100644 --- a/python/sandcrawler/minio.py +++ b/python/sandcrawler/minio.py @@ -28,7 +28,7 @@ class SandcrawlerMinioClient(object): ) self.default_bucket = default_bucket - def _blob_path(self, folder, sha1hex, extension, prefix): + def _blob_path(self, folder, sha1hex: str, extension: str, prefix): if not extension: extension = "" if not prefix: diff --git a/python/sandcrawler/pdfextract.py b/python/sandcrawler/pdfextract.py index 4e55f3f..5ef5dfd 100644 --- a/python/sandcrawler/pdfextract.py +++ b/python/sandcrawler/pdfextract.py @@ -1,5 +1,6 @@ import sys +import json import datetime from io import BytesIO from dataclasses import dataclass @@ -21,6 +22,7 @@ class PdfExtractResult: file_meta: Optional[Dict[str,Any]] = None text: Optional[str] = None page0_thumbnail: Optional[bytes] = None + has_page0_thumbnail: bool = False meta_xml: Optional[str] = None pdf_info: Optional[Dict[str,Any]] = None pdf_extra: Optional[Dict[str,Any]] = None @@ -31,18 +33,75 @@ class PdfExtractResult: Outputs a JSON string as would be published to Kafka text/info topic. """ return { + 'key': self.sha1hex, 'sha1hex': self.sha1hex, 'status': self.status, 'file_meta': self.file_meta, 'error_msg': self.error_msg, 'text': self.text, - 'page0_thumbnail': self.page0_thumbnail is not None, + 'has_page0_thumbnail': self.has_page0_thumbnail, 'meta_xml': self.meta_xml, 'pdf_info': self.pdf_info, 'pdf_extra': self.pdf_extra, 'source': self.source, } + @classmethod + def from_pdftext_dict(cls, record): + """ + Outputs a JSON string as would be published to Kafka text/info topic. + """ + if record['status'] != 'success': + return PdfExtractResult( + sha1hex=record['sha1hex'], + status=record['status'], + error_msg=record.get('error_msg'), + ) + else: + return PdfExtractResult( + sha1hex=record['sha1hex'], + status=record['status'], + file_meta=record.get('file_meta'), + text=record.get('text'), + has_page0_thumbnail=bool(record.get('has_page0_thumbnail', False)), + meta_xml=record.get('meta_xml'), + pdf_info=record.get('pdf_info'), + pdf_extra=record.get('pdf_extra'), + ) + + def to_sql_tuple(self) -> tuple: + # pdf_meta (sha1hex, updated, status, page0_thumbnail, page_count, + # word_count, page0_height, page0_width, permanent_id, pdf_created, + # pdf_version, metadata) + word_count: Optional[int] = None + if self.text: + word_count = len(self.text.split()) + metadata: Optional[Dict] = None + pdf_extra = self.pdf_extra or dict() + pdf_created = None + # TODO: form, encrypted + if self.pdf_info: + metadata = dict() + for k in ('Title', 'Subject', 'Author', 'Creator', 'Producer', 'doi'): + if k in self.pdf_info: + metadata[k.lower()] = self.pdf_info[k] + if 'CreationDate' in self.pdf_info: + pdf_created = self.pdf_info['CreationDate'] + return ( + self.sha1hex, + datetime.datetime.now(), # updated + self.status, + self.has_page0_thumbnail, + pdf_extra.get('page_count'), + word_count, + pdf_extra.get('page0_height'), + pdf_extra.get('page0_width'), + pdf_extra.get('permanent_id'), + pdf_created, + pdf_extra.get('pdf_version'), + metadata and json.dumps(metadata, sort_keys=True), + ) + def process_pdf(blob: bytes, thumb_size=(180,300), thumb_type="JPEG") -> PdfExtractResult: """ @@ -70,6 +129,7 @@ def process_pdf(blob: bytes, thumb_size=(180,300), thumb_type="JPEG") -> PdfExtr sha1hex=sha1hex, status='empty-pdf', file_meta=file_meta, + has_page0_thumbnail=False, ) page0 = pdf.create_page(0) if page0 is None: @@ -131,6 +191,7 @@ def process_pdf(blob: bytes, thumb_size=(180,300), thumb_type="JPEG") -> PdfExtr status='success', error_msg=None, text=full_text or None, + has_page0_thumbnail=page0_thumbnail is not None, page0_thumbnail=page0_thumbnail, meta_xml=pdf.metadata or None, pdf_info=pdf_info, @@ -172,7 +233,7 @@ class PdfExtractWorker(SandcrawlerFetchWorker): result = process_pdf(blob) result.source = record if self.thumbnail_sink and result.page0_thumbnail is not None: - self.thumbnail_sink.push_record(result.page0_thumbnail) + self.thumbnail_sink.push_record(result.page0_thumbnail, key=result.sha1hex) return result.to_pdftext_dict() class PdfExtractBlobWorker(SandcrawlerWorker): @@ -193,7 +254,7 @@ class PdfExtractBlobWorker(SandcrawlerWorker): result = process_pdf(blob) if self.thumbnail_sink and result.page0_thumbnail is not None: - self.thumbnail_sink.push_record(result.page0_thumbnail) + self.thumbnail_sink.push_record(result.page0_thumbnail, key=result.sha1hex) return result diff --git a/python/sandcrawler/persist.py b/python/sandcrawler/persist.py index 8d421ad..6d9298e 100644 --- a/python/sandcrawler/persist.py +++ b/python/sandcrawler/persist.py @@ -20,12 +20,14 @@ grobid """ import os +from typing import Optional import xml.etree.ElementTree from sandcrawler.workers import SandcrawlerWorker from sandcrawler.db import SandcrawlerPostgresClient from sandcrawler.minio import SandcrawlerMinioClient from sandcrawler.grobid import GrobidClient +from sandcrawler.pdfextract import PdfExtractResult class PersistCdxWorker(SandcrawlerWorker): @@ -404,29 +406,33 @@ class PersistPdfTextWorker(SandcrawlerWorker): def push_batch(self, batch): self.counts['total'] += len(batch) + parsed_batch = [] for r in batch: - if r['status'] != 'success' or not r.get('text'): + parsed_batch.append(PdfExtractResult.from_pdftext_dict(r)) + + for r in parsed_batch: + if r.status != 'success' or not r.text: self.counts['s3-skip-status'] += 1 - if r.get('error_msg'): - r['metadata'] = {'error_msg': r['error_msg'][:500]} + if r.error_msg: + r.metadata = {'error_msg': r.error_msg[:500]} continue - assert len(r['sha1hex']) == 40 + assert len(r.sha1hex) == 40 if not self.db_only: resp = self.s3.put_blob( folder="text", - blob=r['text'], - sha1hex=r['sha1hex'], + blob=r.text, + sha1hex=r.sha1hex, extension=".txt", ) self.counts['s3-put'] += 1 if not self.s3_only: - resp = self.db.insert_pdf_meta(self.cur, batch, on_conflict="update") + resp = self.db.insert_pdf_meta(self.cur, parsed_batch, on_conflict="update") self.counts['insert-pdf-meta'] += resp[0] self.counts['update-pdf-meta'] += resp[1] - file_meta_batch = [r['file_meta'] for r in batch if r.get('file_meta')] + file_meta_batch = [r.file_meta for r in parsed_batch if r.file_meta] resp = self.db.insert_file_meta(self.cur, file_meta_batch, on_conflict="update") self.counts['insert-file-meta'] += resp[0] self.counts['update-file-meta'] += resp[1] @@ -454,12 +460,14 @@ class PersistThumbnailWorker(SandcrawlerWorker): self.s3_extension = kwargs.get('s3_extension', ".jpg") self.s3_folder = kwargs.get('s3_folder', "pdf") - def process(self, blob, key=None): + def process(self, blob: bytes, key: Optional[str] = None): """ Processing raw messages, not decoded JSON objects """ - assert key is not None and len(key) == 40 + if isinstance(key, bytes): + key = key.decode('utf-8') + assert key is not None and len(key) == 40 and isinstance(key, str) assert isinstance(blob, bytes) assert len(blob) >= 50 @@ -470,5 +478,4 @@ class PersistThumbnailWorker(SandcrawlerWorker): extension=self.s3_extension, ) self.counts['s3-put'] += 1 - return True diff --git a/python/sandcrawler/workers.py b/python/sandcrawler/workers.py index 8115ee3..4a1d7a4 100644 --- a/python/sandcrawler/workers.py +++ b/python/sandcrawler/workers.py @@ -233,7 +233,7 @@ class BlackholeSink(SandcrawlerWorker): Useful for tests. """ - def push_record(self, task): + def push_record(self, task, key=None): return def push_batch(self, tasks): @@ -528,7 +528,7 @@ class KafkaJsonPusher(RecordPusher): # without decoding as JSON. Eg, for thumbnails (where # message bytes are JPEG, and we need # the sha1hex key # from the message) - record = msg + record = msg.value() else: record = json.loads(msg.value().decode('utf-8')) # This complex bit of code implements backoff/backpressure diff --git a/python/sandcrawler_worker.py b/python/sandcrawler_worker.py index e18d883..d85a995 100755 --- a/python/sandcrawler_worker.py +++ b/python/sandcrawler_worker.py @@ -135,7 +135,8 @@ def run_persist_thumbnail(args): kafka_hosts=args.kafka_hosts, consume_topic=consume_topic, group="persist-pdf-thumbnail", - raw_record=True, + push_batches=False, + raw_records=True, batch_size=25, ) pusher.run() -- cgit v1.2.3