diff options
Diffstat (limited to 'kafka')
-rw-r--r-- | kafka/debugging_issues.txt | 48 | ||||
-rw-r--r-- | kafka/grobid_kafka_notes.txt | 60 | ||||
-rw-r--r-- | kafka/howto_rebalance.md | 43 | ||||
-rw-r--r-- | kafka/topics.md | 214 |
4 files changed, 365 insertions, 0 deletions
diff --git a/kafka/debugging_issues.txt b/kafka/debugging_issues.txt new file mode 100644 index 0000000..007c786 --- /dev/null +++ b/kafka/debugging_issues.txt @@ -0,0 +1,48 @@ + +## 2020-11-12 + +To reset a consumer group to the offsets from a specific date (or datetime), +use: + + ./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group persist-grobid-s3 --reset-offsets --all-topics --to-datetime 2020-11-09T00:00:00.000 + +Add `--execute` to actually commit the change. + +## 2018-12-02 + +Had been having some troubles with consumer group partition assignments with +the grobid-output group and grobid-hbase-insert consumer group. Tried deleting +and re-creating, which was probbaly a mistake. Also tried to use kafka-broker +shell scripts to cleanup/debug and didn't work well. + +In the end, after re-building the topic, decided to create a new consumer group +(grobid-hbase-insert2) to get rid of history/crap. Might need to do this again +in the future, oh well. + +A few things learned: + +- whatever pykafka "native python" is producing to consumer group offsets + doesn't work great with kafka-manager or the shell scripts: consumer instance + names don't show. this is an error in shell scripts, and blank/red in + kafka-manager +- restarting kafka-manager takes a while (for it to refresh data?) and it shows + inconsistent stuff during that period, but it does result in cleaned up + consumer group cached metadata (aka, old groups are cleared) +- kafka-manager can't fetch JXM info, either due to lack of config or port + blocking. should try to fix this for metrics etc +- it would be nice to be using recent librdkafka everywhere. pykafka can + optionally use this, and many other tools do automatically. however, this is + a system package, and xenial doesn't have backports (debian stretch does). + the version in bionic looks "good enough", so many should try that? +- there has been a minor release of kafka (2.1) since I installed (!) +- the burrow (consumer group monitoring) tool is packaged for some version of + ubuntu + +In general, not feally great about the current setup. Very frustrating that the +debug/status tools are broken with pykafka native output. Need to at least +document things a lot better. + +Separately, came up with an idea to do batched processing with GROBID: don't +auto-commit, instead consume a batch (10? or until block), process those, then +commit. This being a way to get "the batch size returned". + diff --git a/kafka/grobid_kafka_notes.txt b/kafka/grobid_kafka_notes.txt new file mode 100644 index 0000000..a1f7380 --- /dev/null +++ b/kafka/grobid_kafka_notes.txt @@ -0,0 +1,60 @@ + +Will want to be able to scale to 100-200+ fully-utilized cores running GROBID; +how best to achieve this? will need *many* workers going concurrent HTTP GETs, +POSTs, and Kafka publishes. + +I'm pretty confident we can relax "at least once"/"at most once" constraints in +this case: infrequent re-processing and missing a tiny fraction of processed +works should be acceptable, because we will have higher-level checks (eg, the +'ungrobided' HBase filter/dump). + +For the 'ungrobided' topic, use a reasonably large number of partitions, say +50. This sets max number of worker *processes*, and may be enough for initial +single-host worker. We can have a python wrapper spawn many child processes +using multiprocessing library, with completely independent kafka client +connections in each. + +To get more concurrency, each consumer *process* creates a thread pool (or +process pool?), and a Queue with fixed size. Consumes messages, pushes to +Queue, workers threads pull and do the rest. golang sure would be nice for +this... + +Need to ensure we have compression enabled, for the GROBID output in +particular! Probably worth using "expensive" GZIP compression to get extra disk +savings; latency shouldn't be a big deal here. + +## Commands + +Load up some example lines, without partition key: + + head -n10 python/tests/files/example_ungrobided.tsv | kafkacat -P -b localhost:9092 -t sandcrawler-qa.ungrobided + +Load up some example lines, with partition key: + + head -n10 python/tests/files/example_ungrobided.tsv | awk -F'\t' '{print $1 "\t" $0}' | kafkacat -K$'\t' -P -b localhost:9092 -t sandcrawler-qa.ungrobided + +Check ungrobided topic: + + kafkacat -C -b localhost:9092 -t sandcrawler-qa.ungrobided + +Check grobid output: + + kafkacat -C -b localhost:9092 -t sandcrawler-qa.grobid-output + +## Actual Production Commands + + gohdfs get sandcrawler/output-prod/2018-11-30-2125.55-dumpungrobided/part-00000 + mv part-00000 2018-11-30-2125.55-dumpungrobided.tsv + cat 2018-11-30-2125.55-dumpungrobided.tsv | kafkacat -P -b 127.0.0.1:9092 -t sandcrawler-prod.ungrobided + +## Performance + +On 2018-11-21, using grobid-vm (svc096) with 30 cores, and running with 50x +kafka-grobid-worker processes (using systemd parallelization), achieved: + +- 2044 PDFs extracted in 197 seconds, or about 10/second +- that's about 28 hours to process 1 million PDFs + +I think this is about all the single machine can handle. To get more throughput +with multiple machines, might need to tweak worker to use a worker thread-pool +or some other concurrent pattern (async?). diff --git a/kafka/howto_rebalance.md b/kafka/howto_rebalance.md new file mode 100644 index 0000000..093740a --- /dev/null +++ b/kafka/howto_rebalance.md @@ -0,0 +1,43 @@ + +## Rebalance Storage Between Brokers (kafka-manager web) + +For each topic you want to rebalance (eg, the large or high-throughput ones), +go to the topic page and do the blue "reassign partitions" button (or +potentially "generate" or "manual"). + +Monitor progress with the "Reassign Partitions" link at top of the page. + +Finally, run a preferred replica election after partition movement is complete. + +## Rebalance Storage Between Brokers (CLI) + +For example, after adding or removing brokers from the cluster. + +Create a list of topics to move, and put it in `/tmp/topics_to_move.json`: + + { + "version": 1, + "topics": [ + {"topic": "sandcrawler-shadow.grobid-output"}, + {"topic": "fatcat-prod.api-crossref"} + ] + } + +On a kafka broker, go to `/srv/kafka-broker/kafka-*/bin`, generate a plan, then +inspect the output: + + ./kafka-reassign-partitions.sh --zookeeper localhost:2181 --broker-list "280,281,284,285,263" --topics-to-move-json-file /tmp/topics_to_move.json --generate > /tmp/reassignment-plan.json + cat /tmp/reassignment-plan.json | rg '^\{' | head -n1 | jq . > /tmp/old-plan.json + cat /tmp/reassignment-plan.json | rg '^\{' | tail -n1 | jq . > /tmp/new-plan.json + cat /tmp/reassignment-plan.json | rg '^\{' | jq . + +If that looks good, start the rebalance: + + ./kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file /tmp/new-plan.json --execute + +Then monitor progress: + + ./kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file /tmp/new-plan.json --verify + +Finally, run a preferred replica election after partition movement is complete. +Currently do this through the web interface (linked above). diff --git a/kafka/topics.md b/kafka/topics.md new file mode 100644 index 0000000..a699e16 --- /dev/null +++ b/kafka/topics.md @@ -0,0 +1,214 @@ + +This file lists all the Kafka topics currently used by sandcrawler (and +fatcat). + +NOTE: should use `.` or `_` in topic names, but not both. We chose to use `.` + +ENV below is one of `prod` or `qa`. + + +## Topic List + +All topics should default to `snappy` compression on-disk, and indefinite +retention (on both a size and time basis). + + sandcrawler-ENV.grobid-output-pg + => output of GROBID processing using grobid_tool.py + => schema is sandcrawler-db style JSON: TEI-XML as a field + => expected to be large; 12 partitions + => use GZIP compression (worth the overhead) + => key is sha1hex of PDF; enable key compaction + + sandcrawler-ENV.ungrobided-pg + => PDF files in IA needing GROBID processing + => schema is sandcrawler-db style JSON. Can be either `cdx` or `petabox` object + => fewer partitions with batch mode, but still a bunch (24?) + => key is sha1hex of PDF. enable time compaction (6 months?) + + sandcrawler-ENV.ingest-file-requests-daily + => was ingest-file-requests previously, but renamed/rebalanced + => ingest requests from multiple sources; mostly continuous or pseudo-interactive + => schema is JSON; see ingest proposal for fields. small objects. + => fewer partitions with batch mode, but still a bunch (24) + => can't think of a good key, so none. enable time compaction (3-6 months?) + + sandcrawler-ENV.ingest-file-requests-bulk + => ingest requests from bulk crawl sources; background processing + => same as ingest-file-requests + + sandcrawler-ENV.ingest-file-requests-priority + => ingest requests from bulk crawl sources; background processing + => same as ingest-file-requests + + sandcrawler-ENV.ingest-file-results + => ingest requests from multiple sources + => schema is JSON; see ingest proposal for fields. small objects. + => 6 partitions + => can't think of a good key, so none; no compaction + + sandcrawler-ENV.pdftrio-output + => output of each pdftrio ML classification + => schema is JSON; see pdftrio proposal for fields. small objects. + => 6 partitions + => key is sha1hex of PDF; enable key compaction + + sandcrawler-ENV.unextracted + => PDF files in IA needing extraction (thumbnails and text) + => schema is sandcrawler-db style JSON. Can be either `cdx` or `petabox` object + => fewer partitions with batch mode, but still a bunch (12? 24?) + => key is sha1hex of PDF. enable time compaction (6 months?) + + sandcrawler-ENV.pdf-text + => fulltext (raw text) and PDF metadata for pdfs + => schema is JSON; see pdf_meta proposal for fields. large objects. + => 12 partitions + => key is sha1hex of PDF; enable key compaction; gzip compression + + sandcrawler-ENV.xml-doc + => fulltext XML; mostly JATS XML + => schema is JSON, with 'jats_xml' field containing the XML as a string + => 6 partitions + => key is sha1hex of XML document; enable key compaction; gzip compression + + sandcrawler-ENV.html-teixml + => extracted fulltext from HTML; mostly TEI-XML + => schema is JSON, with 'tei_xml' field containing the XML as a string + => 6 partitions + => key is sha1hex of source HTML document; enable key compaction; gzip compression + + sandcrawler-ENV.pdf-thumbnail-SIZE-TYPE + => thumbnail images (eg, png, jpg) from PDFs + => raw bytes in message (no JSON or other wrapping). fields average 10 KByte + => 12 partitions; expect a TByte or so total + => key is sha1hex of PDF; enable key compaction; no compression + + fatcat-ENV.api-crossref + fatcat-ENV.api-datacite + => all new and updated DOIs (regardless of type) + => full raw crossref/datacite API objects (JSON) + => key: lower-case DOI + => ~1TB capacity; 8x crossref partitions, 4x datacite + => key compaction possible + + fatcat-ENV.ftp-pubmed + => new citations from FTP server, from: ftp://ftp.ncbi.nlm.nih.gov/pubmed/updatefiles/ + => raw XML, one record per message (PubmedArticle, up to 25k records/day and 650MB/day) + => key: PMID + => key compaction possible + + fatcat-ENV.api-crossref-state + fatcat-ENV.api-datacite-state + fatcat-ENV.ftp-pubmed-state + fatcat-ENV.oaipmh-pubmed-state + fatcat-ENV.oaipmh-arxiv-state + fatcat-ENV.oaipmh-doaj-journals-state (DISABLED) + fatcat-ENV.oaipmh-doaj-articles-state (DISABLED) + => serialized harvester state for ingesters + => custom JSON + => key: timespan? nothing to start + => 1x partitions; time/space limit Ok + + fatcat-ENV.changelog + => small-ish objects (not fully expanded/hydrated) + => single partition + => key: could be changelog index (integer, as string) + + fatcat-ENV.release-updates-v03 + => contains "fully" expanded JSON objects + => v03 is newer v0.3.0 API schema (backwards incompatible) + => key: fcid + => 8x partitions + fatcat-ENV.container-updates + => key: fcid + => 4x partitions + fatcat-ENV.file-updates + => key: fcid + => 4x partitions + fatcat-ENV.work-ident-updates + => work identifiers when updated and needs re-indexing (eg, in scholar) + => 6x partitions + => key: doc ident ("work_{ident}") + => key compaction possible; long retention + + scholar-ENV.sim-updates + => 6x partitions + => key: "sim_item_{}" + => key compaction possible; long retention + scholar-ENV.update-docs + => 12x partitions + => key: scholar doc identifer + => gzip compression + => key compaction possible + => short time-based retention (2 months?) + +### Deprecated/Unused Topics + + sandcrawler-ENV.ungrobided + => PDF files in IA needing GROBID processing + => 50x partitions (huge! for worker parallelism) + => key: "sha1:<base32>" + + sandcrawler-ENV.grobid-output + => output of GROBID processing (from pdf-ungrobided feed) + => could get big; 16x partitions (to distribute data) + => use GZIP compression (worth the overhead) + => key: "sha1:<base32>"; could compact + + fatcat-ENV.oaipmh-pubmed + fatcat-ENV.oaipmh-arxiv + fatcat-ENV.oaipmh-doaj-journals (DISABLED) + fatcat-ENV.oaipmh-doaj-articles (DISABLED) + => OAI-PMH harvester output + => full XML resource output (just the <<record> part?) + => key: identifier + => ~1TB capacity; 4x-8x partitions + => key compaction possible + +## Create fatcat QA topics + +If you run these commands for an existing topic, you'll get something like +`Error while executing topic command : Topic 'fatcat-qa.changelog' already +exists`; this seems safe, and the settings won't be over-ridden. + + ssh misc-vm + cd /srv/kafka-broker/kafka_2.12-2.0.0/bin/ + + ./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 2 --partitions 24 --topic sandcrawler-qa.ungrobided-pg + ./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 2 --partitions 12 --topic sandcrawler-qa.grobid-output-pg --config compression.type=gzip --config cleanup.policy=compact + + ./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 2 --partitions 24 --topic sandcrawler-qa.ingest-file-requests-daily --config retention.ms=7889400000 --config cleanup.policy=delete + ./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 2 --partitions 12 --topic sandcrawler-qa.ingest-file-requests-bulk --config retention.ms=7889400000 --config cleanup.policy=delete + ./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 2 --partitions 6 --topic sandcrawler-qa.ingest-file-requests-priority --config retention.ms=7889400000 --config cleanup.policy=delete + ./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 2 --partitions 6 --topic sandcrawler-qa.ingest-file-results + + ./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 2 --partitions 6 --topic sandcrawler-qa.pdftrio-output --config cleanup.policy=compact + + ./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 2 --partitions 1 --topic fatcat-qa.changelog + ./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 2 --partitions 8 --topic fatcat-qa.release-updates-v03 + ./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 2 --partitions 4 --topic fatcat-qa.file-updates + ./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 2 --partitions 4 --topic fatcat-qa.container-updates + ./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 2 --partitions 6 --topic fatcat-qa.work-ident-updates + + ./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 2 --partitions 8 --topic fatcat-qa.api-crossref + ./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 2 --partitions 8 --topic fatcat-qa.api-datacite --config cleanup.policy=compact + ./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 2 --partitions 8 --topic fatcat-qa.ftp-pubmed --config cleanup.policy=compact + ./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 2 --partitions 1 --topic fatcat-qa.api-crossref-state + ./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 2 --partitions 1 --topic fatcat-qa.api-datacite-state + ./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 2 --partitions 1 --topic fatcat-qa.ftp-pubmed-state + + ./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 2 --partitions 4 --topic fatcat-qa.oaipmh-pubmed + ./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 2 --partitions 4 --topic fatcat-qa.oaipmh-arxiv + ./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 2 --partitions 1 --topic fatcat-qa.oaipmh-pubmed-state + ./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 2 --partitions 1 --topic fatcat-qa.oaipmh-arxiv-state + + # only 3 partitions in QA + ./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 2 --partitions 12 --topic sandcrawler-qa.pdf-text --config compression.type=gzip --config cleanup.policy=compact + ./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 2 --partitions 12 --topic sandcrawler-qa.pdf-thumbnail-180px-jpg --config cleanup.policy=compact + ./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 2 --partitions 24 --topic sandcrawler-qa.unextracted + + ./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 2 --partitions 6 --topic scholar-qa.sim-updates + ./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 2 --partitions 12 --topic scholar-qa.update-docs --config compression.type=gzip --config cleanup.policy=compact --config retention.ms=7889400000 + + ./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 2 --partitions 6 --topic sandcrawler-qa.xml-doc --config compression.type=gzip --config cleanup.policy=compact + ./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 2 --partitions 6 --topic sandcrawler-qa.html-teixml --config compression.type=gzip --config cleanup.policy=compact + |