diff options
| author | Bryan Newbold <bnewbold@archive.org> | 2020-01-28 18:55:36 -0800 | 
|---|---|---|
| committer | Bryan Newbold <bnewbold@archive.org> | 2020-01-28 18:55:39 -0800 | 
| commit | 08377ca3fdb7103ce0e0a98f7ae9e2baa39febf8 (patch) | |
| tree | 7518d73712d36bda2e4c7fda9a3afe923e9c91de /python | |
| parent | e0c2cc4b1a41b5de40c9e3adc9cba36d4dc93ed1 (diff) | |
| download | sandcrawler-08377ca3fdb7103ce0e0a98f7ae9e2baa39febf8.tar.gz sandcrawler-08377ca3fdb7103ce0e0a98f7ae9e2baa39febf8.zip | |
grobid worker: always set a key in response
We have key-based compaction enabled for the GROBID output topic. This
means it is an error to public to that topic without a key set.
Hopefully this change will end these errors, which look like:
  KafkaError{code=INVALID_MSG,val=2,str="Broker: Invalid message"}
Diffstat (limited to 'python')
| -rw-r--r-- | python/sandcrawler/grobid.py | 29 | 
1 files changed, 25 insertions, 4 deletions
| diff --git a/python/sandcrawler/grobid.py b/python/sandcrawler/grobid.py index 8c3aec1..bc886c2 100644 --- a/python/sandcrawler/grobid.py +++ b/python/sandcrawler/grobid.py @@ -88,6 +88,7 @@ class GrobidWorker(SandcrawlerWorker):          self.consolidate_mode = 2      def process(self, record): +        default_key = record['sha1hex']          if record.get('warc_path') and record.get('warc_offset'):              # it's a full CDX dict. fetch using WaybackClient              if not self.wayback_client: @@ -99,7 +100,12 @@ class GrobidWorker(SandcrawlerWorker):                      warc_path=record['warc_path'],                  )              except WaybackError as we: -                return dict(status="error-wayback", error_msg=str(we), source=record) +                return dict( +                    status="error-wayback", +                    error_msg=str(we), +                    source=record, +                    key=default_key, +                )          elif record.get('url') and record.get('datetime'):              # it's a partial CDX dict or something? fetch using WaybackClient              if not self.wayback_client: @@ -110,7 +116,12 @@ class GrobidWorker(SandcrawlerWorker):                      datetime=record['datetime'],                  )              except WaybackError as we: -                return dict(status="error-wayback", error_msg=str(we), source=record) +                return dict( +                    status="error-wayback", +                    error_msg=str(we), +                    source=record, +                    key=default_key, +                )          elif record.get('item') and record.get('path'):              # it's petabox link; fetch via HTTP              resp = requests.get("https://archive.org/serve/{}/{}".format( @@ -118,12 +129,22 @@ class GrobidWorker(SandcrawlerWorker):              try:                  resp.raise_for_status()              except Exception as e: -                return dict(status="error-petabox", error_msg=str(e), source=record) +                return dict( +                    status="error-petabox", +                    error_msg=str(e), +                    source=record, +                    key=default_key, +                )              blob = resp.body          else:              raise ValueError("not a CDX (wayback) or petabox (archive.org) dict; not sure how to proceed")          if not blob: -            return dict(status="error", error_msg="empty blob", source=record) +            return dict( +                status="error", +                error_msg="empty blob", +                source=record, +                key=default_key, +            )          result = self.grobid_client.process_fulltext(blob, consolidate_mode=self.consolidate_mode)          result['file_meta'] = gen_file_metadata(blob)          result['source'] = record | 
