aboutsummaryrefslogtreecommitdiffstats
path: root/kafka
diff options
context:
space:
mode:
Diffstat (limited to 'kafka')
-rw-r--r--kafka/debugging_issues.txt48
-rw-r--r--kafka/grobid_kafka_notes.txt60
-rw-r--r--kafka/howto_rebalance.md43
-rw-r--r--kafka/topics.md214
4 files changed, 365 insertions, 0 deletions
diff --git a/kafka/debugging_issues.txt b/kafka/debugging_issues.txt
new file mode 100644
index 0000000..007c786
--- /dev/null
+++ b/kafka/debugging_issues.txt
@@ -0,0 +1,48 @@
+
+## 2020-11-12
+
+To reset a consumer group to the offsets from a specific date (or datetime),
+use:
+
+ ./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group persist-grobid-s3 --reset-offsets --all-topics --to-datetime 2020-11-09T00:00:00.000
+
+Add `--execute` to actually commit the change.
+
+## 2018-12-02
+
+Had been having some troubles with consumer group partition assignments with
+the grobid-output group and grobid-hbase-insert consumer group. Tried deleting
+and re-creating, which was probbaly a mistake. Also tried to use kafka-broker
+shell scripts to cleanup/debug and didn't work well.
+
+In the end, after re-building the topic, decided to create a new consumer group
+(grobid-hbase-insert2) to get rid of history/crap. Might need to do this again
+in the future, oh well.
+
+A few things learned:
+
+- whatever pykafka "native python" is producing to consumer group offsets
+ doesn't work great with kafka-manager or the shell scripts: consumer instance
+ names don't show. this is an error in shell scripts, and blank/red in
+ kafka-manager
+- restarting kafka-manager takes a while (for it to refresh data?) and it shows
+ inconsistent stuff during that period, but it does result in cleaned up
+ consumer group cached metadata (aka, old groups are cleared)
+- kafka-manager can't fetch JXM info, either due to lack of config or port
+ blocking. should try to fix this for metrics etc
+- it would be nice to be using recent librdkafka everywhere. pykafka can
+ optionally use this, and many other tools do automatically. however, this is
+ a system package, and xenial doesn't have backports (debian stretch does).
+ the version in bionic looks "good enough", so many should try that?
+- there has been a minor release of kafka (2.1) since I installed (!)
+- the burrow (consumer group monitoring) tool is packaged for some version of
+ ubuntu
+
+In general, not feally great about the current setup. Very frustrating that the
+debug/status tools are broken with pykafka native output. Need to at least
+document things a lot better.
+
+Separately, came up with an idea to do batched processing with GROBID: don't
+auto-commit, instead consume a batch (10? or until block), process those, then
+commit. This being a way to get "the batch size returned".
+
diff --git a/kafka/grobid_kafka_notes.txt b/kafka/grobid_kafka_notes.txt
new file mode 100644
index 0000000..a1f7380
--- /dev/null
+++ b/kafka/grobid_kafka_notes.txt
@@ -0,0 +1,60 @@
+
+Will want to be able to scale to 100-200+ fully-utilized cores running GROBID;
+how best to achieve this? will need *many* workers going concurrent HTTP GETs,
+POSTs, and Kafka publishes.
+
+I'm pretty confident we can relax "at least once"/"at most once" constraints in
+this case: infrequent re-processing and missing a tiny fraction of processed
+works should be acceptable, because we will have higher-level checks (eg, the
+'ungrobided' HBase filter/dump).
+
+For the 'ungrobided' topic, use a reasonably large number of partitions, say
+50. This sets max number of worker *processes*, and may be enough for initial
+single-host worker. We can have a python wrapper spawn many child processes
+using multiprocessing library, with completely independent kafka client
+connections in each.
+
+To get more concurrency, each consumer *process* creates a thread pool (or
+process pool?), and a Queue with fixed size. Consumes messages, pushes to
+Queue, workers threads pull and do the rest. golang sure would be nice for
+this...
+
+Need to ensure we have compression enabled, for the GROBID output in
+particular! Probably worth using "expensive" GZIP compression to get extra disk
+savings; latency shouldn't be a big deal here.
+
+## Commands
+
+Load up some example lines, without partition key:
+
+ head -n10 python/tests/files/example_ungrobided.tsv | kafkacat -P -b localhost:9092 -t sandcrawler-qa.ungrobided
+
+Load up some example lines, with partition key:
+
+ head -n10 python/tests/files/example_ungrobided.tsv | awk -F'\t' '{print $1 "\t" $0}' | kafkacat -K$'\t' -P -b localhost:9092 -t sandcrawler-qa.ungrobided
+
+Check ungrobided topic:
+
+ kafkacat -C -b localhost:9092 -t sandcrawler-qa.ungrobided
+
+Check grobid output:
+
+ kafkacat -C -b localhost:9092 -t sandcrawler-qa.grobid-output
+
+## Actual Production Commands
+
+ gohdfs get sandcrawler/output-prod/2018-11-30-2125.55-dumpungrobided/part-00000
+ mv part-00000 2018-11-30-2125.55-dumpungrobided.tsv
+ cat 2018-11-30-2125.55-dumpungrobided.tsv | kafkacat -P -b 127.0.0.1:9092 -t sandcrawler-prod.ungrobided
+
+## Performance
+
+On 2018-11-21, using grobid-vm (svc096) with 30 cores, and running with 50x
+kafka-grobid-worker processes (using systemd parallelization), achieved:
+
+- 2044 PDFs extracted in 197 seconds, or about 10/second
+- that's about 28 hours to process 1 million PDFs
+
+I think this is about all the single machine can handle. To get more throughput
+with multiple machines, might need to tweak worker to use a worker thread-pool
+or some other concurrent pattern (async?).
diff --git a/kafka/howto_rebalance.md b/kafka/howto_rebalance.md
new file mode 100644
index 0000000..093740a
--- /dev/null
+++ b/kafka/howto_rebalance.md
@@ -0,0 +1,43 @@
+
+## Rebalance Storage Between Brokers (kafka-manager web)
+
+For each topic you want to rebalance (eg, the large or high-throughput ones),
+go to the topic page and do the blue "reassign partitions" button (or
+potentially "generate" or "manual").
+
+Monitor progress with the "Reassign Partitions" link at top of the page.
+
+Finally, run a preferred replica election after partition movement is complete.
+
+## Rebalance Storage Between Brokers (CLI)
+
+For example, after adding or removing brokers from the cluster.
+
+Create a list of topics to move, and put it in `/tmp/topics_to_move.json`:
+
+ {
+ "version": 1,
+ "topics": [
+ {"topic": "sandcrawler-shadow.grobid-output"},
+ {"topic": "fatcat-prod.api-crossref"}
+ ]
+ }
+
+On a kafka broker, go to `/srv/kafka-broker/kafka-*/bin`, generate a plan, then
+inspect the output:
+
+ ./kafka-reassign-partitions.sh --zookeeper localhost:2181 --broker-list "280,281,284,285,263" --topics-to-move-json-file /tmp/topics_to_move.json --generate > /tmp/reassignment-plan.json
+ cat /tmp/reassignment-plan.json | rg '^\{' | head -n1 | jq . > /tmp/old-plan.json
+ cat /tmp/reassignment-plan.json | rg '^\{' | tail -n1 | jq . > /tmp/new-plan.json
+ cat /tmp/reassignment-plan.json | rg '^\{' | jq .
+
+If that looks good, start the rebalance:
+
+ ./kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file /tmp/new-plan.json --execute
+
+Then monitor progress:
+
+ ./kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file /tmp/new-plan.json --verify
+
+Finally, run a preferred replica election after partition movement is complete.
+Currently do this through the web interface (linked above).
diff --git a/kafka/topics.md b/kafka/topics.md
new file mode 100644
index 0000000..a699e16
--- /dev/null
+++ b/kafka/topics.md
@@ -0,0 +1,214 @@
+
+This file lists all the Kafka topics currently used by sandcrawler (and
+fatcat).
+
+NOTE: should use `.` or `_` in topic names, but not both. We chose to use `.`
+
+ENV below is one of `prod` or `qa`.
+
+
+## Topic List
+
+All topics should default to `snappy` compression on-disk, and indefinite
+retention (on both a size and time basis).
+
+ sandcrawler-ENV.grobid-output-pg
+ => output of GROBID processing using grobid_tool.py
+ => schema is sandcrawler-db style JSON: TEI-XML as a field
+ => expected to be large; 12 partitions
+ => use GZIP compression (worth the overhead)
+ => key is sha1hex of PDF; enable key compaction
+
+ sandcrawler-ENV.ungrobided-pg
+ => PDF files in IA needing GROBID processing
+ => schema is sandcrawler-db style JSON. Can be either `cdx` or `petabox` object
+ => fewer partitions with batch mode, but still a bunch (24?)
+ => key is sha1hex of PDF. enable time compaction (6 months?)
+
+ sandcrawler-ENV.ingest-file-requests-daily
+ => was ingest-file-requests previously, but renamed/rebalanced
+ => ingest requests from multiple sources; mostly continuous or pseudo-interactive
+ => schema is JSON; see ingest proposal for fields. small objects.
+ => fewer partitions with batch mode, but still a bunch (24)
+ => can't think of a good key, so none. enable time compaction (3-6 months?)
+
+ sandcrawler-ENV.ingest-file-requests-bulk
+ => ingest requests from bulk crawl sources; background processing
+ => same as ingest-file-requests
+
+ sandcrawler-ENV.ingest-file-requests-priority
+ => ingest requests from bulk crawl sources; background processing
+ => same as ingest-file-requests
+
+ sandcrawler-ENV.ingest-file-results
+ => ingest requests from multiple sources
+ => schema is JSON; see ingest proposal for fields. small objects.
+ => 6 partitions
+ => can't think of a good key, so none; no compaction
+
+ sandcrawler-ENV.pdftrio-output
+ => output of each pdftrio ML classification
+ => schema is JSON; see pdftrio proposal for fields. small objects.
+ => 6 partitions
+ => key is sha1hex of PDF; enable key compaction
+
+ sandcrawler-ENV.unextracted
+ => PDF files in IA needing extraction (thumbnails and text)
+ => schema is sandcrawler-db style JSON. Can be either `cdx` or `petabox` object
+ => fewer partitions with batch mode, but still a bunch (12? 24?)
+ => key is sha1hex of PDF. enable time compaction (6 months?)
+
+ sandcrawler-ENV.pdf-text
+ => fulltext (raw text) and PDF metadata for pdfs
+ => schema is JSON; see pdf_meta proposal for fields. large objects.
+ => 12 partitions
+ => key is sha1hex of PDF; enable key compaction; gzip compression
+
+ sandcrawler-ENV.xml-doc
+ => fulltext XML; mostly JATS XML
+ => schema is JSON, with 'jats_xml' field containing the XML as a string
+ => 6 partitions
+ => key is sha1hex of XML document; enable key compaction; gzip compression
+
+ sandcrawler-ENV.html-teixml
+ => extracted fulltext from HTML; mostly TEI-XML
+ => schema is JSON, with 'tei_xml' field containing the XML as a string
+ => 6 partitions
+ => key is sha1hex of source HTML document; enable key compaction; gzip compression
+
+ sandcrawler-ENV.pdf-thumbnail-SIZE-TYPE
+ => thumbnail images (eg, png, jpg) from PDFs
+ => raw bytes in message (no JSON or other wrapping). fields average 10 KByte
+ => 12 partitions; expect a TByte or so total
+ => key is sha1hex of PDF; enable key compaction; no compression
+
+ fatcat-ENV.api-crossref
+ fatcat-ENV.api-datacite
+ => all new and updated DOIs (regardless of type)
+ => full raw crossref/datacite API objects (JSON)
+ => key: lower-case DOI
+ => ~1TB capacity; 8x crossref partitions, 4x datacite
+ => key compaction possible
+
+ fatcat-ENV.ftp-pubmed
+ => new citations from FTP server, from: ftp://ftp.ncbi.nlm.nih.gov/pubmed/updatefiles/
+ => raw XML, one record per message (PubmedArticle, up to 25k records/day and 650MB/day)
+ => key: PMID
+ => key compaction possible
+
+ fatcat-ENV.api-crossref-state
+ fatcat-ENV.api-datacite-state
+ fatcat-ENV.ftp-pubmed-state
+ fatcat-ENV.oaipmh-pubmed-state
+ fatcat-ENV.oaipmh-arxiv-state
+ fatcat-ENV.oaipmh-doaj-journals-state (DISABLED)
+ fatcat-ENV.oaipmh-doaj-articles-state (DISABLED)
+ => serialized harvester state for ingesters
+ => custom JSON
+ => key: timespan? nothing to start
+ => 1x partitions; time/space limit Ok
+
+ fatcat-ENV.changelog
+ => small-ish objects (not fully expanded/hydrated)
+ => single partition
+ => key: could be changelog index (integer, as string)
+
+ fatcat-ENV.release-updates-v03
+ => contains "fully" expanded JSON objects
+ => v03 is newer v0.3.0 API schema (backwards incompatible)
+ => key: fcid
+ => 8x partitions
+ fatcat-ENV.container-updates
+ => key: fcid
+ => 4x partitions
+ fatcat-ENV.file-updates
+ => key: fcid
+ => 4x partitions
+ fatcat-ENV.work-ident-updates
+ => work identifiers when updated and needs re-indexing (eg, in scholar)
+ => 6x partitions
+ => key: doc ident ("work_{ident}")
+ => key compaction possible; long retention
+
+ scholar-ENV.sim-updates
+ => 6x partitions
+ => key: "sim_item_{}"
+ => key compaction possible; long retention
+ scholar-ENV.update-docs
+ => 12x partitions
+ => key: scholar doc identifer
+ => gzip compression
+ => key compaction possible
+ => short time-based retention (2 months?)
+
+### Deprecated/Unused Topics
+
+ sandcrawler-ENV.ungrobided
+ => PDF files in IA needing GROBID processing
+ => 50x partitions (huge! for worker parallelism)
+ => key: "sha1:<base32>"
+
+ sandcrawler-ENV.grobid-output
+ => output of GROBID processing (from pdf-ungrobided feed)
+ => could get big; 16x partitions (to distribute data)
+ => use GZIP compression (worth the overhead)
+ => key: "sha1:<base32>"; could compact
+
+ fatcat-ENV.oaipmh-pubmed
+ fatcat-ENV.oaipmh-arxiv
+ fatcat-ENV.oaipmh-doaj-journals (DISABLED)
+ fatcat-ENV.oaipmh-doaj-articles (DISABLED)
+ => OAI-PMH harvester output
+ => full XML resource output (just the <<record> part?)
+ => key: identifier
+ => ~1TB capacity; 4x-8x partitions
+ => key compaction possible
+
+## Create fatcat QA topics
+
+If you run these commands for an existing topic, you'll get something like
+`Error while executing topic command : Topic 'fatcat-qa.changelog' already
+exists`; this seems safe, and the settings won't be over-ridden.
+
+ ssh misc-vm
+ cd /srv/kafka-broker/kafka_2.12-2.0.0/bin/
+
+ ./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 2 --partitions 24 --topic sandcrawler-qa.ungrobided-pg
+ ./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 2 --partitions 12 --topic sandcrawler-qa.grobid-output-pg --config compression.type=gzip --config cleanup.policy=compact
+
+ ./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 2 --partitions 24 --topic sandcrawler-qa.ingest-file-requests-daily --config retention.ms=7889400000 --config cleanup.policy=delete
+ ./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 2 --partitions 12 --topic sandcrawler-qa.ingest-file-requests-bulk --config retention.ms=7889400000 --config cleanup.policy=delete
+ ./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 2 --partitions 6 --topic sandcrawler-qa.ingest-file-requests-priority --config retention.ms=7889400000 --config cleanup.policy=delete
+ ./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 2 --partitions 6 --topic sandcrawler-qa.ingest-file-results
+
+ ./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 2 --partitions 6 --topic sandcrawler-qa.pdftrio-output --config cleanup.policy=compact
+
+ ./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 2 --partitions 1 --topic fatcat-qa.changelog
+ ./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 2 --partitions 8 --topic fatcat-qa.release-updates-v03
+ ./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 2 --partitions 4 --topic fatcat-qa.file-updates
+ ./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 2 --partitions 4 --topic fatcat-qa.container-updates
+ ./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 2 --partitions 6 --topic fatcat-qa.work-ident-updates
+
+ ./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 2 --partitions 8 --topic fatcat-qa.api-crossref
+ ./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 2 --partitions 8 --topic fatcat-qa.api-datacite --config cleanup.policy=compact
+ ./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 2 --partitions 8 --topic fatcat-qa.ftp-pubmed --config cleanup.policy=compact
+ ./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 2 --partitions 1 --topic fatcat-qa.api-crossref-state
+ ./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 2 --partitions 1 --topic fatcat-qa.api-datacite-state
+ ./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 2 --partitions 1 --topic fatcat-qa.ftp-pubmed-state
+
+ ./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 2 --partitions 4 --topic fatcat-qa.oaipmh-pubmed
+ ./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 2 --partitions 4 --topic fatcat-qa.oaipmh-arxiv
+ ./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 2 --partitions 1 --topic fatcat-qa.oaipmh-pubmed-state
+ ./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 2 --partitions 1 --topic fatcat-qa.oaipmh-arxiv-state
+
+ # only 3 partitions in QA
+ ./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 2 --partitions 12 --topic sandcrawler-qa.pdf-text --config compression.type=gzip --config cleanup.policy=compact
+ ./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 2 --partitions 12 --topic sandcrawler-qa.pdf-thumbnail-180px-jpg --config cleanup.policy=compact
+ ./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 2 --partitions 24 --topic sandcrawler-qa.unextracted
+
+ ./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 2 --partitions 6 --topic scholar-qa.sim-updates
+ ./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 2 --partitions 12 --topic scholar-qa.update-docs --config compression.type=gzip --config cleanup.policy=compact --config retention.ms=7889400000
+
+ ./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 2 --partitions 6 --topic sandcrawler-qa.xml-doc --config compression.type=gzip --config cleanup.policy=compact
+ ./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 2 --partitions 6 --topic sandcrawler-qa.html-teixml --config compression.type=gzip --config cleanup.policy=compact
+