aboutsummaryrefslogtreecommitdiffstats
path: root/python
diff options
context:
space:
mode:
authorBryan Newbold <bnewbold@archive.org>2020-11-03 19:27:03 -0800
committerBryan Newbold <bnewbold@archive.org>2020-11-03 19:27:31 -0800
commit2885b34ab3e4c862f9e895a237108d42793efb1d (patch)
tree2a0c197427d39e9809bd120df50afb06e707b9b0 /python
parent3adcaf9802928346dda597cefd4b66b2e62fa942 (diff)
downloadsandcrawler-2885b34ab3e4c862f9e895a237108d42793efb1d.tar.gz
sandcrawler-2885b34ab3e4c862f9e895a237108d42793efb1d.zip
ingest: handle publishing XML docs to kafka
Diffstat (limited to 'python')
-rw-r--r--python/sandcrawler/ingest.py24
-rwxr-xr-xpython/sandcrawler_worker.py6
2 files changed, 27 insertions, 3 deletions
diff --git a/python/sandcrawler/ingest.py b/python/sandcrawler/ingest.py
index 7ad0124..1a42b6a 100644
--- a/python/sandcrawler/ingest.py
+++ b/python/sandcrawler/ingest.py
@@ -66,6 +66,7 @@ class IngestFileWorker(SandcrawlerWorker):
self.grobid_sink = kwargs.get('grobid_sink')
self.thumbnail_sink = kwargs.get('thumbnail_sink')
self.pdftext_sink = kwargs.get('pdftext_sink')
+ self.xmldoc_sink = kwargs.get('xmldoc_sink')
self.max_hops = 6
self.try_existing_ingest = kwargs.get('try_existing_ingest', False)
@@ -242,8 +243,9 @@ class IngestFileWorker(SandcrawlerWorker):
'pdf_meta': self.process_pdfextract(resource, file_meta),
}
elif ingest_type == "xml":
- # TODO
- return {}
+ return {
+ 'xml_meta': self.process_xml(resource, file_meta),
+ }
else:
raise NotImplementedError(f"process {ingest_type} hit")
@@ -300,12 +302,28 @@ class IngestFileWorker(SandcrawlerWorker):
if self.thumbnail_sink and result.page0_thumbnail is not None:
self.thumbnail_sink.push_record(result.page0_thumbnail, key=result.sha1hex)
if self.pdftext_sink:
- self.pdftext_sink.push_record(result.to_pdftext_dict())
+ self.pdftext_sink.push_record(result.to_pdftext_dict(), key=result.sha1hex)
result.page0_thumbnail = None
result.text = None
result.file_meta = None
return result.to_pdftext_dict()
+ def process_xml(self, resource: ResourceResult, file_meta: dict) -> dict:
+ """
+ Simply publishes to Kafka topic.
+
+ In the future, could extract other metadata here (like body word
+ count), or attempting to fetch sub-resources.
+ """
+ if self.xmldoc_sink and file_meta['mimetype'] == "application/jats+xml":
+ msg = dict(
+ sha1hex=file_meta["sha1hex"],
+ status="success",
+ jats_xml=resource.body.encode('utf-8'),
+ )
+ self.xmldoc_sink.push_record(msg, key=file_meta['sha1hex'])
+ return dict(status="success")
+
def timeout_response(self, task: dict) -> dict:
print("[TIMEOUT]", file=sys.stderr)
return dict(
diff --git a/python/sandcrawler_worker.py b/python/sandcrawler_worker.py
index 537398e..b62fa80 100755
--- a/python/sandcrawler_worker.py
+++ b/python/sandcrawler_worker.py
@@ -174,6 +174,7 @@ def run_ingest_file(args):
grobid_topic = "sandcrawler-{}.grobid-output-pg".format(args.env)
pdftext_topic = "sandcrawler-{}.pdf-text".format(args.env)
thumbnail_topic = "sandcrawler-{}.pdf-thumbnail-180px-jpg".format(args.env)
+ xmldoc_topic = "sandcrawler-{}.xml-doc".format(args.env)
sink = KafkaSink(
kafka_hosts=args.kafka_hosts,
produce_topic=produce_topic,
@@ -193,12 +194,17 @@ def run_ingest_file(args):
kafka_hosts=args.kafka_hosts,
produce_topic=thumbnail_topic,
)
+ xmldoc_sink = KafkaSink(
+ kafka_hosts=args.kafka_hosts,
+ produce_topic=xmldoc_topic,
+ )
worker = IngestFileWorker(
grobid_client=grobid_client,
sink=sink,
grobid_sink=grobid_sink,
thumbnail_sink=thumbnail_sink,
pdftext_sink=pdftext_sink,
+ xmldoc_sink=xmldoc_sink,
# don't SPNv2 for --bulk backfill
try_spn2=not args.bulk,
)