aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--kafka/topics.md10
-rwxr-xr-xpython/sandcrawler_worker.py8
-rwxr-xr-xsql/reingest_quarterly.sh2
-rwxr-xr-xsql/reingest_weekly.sh2
4 files changed, 17 insertions, 5 deletions
diff --git a/kafka/topics.md b/kafka/topics.md
index a0ab6ff..a699e16 100644
--- a/kafka/topics.md
+++ b/kafka/topics.md
@@ -25,7 +25,8 @@ retention (on both a size and time basis).
=> fewer partitions with batch mode, but still a bunch (24?)
=> key is sha1hex of PDF. enable time compaction (6 months?)
- sandcrawler-ENV.ingest-file-requests
+ sandcrawler-ENV.ingest-file-requests-daily
+ => was ingest-file-requests previously, but renamed/rebalanced
=> ingest requests from multiple sources; mostly continuous or pseudo-interactive
=> schema is JSON; see ingest proposal for fields. small objects.
=> fewer partitions with batch mode, but still a bunch (24)
@@ -35,6 +36,10 @@ retention (on both a size and time basis).
=> ingest requests from bulk crawl sources; background processing
=> same as ingest-file-requests
+ sandcrawler-ENV.ingest-file-requests-priority
+ => ingest requests from bulk crawl sources; background processing
+ => same as ingest-file-requests
+
sandcrawler-ENV.ingest-file-results
=> ingest requests from multiple sources
=> schema is JSON; see ingest proposal for fields. small objects.
@@ -171,8 +176,9 @@ exists`; this seems safe, and the settings won't be over-ridden.
./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 2 --partitions 24 --topic sandcrawler-qa.ungrobided-pg
./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 2 --partitions 12 --topic sandcrawler-qa.grobid-output-pg --config compression.type=gzip --config cleanup.policy=compact
- ./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 2 --partitions 24 --topic sandcrawler-qa.ingest-file-requests --config retention.ms=7889400000 --config cleanup.policy=delete
+ ./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 2 --partitions 24 --topic sandcrawler-qa.ingest-file-requests-daily --config retention.ms=7889400000 --config cleanup.policy=delete
./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 2 --partitions 12 --topic sandcrawler-qa.ingest-file-requests-bulk --config retention.ms=7889400000 --config cleanup.policy=delete
+ ./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 2 --partitions 6 --topic sandcrawler-qa.ingest-file-requests-priority --config retention.ms=7889400000 --config cleanup.policy=delete
./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 2 --partitions 6 --topic sandcrawler-qa.ingest-file-results
./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 2 --partitions 6 --topic sandcrawler-qa.pdftrio-output --config cleanup.policy=compact
diff --git a/python/sandcrawler_worker.py b/python/sandcrawler_worker.py
index 6be8bac..bd4ff67 100755
--- a/python/sandcrawler_worker.py
+++ b/python/sandcrawler_worker.py
@@ -204,9 +204,12 @@ def run_ingest_file(args):
if args.bulk:
consume_group = "sandcrawler-{}-ingest-file-bulk".format(args.env)
consume_topic = "sandcrawler-{}.ingest-file-requests-bulk".format(args.env)
+ elif args.priority:
+ consume_group = "sandcrawler-{}-ingest-file-priority".format(args.env)
+ consume_topic = "sandcrawler-{}.ingest-file-requests-priority".format(args.env)
else:
consume_group = "sandcrawler-{}-ingest-file".format(args.env)
- consume_topic = "sandcrawler-{}.ingest-file-requests".format(args.env)
+ consume_topic = "sandcrawler-{}.ingest-file-requests-daily".format(args.env)
produce_topic = "sandcrawler-{}.ingest-file-results".format(args.env)
grobid_topic = "sandcrawler-{}.grobid-output-pg".format(args.env)
pdftext_topic = "sandcrawler-{}.pdf-text".format(args.env)
@@ -353,6 +356,9 @@ def main():
sub_ingest_file.add_argument('--bulk',
action='store_true',
help="consume from bulk kafka topic (eg, for ingest backfill)")
+ sub_ingest_file.add_argument('--priority',
+ action='store_true',
+ help="consume from priority kafka topic (eg, for SPN requests)")
sub_ingest_file.set_defaults(func=run_ingest_file)
sub_persist_ingest_file = subparsers.add_parser('persist-ingest-file',
diff --git a/sql/reingest_quarterly.sh b/sql/reingest_quarterly.sh
index 9f1ad3c..2e19f5e 100755
--- a/sql/reingest_quarterly.sh
+++ b/sql/reingest_quarterly.sh
@@ -14,7 +14,7 @@ sudo -u sandcrawler pipenv run ./scripts/ingestrequest_row2json.py /srv/sandcraw
sudo -u sandcrawler pipenv run ./scripts/ingestrequest_row2json.py /srv/sandcrawler/tasks/reingest_quarterly_gateway-timeout.rows.json | shuf > /srv/sandcrawler/tasks/reingest_quarterly_gateway-timeout.json
sudo -u sandcrawler pipenv run ./scripts/ingestrequest_row2json.py /srv/sandcrawler/tasks/reingest_quarterly_petabox-error_current.rows.json | shuf > /srv/sandcrawler/tasks/reingest_quarterly_petabox-error_current.json
-cat /srv/sandcrawler/tasks/reingest_quarterly_spn2-error_current.json /srv/sandcrawler/tasks/reingest_quarterly_cdx-error_current.json /srv/sandcrawler/tasks/reingest_quarterly_wayback-error_current.json /srv/sandcrawler/tasks/reingest_quarterly_petabox-error_current.json /srv/sandcrawler/tasks/reingest_quarterly_gateway-timeout.json | shuf | head -n250000 | jq . -c | kafkacat -P -b wbgrp-svc263.us.archive.org -t sandcrawler-prod.ingest-file-requests -p -1
+cat /srv/sandcrawler/tasks/reingest_quarterly_spn2-error_current.json /srv/sandcrawler/tasks/reingest_quarterly_cdx-error_current.json /srv/sandcrawler/tasks/reingest_quarterly_wayback-error_current.json /srv/sandcrawler/tasks/reingest_quarterly_petabox-error_current.json /srv/sandcrawler/tasks/reingest_quarterly_gateway-timeout.json | shuf | head -n250000 | jq . -c | kafkacat -P -b wbgrp-svc263.us.archive.org -t sandcrawler-prod.ingest-file-requests-daily -p -1
#cat /srv/sandcrawler/tasks/reingest_quarterly_cdx-error_bulk.json | shuf | jq . -c | kafkacat -P -b wbgrp-svc263.us.archive.org -t sandcrawler-prod.ingest-file-requests-bulk -p -1
diff --git a/sql/reingest_weekly.sh b/sql/reingest_weekly.sh
index ce34dd7..67ecabd 100755
--- a/sql/reingest_weekly.sh
+++ b/sql/reingest_weekly.sh
@@ -14,7 +14,7 @@ sudo -u sandcrawler pipenv run ./scripts/ingestrequest_row2json.py /srv/sandcraw
sudo -u sandcrawler pipenv run ./scripts/ingestrequest_row2json.py /srv/sandcrawler/tasks/reingest_weekly_gateway-timeout.rows.json | shuf > /srv/sandcrawler/tasks/reingest_weekly_gateway-timeout.json
sudo -u sandcrawler pipenv run ./scripts/ingestrequest_row2json.py /srv/sandcrawler/tasks/reingest_weekly_petabox-error_current.rows.json | shuf > /srv/sandcrawler/tasks/reingest_weekly_petabox-error_current.json
-cat /srv/sandcrawler/tasks/reingest_weekly_spn2-error_current.json /srv/sandcrawler/tasks/reingest_weekly_cdx-error_current.json /srv/sandcrawler/tasks/reingest_weekly_wayback-error_current.json /srv/sandcrawler/tasks/reingest_weekly_petabox-error_current.json /srv/sandcrawler/tasks/reingest_weekly_gateway-timeout.json | shuf | head -n60000 | jq . -c | kafkacat -P -b wbgrp-svc263.us.archive.org -t sandcrawler-prod.ingest-file-requests -p -1
+cat /srv/sandcrawler/tasks/reingest_weekly_spn2-error_current.json /srv/sandcrawler/tasks/reingest_weekly_cdx-error_current.json /srv/sandcrawler/tasks/reingest_weekly_wayback-error_current.json /srv/sandcrawler/tasks/reingest_weekly_petabox-error_current.json /srv/sandcrawler/tasks/reingest_weekly_gateway-timeout.json | shuf | head -n60000 | jq . -c | kafkacat -P -b wbgrp-svc263.us.archive.org -t sandcrawler-prod.ingest-file-requests-daily -p -1
#cat /srv/sandcrawler/tasks/reingest_weekly_cdx-error_bulk.json | shuf | jq . -c | kafkacat -P -b wbgrp-svc263.us.archive.org -t sandcrawler-prod.ingest-file-requests-bulk -p -1