diff options
author | Bryan Newbold <bnewbold@archive.org> | 2020-11-03 19:27:03 -0800 |
---|---|---|
committer | Bryan Newbold <bnewbold@archive.org> | 2020-11-03 19:27:31 -0800 |
commit | 2885b34ab3e4c862f9e895a237108d42793efb1d (patch) | |
tree | 2a0c197427d39e9809bd120df50afb06e707b9b0 /python/sandcrawler | |
parent | 3adcaf9802928346dda597cefd4b66b2e62fa942 (diff) | |
download | sandcrawler-2885b34ab3e4c862f9e895a237108d42793efb1d.tar.gz sandcrawler-2885b34ab3e4c862f9e895a237108d42793efb1d.zip |
ingest: handle publishing XML docs to kafka
Diffstat (limited to 'python/sandcrawler')
-rw-r--r-- | python/sandcrawler/ingest.py | 24 |
1 files changed, 21 insertions, 3 deletions
diff --git a/python/sandcrawler/ingest.py b/python/sandcrawler/ingest.py index 7ad0124..1a42b6a 100644 --- a/python/sandcrawler/ingest.py +++ b/python/sandcrawler/ingest.py @@ -66,6 +66,7 @@ class IngestFileWorker(SandcrawlerWorker): self.grobid_sink = kwargs.get('grobid_sink') self.thumbnail_sink = kwargs.get('thumbnail_sink') self.pdftext_sink = kwargs.get('pdftext_sink') + self.xmldoc_sink = kwargs.get('xmldoc_sink') self.max_hops = 6 self.try_existing_ingest = kwargs.get('try_existing_ingest', False) @@ -242,8 +243,9 @@ class IngestFileWorker(SandcrawlerWorker): 'pdf_meta': self.process_pdfextract(resource, file_meta), } elif ingest_type == "xml": - # TODO - return {} + return { + 'xml_meta': self.process_xml(resource, file_meta), + } else: raise NotImplementedError(f"process {ingest_type} hit") @@ -300,12 +302,28 @@ class IngestFileWorker(SandcrawlerWorker): if self.thumbnail_sink and result.page0_thumbnail is not None: self.thumbnail_sink.push_record(result.page0_thumbnail, key=result.sha1hex) if self.pdftext_sink: - self.pdftext_sink.push_record(result.to_pdftext_dict()) + self.pdftext_sink.push_record(result.to_pdftext_dict(), key=result.sha1hex) result.page0_thumbnail = None result.text = None result.file_meta = None return result.to_pdftext_dict() + def process_xml(self, resource: ResourceResult, file_meta: dict) -> dict: + """ + Simply publishes to Kafka topic. + + In the future, could extract other metadata here (like body word + count), or attempting to fetch sub-resources. + """ + if self.xmldoc_sink and file_meta['mimetype'] == "application/jats+xml": + msg = dict( + sha1hex=file_meta["sha1hex"], + status="success", + jats_xml=resource.body.encode('utf-8'), + ) + self.xmldoc_sink.push_record(msg, key=file_meta['sha1hex']) + return dict(status="success") + def timeout_response(self, task: dict) -> dict: print("[TIMEOUT]", file=sys.stderr) return dict( |