diff options
Diffstat (limited to 'python/sandcrawler_worker.py')
-rwxr-xr-x | python/sandcrawler_worker.py | 20 |
1 files changed, 15 insertions, 5 deletions
diff --git a/python/sandcrawler_worker.py b/python/sandcrawler_worker.py index f314218..895a5b9 100755 --- a/python/sandcrawler_worker.py +++ b/python/sandcrawler_worker.py @@ -30,11 +30,10 @@ def run_grobid_extract(args): def run_grobid_persist(args): consume_topic = "sandcrawler-{}.grobid-output-pg".format(args.env) - raise NotImplementedError - #worker = GrobidPersistWorker() - #pusher = KafkaJsonPusher(worker=worker, kafka_hosts=args.kafka_hosts, - # consume_topic=consume_topic, group="grobid-persist") - #pusher.run() + worker = PersistGrobidWorker() + pusher = KafkaJsonPusher(worker=worker, kafka_hosts=args.kafka_hosts, + consume_topic=consume_topic, group="grobid-persist") + pusher.run() def run_ingest_file(args): consume_topic = "sandcrawler-{}.ingest-file-requests".format(args.env) @@ -46,6 +45,13 @@ def run_ingest_file(args): consume_topic=consume_topic, group="ingest-file", batch_size=1) pusher.run() +def run_ingest_file_persist(args): + consume_topic = "sandcrawler-{}.ingest-file-results".format(args.env) + worker = PersistIngestFileResultWorker() + pusher = KafkaJsonPusher(worker=worker, kafka_hosts=args.kafka_hosts, + consume_topic=consume_topic, group="ingest-persist") + pusher.run() + def main(): parser = argparse.ArgumentParser( formatter_class=argparse.ArgumentDefaultsHelpFormatter) @@ -72,6 +78,10 @@ def main(): help="daemon that consumes requests from Kafka, ingests, pushes results to Kafka") sub_ingest_file.set_defaults(func=run_ingest_file) + sub_ingest_file_persist = subparsers.add_parser('ingest-file-persist', + help="daemon that consumes ingest-file output from Kafka and pushes to postgres") + sub_ingest_file_persist.set_defaults(func=run_ingest_file_persist) + args = parser.parse_args() if not args.__dict__.get("func"): print("tell me what to do!") |