From 6f382a4c07ccac68896f75d55835a8876981edbd Mon Sep 17 00:00:00 2001 From: Bryan Newbold Date: Thu, 25 Jun 2020 18:18:19 -0700 Subject: pdfextract support in ingest worker --- python/sandcrawler/ingest.py | 36 +++++++++++++++++++++++++++++++++++- python/sandcrawler/pdfextract.py | 24 ++++++++++++++++++++++++ python/tests/test_ingest.py | 7 +++++++ 3 files changed, 66 insertions(+), 1 deletion(-) diff --git a/python/sandcrawler/ingest.py b/python/sandcrawler/ingest.py index f4e78e4..d63baff 100644 --- a/python/sandcrawler/ingest.py +++ b/python/sandcrawler/ingest.py @@ -9,6 +9,7 @@ from collections import namedtuple from sandcrawler.ia import SavePageNowClient, CdxApiClient, WaybackClient, WaybackError, SavePageNowError, CdxApiError, PetaboxError, cdx_to_dict, ResourceResult from sandcrawler.grobid import GrobidClient +from sandcrawler.pdfextract import process_pdf, PdfExtractResult from sandcrawler.misc import gen_file_metadata, clean_url from sandcrawler.html import extract_fulltext_url from sandcrawler.workers import SandcrawlerWorker @@ -59,9 +60,12 @@ class IngestFileWorker(SandcrawlerWorker): if not self.pgrest_client: self.pgrest_client = SandcrawlerPostgrestClient() self.grobid_sink = kwargs.get('grobid_sink') + self.thumbnail_sink = kwargs.get('thumbnail_sink') + self.pdftext_sink = kwargs.get('pdftext_sink') self.try_existing_ingest = kwargs.get('try_existing_ingest', False) self.try_existing_grobid = kwargs.get('try_existing_grobid', True) + self.try_existing_pdfextract = kwargs.get('try_existing_pdfextract', True) self.try_wayback = kwargs.get('try_wayback', True) self.try_spn2 = kwargs.get('try_spn2', True) @@ -196,6 +200,7 @@ class IngestFileWorker(SandcrawlerWorker): """ return { 'grobid': self.process_grobid(resource, file_meta), + 'pdf_meta': self.process_pdfextract(resource, file_meta), } def process_grobid(self, resource, file_meta): @@ -229,6 +234,34 @@ class IngestFileWorker(SandcrawlerWorker): result.pop('key', None) return result + def process_pdfextract(self, resource, file_meta): + """ + Extracts thumbnail and pdf_meta info from PDF. + + By default checks sandcrawler-db for an existing row first, then decide + if we should re-process. + + TODO: difference between Kafka schema and SQL/postgrest schema + """ + if self.try_existing_pdfextract: + existing = self.pgrest_client.get_pdf_meta(file_meta['sha1hex']) + if existing: + print("found existing pdf_meta result", file=sys.stderr) + result = PdfExtractResult.from_pdf_meta_dict(existing) + return result.to_pdftext_dict() + + # Need to actually processes + result = process_pdf(resource.body) + assert result.file_meta['sha1hex'] == file_meta['sha1hex'] + if self.thumbnail_sink and result.page0_thumbnail is not None: + self.thumbnail_sink.push_record(result.page0_thumbnail, key=result.sha1hex) + if self.pdftext_sink: + self.pdftext_sink.push_record(result.to_pdftext_dict()) + result.page0_thumbnail = None + result.text = None + result.file_meta = None + return result.to_pdftext_dict() + def timeout_response(self, task): print("[TIMEOUT]", file=sys.stderr) return dict( @@ -421,9 +454,10 @@ class IngestFileWorker(SandcrawlerWorker): result['status'] = "success" result['hit'] = True - print("[SUCCESS\t] sha1:{} grobid:{}".format( + print("[SUCCESS\t] sha1:{} grobid:{} pdfextract:{}".format( result.get('file_meta', {}).get('sha1hex'), result.get('grobid', {}).get('status_code'), + result.get('pdfextract', {}).get('status'), ), file=sys.stderr) return result diff --git a/python/sandcrawler/pdfextract.py b/python/sandcrawler/pdfextract.py index 6a78a0a..97c2f3b 100644 --- a/python/sandcrawler/pdfextract.py +++ b/python/sandcrawler/pdfextract.py @@ -69,6 +69,30 @@ class PdfExtractResult: pdf_extra=record.get('pdf_extra'), ) + @classmethod + def from_pdf_meta_dict(cls, record): + """ + Parses what would be returned from postgrest + """ + if record['status'] != 'success': + return PdfExtractResult( + sha1hex=record['sha1hex'], + status=record['status'], + error_msg=record.get('metadata', {}).get('error_msg'), + ) + else: + pdf_extra = dict() + for k in ('page_count', 'page0_height', 'page0_width', 'permanent_id', 'pdf_version'): + if record.get(k): + pdf_extra[k] = record[k] + return PdfExtractResult( + sha1hex=record['sha1hex'], + status=record['status'], + has_page0_thumbnail=bool(record.get('has_page0_thumbnail', False)), + pdf_info=record.get('metadata'), + pdf_extra=pdf_extra, + ) + def to_sql_tuple(self) -> tuple: # pdf_meta (sha1hex, updated, status, page0_thumbnail, page_count, # word_count, page0_height, page0_width, permanent_id, pdf_created, diff --git a/python/tests/test_ingest.py b/python/tests/test_ingest.py index 33de35d..c2d6266 100644 --- a/python/tests/test_ingest.py +++ b/python/tests/test_ingest.py @@ -73,6 +73,10 @@ def test_ingest_success(ingest_worker_pdf): 'http://dummy-postgrest/grobid?sha1hex=eq.{}'.format("90ffd2359008d82298821d16b21778c5c39aec36"), status=200, body=json.dumps([])) + responses.add(responses.GET, + 'http://dummy-postgrest/pdf_meta?sha1hex=eq.{}'.format("90ffd2359008d82298821d16b21778c5c39aec36"), + status=200, + body=json.dumps([])) responses.add(responses.POST, 'http://dummy-grobid/api/processFulltextDocument', status=200, body=REAL_TEI_XML, content_type='text/xml') @@ -99,6 +103,9 @@ def test_ingest_success(ingest_worker_pdf): assert 'grobid_version' not in resp['grobid']['metadata'] assert 'fatcat_release' not in resp['grobid']['metadata'] assert not 'tei_xml' in resp['grobid'] + assert resp['pdf_meta']['status'] == "success" + assert resp['pdf_meta']['pdf_extra']['page_count'] == 1 + assert resp['pdf_meta'].get('text') is None @responses.activate def test_ingest_landing(ingest_worker): -- cgit v1.2.3