aboutsummaryrefslogtreecommitdiffstats
path: root/python/sandcrawler_worker.py
diff options
context:
space:
mode:
authorBryan Newbold <bnewbold@archive.org>2019-12-24 16:39:14 -0800
committerBryan Newbold <bnewbold@archive.org>2020-01-02 18:12:58 -0800
commit460843e31ebea16fcb543b8448365cfe004103b0 (patch)
treef5576149afa91b6807decb048e2d5ba461a2a6da /python/sandcrawler_worker.py
parent4a3eade72ea557eb5e04c59c454887d20c718314 (diff)
downloadsandcrawler-460843e31ebea16fcb543b8448365cfe004103b0.tar.gz
sandcrawler-460843e31ebea16fcb543b8448365cfe004103b0.zip
start work on persist workers and tool
Diffstat (limited to 'python/sandcrawler_worker.py')
-rwxr-xr-xpython/sandcrawler_worker.py20
1 files changed, 15 insertions, 5 deletions
diff --git a/python/sandcrawler_worker.py b/python/sandcrawler_worker.py
index f314218..895a5b9 100755
--- a/python/sandcrawler_worker.py
+++ b/python/sandcrawler_worker.py
@@ -30,11 +30,10 @@ def run_grobid_extract(args):
def run_grobid_persist(args):
consume_topic = "sandcrawler-{}.grobid-output-pg".format(args.env)
- raise NotImplementedError
- #worker = GrobidPersistWorker()
- #pusher = KafkaJsonPusher(worker=worker, kafka_hosts=args.kafka_hosts,
- # consume_topic=consume_topic, group="grobid-persist")
- #pusher.run()
+ worker = PersistGrobidWorker()
+ pusher = KafkaJsonPusher(worker=worker, kafka_hosts=args.kafka_hosts,
+ consume_topic=consume_topic, group="grobid-persist")
+ pusher.run()
def run_ingest_file(args):
consume_topic = "sandcrawler-{}.ingest-file-requests".format(args.env)
@@ -46,6 +45,13 @@ def run_ingest_file(args):
consume_topic=consume_topic, group="ingest-file", batch_size=1)
pusher.run()
+def run_ingest_file_persist(args):
+ consume_topic = "sandcrawler-{}.ingest-file-results".format(args.env)
+ worker = PersistIngestFileResultWorker()
+ pusher = KafkaJsonPusher(worker=worker, kafka_hosts=args.kafka_hosts,
+ consume_topic=consume_topic, group="ingest-persist")
+ pusher.run()
+
def main():
parser = argparse.ArgumentParser(
formatter_class=argparse.ArgumentDefaultsHelpFormatter)
@@ -72,6 +78,10 @@ def main():
help="daemon that consumes requests from Kafka, ingests, pushes results to Kafka")
sub_ingest_file.set_defaults(func=run_ingest_file)
+ sub_ingest_file_persist = subparsers.add_parser('ingest-file-persist',
+ help="daemon that consumes ingest-file output from Kafka and pushes to postgres")
+ sub_ingest_file_persist.set_defaults(func=run_ingest_file_persist)
+
args = parser.parse_args()
if not args.__dict__.get("func"):
print("tell me what to do!")