From 9bd0860aded708c9ba86bf99eed57af38772c10a Mon Sep 17 00:00:00 2001 From: Bryan Newbold Date: Tue, 28 Apr 2020 15:56:01 -0700 Subject: NSQ for job task manager/scheduler --- proposals/20200211_nsq.md | 79 +++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 79 insertions(+) create mode 100644 proposals/20200211_nsq.md diff --git a/proposals/20200211_nsq.md b/proposals/20200211_nsq.md new file mode 100644 index 0000000..6aa885b --- /dev/null +++ b/proposals/20200211_nsq.md @@ -0,0 +1,79 @@ + +status: planned + +In short, Kafka is not working well as a job task scheduler, and I want to try +NSQ as a medium-term solution to that problem. + + +## Motivation + +Thinking of setting up NSQ to use for scheduling distributed work, to replace +kafka for some topics. for example, "regrobid" requests where we enqueue +millions of, basically, CDX lines, and want to process on dozens of cores or +multiple machines. or file ingest backfill. results would still go to kafka (to +persist), and pipelines like DOI harvest -> import -> elasticsearch would still +be kafka + +The pain point with kafka is having dozens of workers on tasks that take more +than a couple seconds per task. we could keep tweaking kafka and writing weird +consumer group things to handle this, but I think it will never work very well. +NSQ supports re-queues with delay (eg, on failure, defer to re-process later), +allows many workers to connect and leave with no disruption, messages don't +have to be processed in order, and has a very simple enqueue API (HTTP POST). + +The slowish tasks we have now are file ingest (wayback and/or SPNv2 + +GROBID) and re-GROBID. In the near future will also have ML backlog to go +through. + +Throughput isn't much of a concern as tasks take 10+ seconds each. + + +## Specific Plan + +Continue publishing ingest requests to Kafka topic. Have a new persist worker +consume from this topic and push to request table (but not result table) using +`ON CONFLICT DO NOTHING`. Have a new single-process kafka consumer pull from +the topic and push to NSQ. This consumer monitors NSQ and doesn't push too many +requests (eg, 1k maximum). NSQ could potentially even run as in-memory mode. +New worker/pusher class that acts as an NSQ client, possibly with parallelism. + +*Clean* NSQ shutdown/restart always persists data locally to disk. + +Unclean shutdown (eg, power failure) would mean NSQ might have lost state. +Because we are persisting requests to sandcrawler-db, cleanup is simple: +re-enqueue all requests from the past N days with null result or result older +than M days. + +Still need multiple kafka and NSQ topics to have priority queues (eg, bulk, +platform-specific). + +To start, have a single static NSQ host; don't need nsqlookupd. Could use +wbgrp-svc506 (datanode VM with SSD, lots of CPU and RAM). + +To move hosts, simply restart the kafka pusher pointing at the new NSQ host. +When the old host's queue is empty, restart the workers to consume from the new +host, and destroy the old NSQ host. + + +## Alternatives + +Work arounds i've done to date have been using the `grobid_tool.py` or +`ingest_tool.py` JSON input modes to pipe JSON task files (millions of lines) +through GNU/parallel. I guess GNU/parallel's distributed mode is also an option +here. + +Other things that could be used: + +**celery**: popular, many features. need to run separate redis, no disk persistence (?) + +**disque**: need to run redis, no disk persistence (?) + +**gearman**: no disk persistence (?) + + +## Old Notes + +TBD if would want to switch ingest requests from fatcat -> sandcrawler over, +and have the continuous ingests run out of NSQ, or keep using kafka for that. +currently can only do up to 10x parallelism or so with SPNv2, so that isn't a +scaling pain point -- cgit v1.2.3