aboutsummaryrefslogtreecommitdiffstats
path: root/please
blob: 1ea751f909179f137f407046a0f950819b09aa1d (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
#!/usr/bin/env python3
"""
Helper script for running Sandcrawler (journal pipeline) tasks in production.

This is basically a Makefile. Be sure to only use python3 standard library
modules, so there are no dependencies.
"""

import sys
import argparse
import subprocess
from datetime import datetime

HDFS_DIR = "hdfs:///user/bnewbold/sandcrawler"
HBASE_HOST = "wbgrp-svc263.us.archive.org"

def rebuild_python():
    print("Rebuilding python venv...")
    cmd = """cd mapreduce;
        export PIPENV_VENV_IN_PROJECT=1;
        pipenv install --deploy
        tar -czf venv-current.tar.gz -C .venv ."""
    subprocess.call(cmd, shell=True)

def rebuild_scalding():
    print("Rebuilding scalding jar...")
    cmd = """cd scalding; sbt assembly"""
    subprocess.call(cmd, shell=True)

def run_backfill(args):
    if args.rebuild:
        rebuild_python()
    print("Starting backfill job...")
    output = "{}/output-{}/{}-backfill".format(
        HDFS_DIR,
        args.env,
        datetime.strftime(datetime.now(), "%Y-%m-%d-%H%M.%S"))
    cmd = """cd mapreduce;
        pipenv run ./backfill_hbase_from_cdx.py \
            --hbase-host {hbase_host} \
            --hbase-table wbgrp-journal-extract-0-{env} \
            -r hadoop \
            -c mrjob.conf \
            --archive venv-current.tar.gz#venv \
            {input_cdx}
            """.format(hbase_host=HBASE_HOST, env=args.env,
                input_cdx=args.input_cdx)
    subprocess.call(cmd, shell=True)

def run_rowcount(args):
    print("Starting rowcount job...")
    output = "{}/output-{}/{}-rowcount".format(
        HDFS_DIR,
        args.env,
        datetime.strftime(datetime.now(), "%Y-%m-%d-%H%M.%S"))
    cmd = """hadoop jar \
        scalding/target/scala-2.11/sandcrawler-assembly-0.2.0-SNAPSHOT.jar \
        com.twitter.scalding.Tool sandcrawler.HBaseRowCountJob \
        --hdfs \
        --app.conf.path scalding/ia_cluster.conf \
        --output {}""".format(output)
    subprocess.call(cmd, shell=True)

def main():
    parser = argparse.ArgumentParser()

    parser.add_argument('--prod',
        help="run against prod HBase table",
        action='store_true')
    parser.add_argument('--qa',
        help="run against qa HBase table",
        action='store_true')
    parser.add_argument('--rebuild',
        help="rebuild whatever artifact gets sent",
        action='store_true')
    subparsers = parser.add_subparsers()

    sub_backfill = subparsers.add_parser('backfill')
    sub_backfill.set_defaults(func=run_backfill)
    sub_backfill.add_argument('input_cdx',
        help="full HDFS path of CDX file to backfill")

    sub_rowcount = subparsers.add_parser('row-count')
    sub_rowcount.set_defaults(func=run_rowcount)

    args = parser.parse_args()
    if not args.__dict__.get("func"):
        print("tell me what to do! (try --help)")
        sys.exit(-1)
    if not (args.prod or args.qa) or (args.prod and args.qa):
        print("must pass one of --prod or --qa")
    if args.prod:
        args.env = "prod"
    if args.qa:
        args.env = "qa"

    args.func(args)

if __name__ == '__main__':
    main()