aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorBryan Newbold <bnewbold@archive.org>2021-09-30 15:24:22 -0700
committerBryan Newbold <bnewbold@archive.org>2021-09-30 15:24:24 -0700
commite4800fc4d0d0467d0e34a4059b941d001916e232 (patch)
treee789c6bfe7dfa95bc497b0329c9f9939864b1b71
parent1c43b0d2a663815c7cb43c918933588f5184c714 (diff)
downloadsandcrawler-e4800fc4d0d0467d0e34a4059b941d001916e232.tar.gz
sandcrawler-e4800fc4d0d0467d0e34a4059b941d001916e232.zip
new 'daily' and 'priority' ingest request topics
The old ingest request queue was always getting lopsided, suspect because it was scaled up (additional partitions) at some point in the past, hoping new topics will fix this. New '-priority' queue is like '-bulk', but for smaller-volume SPN-like requests. Eg, interactive mode.
-rw-r--r--kafka/topics.md10
-rwxr-xr-xpython/sandcrawler_worker.py8
-rwxr-xr-xsql/reingest_quarterly.sh2
-rwxr-xr-xsql/reingest_weekly.sh2
4 files changed, 17 insertions, 5 deletions
diff --git a/kafka/topics.md b/kafka/topics.md
index a0ab6ff..a699e16 100644
--- a/kafka/topics.md
+++ b/kafka/topics.md
@@ -25,7 +25,8 @@ retention (on both a size and time basis).
=> fewer partitions with batch mode, but still a bunch (24?)
=> key is sha1hex of PDF. enable time compaction (6 months?)
- sandcrawler-ENV.ingest-file-requests
+ sandcrawler-ENV.ingest-file-requests-daily
+ => was ingest-file-requests previously, but renamed/rebalanced
=> ingest requests from multiple sources; mostly continuous or pseudo-interactive
=> schema is JSON; see ingest proposal for fields. small objects.
=> fewer partitions with batch mode, but still a bunch (24)
@@ -35,6 +36,10 @@ retention (on both a size and time basis).
=> ingest requests from bulk crawl sources; background processing
=> same as ingest-file-requests
+ sandcrawler-ENV.ingest-file-requests-priority
+ => ingest requests from bulk crawl sources; background processing
+ => same as ingest-file-requests
+
sandcrawler-ENV.ingest-file-results
=> ingest requests from multiple sources
=> schema is JSON; see ingest proposal for fields. small objects.
@@ -171,8 +176,9 @@ exists`; this seems safe, and the settings won't be over-ridden.
./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 2 --partitions 24 --topic sandcrawler-qa.ungrobided-pg
./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 2 --partitions 12 --topic sandcrawler-qa.grobid-output-pg --config compression.type=gzip --config cleanup.policy=compact
- ./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 2 --partitions 24 --topic sandcrawler-qa.ingest-file-requests --config retention.ms=7889400000 --config cleanup.policy=delete
+ ./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 2 --partitions 24 --topic sandcrawler-qa.ingest-file-requests-daily --config retention.ms=7889400000 --config cleanup.policy=delete
./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 2 --partitions 12 --topic sandcrawler-qa.ingest-file-requests-bulk --config retention.ms=7889400000 --config cleanup.policy=delete
+ ./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 2 --partitions 6 --topic sandcrawler-qa.ingest-file-requests-priority --config retention.ms=7889400000 --config cleanup.policy=delete
./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 2 --partitions 6 --topic sandcrawler-qa.ingest-file-results
./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 2 --partitions 6 --topic sandcrawler-qa.pdftrio-output --config cleanup.policy=compact
diff --git a/python/sandcrawler_worker.py b/python/sandcrawler_worker.py
index 6be8bac..bd4ff67 100755
--- a/python/sandcrawler_worker.py
+++ b/python/sandcrawler_worker.py
@@ -204,9 +204,12 @@ def run_ingest_file(args):
if args.bulk:
consume_group = "sandcrawler-{}-ingest-file-bulk".format(args.env)
consume_topic = "sandcrawler-{}.ingest-file-requests-bulk".format(args.env)
+ elif args.priority:
+ consume_group = "sandcrawler-{}-ingest-file-priority".format(args.env)
+ consume_topic = "sandcrawler-{}.ingest-file-requests-priority".format(args.env)
else:
consume_group = "sandcrawler-{}-ingest-file".format(args.env)
- consume_topic = "sandcrawler-{}.ingest-file-requests".format(args.env)
+ consume_topic = "sandcrawler-{}.ingest-file-requests-daily".format(args.env)
produce_topic = "sandcrawler-{}.ingest-file-results".format(args.env)
grobid_topic = "sandcrawler-{}.grobid-output-pg".format(args.env)
pdftext_topic = "sandcrawler-{}.pdf-text".format(args.env)
@@ -353,6 +356,9 @@ def main():
sub_ingest_file.add_argument('--bulk',
action='store_true',
help="consume from bulk kafka topic (eg, for ingest backfill)")
+ sub_ingest_file.add_argument('--priority',
+ action='store_true',
+ help="consume from priority kafka topic (eg, for SPN requests)")
sub_ingest_file.set_defaults(func=run_ingest_file)
sub_persist_ingest_file = subparsers.add_parser('persist-ingest-file',
diff --git a/sql/reingest_quarterly.sh b/sql/reingest_quarterly.sh
index 9f1ad3c..2e19f5e 100755
--- a/sql/reingest_quarterly.sh
+++ b/sql/reingest_quarterly.sh
@@ -14,7 +14,7 @@ sudo -u sandcrawler pipenv run ./scripts/ingestrequest_row2json.py /srv/sandcraw
sudo -u sandcrawler pipenv run ./scripts/ingestrequest_row2json.py /srv/sandcrawler/tasks/reingest_quarterly_gateway-timeout.rows.json | shuf > /srv/sandcrawler/tasks/reingest_quarterly_gateway-timeout.json
sudo -u sandcrawler pipenv run ./scripts/ingestrequest_row2json.py /srv/sandcrawler/tasks/reingest_quarterly_petabox-error_current.rows.json | shuf > /srv/sandcrawler/tasks/reingest_quarterly_petabox-error_current.json
-cat /srv/sandcrawler/tasks/reingest_quarterly_spn2-error_current.json /srv/sandcrawler/tasks/reingest_quarterly_cdx-error_current.json /srv/sandcrawler/tasks/reingest_quarterly_wayback-error_current.json /srv/sandcrawler/tasks/reingest_quarterly_petabox-error_current.json /srv/sandcrawler/tasks/reingest_quarterly_gateway-timeout.json | shuf | head -n250000 | jq . -c | kafkacat -P -b wbgrp-svc263.us.archive.org -t sandcrawler-prod.ingest-file-requests -p -1
+cat /srv/sandcrawler/tasks/reingest_quarterly_spn2-error_current.json /srv/sandcrawler/tasks/reingest_quarterly_cdx-error_current.json /srv/sandcrawler/tasks/reingest_quarterly_wayback-error_current.json /srv/sandcrawler/tasks/reingest_quarterly_petabox-error_current.json /srv/sandcrawler/tasks/reingest_quarterly_gateway-timeout.json | shuf | head -n250000 | jq . -c | kafkacat -P -b wbgrp-svc263.us.archive.org -t sandcrawler-prod.ingest-file-requests-daily -p -1
#cat /srv/sandcrawler/tasks/reingest_quarterly_cdx-error_bulk.json | shuf | jq . -c | kafkacat -P -b wbgrp-svc263.us.archive.org -t sandcrawler-prod.ingest-file-requests-bulk -p -1
diff --git a/sql/reingest_weekly.sh b/sql/reingest_weekly.sh
index ce34dd7..67ecabd 100755
--- a/sql/reingest_weekly.sh
+++ b/sql/reingest_weekly.sh
@@ -14,7 +14,7 @@ sudo -u sandcrawler pipenv run ./scripts/ingestrequest_row2json.py /srv/sandcraw
sudo -u sandcrawler pipenv run ./scripts/ingestrequest_row2json.py /srv/sandcrawler/tasks/reingest_weekly_gateway-timeout.rows.json | shuf > /srv/sandcrawler/tasks/reingest_weekly_gateway-timeout.json
sudo -u sandcrawler pipenv run ./scripts/ingestrequest_row2json.py /srv/sandcrawler/tasks/reingest_weekly_petabox-error_current.rows.json | shuf > /srv/sandcrawler/tasks/reingest_weekly_petabox-error_current.json
-cat /srv/sandcrawler/tasks/reingest_weekly_spn2-error_current.json /srv/sandcrawler/tasks/reingest_weekly_cdx-error_current.json /srv/sandcrawler/tasks/reingest_weekly_wayback-error_current.json /srv/sandcrawler/tasks/reingest_weekly_petabox-error_current.json /srv/sandcrawler/tasks/reingest_weekly_gateway-timeout.json | shuf | head -n60000 | jq . -c | kafkacat -P -b wbgrp-svc263.us.archive.org -t sandcrawler-prod.ingest-file-requests -p -1
+cat /srv/sandcrawler/tasks/reingest_weekly_spn2-error_current.json /srv/sandcrawler/tasks/reingest_weekly_cdx-error_current.json /srv/sandcrawler/tasks/reingest_weekly_wayback-error_current.json /srv/sandcrawler/tasks/reingest_weekly_petabox-error_current.json /srv/sandcrawler/tasks/reingest_weekly_gateway-timeout.json | shuf | head -n60000 | jq . -c | kafkacat -P -b wbgrp-svc263.us.archive.org -t sandcrawler-prod.ingest-file-requests-daily -p -1
#cat /srv/sandcrawler/tasks/reingest_weekly_cdx-error_bulk.json | shuf | jq . -c | kafkacat -P -b wbgrp-svc263.us.archive.org -t sandcrawler-prod.ingest-file-requests-bulk -p -1