From 2885b34ab3e4c862f9e895a237108d42793efb1d Mon Sep 17 00:00:00 2001 From: Bryan Newbold Date: Tue, 3 Nov 2020 19:27:03 -0800 Subject: ingest: handle publishing XML docs to kafka --- python/sandcrawler/ingest.py | 24 +++++++++++++++++++++--- python/sandcrawler_worker.py | 6 ++++++ 2 files changed, 27 insertions(+), 3 deletions(-) diff --git a/python/sandcrawler/ingest.py b/python/sandcrawler/ingest.py index 7ad0124..1a42b6a 100644 --- a/python/sandcrawler/ingest.py +++ b/python/sandcrawler/ingest.py @@ -66,6 +66,7 @@ class IngestFileWorker(SandcrawlerWorker): self.grobid_sink = kwargs.get('grobid_sink') self.thumbnail_sink = kwargs.get('thumbnail_sink') self.pdftext_sink = kwargs.get('pdftext_sink') + self.xmldoc_sink = kwargs.get('xmldoc_sink') self.max_hops = 6 self.try_existing_ingest = kwargs.get('try_existing_ingest', False) @@ -242,8 +243,9 @@ class IngestFileWorker(SandcrawlerWorker): 'pdf_meta': self.process_pdfextract(resource, file_meta), } elif ingest_type == "xml": - # TODO - return {} + return { + 'xml_meta': self.process_xml(resource, file_meta), + } else: raise NotImplementedError(f"process {ingest_type} hit") @@ -300,12 +302,28 @@ class IngestFileWorker(SandcrawlerWorker): if self.thumbnail_sink and result.page0_thumbnail is not None: self.thumbnail_sink.push_record(result.page0_thumbnail, key=result.sha1hex) if self.pdftext_sink: - self.pdftext_sink.push_record(result.to_pdftext_dict()) + self.pdftext_sink.push_record(result.to_pdftext_dict(), key=result.sha1hex) result.page0_thumbnail = None result.text = None result.file_meta = None return result.to_pdftext_dict() + def process_xml(self, resource: ResourceResult, file_meta: dict) -> dict: + """ + Simply publishes to Kafka topic. + + In the future, could extract other metadata here (like body word + count), or attempting to fetch sub-resources. + """ + if self.xmldoc_sink and file_meta['mimetype'] == "application/jats+xml": + msg = dict( + sha1hex=file_meta["sha1hex"], + status="success", + jats_xml=resource.body.encode('utf-8'), + ) + self.xmldoc_sink.push_record(msg, key=file_meta['sha1hex']) + return dict(status="success") + def timeout_response(self, task: dict) -> dict: print("[TIMEOUT]", file=sys.stderr) return dict( diff --git a/python/sandcrawler_worker.py b/python/sandcrawler_worker.py index 537398e..b62fa80 100755 --- a/python/sandcrawler_worker.py +++ b/python/sandcrawler_worker.py @@ -174,6 +174,7 @@ def run_ingest_file(args): grobid_topic = "sandcrawler-{}.grobid-output-pg".format(args.env) pdftext_topic = "sandcrawler-{}.pdf-text".format(args.env) thumbnail_topic = "sandcrawler-{}.pdf-thumbnail-180px-jpg".format(args.env) + xmldoc_topic = "sandcrawler-{}.xml-doc".format(args.env) sink = KafkaSink( kafka_hosts=args.kafka_hosts, produce_topic=produce_topic, @@ -193,12 +194,17 @@ def run_ingest_file(args): kafka_hosts=args.kafka_hosts, produce_topic=thumbnail_topic, ) + xmldoc_sink = KafkaSink( + kafka_hosts=args.kafka_hosts, + produce_topic=xmldoc_topic, + ) worker = IngestFileWorker( grobid_client=grobid_client, sink=sink, grobid_sink=grobid_sink, thumbnail_sink=thumbnail_sink, pdftext_sink=pdftext_sink, + xmldoc_sink=xmldoc_sink, # don't SPNv2 for --bulk backfill try_spn2=not args.bulk, ) -- cgit v1.2.3