aboutsummaryrefslogtreecommitdiffstats
path: root/please
diff options
context:
space:
mode:
Diffstat (limited to 'please')
-rwxr-xr-xplease81
1 files changed, 81 insertions, 0 deletions
diff --git a/please b/please
index 1a992f2..b32dd79 100755
--- a/please
+++ b/please
@@ -95,6 +95,27 @@ def run_rowcount(args):
env=args.env)
subprocess.call(cmd, shell=True)
+def run_statuscodecount(args):
+ if args.rebuild:
+ rebuild_scalding()
+ print("Starting statuscodecount job...")
+ output = "{}/output-{}/{}-statuscodecount".format(
+ HDFS_DIR,
+ args.env,
+ datetime.strftime(datetime.now(), "%Y-%m-%d-%H%M.%S"))
+ cmd = """hadoop jar \
+ scalding/target/scala-2.11/sandcrawler-assembly-0.2.0-SNAPSHOT.jar \
+ com.twitter.scalding.Tool sandcrawler.HBaseStatusCodeCountJob \
+ --hdfs \
+ --app.conf.path scalding/ia_cluster.conf \
+ --hbase-table wbgrp-journal-extract-0-{env} \
+ --zookeeper-hosts {zookeeper_hosts} \
+ --output {output}""".format(
+ output=output,
+ zookeeper_hosts=ZOOKEEPER_HOSTS,
+ env=args.env)
+ subprocess.call(cmd, shell=True)
+
def run_statuscount(args):
if args.rebuild:
rebuild_scalding()
@@ -126,10 +147,15 @@ def run_matchcrossref(args):
datetime.strftime(datetime.now(), "%Y-%m-%d-%H%M.%S"))
# Notes: -D options must come after Tool but before class name
# https://github.com/twitter/scalding/wiki/Frequently-asked-questions#how-do-i-pass-parameters-to-my-hadoop-job-number-of-reducers--memory-options--etc-
+ # Compression: changed due to errors in production
+ # https://stackoverflow.com/a/11336820/4682349
cmd = """hadoop jar \
scalding/target/scala-2.11/sandcrawler-assembly-0.2.0-SNAPSHOT.jar \
com.twitter.scalding.Tool \
-Dmapred.reduce.tasks={reducers} \
+ -Dcascading.spill.list.threshold=500000 \
+ -D mapred.output.compress=false \
+ -Dmapred.compress.map.output=true\
sandcrawler.ScoreJob \
--hdfs \
--app.conf.path scalding/ia_cluster.conf \
@@ -144,6 +170,51 @@ def run_matchcrossref(args):
crossref_input=args.crossref_input)
subprocess.call(cmd, shell=True)
+def run_grobidscorabledump(args):
+ if args.rebuild:
+ rebuild_scalding()
+ print("Starting grobid-scorable-dump job...")
+ output = "{}/output-{}/{}-grobidscorabledump".format(
+ HDFS_DIR,
+ args.env,
+ datetime.strftime(datetime.now(), "%Y-%m-%d-%H%M.%S"))
+ cmd = """hadoop jar \
+ scalding/target/scala-2.11/sandcrawler-assembly-0.2.0-SNAPSHOT.jar \
+ com.twitter.scalding.Tool sandcrawler.GrobidScorableDumpJob \
+ --hdfs \
+ --app.conf.path scalding/ia_cluster.conf \
+ --hbase-table wbgrp-journal-extract-0-{env} \
+ --zookeeper-hosts {zookeeper_hosts} \
+ --output {output}""".format(
+ output=output,
+ zookeeper_hosts=ZOOKEEPER_HOSTS,
+ env=args.env)
+ subprocess.call(cmd, shell=True)
+
+def run_colcount(args):
+ if args.rebuild:
+ rebuild_scalding()
+ print("Starting colcount job...")
+ output = "{}/output-{}/{}-colcount-{}".format(
+ HDFS_DIR,
+ args.env,
+ datetime.strftime(datetime.now(), "%Y-%m-%d-%H%M.%S"),
+ args.column.replace(':', '_'))
+ cmd = """hadoop jar \
+ scalding/target/scala-2.11/sandcrawler-assembly-0.2.0-SNAPSHOT.jar \
+ com.twitter.scalding.Tool sandcrawler.HBaseColCountJob \
+ --hdfs \
+ --app.conf.path scalding/ia_cluster.conf \
+ --hbase-table wbgrp-journal-extract-0-{env} \
+ --zookeeper-hosts {zookeeper_hosts} \
+ --column {column} \
+ --output {output}""".format(
+ column=args.column,
+ output=output,
+ zookeeper_hosts=ZOOKEEPER_HOSTS,
+ env=args.env)
+ subprocess.call(cmd, shell=True)
+
def main():
parser = argparse.ArgumentParser()
@@ -174,6 +245,9 @@ def main():
sub_statuscount = subparsers.add_parser('status-count')
sub_statuscount.set_defaults(func=run_statuscount)
+ sub_statuscodecount = subparsers.add_parser('status-code-count')
+ sub_statuscodecount.set_defaults(func=run_statuscodecount)
+
sub_matchcrossref = subparsers.add_parser('match-crossref')
sub_matchcrossref.set_defaults(func=run_matchcrossref)
sub_matchcrossref.add_argument('crossref_input',
@@ -182,6 +256,13 @@ def main():
help="number of reducers to run",
type=int, default=30)
+ sub_grobidscorabledump = subparsers.add_parser('grobid-scorable-dump')
+ sub_grobidscorabledump.set_defaults(func=run_grobidscorabledump)
+
+ sub_colcount = subparsers.add_parser('col-count')
+ sub_colcount.set_defaults(func=run_colcount)
+ sub_colcount.add_argument('column',
+ help="column name to use in count")
args = parser.parse_args()
if not args.__dict__.get("func"):