diff options
author | Bryan Newbold <bnewbold@archive.org> | 2020-01-14 17:03:16 -0800 |
---|---|---|
committer | Bryan Newbold <bnewbold@archive.org> | 2020-01-14 17:03:16 -0800 |
commit | cc536eaf01c3a58df292b5917d2f11b9cd8a3cf3 (patch) | |
tree | dc4ecfb7c7228d64d0ef56d93657f433abfd821c /python | |
parent | ab01652b9d4b5542a973f591031b54cdcfd4701f (diff) | |
download | sandcrawler-cc536eaf01c3a58df292b5917d2f11b9cd8a3cf3.tar.gz sandcrawler-cc536eaf01c3a58df292b5917d2f11b9cd8a3cf3.zip |
ingest: check existing GROBID; also push results to sink
Diffstat (limited to 'python')
-rw-r--r-- | python/sandcrawler/ingest.py | 26 |
1 files changed, 22 insertions, 4 deletions
diff --git a/python/sandcrawler/ingest.py b/python/sandcrawler/ingest.py index bcb6608..bb5f3fc 100644 --- a/python/sandcrawler/ingest.py +++ b/python/sandcrawler/ingest.py @@ -11,6 +11,7 @@ from sandcrawler.grobid import GrobidClient from sandcrawler.misc import gen_file_metadata from sandcrawler.html import extract_fulltext_url from sandcrawler.workers import SandcrawlerWorker +from sandcrawler.db import SandcrawlerPostgrestClient class IngestFileWorker(SandcrawlerWorker): @@ -53,8 +54,13 @@ class IngestFileWorker(SandcrawlerWorker): self.grobid_client = kwargs.get('grobid_client') if not self.grobid_client: self.grobid_client = GrobidClient() + self.pgrest_client = kwargs.get('pgrest_client') + if not self.pgrest_client: + self.pgrest_client = SandcrawlerPostgrestClient() + self.grobid_sink = kwargs.get('grobid_sink') self.try_existing_ingest = False + self.try_existing_grobid = True self.try_wayback = True self.try_spn2 = True @@ -128,24 +134,36 @@ class IngestFileWorker(SandcrawlerWorker): Run all the necessary processing for a new/fresh ingest hit. """ return { - 'grobid': self.process_grobid(resource), + 'grobid': self.process_grobid(resource, file_meta), } - def process_grobid(self, resource): + def process_grobid(self, resource, file_meta): """ Submits to resource body to GROBID for processing. TODO: By default checks sandcrawler-db for an existing row first, then decide if we should re-process - - TODO: Code to push to Kafka might also go here? """ + if self.try_existing_grobid: + existing = self.pgrest_client.get_grobid(file_meta['sha1hex']) + if existing: + print("found existing GROBID result", file=sys.stderr) + return existing + + # Need to actually processes result = self.grobid_client.process_fulltext(resource.body) + if self.grobid_sink: + # extra fields for GROBID kafka messages + result['file_meta'] = file_meta + result['key'] = result['file_meta']['sha1hex'] + self.grobid_sink.push_record(result.copy()) if result['status'] == "success": metadata = self.grobid_client.metadata(result) if metadata: result.update(metadata) result.pop('tei_xml', None) + result.pop('file_meta', None) + result.pop('key', None) return result def process(self, request): |