diff options
author | Bryan Newbold <bnewbold@archive.org> | 2021-09-30 15:24:22 -0700 |
---|---|---|
committer | Bryan Newbold <bnewbold@archive.org> | 2021-09-30 15:24:24 -0700 |
commit | e4800fc4d0d0467d0e34a4059b941d001916e232 (patch) | |
tree | e789c6bfe7dfa95bc497b0329c9f9939864b1b71 | |
parent | 1c43b0d2a663815c7cb43c918933588f5184c714 (diff) | |
download | sandcrawler-e4800fc4d0d0467d0e34a4059b941d001916e232.tar.gz sandcrawler-e4800fc4d0d0467d0e34a4059b941d001916e232.zip |
new 'daily' and 'priority' ingest request topics
The old ingest request queue was always getting lopsided, suspect
because it was scaled up (additional partitions) at some point in the
past, hoping new topics will fix this.
New '-priority' queue is like '-bulk', but for smaller-volume SPN-like
requests. Eg, interactive mode.
-rw-r--r-- | kafka/topics.md | 10 | ||||
-rwxr-xr-x | python/sandcrawler_worker.py | 8 | ||||
-rwxr-xr-x | sql/reingest_quarterly.sh | 2 | ||||
-rwxr-xr-x | sql/reingest_weekly.sh | 2 |
4 files changed, 17 insertions, 5 deletions
diff --git a/kafka/topics.md b/kafka/topics.md index a0ab6ff..a699e16 100644 --- a/kafka/topics.md +++ b/kafka/topics.md @@ -25,7 +25,8 @@ retention (on both a size and time basis). => fewer partitions with batch mode, but still a bunch (24?) => key is sha1hex of PDF. enable time compaction (6 months?) - sandcrawler-ENV.ingest-file-requests + sandcrawler-ENV.ingest-file-requests-daily + => was ingest-file-requests previously, but renamed/rebalanced => ingest requests from multiple sources; mostly continuous or pseudo-interactive => schema is JSON; see ingest proposal for fields. small objects. => fewer partitions with batch mode, but still a bunch (24) @@ -35,6 +36,10 @@ retention (on both a size and time basis). => ingest requests from bulk crawl sources; background processing => same as ingest-file-requests + sandcrawler-ENV.ingest-file-requests-priority + => ingest requests from bulk crawl sources; background processing + => same as ingest-file-requests + sandcrawler-ENV.ingest-file-results => ingest requests from multiple sources => schema is JSON; see ingest proposal for fields. small objects. @@ -171,8 +176,9 @@ exists`; this seems safe, and the settings won't be over-ridden. ./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 2 --partitions 24 --topic sandcrawler-qa.ungrobided-pg ./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 2 --partitions 12 --topic sandcrawler-qa.grobid-output-pg --config compression.type=gzip --config cleanup.policy=compact - ./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 2 --partitions 24 --topic sandcrawler-qa.ingest-file-requests --config retention.ms=7889400000 --config cleanup.policy=delete + ./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 2 --partitions 24 --topic sandcrawler-qa.ingest-file-requests-daily --config retention.ms=7889400000 --config cleanup.policy=delete ./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 2 --partitions 12 --topic sandcrawler-qa.ingest-file-requests-bulk --config retention.ms=7889400000 --config cleanup.policy=delete + ./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 2 --partitions 6 --topic sandcrawler-qa.ingest-file-requests-priority --config retention.ms=7889400000 --config cleanup.policy=delete ./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 2 --partitions 6 --topic sandcrawler-qa.ingest-file-results ./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 2 --partitions 6 --topic sandcrawler-qa.pdftrio-output --config cleanup.policy=compact diff --git a/python/sandcrawler_worker.py b/python/sandcrawler_worker.py index 6be8bac..bd4ff67 100755 --- a/python/sandcrawler_worker.py +++ b/python/sandcrawler_worker.py @@ -204,9 +204,12 @@ def run_ingest_file(args): if args.bulk: consume_group = "sandcrawler-{}-ingest-file-bulk".format(args.env) consume_topic = "sandcrawler-{}.ingest-file-requests-bulk".format(args.env) + elif args.priority: + consume_group = "sandcrawler-{}-ingest-file-priority".format(args.env) + consume_topic = "sandcrawler-{}.ingest-file-requests-priority".format(args.env) else: consume_group = "sandcrawler-{}-ingest-file".format(args.env) - consume_topic = "sandcrawler-{}.ingest-file-requests".format(args.env) + consume_topic = "sandcrawler-{}.ingest-file-requests-daily".format(args.env) produce_topic = "sandcrawler-{}.ingest-file-results".format(args.env) grobid_topic = "sandcrawler-{}.grobid-output-pg".format(args.env) pdftext_topic = "sandcrawler-{}.pdf-text".format(args.env) @@ -353,6 +356,9 @@ def main(): sub_ingest_file.add_argument('--bulk', action='store_true', help="consume from bulk kafka topic (eg, for ingest backfill)") + sub_ingest_file.add_argument('--priority', + action='store_true', + help="consume from priority kafka topic (eg, for SPN requests)") sub_ingest_file.set_defaults(func=run_ingest_file) sub_persist_ingest_file = subparsers.add_parser('persist-ingest-file', diff --git a/sql/reingest_quarterly.sh b/sql/reingest_quarterly.sh index 9f1ad3c..2e19f5e 100755 --- a/sql/reingest_quarterly.sh +++ b/sql/reingest_quarterly.sh @@ -14,7 +14,7 @@ sudo -u sandcrawler pipenv run ./scripts/ingestrequest_row2json.py /srv/sandcraw sudo -u sandcrawler pipenv run ./scripts/ingestrequest_row2json.py /srv/sandcrawler/tasks/reingest_quarterly_gateway-timeout.rows.json | shuf > /srv/sandcrawler/tasks/reingest_quarterly_gateway-timeout.json sudo -u sandcrawler pipenv run ./scripts/ingestrequest_row2json.py /srv/sandcrawler/tasks/reingest_quarterly_petabox-error_current.rows.json | shuf > /srv/sandcrawler/tasks/reingest_quarterly_petabox-error_current.json -cat /srv/sandcrawler/tasks/reingest_quarterly_spn2-error_current.json /srv/sandcrawler/tasks/reingest_quarterly_cdx-error_current.json /srv/sandcrawler/tasks/reingest_quarterly_wayback-error_current.json /srv/sandcrawler/tasks/reingest_quarterly_petabox-error_current.json /srv/sandcrawler/tasks/reingest_quarterly_gateway-timeout.json | shuf | head -n250000 | jq . -c | kafkacat -P -b wbgrp-svc263.us.archive.org -t sandcrawler-prod.ingest-file-requests -p -1 +cat /srv/sandcrawler/tasks/reingest_quarterly_spn2-error_current.json /srv/sandcrawler/tasks/reingest_quarterly_cdx-error_current.json /srv/sandcrawler/tasks/reingest_quarterly_wayback-error_current.json /srv/sandcrawler/tasks/reingest_quarterly_petabox-error_current.json /srv/sandcrawler/tasks/reingest_quarterly_gateway-timeout.json | shuf | head -n250000 | jq . -c | kafkacat -P -b wbgrp-svc263.us.archive.org -t sandcrawler-prod.ingest-file-requests-daily -p -1 #cat /srv/sandcrawler/tasks/reingest_quarterly_cdx-error_bulk.json | shuf | jq . -c | kafkacat -P -b wbgrp-svc263.us.archive.org -t sandcrawler-prod.ingest-file-requests-bulk -p -1 diff --git a/sql/reingest_weekly.sh b/sql/reingest_weekly.sh index ce34dd7..67ecabd 100755 --- a/sql/reingest_weekly.sh +++ b/sql/reingest_weekly.sh @@ -14,7 +14,7 @@ sudo -u sandcrawler pipenv run ./scripts/ingestrequest_row2json.py /srv/sandcraw sudo -u sandcrawler pipenv run ./scripts/ingestrequest_row2json.py /srv/sandcrawler/tasks/reingest_weekly_gateway-timeout.rows.json | shuf > /srv/sandcrawler/tasks/reingest_weekly_gateway-timeout.json sudo -u sandcrawler pipenv run ./scripts/ingestrequest_row2json.py /srv/sandcrawler/tasks/reingest_weekly_petabox-error_current.rows.json | shuf > /srv/sandcrawler/tasks/reingest_weekly_petabox-error_current.json -cat /srv/sandcrawler/tasks/reingest_weekly_spn2-error_current.json /srv/sandcrawler/tasks/reingest_weekly_cdx-error_current.json /srv/sandcrawler/tasks/reingest_weekly_wayback-error_current.json /srv/sandcrawler/tasks/reingest_weekly_petabox-error_current.json /srv/sandcrawler/tasks/reingest_weekly_gateway-timeout.json | shuf | head -n60000 | jq . -c | kafkacat -P -b wbgrp-svc263.us.archive.org -t sandcrawler-prod.ingest-file-requests -p -1 +cat /srv/sandcrawler/tasks/reingest_weekly_spn2-error_current.json /srv/sandcrawler/tasks/reingest_weekly_cdx-error_current.json /srv/sandcrawler/tasks/reingest_weekly_wayback-error_current.json /srv/sandcrawler/tasks/reingest_weekly_petabox-error_current.json /srv/sandcrawler/tasks/reingest_weekly_gateway-timeout.json | shuf | head -n60000 | jq . -c | kafkacat -P -b wbgrp-svc263.us.archive.org -t sandcrawler-prod.ingest-file-requests-daily -p -1 #cat /srv/sandcrawler/tasks/reingest_weekly_cdx-error_bulk.json | shuf | jq . -c | kafkacat -P -b wbgrp-svc263.us.archive.org -t sandcrawler-prod.ingest-file-requests-bulk -p -1 |