1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
|
#!/usr/bin/env python3
"""
Helper script for running Sandcrawler (journal pipeline) tasks in production.
This is basically a Makefile. Be sure to only use python3 standard library
modules, so there are no dependencies.
"""
import sys
import argparse
import subprocess
from datetime import datetime
HDFS_OUT_DIR = "/user/bnewbold/sandcrawler/out"
HBASE_HOST = "wbgrp-svc263.us.archive.org"
def run_backfill(args):
output = "hdfs://{}/{}/{}-backfill".format(
HDFS_OUT_DIR,
args.env,
datetime.strftime(datetime.now(), "%Y-%m-%d-%H:%M:%S"))
cmd = """hadoop jar \
scalding/target/scala-2.11/sandcrawler-assembly-0.2.0-SNAPSHOT.jar \
com.twitter.scalding.Tool sandcrawler.HBaseRowCountJob --hdfs \
--app.conf.path scalding/ia_cluster.conf \
--output hdfs://{}""".format(output)
cmd = """cd mapreduce;
pipenv shell
export VENVSHORT=`basename $VIRTUAL_ENV`
./backfill_hbase_from_cdx.py \
--hbase-host {HBASE_HOST} \
--hbase-table wbgrp-journal-extract-0-{args.env} \
-r hadoop \
-c mrjob.conf \
--archive $VENVSHORT.tar.gz#venv \
{args.input_cdx}
""".format()
subprocess.call(cmd, shell=True)
def run_rowcount(args):
output = "hdfs://{}/{}/{}-rowcount".format(
HDFS_OUT_DIR,
args.env,
datetime.strftime(datetime.now(), "%Y-%m-%d-%H:%M:%S"))
cmd = """hadoop jar \
scalding/target/scala-2.11/sandcrawler-assembly-0.2.0-SNAPSHOT.jar \
com.twitter.scalding.Tool sandcrawler.HBaseRowCountJob --hdfs \
--app.conf.path scalding/ia_cluster.conf \
--output hdfs://{}""".format(output)
subprocess.call(cmd, shell=True)
def main():
parser = argparse.ArgumentParser()
parser.add_argument('--prod',
help="run against prod HBase table",
action='store_true')
parser.add_argument('--qa',
help="run against qa HBase table",
action='store_true')
subparsers = parser.add_subparsers()
sub_backfill = subparsers.add_parser('backfill')
sub_backfill.set_defaults(func=run_backfill)
sub_backfill.add_argument('input_cdx',
help="full HDFS path of CDX file to backfill")
sub_rowcount = subparsers.add_parser('row-count')
sub_rowcount.set_defaults(func=run_rowcount)
args = parser.parse_args()
if not args.__dict__.get("func"):
print("tell me what to do! (try --help)")
sys.exit(-1)
if not (args.prod or args.qa) or (args.prod and args.qa):
print("must pass one of --prod or --qa")
if args.prod:
args.env = "prod"
if args.qa:
args.env = "qa"
args.func(args)
if __name__ == '__main__':
main()
|