From d98577f9016466622593bedf2740ac28c3a2d606 Mon Sep 17 00:00:00 2001 From: Bryan Newbold Date: Wed, 13 Nov 2019 16:43:32 -0800 Subject: add basic sandcrawler worker (kafka) --- python/sandcrawler_worker.py | 74 ++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 74 insertions(+) create mode 100755 python/sandcrawler_worker.py (limited to 'python') diff --git a/python/sandcrawler_worker.py b/python/sandcrawler_worker.py new file mode 100755 index 0000000..1f02eea --- /dev/null +++ b/python/sandcrawler_worker.py @@ -0,0 +1,74 @@ +#!/usr/bin/env python3 + +""" +These are generally for continuously running workers that consume from Kafka. +Outputs might either be pushed back into Kafka, or directly into sandcrawler-db +or minio. +""" + +import sys +import argparse +import datetime +import raven + +from sandcrawler import * + +# Yep, a global. Gets DSN from `SENTRY_DSN` environment variable +sentry_client = raven.Client() + + +def run_grobid_extract(args): + consume_topic = "sandcrawler-{}.ungrobided-pg".format(args.env) + produce_topic = "sandcrawler-{}.grobid-output-pg".format(args.env) + sink = KafkaSink(kafka_hosts=args.kafka_hosts, produce_topic=produce_topic) + worker = GrobidWorker(host_url=args.grobid_host, sink=sink) + pusher = KafkaJsonPusher(sink=worker, group="grobid-extract") + pusher.run() + +def run_grobid_persist(args): + consume_topic = "sandcrawler-{}.grobid-output-pg".format(args.env) + sink = GrobidPersist(consume_topic=consume_topic) + pusher = KafkaJsonPusher(sink) + pusher.run() + +def run_ingest_file(args): + consume_topic = "sandcrawler-{}.ingest-file-requests".format(args.env) + produce_topic = "sandcrawler-{}.ingest-file-results".format(args.env) + sink = KafkaSink(kafka_hosts=args.kafka_hosts, produce_topic=produce_topic) + grobid_client = GrobidClient(host_url=args.grobid_host) + worker = IngestFileWorker(grobid_client=grobid_client, sink=sink) + pusher = KafkaJsonPusher(worker=worker, kafka_hosts=args.kafka_hosts, + consume_topic=consume_topic, group="grobid-extract") + pusher.run() + +def main(): + parser = argparse.ArgumentParser() + parser.add_argument('--kafka-hosts', + default="localhost:9092", + help="list of Kafka brokers (host/port) to use") + parser.add_argument('--env', + default="dev", + help="Kafka topic namespace to use (eg, prod, qa, dev)") + parser.add_argument('--grobid-host', + default="http://grobid.qa.fatcat.wiki", + help="GROBID API host/port") + subparsers = parser.add_subparsers() + + sub_grobid_extract = subparsers.add_parser('grobid-extract') + sub_grobid_extract.set_defaults(func=run_grobid_extract) + + sub_grobid_persist = subparsers.add_parser('grobid-persist') + sub_grobid_persist.set_defaults(func=run_grobid_persist) + + sub_ingest_file = subparsers.add_parser('ingest-file') + sub_ingest_file.set_defaults(func=run_ingest_file) + + args = parser.parse_args() + if not args.__dict__.get("func"): + print("tell me what to do!") + sys.exit(-1) + + args.func(args) + +if __name__ == '__main__': + main() -- cgit v1.2.3