diff options
-rw-r--r-- | kafka/topics.md | 10 | ||||
-rwxr-xr-x | python/sandcrawler_worker.py | 8 | ||||
-rwxr-xr-x | sql/reingest_quarterly.sh | 2 | ||||
-rwxr-xr-x | sql/reingest_weekly.sh | 2 |
4 files changed, 17 insertions, 5 deletions
diff --git a/kafka/topics.md b/kafka/topics.md index a0ab6ff..a699e16 100644 --- a/kafka/topics.md +++ b/kafka/topics.md @@ -25,7 +25,8 @@ retention (on both a size and time basis). => fewer partitions with batch mode, but still a bunch (24?) => key is sha1hex of PDF. enable time compaction (6 months?) - sandcrawler-ENV.ingest-file-requests + sandcrawler-ENV.ingest-file-requests-daily + => was ingest-file-requests previously, but renamed/rebalanced => ingest requests from multiple sources; mostly continuous or pseudo-interactive => schema is JSON; see ingest proposal for fields. small objects. => fewer partitions with batch mode, but still a bunch (24) @@ -35,6 +36,10 @@ retention (on both a size and time basis). => ingest requests from bulk crawl sources; background processing => same as ingest-file-requests + sandcrawler-ENV.ingest-file-requests-priority + => ingest requests from bulk crawl sources; background processing + => same as ingest-file-requests + sandcrawler-ENV.ingest-file-results => ingest requests from multiple sources => schema is JSON; see ingest proposal for fields. small objects. @@ -171,8 +176,9 @@ exists`; this seems safe, and the settings won't be over-ridden. ./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 2 --partitions 24 --topic sandcrawler-qa.ungrobided-pg ./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 2 --partitions 12 --topic sandcrawler-qa.grobid-output-pg --config compression.type=gzip --config cleanup.policy=compact - ./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 2 --partitions 24 --topic sandcrawler-qa.ingest-file-requests --config retention.ms=7889400000 --config cleanup.policy=delete + ./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 2 --partitions 24 --topic sandcrawler-qa.ingest-file-requests-daily --config retention.ms=7889400000 --config cleanup.policy=delete ./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 2 --partitions 12 --topic sandcrawler-qa.ingest-file-requests-bulk --config retention.ms=7889400000 --config cleanup.policy=delete + ./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 2 --partitions 6 --topic sandcrawler-qa.ingest-file-requests-priority --config retention.ms=7889400000 --config cleanup.policy=delete ./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 2 --partitions 6 --topic sandcrawler-qa.ingest-file-results ./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 2 --partitions 6 --topic sandcrawler-qa.pdftrio-output --config cleanup.policy=compact diff --git a/python/sandcrawler_worker.py b/python/sandcrawler_worker.py index 6be8bac..bd4ff67 100755 --- a/python/sandcrawler_worker.py +++ b/python/sandcrawler_worker.py @@ -204,9 +204,12 @@ def run_ingest_file(args): if args.bulk: consume_group = "sandcrawler-{}-ingest-file-bulk".format(args.env) consume_topic = "sandcrawler-{}.ingest-file-requests-bulk".format(args.env) + elif args.priority: + consume_group = "sandcrawler-{}-ingest-file-priority".format(args.env) + consume_topic = "sandcrawler-{}.ingest-file-requests-priority".format(args.env) else: consume_group = "sandcrawler-{}-ingest-file".format(args.env) - consume_topic = "sandcrawler-{}.ingest-file-requests".format(args.env) + consume_topic = "sandcrawler-{}.ingest-file-requests-daily".format(args.env) produce_topic = "sandcrawler-{}.ingest-file-results".format(args.env) grobid_topic = "sandcrawler-{}.grobid-output-pg".format(args.env) pdftext_topic = "sandcrawler-{}.pdf-text".format(args.env) @@ -353,6 +356,9 @@ def main(): sub_ingest_file.add_argument('--bulk', action='store_true', help="consume from bulk kafka topic (eg, for ingest backfill)") + sub_ingest_file.add_argument('--priority', + action='store_true', + help="consume from priority kafka topic (eg, for SPN requests)") sub_ingest_file.set_defaults(func=run_ingest_file) sub_persist_ingest_file = subparsers.add_parser('persist-ingest-file', diff --git a/sql/reingest_quarterly.sh b/sql/reingest_quarterly.sh index 9f1ad3c..2e19f5e 100755 --- a/sql/reingest_quarterly.sh +++ b/sql/reingest_quarterly.sh @@ -14,7 +14,7 @@ sudo -u sandcrawler pipenv run ./scripts/ingestrequest_row2json.py /srv/sandcraw sudo -u sandcrawler pipenv run ./scripts/ingestrequest_row2json.py /srv/sandcrawler/tasks/reingest_quarterly_gateway-timeout.rows.json | shuf > /srv/sandcrawler/tasks/reingest_quarterly_gateway-timeout.json sudo -u sandcrawler pipenv run ./scripts/ingestrequest_row2json.py /srv/sandcrawler/tasks/reingest_quarterly_petabox-error_current.rows.json | shuf > /srv/sandcrawler/tasks/reingest_quarterly_petabox-error_current.json -cat /srv/sandcrawler/tasks/reingest_quarterly_spn2-error_current.json /srv/sandcrawler/tasks/reingest_quarterly_cdx-error_current.json /srv/sandcrawler/tasks/reingest_quarterly_wayback-error_current.json /srv/sandcrawler/tasks/reingest_quarterly_petabox-error_current.json /srv/sandcrawler/tasks/reingest_quarterly_gateway-timeout.json | shuf | head -n250000 | jq . -c | kafkacat -P -b wbgrp-svc263.us.archive.org -t sandcrawler-prod.ingest-file-requests -p -1 +cat /srv/sandcrawler/tasks/reingest_quarterly_spn2-error_current.json /srv/sandcrawler/tasks/reingest_quarterly_cdx-error_current.json /srv/sandcrawler/tasks/reingest_quarterly_wayback-error_current.json /srv/sandcrawler/tasks/reingest_quarterly_petabox-error_current.json /srv/sandcrawler/tasks/reingest_quarterly_gateway-timeout.json | shuf | head -n250000 | jq . -c | kafkacat -P -b wbgrp-svc263.us.archive.org -t sandcrawler-prod.ingest-file-requests-daily -p -1 #cat /srv/sandcrawler/tasks/reingest_quarterly_cdx-error_bulk.json | shuf | jq . -c | kafkacat -P -b wbgrp-svc263.us.archive.org -t sandcrawler-prod.ingest-file-requests-bulk -p -1 diff --git a/sql/reingest_weekly.sh b/sql/reingest_weekly.sh index ce34dd7..67ecabd 100755 --- a/sql/reingest_weekly.sh +++ b/sql/reingest_weekly.sh @@ -14,7 +14,7 @@ sudo -u sandcrawler pipenv run ./scripts/ingestrequest_row2json.py /srv/sandcraw sudo -u sandcrawler pipenv run ./scripts/ingestrequest_row2json.py /srv/sandcrawler/tasks/reingest_weekly_gateway-timeout.rows.json | shuf > /srv/sandcrawler/tasks/reingest_weekly_gateway-timeout.json sudo -u sandcrawler pipenv run ./scripts/ingestrequest_row2json.py /srv/sandcrawler/tasks/reingest_weekly_petabox-error_current.rows.json | shuf > /srv/sandcrawler/tasks/reingest_weekly_petabox-error_current.json -cat /srv/sandcrawler/tasks/reingest_weekly_spn2-error_current.json /srv/sandcrawler/tasks/reingest_weekly_cdx-error_current.json /srv/sandcrawler/tasks/reingest_weekly_wayback-error_current.json /srv/sandcrawler/tasks/reingest_weekly_petabox-error_current.json /srv/sandcrawler/tasks/reingest_weekly_gateway-timeout.json | shuf | head -n60000 | jq . -c | kafkacat -P -b wbgrp-svc263.us.archive.org -t sandcrawler-prod.ingest-file-requests -p -1 +cat /srv/sandcrawler/tasks/reingest_weekly_spn2-error_current.json /srv/sandcrawler/tasks/reingest_weekly_cdx-error_current.json /srv/sandcrawler/tasks/reingest_weekly_wayback-error_current.json /srv/sandcrawler/tasks/reingest_weekly_petabox-error_current.json /srv/sandcrawler/tasks/reingest_weekly_gateway-timeout.json | shuf | head -n60000 | jq . -c | kafkacat -P -b wbgrp-svc263.us.archive.org -t sandcrawler-prod.ingest-file-requests-daily -p -1 #cat /srv/sandcrawler/tasks/reingest_weekly_cdx-error_bulk.json | shuf | jq . -c | kafkacat -P -b wbgrp-svc263.us.archive.org -t sandcrawler-prod.ingest-file-requests-bulk -p -1 |