From cc536eaf01c3a58df292b5917d2f11b9cd8a3cf3 Mon Sep 17 00:00:00 2001 From: Bryan Newbold Date: Tue, 14 Jan 2020 17:03:16 -0800 Subject: ingest: check existing GROBID; also push results to sink --- python/sandcrawler/ingest.py | 26 ++++++++++++++++++++++---- 1 file changed, 22 insertions(+), 4 deletions(-) diff --git a/python/sandcrawler/ingest.py b/python/sandcrawler/ingest.py index bcb6608..bb5f3fc 100644 --- a/python/sandcrawler/ingest.py +++ b/python/sandcrawler/ingest.py @@ -11,6 +11,7 @@ from sandcrawler.grobid import GrobidClient from sandcrawler.misc import gen_file_metadata from sandcrawler.html import extract_fulltext_url from sandcrawler.workers import SandcrawlerWorker +from sandcrawler.db import SandcrawlerPostgrestClient class IngestFileWorker(SandcrawlerWorker): @@ -53,8 +54,13 @@ class IngestFileWorker(SandcrawlerWorker): self.grobid_client = kwargs.get('grobid_client') if not self.grobid_client: self.grobid_client = GrobidClient() + self.pgrest_client = kwargs.get('pgrest_client') + if not self.pgrest_client: + self.pgrest_client = SandcrawlerPostgrestClient() + self.grobid_sink = kwargs.get('grobid_sink') self.try_existing_ingest = False + self.try_existing_grobid = True self.try_wayback = True self.try_spn2 = True @@ -128,24 +134,36 @@ class IngestFileWorker(SandcrawlerWorker): Run all the necessary processing for a new/fresh ingest hit. """ return { - 'grobid': self.process_grobid(resource), + 'grobid': self.process_grobid(resource, file_meta), } - def process_grobid(self, resource): + def process_grobid(self, resource, file_meta): """ Submits to resource body to GROBID for processing. TODO: By default checks sandcrawler-db for an existing row first, then decide if we should re-process - - TODO: Code to push to Kafka might also go here? """ + if self.try_existing_grobid: + existing = self.pgrest_client.get_grobid(file_meta['sha1hex']) + if existing: + print("found existing GROBID result", file=sys.stderr) + return existing + + # Need to actually processes result = self.grobid_client.process_fulltext(resource.body) + if self.grobid_sink: + # extra fields for GROBID kafka messages + result['file_meta'] = file_meta + result['key'] = result['file_meta']['sha1hex'] + self.grobid_sink.push_record(result.copy()) if result['status'] == "success": metadata = self.grobid_client.metadata(result) if metadata: result.update(metadata) result.pop('tei_xml', None) + result.pop('file_meta', None) + result.pop('key', None) return result def process(self, request): -- cgit v1.2.3