aboutsummaryrefslogtreecommitdiffstats
path: root/kafka
diff options
context:
space:
mode:
authorBryan Newbold <bnewbold@archive.org>2018-11-20 14:19:40 -0800
committerBryan Newbold <bnewbold@archive.org>2018-11-20 14:19:40 -0800
commit7186eb098b1e3f62288febe27db73685dacf1a2f (patch)
tree12f2a6101026c5520dce62cbe4242a0ae7f3cb04 /kafka
parent4cb7c1bdc6710a11c869f3d398ed39762644395c (diff)
downloadsandcrawler-7186eb098b1e3f62288febe27db73685dacf1a2f.tar.gz
sandcrawler-7186eb098b1e3f62288febe27db73685dacf1a2f.zip
kafka notes
Diffstat (limited to 'kafka')
-rw-r--r--kafka/grobid_kafka_notes.txt24
-rw-r--r--kafka/topics.md86
2 files changed, 110 insertions, 0 deletions
diff --git a/kafka/grobid_kafka_notes.txt b/kafka/grobid_kafka_notes.txt
new file mode 100644
index 0000000..f774291
--- /dev/null
+++ b/kafka/grobid_kafka_notes.txt
@@ -0,0 +1,24 @@
+
+Will want to be able to scale to 100-200+ fully-utilized cores running GROBID;
+how best to achieve this? will need *many* workers going concurrent HTTP GETs,
+POSTs, and Kafka publishes.
+
+I'm pretty confident we can relax "at least once"/"at most once" constraints in
+this case: infrequent re-processing and missing a tiny fraction of processed
+works should be acceptable, because we will have higher-level checks (eg, the
+'ungrobided' HBase filter/dump).
+
+For the 'ungrobided' topic, use a reasonably large number of partitions, say
+50. This sets max number of worker *processes*, and may be enough for initial
+single-host worker. We can have a python wrapper spawn many child processes
+using multiprocessing library, with completely independent kafka client
+connections in each.
+
+To get more concurrency, each consumer *process* creates a thread pool (or
+process pool?), and a Queue with fixed size. Consumes messages, pushes to
+Queue, workers threads pull and do the rest. golang sure would be nice for
+this...
+
+Need to ensure we have compression enabled, for the GROBID output in
+particular! Probably worth using "expensive" GZIP compression to get extra disk
+savings; latency shouldn't be a big deal here.
diff --git a/kafka/topics.md b/kafka/topics.md
new file mode 100644
index 0000000..6361cc8
--- /dev/null
+++ b/kafka/topics.md
@@ -0,0 +1,86 @@
+
+This file lists all the Kafka topics currently used by sandcrawler (and
+fatcat).
+
+NOTE: should use `.` or `_` in topic names, but not both. We chose to use `.`
+
+ENV below is one of `prod` or `qa`.
+
+
+## Topic List
+
+All topics should default to `snappy` compression on-disk, and indefinite
+retention (on both a size and time basis).
+
+ sandcrawler-ENV.ungrobided
+ => PDF files in IA needing GROBID processing
+ => 50x partitions (huge! for worker parallelism)
+ => key: "sha1:<base32>"
+
+ sandcrawler-ENV.grobided
+ => output of GROBID processing (from pdf-ungrobided feed)
+ => could get big; 16x partitions (to distribute data)
+ => use GZIP compression (worth the overhead)
+ => key: "sha1:<base32>"; could compact
+
+ fatcat-ENV.api-crossref
+ fatcat-ENV.api-datacite
+ => all new and updated DOIs (regardless of type)
+ => full raw crossref/datacite API objects (JSON)
+ => key: lower-case DOI
+ => ~1TB capacity; 8x crossref partitions, 4x datacite
+ => key compaction possible
+
+ fatcat-ENV.oaipmh-pubmed
+ fatcat-ENV.oaipmh-arxiv
+ fatcat-ENV.oaipmh-doaj-journals (DISABLED)
+ fatcat-ENV.oaipmh-doaj-articles (DISABLED)
+ => OAI-PMH harvester output
+ => full XML resource output (just the <<record> part?)
+ => key: identifier
+ => ~1TB capacity; 4x-8x partitions
+ => key compaction possible
+
+ fatcat-ENV.api-crossref-state
+ fatcat-ENV.api-datacite-state
+ fatcat-ENV.oaipmh-pubmed-state
+ fatcat-ENV.oaipmh-arxiv-state
+ fatcat-ENV.oaipmh-doaj-journals-state (DISABLED)
+ fatcat-ENV.oaipmh-doaj-articles-state (DISABLED)
+ => serialized harvester state for ingesters
+ => custom JSON
+ => key: timespan? nothing to start
+ => 1x partitions; time/space limit Ok
+
+ fatcat-ENV.changelog
+ => small-ish objects (not fully expanded/hydrated)
+ => single partition
+ => key: could be changelog index (integer, as string)
+
+ fatcat-ENV.release-updates
+ => contains "fully" expanded JSON objects
+ => key: fcid
+ => 8x partitions
+
+
+## Create fatcat QA topics
+
+ ssh misc-vm
+ cd /srv/kafka-broker/kafka_2.12-2.0.0/bin/
+
+ ./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 2 --partitions 50 --topic sandcrawler-qa.ungrobided
+ ./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 2 --partitions 16 --topic sandcrawler-qa.grobided --config compression.type=gzip
+
+ ./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 2 --partitions 1 --topic fatcat-qa.changelog
+ ./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 2 --partitions 8 --topic fatcat-qa.release-updates
+
+ ./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 2 --partitions 8 --topic fatcat-qa.api-crossref
+ ./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 2 --partitions 8 --topic fatcat-qa.api-datacite
+ ./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 2 --partitions 1 --topic fatcat-qa.api-crossref-state
+ ./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 2 --partitions 1 --topic fatcat-qa.api-datacite-state
+
+ ./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 2 --partitions 4 --topic fatcat-qa.oaipmh-pubmed
+ ./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 2 --partitions 4 --topic fatcat-qa.oaipmh-arxiv
+ ./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 2 --partitions 1 --topic fatcat-qa.oaipmh-pubmed-state
+ ./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 2 --partitions 1 --topic fatcat-qa.oaipmh-arxiv-state
+