aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rwxr-xr-xplease24
-rw-r--r--scalding/src/main/scala/sandcrawler/DumpGrobidStatusCodeJob.scala34
2 files changed, 58 insertions, 0 deletions
diff --git a/please b/please
index f8e103d..8fb7f19 100755
--- a/please
+++ b/please
@@ -251,6 +251,27 @@ def run_dumpfilemeta(args):
env=args.env)
subprocess.call(cmd, shell=True)
+def run_dumpgrobidstatuscode(args):
+ if args.rebuild:
+ rebuild_scalding()
+ print("Starting dumpgrobidstatuscode job...")
+ output = "{}/output-{}/{}-dumpgrobidstatuscode".format(
+ HDFS_DIR,
+ args.env,
+ datetime.strftime(datetime.now(), "%Y-%m-%d-%H%M.%S"))
+ cmd = """hadoop jar \
+ scalding/target/scala-2.11/sandcrawler-assembly-0.2.0-SNAPSHOT.jar \
+ com.twitter.scalding.Tool sandcrawler.DumpGrobidStatusCodeJob \
+ --hdfs \
+ --app.conf.path scalding/ia_cluster.conf \
+ --hbase-table wbgrp-journal-extract-0-{env} \
+ --zookeeper-hosts {zookeeper_hosts} \
+ --output {output}""".format(
+ output=output,
+ zookeeper_hosts=ZOOKEEPER_HOSTS,
+ env=args.env)
+ subprocess.call(cmd, shell=True)
+
def run_dumpgrobidmetainsertable(args):
if args.rebuild:
rebuild_scalding()
@@ -435,6 +456,9 @@ def main():
sub_dumpfilemeta = subparsers.add_parser('dump-file-meta')
sub_dumpfilemeta.set_defaults(func=run_dumpfilemeta)
+ sub_dumpgrobidstatuscode = subparsers.add_parser('dump-grobid-status-code')
+ sub_dumpgrobidstatuscode.set_defaults(func=run_dumpgrobidstatuscode)
+
sub_dumpgrobidmetainsertable = subparsers.add_parser('dump-grobid-meta-insertable')
sub_dumpgrobidmetainsertable.set_defaults(func=run_dumpgrobidmetainsertable)
diff --git a/scalding/src/main/scala/sandcrawler/DumpGrobidStatusCodeJob.scala b/scalding/src/main/scala/sandcrawler/DumpGrobidStatusCodeJob.scala
new file mode 100644
index 0000000..42b3464
--- /dev/null
+++ b/scalding/src/main/scala/sandcrawler/DumpGrobidStatusCodeJob.scala
@@ -0,0 +1,34 @@
+package sandcrawler
+
+import java.util.Properties
+
+import cascading.property.AppProps
+import cascading.tuple.Fields
+import com.twitter.scalding._
+import com.twitter.scalding.typed.TDsl._
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable
+import org.apache.hadoop.hbase.util.Bytes
+import parallelai.spyglass.base.JobBase
+import parallelai.spyglass.hbase.HBaseConstants.SourceMode
+import parallelai.spyglass.hbase.HBasePipeConversions
+import parallelai.spyglass.hbase.HBaseSource
+
+// Dumps status code for each GROBID-processed file. Good for crawl/corpus
+// analytics, if we consider GROBID status a rough "is this a paper" metric.
+class DumpGrobidStatusCodeJob(args: Args) extends JobBase(args) with HBasePipeConversions {
+
+ val metaPipe : TypedPipe[(String, Long)] = HBaseBuilder.build(args("hbase-table"),
+ args("zookeeper-hosts"),
+ List("grobid0:status_code"),
+ SourceMode.SCAN_ALL)
+ .read
+ .toTypedPipe[(ImmutableBytesWritable,ImmutableBytesWritable)](new Fields("key", "status_code"))
+ .filter { case (_, status_code) => status_code != null }
+ .map { case (key, status_code) =>
+ (Bytes.toString(key.copyBytes()),
+ Bytes.toLong(status_code.copyBytes()))
+ };
+
+ metaPipe.write(TypedTsv[(String,Long)](args("output")))
+
+}