aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorBryan Newbold <bnewbold@archive.org>2020-01-14 17:03:16 -0800
committerBryan Newbold <bnewbold@archive.org>2020-01-14 17:03:16 -0800
commitcc536eaf01c3a58df292b5917d2f11b9cd8a3cf3 (patch)
treedc4ecfb7c7228d64d0ef56d93657f433abfd821c
parentab01652b9d4b5542a973f591031b54cdcfd4701f (diff)
downloadsandcrawler-cc536eaf01c3a58df292b5917d2f11b9cd8a3cf3.tar.gz
sandcrawler-cc536eaf01c3a58df292b5917d2f11b9cd8a3cf3.zip
ingest: check existing GROBID; also push results to sink
-rw-r--r--python/sandcrawler/ingest.py26
1 files changed, 22 insertions, 4 deletions
diff --git a/python/sandcrawler/ingest.py b/python/sandcrawler/ingest.py
index bcb6608..bb5f3fc 100644
--- a/python/sandcrawler/ingest.py
+++ b/python/sandcrawler/ingest.py
@@ -11,6 +11,7 @@ from sandcrawler.grobid import GrobidClient
from sandcrawler.misc import gen_file_metadata
from sandcrawler.html import extract_fulltext_url
from sandcrawler.workers import SandcrawlerWorker
+from sandcrawler.db import SandcrawlerPostgrestClient
class IngestFileWorker(SandcrawlerWorker):
@@ -53,8 +54,13 @@ class IngestFileWorker(SandcrawlerWorker):
self.grobid_client = kwargs.get('grobid_client')
if not self.grobid_client:
self.grobid_client = GrobidClient()
+ self.pgrest_client = kwargs.get('pgrest_client')
+ if not self.pgrest_client:
+ self.pgrest_client = SandcrawlerPostgrestClient()
+ self.grobid_sink = kwargs.get('grobid_sink')
self.try_existing_ingest = False
+ self.try_existing_grobid = True
self.try_wayback = True
self.try_spn2 = True
@@ -128,24 +134,36 @@ class IngestFileWorker(SandcrawlerWorker):
Run all the necessary processing for a new/fresh ingest hit.
"""
return {
- 'grobid': self.process_grobid(resource),
+ 'grobid': self.process_grobid(resource, file_meta),
}
- def process_grobid(self, resource):
+ def process_grobid(self, resource, file_meta):
"""
Submits to resource body to GROBID for processing.
TODO: By default checks sandcrawler-db for an existing row first, then
decide if we should re-process
-
- TODO: Code to push to Kafka might also go here?
"""
+ if self.try_existing_grobid:
+ existing = self.pgrest_client.get_grobid(file_meta['sha1hex'])
+ if existing:
+ print("found existing GROBID result", file=sys.stderr)
+ return existing
+
+ # Need to actually processes
result = self.grobid_client.process_fulltext(resource.body)
+ if self.grobid_sink:
+ # extra fields for GROBID kafka messages
+ result['file_meta'] = file_meta
+ result['key'] = result['file_meta']['sha1hex']
+ self.grobid_sink.push_record(result.copy())
if result['status'] == "success":
metadata = self.grobid_client.metadata(result)
if metadata:
result.update(metadata)
result.pop('tei_xml', None)
+ result.pop('file_meta', None)
+ result.pop('key', None)
return result
def process(self, request):