aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--python/sandcrawler/db.py47
-rw-r--r--python/sandcrawler/minio.py2
-rw-r--r--python/sandcrawler/pdfextract.py67
-rw-r--r--python/sandcrawler/persist.py29
-rw-r--r--python/sandcrawler/workers.py4
-rwxr-xr-xpython/sandcrawler_worker.py3
6 files changed, 134 insertions, 18 deletions
diff --git a/python/sandcrawler/db.py b/python/sandcrawler/db.py
index 03cc15f..793f1c4 100644
--- a/python/sandcrawler/db.py
+++ b/python/sandcrawler/db.py
@@ -34,6 +34,15 @@ class SandcrawlerPostgrestClient:
else:
return None
+ def get_pdf_meta(self, sha1):
+ resp = requests.get(self.api_url + "/pdf_meta", params=dict(sha1hex='eq.'+sha1))
+ resp.raise_for_status()
+ resp = resp.json()
+ if resp:
+ return resp[0]
+ else:
+ return None
+
def get_file_meta(self, sha1):
resp = requests.get(self.api_url + "/file_meta", params=dict(sha1hex='eq.'+sha1))
resp.raise_for_status()
@@ -185,6 +194,44 @@ class SandcrawlerPostgresClient:
resp = psycopg2.extras.execute_values(cur, sql, batch, page_size=250, fetch=True)
return self._inserts_and_updates(resp, on_conflict)
+ def insert_pdf_meta(self, cur, batch, on_conflict="nothing"):
+ """
+ batch elements are expected to have .to_sql_tuple() method
+ """
+ sql = """
+ INSERT INTO
+ pdf_meta (sha1hex, updated, status, has_page0_thumbnail, page_count, word_count, page0_height, page0_width, permanent_id, pdf_created, pdf_version, metadata)
+ VALUES %s
+ ON CONFLICT (sha1hex) DO
+ """
+ if on_conflict.lower() == "nothing":
+ sql += " NOTHING"
+ elif on_conflict.lower() == "update":
+ sql += """ UPDATE SET
+ updated=EXCLUDED.updated,
+ status=EXCLUDED.status,
+ has_page0_thumbnail=EXCLUDED.has_page0_thumbnail,
+ page_count=EXCLUDED.page_count,
+ word_count=EXCLUDED.word_count,
+ page0_height=EXCLUDED.page0_height,
+ page0_width=EXCLUDED.page0_width,
+ permanent_id=EXCLUDED.permanent_id,
+ pdf_created=EXCLUDED.pdf_created,
+ pdf_version=EXCLUDED.pdf_version,
+ metadata=EXCLUDED.metadata
+ """
+ else:
+ raise NotImplementedError("on_conflict: {}".format(on_conflict))
+ sql += " RETURNING xmax;"
+ batch = [d.to_sql_tuple() for d in batch]
+ # filter out duplicate rows by key (sha1hex)
+ batch_dict = dict()
+ for b in batch:
+ batch_dict[b[0]] = b
+ batch = list(batch_dict.values())
+ resp = psycopg2.extras.execute_values(cur, sql, batch, page_size=250, fetch=True)
+ return self._inserts_and_updates(resp, on_conflict)
+
def insert_pdftrio(self, cur, batch, on_conflict="nothing"):
sql = """
INSERT INTO
diff --git a/python/sandcrawler/minio.py b/python/sandcrawler/minio.py
index 41dd29a..4126b4b 100644
--- a/python/sandcrawler/minio.py
+++ b/python/sandcrawler/minio.py
@@ -28,7 +28,7 @@ class SandcrawlerMinioClient(object):
)
self.default_bucket = default_bucket
- def _blob_path(self, folder, sha1hex, extension, prefix):
+ def _blob_path(self, folder, sha1hex: str, extension: str, prefix):
if not extension:
extension = ""
if not prefix:
diff --git a/python/sandcrawler/pdfextract.py b/python/sandcrawler/pdfextract.py
index 4e55f3f..5ef5dfd 100644
--- a/python/sandcrawler/pdfextract.py
+++ b/python/sandcrawler/pdfextract.py
@@ -1,5 +1,6 @@
import sys
+import json
import datetime
from io import BytesIO
from dataclasses import dataclass
@@ -21,6 +22,7 @@ class PdfExtractResult:
file_meta: Optional[Dict[str,Any]] = None
text: Optional[str] = None
page0_thumbnail: Optional[bytes] = None
+ has_page0_thumbnail: bool = False
meta_xml: Optional[str] = None
pdf_info: Optional[Dict[str,Any]] = None
pdf_extra: Optional[Dict[str,Any]] = None
@@ -31,18 +33,75 @@ class PdfExtractResult:
Outputs a JSON string as would be published to Kafka text/info topic.
"""
return {
+ 'key': self.sha1hex,
'sha1hex': self.sha1hex,
'status': self.status,
'file_meta': self.file_meta,
'error_msg': self.error_msg,
'text': self.text,
- 'page0_thumbnail': self.page0_thumbnail is not None,
+ 'has_page0_thumbnail': self.has_page0_thumbnail,
'meta_xml': self.meta_xml,
'pdf_info': self.pdf_info,
'pdf_extra': self.pdf_extra,
'source': self.source,
}
+ @classmethod
+ def from_pdftext_dict(cls, record):
+ """
+ Outputs a JSON string as would be published to Kafka text/info topic.
+ """
+ if record['status'] != 'success':
+ return PdfExtractResult(
+ sha1hex=record['sha1hex'],
+ status=record['status'],
+ error_msg=record.get('error_msg'),
+ )
+ else:
+ return PdfExtractResult(
+ sha1hex=record['sha1hex'],
+ status=record['status'],
+ file_meta=record.get('file_meta'),
+ text=record.get('text'),
+ has_page0_thumbnail=bool(record.get('has_page0_thumbnail', False)),
+ meta_xml=record.get('meta_xml'),
+ pdf_info=record.get('pdf_info'),
+ pdf_extra=record.get('pdf_extra'),
+ )
+
+ def to_sql_tuple(self) -> tuple:
+ # pdf_meta (sha1hex, updated, status, page0_thumbnail, page_count,
+ # word_count, page0_height, page0_width, permanent_id, pdf_created,
+ # pdf_version, metadata)
+ word_count: Optional[int] = None
+ if self.text:
+ word_count = len(self.text.split())
+ metadata: Optional[Dict] = None
+ pdf_extra = self.pdf_extra or dict()
+ pdf_created = None
+ # TODO: form, encrypted
+ if self.pdf_info:
+ metadata = dict()
+ for k in ('Title', 'Subject', 'Author', 'Creator', 'Producer', 'doi'):
+ if k in self.pdf_info:
+ metadata[k.lower()] = self.pdf_info[k]
+ if 'CreationDate' in self.pdf_info:
+ pdf_created = self.pdf_info['CreationDate']
+ return (
+ self.sha1hex,
+ datetime.datetime.now(), # updated
+ self.status,
+ self.has_page0_thumbnail,
+ pdf_extra.get('page_count'),
+ word_count,
+ pdf_extra.get('page0_height'),
+ pdf_extra.get('page0_width'),
+ pdf_extra.get('permanent_id'),
+ pdf_created,
+ pdf_extra.get('pdf_version'),
+ metadata and json.dumps(metadata, sort_keys=True),
+ )
+
def process_pdf(blob: bytes, thumb_size=(180,300), thumb_type="JPEG") -> PdfExtractResult:
"""
@@ -70,6 +129,7 @@ def process_pdf(blob: bytes, thumb_size=(180,300), thumb_type="JPEG") -> PdfExtr
sha1hex=sha1hex,
status='empty-pdf',
file_meta=file_meta,
+ has_page0_thumbnail=False,
)
page0 = pdf.create_page(0)
if page0 is None:
@@ -131,6 +191,7 @@ def process_pdf(blob: bytes, thumb_size=(180,300), thumb_type="JPEG") -> PdfExtr
status='success',
error_msg=None,
text=full_text or None,
+ has_page0_thumbnail=page0_thumbnail is not None,
page0_thumbnail=page0_thumbnail,
meta_xml=pdf.metadata or None,
pdf_info=pdf_info,
@@ -172,7 +233,7 @@ class PdfExtractWorker(SandcrawlerFetchWorker):
result = process_pdf(blob)
result.source = record
if self.thumbnail_sink and result.page0_thumbnail is not None:
- self.thumbnail_sink.push_record(result.page0_thumbnail)
+ self.thumbnail_sink.push_record(result.page0_thumbnail, key=result.sha1hex)
return result.to_pdftext_dict()
class PdfExtractBlobWorker(SandcrawlerWorker):
@@ -193,7 +254,7 @@ class PdfExtractBlobWorker(SandcrawlerWorker):
result = process_pdf(blob)
if self.thumbnail_sink and result.page0_thumbnail is not None:
- self.thumbnail_sink.push_record(result.page0_thumbnail)
+ self.thumbnail_sink.push_record(result.page0_thumbnail, key=result.sha1hex)
return result
diff --git a/python/sandcrawler/persist.py b/python/sandcrawler/persist.py
index 8d421ad..6d9298e 100644
--- a/python/sandcrawler/persist.py
+++ b/python/sandcrawler/persist.py
@@ -20,12 +20,14 @@ grobid
"""
import os
+from typing import Optional
import xml.etree.ElementTree
from sandcrawler.workers import SandcrawlerWorker
from sandcrawler.db import SandcrawlerPostgresClient
from sandcrawler.minio import SandcrawlerMinioClient
from sandcrawler.grobid import GrobidClient
+from sandcrawler.pdfextract import PdfExtractResult
class PersistCdxWorker(SandcrawlerWorker):
@@ -404,29 +406,33 @@ class PersistPdfTextWorker(SandcrawlerWorker):
def push_batch(self, batch):
self.counts['total'] += len(batch)
+ parsed_batch = []
for r in batch:
- if r['status'] != 'success' or not r.get('text'):
+ parsed_batch.append(PdfExtractResult.from_pdftext_dict(r))
+
+ for r in parsed_batch:
+ if r.status != 'success' or not r.text:
self.counts['s3-skip-status'] += 1
- if r.get('error_msg'):
- r['metadata'] = {'error_msg': r['error_msg'][:500]}
+ if r.error_msg:
+ r.metadata = {'error_msg': r.error_msg[:500]}
continue
- assert len(r['sha1hex']) == 40
+ assert len(r.sha1hex) == 40
if not self.db_only:
resp = self.s3.put_blob(
folder="text",
- blob=r['text'],
- sha1hex=r['sha1hex'],
+ blob=r.text,
+ sha1hex=r.sha1hex,
extension=".txt",
)
self.counts['s3-put'] += 1
if not self.s3_only:
- resp = self.db.insert_pdf_meta(self.cur, batch, on_conflict="update")
+ resp = self.db.insert_pdf_meta(self.cur, parsed_batch, on_conflict="update")
self.counts['insert-pdf-meta'] += resp[0]
self.counts['update-pdf-meta'] += resp[1]
- file_meta_batch = [r['file_meta'] for r in batch if r.get('file_meta')]
+ file_meta_batch = [r.file_meta for r in parsed_batch if r.file_meta]
resp = self.db.insert_file_meta(self.cur, file_meta_batch, on_conflict="update")
self.counts['insert-file-meta'] += resp[0]
self.counts['update-file-meta'] += resp[1]
@@ -454,12 +460,14 @@ class PersistThumbnailWorker(SandcrawlerWorker):
self.s3_extension = kwargs.get('s3_extension', ".jpg")
self.s3_folder = kwargs.get('s3_folder', "pdf")
- def process(self, blob, key=None):
+ def process(self, blob: bytes, key: Optional[str] = None):
"""
Processing raw messages, not decoded JSON objects
"""
- assert key is not None and len(key) == 40
+ if isinstance(key, bytes):
+ key = key.decode('utf-8')
+ assert key is not None and len(key) == 40 and isinstance(key, str)
assert isinstance(blob, bytes)
assert len(blob) >= 50
@@ -470,5 +478,4 @@ class PersistThumbnailWorker(SandcrawlerWorker):
extension=self.s3_extension,
)
self.counts['s3-put'] += 1
- return True
diff --git a/python/sandcrawler/workers.py b/python/sandcrawler/workers.py
index 8115ee3..4a1d7a4 100644
--- a/python/sandcrawler/workers.py
+++ b/python/sandcrawler/workers.py
@@ -233,7 +233,7 @@ class BlackholeSink(SandcrawlerWorker):
Useful for tests.
"""
- def push_record(self, task):
+ def push_record(self, task, key=None):
return
def push_batch(self, tasks):
@@ -528,7 +528,7 @@ class KafkaJsonPusher(RecordPusher):
# without decoding as JSON. Eg, for thumbnails (where
# message bytes are JPEG, and we need # the sha1hex key
# from the message)
- record = msg
+ record = msg.value()
else:
record = json.loads(msg.value().decode('utf-8'))
# This complex bit of code implements backoff/backpressure
diff --git a/python/sandcrawler_worker.py b/python/sandcrawler_worker.py
index e18d883..d85a995 100755
--- a/python/sandcrawler_worker.py
+++ b/python/sandcrawler_worker.py
@@ -135,7 +135,8 @@ def run_persist_thumbnail(args):
kafka_hosts=args.kafka_hosts,
consume_topic=consume_topic,
group="persist-pdf-thumbnail",
- raw_record=True,
+ push_batches=False,
+ raw_records=True,
batch_size=25,
)
pusher.run()