aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorBryan Newbold <bnewbold@archive.org>2020-01-28 18:55:36 -0800
committerBryan Newbold <bnewbold@archive.org>2020-01-28 18:55:39 -0800
commit08377ca3fdb7103ce0e0a98f7ae9e2baa39febf8 (patch)
tree7518d73712d36bda2e4c7fda9a3afe923e9c91de
parente0c2cc4b1a41b5de40c9e3adc9cba36d4dc93ed1 (diff)
downloadsandcrawler-08377ca3fdb7103ce0e0a98f7ae9e2baa39febf8.tar.gz
sandcrawler-08377ca3fdb7103ce0e0a98f7ae9e2baa39febf8.zip
grobid worker: always set a key in response
We have key-based compaction enabled for the GROBID output topic. This means it is an error to public to that topic without a key set. Hopefully this change will end these errors, which look like: KafkaError{code=INVALID_MSG,val=2,str="Broker: Invalid message"}
-rw-r--r--python/sandcrawler/grobid.py29
1 files changed, 25 insertions, 4 deletions
diff --git a/python/sandcrawler/grobid.py b/python/sandcrawler/grobid.py
index 8c3aec1..bc886c2 100644
--- a/python/sandcrawler/grobid.py
+++ b/python/sandcrawler/grobid.py
@@ -88,6 +88,7 @@ class GrobidWorker(SandcrawlerWorker):
self.consolidate_mode = 2
def process(self, record):
+ default_key = record['sha1hex']
if record.get('warc_path') and record.get('warc_offset'):
# it's a full CDX dict. fetch using WaybackClient
if not self.wayback_client:
@@ -99,7 +100,12 @@ class GrobidWorker(SandcrawlerWorker):
warc_path=record['warc_path'],
)
except WaybackError as we:
- return dict(status="error-wayback", error_msg=str(we), source=record)
+ return dict(
+ status="error-wayback",
+ error_msg=str(we),
+ source=record,
+ key=default_key,
+ )
elif record.get('url') and record.get('datetime'):
# it's a partial CDX dict or something? fetch using WaybackClient
if not self.wayback_client:
@@ -110,7 +116,12 @@ class GrobidWorker(SandcrawlerWorker):
datetime=record['datetime'],
)
except WaybackError as we:
- return dict(status="error-wayback", error_msg=str(we), source=record)
+ return dict(
+ status="error-wayback",
+ error_msg=str(we),
+ source=record,
+ key=default_key,
+ )
elif record.get('item') and record.get('path'):
# it's petabox link; fetch via HTTP
resp = requests.get("https://archive.org/serve/{}/{}".format(
@@ -118,12 +129,22 @@ class GrobidWorker(SandcrawlerWorker):
try:
resp.raise_for_status()
except Exception as e:
- return dict(status="error-petabox", error_msg=str(e), source=record)
+ return dict(
+ status="error-petabox",
+ error_msg=str(e),
+ source=record,
+ key=default_key,
+ )
blob = resp.body
else:
raise ValueError("not a CDX (wayback) or petabox (archive.org) dict; not sure how to proceed")
if not blob:
- return dict(status="error", error_msg="empty blob", source=record)
+ return dict(
+ status="error",
+ error_msg="empty blob",
+ source=record,
+ key=default_key,
+ )
result = self.grobid_client.process_fulltext(blob, consolidate_mode=self.consolidate_mode)
result['file_meta'] = gen_file_metadata(blob)
result['source'] = record