diff options
author | Bryan Newbold <bnewbold@archive.org> | 2020-01-28 18:55:36 -0800 |
---|---|---|
committer | Bryan Newbold <bnewbold@archive.org> | 2020-01-28 18:55:39 -0800 |
commit | 08377ca3fdb7103ce0e0a98f7ae9e2baa39febf8 (patch) | |
tree | 7518d73712d36bda2e4c7fda9a3afe923e9c91de /python | |
parent | e0c2cc4b1a41b5de40c9e3adc9cba36d4dc93ed1 (diff) | |
download | sandcrawler-08377ca3fdb7103ce0e0a98f7ae9e2baa39febf8.tar.gz sandcrawler-08377ca3fdb7103ce0e0a98f7ae9e2baa39febf8.zip |
grobid worker: always set a key in response
We have key-based compaction enabled for the GROBID output topic. This
means it is an error to public to that topic without a key set.
Hopefully this change will end these errors, which look like:
KafkaError{code=INVALID_MSG,val=2,str="Broker: Invalid message"}
Diffstat (limited to 'python')
-rw-r--r-- | python/sandcrawler/grobid.py | 29 |
1 files changed, 25 insertions, 4 deletions
diff --git a/python/sandcrawler/grobid.py b/python/sandcrawler/grobid.py index 8c3aec1..bc886c2 100644 --- a/python/sandcrawler/grobid.py +++ b/python/sandcrawler/grobid.py @@ -88,6 +88,7 @@ class GrobidWorker(SandcrawlerWorker): self.consolidate_mode = 2 def process(self, record): + default_key = record['sha1hex'] if record.get('warc_path') and record.get('warc_offset'): # it's a full CDX dict. fetch using WaybackClient if not self.wayback_client: @@ -99,7 +100,12 @@ class GrobidWorker(SandcrawlerWorker): warc_path=record['warc_path'], ) except WaybackError as we: - return dict(status="error-wayback", error_msg=str(we), source=record) + return dict( + status="error-wayback", + error_msg=str(we), + source=record, + key=default_key, + ) elif record.get('url') and record.get('datetime'): # it's a partial CDX dict or something? fetch using WaybackClient if not self.wayback_client: @@ -110,7 +116,12 @@ class GrobidWorker(SandcrawlerWorker): datetime=record['datetime'], ) except WaybackError as we: - return dict(status="error-wayback", error_msg=str(we), source=record) + return dict( + status="error-wayback", + error_msg=str(we), + source=record, + key=default_key, + ) elif record.get('item') and record.get('path'): # it's petabox link; fetch via HTTP resp = requests.get("https://archive.org/serve/{}/{}".format( @@ -118,12 +129,22 @@ class GrobidWorker(SandcrawlerWorker): try: resp.raise_for_status() except Exception as e: - return dict(status="error-petabox", error_msg=str(e), source=record) + return dict( + status="error-petabox", + error_msg=str(e), + source=record, + key=default_key, + ) blob = resp.body else: raise ValueError("not a CDX (wayback) or petabox (archive.org) dict; not sure how to proceed") if not blob: - return dict(status="error", error_msg="empty blob", source=record) + return dict( + status="error", + error_msg="empty blob", + source=record, + key=default_key, + ) result = self.grobid_client.process_fulltext(blob, consolidate_mode=self.consolidate_mode) result['file_meta'] = gen_file_metadata(blob) result['source'] = record |