From 08377ca3fdb7103ce0e0a98f7ae9e2baa39febf8 Mon Sep 17 00:00:00 2001 From: Bryan Newbold Date: Tue, 28 Jan 2020 18:55:36 -0800 Subject: grobid worker: always set a key in response We have key-based compaction enabled for the GROBID output topic. This means it is an error to public to that topic without a key set. Hopefully this change will end these errors, which look like: KafkaError{code=INVALID_MSG,val=2,str="Broker: Invalid message"} --- python/sandcrawler/grobid.py | 29 +++++++++++++++++++++++++---- 1 file changed, 25 insertions(+), 4 deletions(-) diff --git a/python/sandcrawler/grobid.py b/python/sandcrawler/grobid.py index 8c3aec1..bc886c2 100644 --- a/python/sandcrawler/grobid.py +++ b/python/sandcrawler/grobid.py @@ -88,6 +88,7 @@ class GrobidWorker(SandcrawlerWorker): self.consolidate_mode = 2 def process(self, record): + default_key = record['sha1hex'] if record.get('warc_path') and record.get('warc_offset'): # it's a full CDX dict. fetch using WaybackClient if not self.wayback_client: @@ -99,7 +100,12 @@ class GrobidWorker(SandcrawlerWorker): warc_path=record['warc_path'], ) except WaybackError as we: - return dict(status="error-wayback", error_msg=str(we), source=record) + return dict( + status="error-wayback", + error_msg=str(we), + source=record, + key=default_key, + ) elif record.get('url') and record.get('datetime'): # it's a partial CDX dict or something? fetch using WaybackClient if not self.wayback_client: @@ -110,7 +116,12 @@ class GrobidWorker(SandcrawlerWorker): datetime=record['datetime'], ) except WaybackError as we: - return dict(status="error-wayback", error_msg=str(we), source=record) + return dict( + status="error-wayback", + error_msg=str(we), + source=record, + key=default_key, + ) elif record.get('item') and record.get('path'): # it's petabox link; fetch via HTTP resp = requests.get("https://archive.org/serve/{}/{}".format( @@ -118,12 +129,22 @@ class GrobidWorker(SandcrawlerWorker): try: resp.raise_for_status() except Exception as e: - return dict(status="error-petabox", error_msg=str(e), source=record) + return dict( + status="error-petabox", + error_msg=str(e), + source=record, + key=default_key, + ) blob = resp.body else: raise ValueError("not a CDX (wayback) or petabox (archive.org) dict; not sure how to proceed") if not blob: - return dict(status="error", error_msg="empty blob", source=record) + return dict( + status="error", + error_msg="empty blob", + source=record, + key=default_key, + ) result = self.grobid_client.process_fulltext(blob, consolidate_mode=self.consolidate_mode) result['file_meta'] = gen_file_metadata(blob) result['source'] = record -- cgit v1.2.3