diff options
Diffstat (limited to 'please')
-rwxr-xr-x | please | 81 |
1 files changed, 81 insertions, 0 deletions
@@ -95,6 +95,27 @@ def run_rowcount(args): env=args.env) subprocess.call(cmd, shell=True) +def run_statuscodecount(args): + if args.rebuild: + rebuild_scalding() + print("Starting statuscodecount job...") + output = "{}/output-{}/{}-statuscodecount".format( + HDFS_DIR, + args.env, + datetime.strftime(datetime.now(), "%Y-%m-%d-%H%M.%S")) + cmd = """hadoop jar \ + scalding/target/scala-2.11/sandcrawler-assembly-0.2.0-SNAPSHOT.jar \ + com.twitter.scalding.Tool sandcrawler.HBaseStatusCodeCountJob \ + --hdfs \ + --app.conf.path scalding/ia_cluster.conf \ + --hbase-table wbgrp-journal-extract-0-{env} \ + --zookeeper-hosts {zookeeper_hosts} \ + --output {output}""".format( + output=output, + zookeeper_hosts=ZOOKEEPER_HOSTS, + env=args.env) + subprocess.call(cmd, shell=True) + def run_statuscount(args): if args.rebuild: rebuild_scalding() @@ -126,10 +147,15 @@ def run_matchcrossref(args): datetime.strftime(datetime.now(), "%Y-%m-%d-%H%M.%S")) # Notes: -D options must come after Tool but before class name # https://github.com/twitter/scalding/wiki/Frequently-asked-questions#how-do-i-pass-parameters-to-my-hadoop-job-number-of-reducers--memory-options--etc- + # Compression: changed due to errors in production + # https://stackoverflow.com/a/11336820/4682349 cmd = """hadoop jar \ scalding/target/scala-2.11/sandcrawler-assembly-0.2.0-SNAPSHOT.jar \ com.twitter.scalding.Tool \ -Dmapred.reduce.tasks={reducers} \ + -Dcascading.spill.list.threshold=500000 \ + -D mapred.output.compress=false \ + -Dmapred.compress.map.output=true\ sandcrawler.ScoreJob \ --hdfs \ --app.conf.path scalding/ia_cluster.conf \ @@ -144,6 +170,51 @@ def run_matchcrossref(args): crossref_input=args.crossref_input) subprocess.call(cmd, shell=True) +def run_grobidscorabledump(args): + if args.rebuild: + rebuild_scalding() + print("Starting grobid-scorable-dump job...") + output = "{}/output-{}/{}-grobidscorabledump".format( + HDFS_DIR, + args.env, + datetime.strftime(datetime.now(), "%Y-%m-%d-%H%M.%S")) + cmd = """hadoop jar \ + scalding/target/scala-2.11/sandcrawler-assembly-0.2.0-SNAPSHOT.jar \ + com.twitter.scalding.Tool sandcrawler.GrobidScorableDumpJob \ + --hdfs \ + --app.conf.path scalding/ia_cluster.conf \ + --hbase-table wbgrp-journal-extract-0-{env} \ + --zookeeper-hosts {zookeeper_hosts} \ + --output {output}""".format( + output=output, + zookeeper_hosts=ZOOKEEPER_HOSTS, + env=args.env) + subprocess.call(cmd, shell=True) + +def run_colcount(args): + if args.rebuild: + rebuild_scalding() + print("Starting colcount job...") + output = "{}/output-{}/{}-colcount-{}".format( + HDFS_DIR, + args.env, + datetime.strftime(datetime.now(), "%Y-%m-%d-%H%M.%S"), + args.column.replace(':', '_')) + cmd = """hadoop jar \ + scalding/target/scala-2.11/sandcrawler-assembly-0.2.0-SNAPSHOT.jar \ + com.twitter.scalding.Tool sandcrawler.HBaseColCountJob \ + --hdfs \ + --app.conf.path scalding/ia_cluster.conf \ + --hbase-table wbgrp-journal-extract-0-{env} \ + --zookeeper-hosts {zookeeper_hosts} \ + --column {column} \ + --output {output}""".format( + column=args.column, + output=output, + zookeeper_hosts=ZOOKEEPER_HOSTS, + env=args.env) + subprocess.call(cmd, shell=True) + def main(): parser = argparse.ArgumentParser() @@ -174,6 +245,9 @@ def main(): sub_statuscount = subparsers.add_parser('status-count') sub_statuscount.set_defaults(func=run_statuscount) + sub_statuscodecount = subparsers.add_parser('status-code-count') + sub_statuscodecount.set_defaults(func=run_statuscodecount) + sub_matchcrossref = subparsers.add_parser('match-crossref') sub_matchcrossref.set_defaults(func=run_matchcrossref) sub_matchcrossref.add_argument('crossref_input', @@ -182,6 +256,13 @@ def main(): help="number of reducers to run", type=int, default=30) + sub_grobidscorabledump = subparsers.add_parser('grobid-scorable-dump') + sub_grobidscorabledump.set_defaults(func=run_grobidscorabledump) + + sub_colcount = subparsers.add_parser('col-count') + sub_colcount.set_defaults(func=run_colcount) + sub_colcount.add_argument('column', + help="column name to use in count") args = parser.parse_args() if not args.__dict__.get("func"): |