From e4800fc4d0d0467d0e34a4059b941d001916e232 Mon Sep 17 00:00:00 2001 From: Bryan Newbold Date: Thu, 30 Sep 2021 15:24:22 -0700 Subject: new 'daily' and 'priority' ingest request topics The old ingest request queue was always getting lopsided, suspect because it was scaled up (additional partitions) at some point in the past, hoping new topics will fix this. New '-priority' queue is like '-bulk', but for smaller-volume SPN-like requests. Eg, interactive mode. --- kafka/topics.md | 10 ++++++++-- python/sandcrawler_worker.py | 8 +++++++- sql/reingest_quarterly.sh | 2 +- sql/reingest_weekly.sh | 2 +- 4 files changed, 17 insertions(+), 5 deletions(-) diff --git a/kafka/topics.md b/kafka/topics.md index a0ab6ff..a699e16 100644 --- a/kafka/topics.md +++ b/kafka/topics.md @@ -25,7 +25,8 @@ retention (on both a size and time basis). => fewer partitions with batch mode, but still a bunch (24?) => key is sha1hex of PDF. enable time compaction (6 months?) - sandcrawler-ENV.ingest-file-requests + sandcrawler-ENV.ingest-file-requests-daily + => was ingest-file-requests previously, but renamed/rebalanced => ingest requests from multiple sources; mostly continuous or pseudo-interactive => schema is JSON; see ingest proposal for fields. small objects. => fewer partitions with batch mode, but still a bunch (24) @@ -35,6 +36,10 @@ retention (on both a size and time basis). => ingest requests from bulk crawl sources; background processing => same as ingest-file-requests + sandcrawler-ENV.ingest-file-requests-priority + => ingest requests from bulk crawl sources; background processing + => same as ingest-file-requests + sandcrawler-ENV.ingest-file-results => ingest requests from multiple sources => schema is JSON; see ingest proposal for fields. small objects. @@ -171,8 +176,9 @@ exists`; this seems safe, and the settings won't be over-ridden. ./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 2 --partitions 24 --topic sandcrawler-qa.ungrobided-pg ./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 2 --partitions 12 --topic sandcrawler-qa.grobid-output-pg --config compression.type=gzip --config cleanup.policy=compact - ./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 2 --partitions 24 --topic sandcrawler-qa.ingest-file-requests --config retention.ms=7889400000 --config cleanup.policy=delete + ./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 2 --partitions 24 --topic sandcrawler-qa.ingest-file-requests-daily --config retention.ms=7889400000 --config cleanup.policy=delete ./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 2 --partitions 12 --topic sandcrawler-qa.ingest-file-requests-bulk --config retention.ms=7889400000 --config cleanup.policy=delete + ./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 2 --partitions 6 --topic sandcrawler-qa.ingest-file-requests-priority --config retention.ms=7889400000 --config cleanup.policy=delete ./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 2 --partitions 6 --topic sandcrawler-qa.ingest-file-results ./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 2 --partitions 6 --topic sandcrawler-qa.pdftrio-output --config cleanup.policy=compact diff --git a/python/sandcrawler_worker.py b/python/sandcrawler_worker.py index 6be8bac..bd4ff67 100755 --- a/python/sandcrawler_worker.py +++ b/python/sandcrawler_worker.py @@ -204,9 +204,12 @@ def run_ingest_file(args): if args.bulk: consume_group = "sandcrawler-{}-ingest-file-bulk".format(args.env) consume_topic = "sandcrawler-{}.ingest-file-requests-bulk".format(args.env) + elif args.priority: + consume_group = "sandcrawler-{}-ingest-file-priority".format(args.env) + consume_topic = "sandcrawler-{}.ingest-file-requests-priority".format(args.env) else: consume_group = "sandcrawler-{}-ingest-file".format(args.env) - consume_topic = "sandcrawler-{}.ingest-file-requests".format(args.env) + consume_topic = "sandcrawler-{}.ingest-file-requests-daily".format(args.env) produce_topic = "sandcrawler-{}.ingest-file-results".format(args.env) grobid_topic = "sandcrawler-{}.grobid-output-pg".format(args.env) pdftext_topic = "sandcrawler-{}.pdf-text".format(args.env) @@ -353,6 +356,9 @@ def main(): sub_ingest_file.add_argument('--bulk', action='store_true', help="consume from bulk kafka topic (eg, for ingest backfill)") + sub_ingest_file.add_argument('--priority', + action='store_true', + help="consume from priority kafka topic (eg, for SPN requests)") sub_ingest_file.set_defaults(func=run_ingest_file) sub_persist_ingest_file = subparsers.add_parser('persist-ingest-file', diff --git a/sql/reingest_quarterly.sh b/sql/reingest_quarterly.sh index 9f1ad3c..2e19f5e 100755 --- a/sql/reingest_quarterly.sh +++ b/sql/reingest_quarterly.sh @@ -14,7 +14,7 @@ sudo -u sandcrawler pipenv run ./scripts/ingestrequest_row2json.py /srv/sandcraw sudo -u sandcrawler pipenv run ./scripts/ingestrequest_row2json.py /srv/sandcrawler/tasks/reingest_quarterly_gateway-timeout.rows.json | shuf > /srv/sandcrawler/tasks/reingest_quarterly_gateway-timeout.json sudo -u sandcrawler pipenv run ./scripts/ingestrequest_row2json.py /srv/sandcrawler/tasks/reingest_quarterly_petabox-error_current.rows.json | shuf > /srv/sandcrawler/tasks/reingest_quarterly_petabox-error_current.json -cat /srv/sandcrawler/tasks/reingest_quarterly_spn2-error_current.json /srv/sandcrawler/tasks/reingest_quarterly_cdx-error_current.json /srv/sandcrawler/tasks/reingest_quarterly_wayback-error_current.json /srv/sandcrawler/tasks/reingest_quarterly_petabox-error_current.json /srv/sandcrawler/tasks/reingest_quarterly_gateway-timeout.json | shuf | head -n250000 | jq . -c | kafkacat -P -b wbgrp-svc263.us.archive.org -t sandcrawler-prod.ingest-file-requests -p -1 +cat /srv/sandcrawler/tasks/reingest_quarterly_spn2-error_current.json /srv/sandcrawler/tasks/reingest_quarterly_cdx-error_current.json /srv/sandcrawler/tasks/reingest_quarterly_wayback-error_current.json /srv/sandcrawler/tasks/reingest_quarterly_petabox-error_current.json /srv/sandcrawler/tasks/reingest_quarterly_gateway-timeout.json | shuf | head -n250000 | jq . -c | kafkacat -P -b wbgrp-svc263.us.archive.org -t sandcrawler-prod.ingest-file-requests-daily -p -1 #cat /srv/sandcrawler/tasks/reingest_quarterly_cdx-error_bulk.json | shuf | jq . -c | kafkacat -P -b wbgrp-svc263.us.archive.org -t sandcrawler-prod.ingest-file-requests-bulk -p -1 diff --git a/sql/reingest_weekly.sh b/sql/reingest_weekly.sh index ce34dd7..67ecabd 100755 --- a/sql/reingest_weekly.sh +++ b/sql/reingest_weekly.sh @@ -14,7 +14,7 @@ sudo -u sandcrawler pipenv run ./scripts/ingestrequest_row2json.py /srv/sandcraw sudo -u sandcrawler pipenv run ./scripts/ingestrequest_row2json.py /srv/sandcrawler/tasks/reingest_weekly_gateway-timeout.rows.json | shuf > /srv/sandcrawler/tasks/reingest_weekly_gateway-timeout.json sudo -u sandcrawler pipenv run ./scripts/ingestrequest_row2json.py /srv/sandcrawler/tasks/reingest_weekly_petabox-error_current.rows.json | shuf > /srv/sandcrawler/tasks/reingest_weekly_petabox-error_current.json -cat /srv/sandcrawler/tasks/reingest_weekly_spn2-error_current.json /srv/sandcrawler/tasks/reingest_weekly_cdx-error_current.json /srv/sandcrawler/tasks/reingest_weekly_wayback-error_current.json /srv/sandcrawler/tasks/reingest_weekly_petabox-error_current.json /srv/sandcrawler/tasks/reingest_weekly_gateway-timeout.json | shuf | head -n60000 | jq . -c | kafkacat -P -b wbgrp-svc263.us.archive.org -t sandcrawler-prod.ingest-file-requests -p -1 +cat /srv/sandcrawler/tasks/reingest_weekly_spn2-error_current.json /srv/sandcrawler/tasks/reingest_weekly_cdx-error_current.json /srv/sandcrawler/tasks/reingest_weekly_wayback-error_current.json /srv/sandcrawler/tasks/reingest_weekly_petabox-error_current.json /srv/sandcrawler/tasks/reingest_weekly_gateway-timeout.json | shuf | head -n60000 | jq . -c | kafkacat -P -b wbgrp-svc263.us.archive.org -t sandcrawler-prod.ingest-file-requests-daily -p -1 #cat /srv/sandcrawler/tasks/reingest_weekly_cdx-error_bulk.json | shuf | jq . -c | kafkacat -P -b wbgrp-svc263.us.archive.org -t sandcrawler-prod.ingest-file-requests-bulk -p -1 -- cgit v1.2.3