1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
|
status: planned
In short, Kafka is not working well as a job task scheduler, and I want to try
NSQ as a medium-term solution to that problem.
## Motivation
Thinking of setting up NSQ to use for scheduling distributed work, to replace
kafka for some topics. for example, "regrobid" requests where we enqueue
millions of, basically, CDX lines, and want to process on dozens of cores or
multiple machines. or file ingest backfill. results would still go to kafka (to
persist), and pipelines like DOI harvest -> import -> elasticsearch would still
be kafka
The pain point with kafka is having dozens of workers on tasks that take more
than a couple seconds per task. we could keep tweaking kafka and writing weird
consumer group things to handle this, but I think it will never work very well.
NSQ supports re-queues with delay (eg, on failure, defer to re-process later),
allows many workers to connect and leave with no disruption, messages don't
have to be processed in order, and has a very simple enqueue API (HTTP POST).
The slowish tasks we have now are file ingest (wayback and/or SPNv2 +
GROBID) and re-GROBID. In the near future will also have ML backlog to go
through.
Throughput isn't much of a concern as tasks take 10+ seconds each.
## Specific Plan
Continue publishing ingest requests to Kafka topic. Have a new persist worker
consume from this topic and push to request table (but not result table) using
`ON CONFLICT DO NOTHING`. Have a new single-process kafka consumer pull from
the topic and push to NSQ. This consumer monitors NSQ and doesn't push too many
requests (eg, 1k maximum). NSQ could potentially even run as in-memory mode.
New worker/pusher class that acts as an NSQ client, possibly with parallelism.
*Clean* NSQ shutdown/restart always persists data locally to disk.
Unclean shutdown (eg, power failure) would mean NSQ might have lost state.
Because we are persisting requests to sandcrawler-db, cleanup is simple:
re-enqueue all requests from the past N days with null result or result older
than M days.
Still need multiple kafka and NSQ topics to have priority queues (eg, bulk,
platform-specific).
To start, have a single static NSQ host; don't need nsqlookupd. Could use
wbgrp-svc506 (datanode VM with SSD, lots of CPU and RAM).
To move hosts, simply restart the kafka pusher pointing at the new NSQ host.
When the old host's queue is empty, restart the workers to consume from the new
host, and destroy the old NSQ host.
## Alternatives
Work arounds i've done to date have been using the `grobid_tool.py` or
`ingest_tool.py` JSON input modes to pipe JSON task files (millions of lines)
through GNU/parallel. I guess GNU/parallel's distributed mode is also an option
here.
Other things that could be used:
**celery**: popular, many features. need to run separate redis, no disk persistence (?)
**disque**: need to run redis, no disk persistence (?) <https://github.com/antirez/disque>
**gearman**: <http://gearman.org/> no disk persistence (?)
## Old Notes
TBD if would want to switch ingest requests from fatcat -> sandcrawler over,
and have the continuous ingests run out of NSQ, or keep using kafka for that.
currently can only do up to 10x parallelism or so with SPNv2, so that isn't a
scaling pain point
|