diff options
-rwxr-xr-x | python/sandcrawler_worker.py | 20 |
1 files changed, 16 insertions, 4 deletions
diff --git a/python/sandcrawler_worker.py b/python/sandcrawler_worker.py index 833b9c4..77c0704 100755 --- a/python/sandcrawler_worker.py +++ b/python/sandcrawler_worker.py @@ -51,11 +51,11 @@ def run_grobid_extract(args): def run_pdf_extract(args): consume_topic = "sandcrawler-{}.unextracted".format(args.env) - text_topic = "sandcrawler-{}.pdf-text".format(args.env) + pdftext_topic = "sandcrawler-{}.pdf-text".format(args.env) thumbnail_topic = "sandcrawler-{}.pdf-thumbnail-180px-jpg".format(args.env) - text_sink = KafkaCompressSink( + pdftext_sink = KafkaCompressSink( kafka_hosts=args.kafka_hosts, - produce_topic=text_topic, + produce_topic=pdftext_topic, ) thumbnail_sink = KafkaSink( kafka_hosts=args.kafka_hosts, @@ -66,7 +66,7 @@ def run_pdf_extract(args): ) worker = PdfExtractWorker( wayback_client=wayback_client, - sink=text_sink, + sink=pdftext_sink, thumbnail_sink=thumbnail_sink, ) pusher = KafkaJsonPusher( @@ -172,6 +172,8 @@ def run_ingest_file(args): consume_topic = "sandcrawler-{}.ingest-file-requests".format(args.env) produce_topic = "sandcrawler-{}.ingest-file-results".format(args.env) grobid_topic = "sandcrawler-{}.grobid-output-pg".format(args.env) + pdftext_topic = "sandcrawler-{}.pdf-text".format(args.env) + thumbnail_topic = "sandcrawler-{}.pdf-thumbnail-180px-jpg".format(args.env) sink = KafkaSink( kafka_hosts=args.kafka_hosts, produce_topic=produce_topic, @@ -183,10 +185,20 @@ def run_ingest_file(args): grobid_client = GrobidClient( host_url=args.grobid_host, ) + pdftext_sink = KafkaCompressSink( + kafka_hosts=args.kafka_hosts, + produce_topic=pdftext_topic, + ) + thumbnail_sink = KafkaSink( + kafka_hosts=args.kafka_hosts, + produce_topic=thumbnail_topic, + ) worker = IngestFileWorker( grobid_client=grobid_client, sink=sink, grobid_sink=grobid_sink, + thumbnail_sink=thumbnail_sink, + pdftext_sink=pdftext_sink, # don't SPNv2 for --bulk backfill try_spn2=not args.bulk, ) |