aboutsummaryrefslogtreecommitdiffstats
path: root/python/sandcrawler/ingest.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/sandcrawler/ingest.py')
-rw-r--r--python/sandcrawler/ingest.py36
1 files changed, 35 insertions, 1 deletions
diff --git a/python/sandcrawler/ingest.py b/python/sandcrawler/ingest.py
index f4e78e4..d63baff 100644
--- a/python/sandcrawler/ingest.py
+++ b/python/sandcrawler/ingest.py
@@ -9,6 +9,7 @@ from collections import namedtuple
from sandcrawler.ia import SavePageNowClient, CdxApiClient, WaybackClient, WaybackError, SavePageNowError, CdxApiError, PetaboxError, cdx_to_dict, ResourceResult
from sandcrawler.grobid import GrobidClient
+from sandcrawler.pdfextract import process_pdf, PdfExtractResult
from sandcrawler.misc import gen_file_metadata, clean_url
from sandcrawler.html import extract_fulltext_url
from sandcrawler.workers import SandcrawlerWorker
@@ -59,9 +60,12 @@ class IngestFileWorker(SandcrawlerWorker):
if not self.pgrest_client:
self.pgrest_client = SandcrawlerPostgrestClient()
self.grobid_sink = kwargs.get('grobid_sink')
+ self.thumbnail_sink = kwargs.get('thumbnail_sink')
+ self.pdftext_sink = kwargs.get('pdftext_sink')
self.try_existing_ingest = kwargs.get('try_existing_ingest', False)
self.try_existing_grobid = kwargs.get('try_existing_grobid', True)
+ self.try_existing_pdfextract = kwargs.get('try_existing_pdfextract', True)
self.try_wayback = kwargs.get('try_wayback', True)
self.try_spn2 = kwargs.get('try_spn2', True)
@@ -196,6 +200,7 @@ class IngestFileWorker(SandcrawlerWorker):
"""
return {
'grobid': self.process_grobid(resource, file_meta),
+ 'pdf_meta': self.process_pdfextract(resource, file_meta),
}
def process_grobid(self, resource, file_meta):
@@ -229,6 +234,34 @@ class IngestFileWorker(SandcrawlerWorker):
result.pop('key', None)
return result
+ def process_pdfextract(self, resource, file_meta):
+ """
+ Extracts thumbnail and pdf_meta info from PDF.
+
+ By default checks sandcrawler-db for an existing row first, then decide
+ if we should re-process.
+
+ TODO: difference between Kafka schema and SQL/postgrest schema
+ """
+ if self.try_existing_pdfextract:
+ existing = self.pgrest_client.get_pdf_meta(file_meta['sha1hex'])
+ if existing:
+ print("found existing pdf_meta result", file=sys.stderr)
+ result = PdfExtractResult.from_pdf_meta_dict(existing)
+ return result.to_pdftext_dict()
+
+ # Need to actually processes
+ result = process_pdf(resource.body)
+ assert result.file_meta['sha1hex'] == file_meta['sha1hex']
+ if self.thumbnail_sink and result.page0_thumbnail is not None:
+ self.thumbnail_sink.push_record(result.page0_thumbnail, key=result.sha1hex)
+ if self.pdftext_sink:
+ self.pdftext_sink.push_record(result.to_pdftext_dict())
+ result.page0_thumbnail = None
+ result.text = None
+ result.file_meta = None
+ return result.to_pdftext_dict()
+
def timeout_response(self, task):
print("[TIMEOUT]", file=sys.stderr)
return dict(
@@ -421,9 +454,10 @@ class IngestFileWorker(SandcrawlerWorker):
result['status'] = "success"
result['hit'] = True
- print("[SUCCESS\t] sha1:{} grobid:{}".format(
+ print("[SUCCESS\t] sha1:{} grobid:{} pdfextract:{}".format(
result.get('file_meta', {}).get('sha1hex'),
result.get('grobid', {}).get('status_code'),
+ result.get('pdfextract', {}).get('status'),
),
file=sys.stderr)
return result