aboutsummaryrefslogtreecommitdiffstats
path: root/scalding
diff options
context:
space:
mode:
authorBryan Newbold <bnewbold@archive.org>2018-08-21 14:36:50 -0700
committerBryan Newbold <bnewbold@archive.org>2018-08-21 21:32:58 -0700
commit24b35b230a0e2b45fd9607b0bf9f6090c4bfab52 (patch)
tree5ff58db1a29a9dd15c3d433976f29ef53090035e /scalding
parent9c2324d1b32116dc3a8a0f43c08ee7dac1bdf5f4 (diff)
downloadsandcrawler-24b35b230a0e2b45fd9607b0bf9f6090c4bfab52.tar.gz
sandcrawler-24b35b230a0e2b45fd9607b0bf9f6090c4bfab52.zip
WIP: MissingColumnDumpJob
This version won't actually work because SpyGlass doesn't actually support SCAN filters (among other things)
Diffstat (limited to 'scalding')
-rw-r--r--scalding/src/main/scala/sandcrawler/MissingColumnDumpJob.scala59
1 files changed, 59 insertions, 0 deletions
diff --git a/scalding/src/main/scala/sandcrawler/MissingColumnDumpJob.scala b/scalding/src/main/scala/sandcrawler/MissingColumnDumpJob.scala
new file mode 100644
index 0000000..de0228d
--- /dev/null
+++ b/scalding/src/main/scala/sandcrawler/MissingColumnDumpJob.scala
@@ -0,0 +1,59 @@
+package sandcrawler
+
+import java.util.Properties
+
+import cascading.property.AppProps
+import cascading.tuple.Fields
+import com.twitter.scalding._
+import org.apache.hadoop.hbase.util.Bytes
+import org.apache.hadoop.hbase.client.Scan
+import org.apache.hadoop.hbase.filter.SingleColumnValueFilter
+import org.apache.hadoop.hbase.filter.CompareFilter
+import parallelai.spyglass.base.JobBase
+import parallelai.spyglass.hbase.HBaseConstants.SourceMode
+import parallelai.spyglass.hbase.HBasePipeConversions
+import parallelai.spyglass.hbase.HBaseRawSource
+import parallelai.spyglass.hbase.HBaseSource
+
+class MissingColumnDumpJob(args: Args) extends JobBase(args) with HBasePipeConversions {
+
+ val output = args("output")
+
+ MissingColumnDumpJob.getHBaseSource(
+ args("hbase-table"),
+ args("zookeeper-hosts"),
+ args("column"))
+ .read
+ .fromBytesWritable('key)
+ .write(Tsv(output))
+}
+
+object MissingColumnDumpJob {
+
+ // eg, "wbgrp-journal-extract-0-qa",7 "mtrcs-zk1.us.archive.org:2181"
+ def getHBaseSource(hbaseTable: String, zookeeperHosts: String, col: String) : HBaseSource = {
+
+ val colFamily = col.split(":")(0)
+ val colColumn = col.split(":")(1)
+ val scan = new Scan
+ val filter = new SingleColumnValueFilter(
+ Bytes.toBytes(colFamily),
+ Bytes.toBytes(colColumn),
+ CompareFilter.CompareOp.EQUAL,
+ Bytes.toBytes("")
+ )
+ filter.setFilterIfMissing(false)
+ scan.setFilter(filter)
+ val scanner = HBaseRawSource.convertScanToString(scan)
+ val (families, fields) = HBaseBuilder.parseColSpecs(List("f:c", col))
+
+ new HBaseRawSource(
+ hbaseTable,
+ zookeeperHosts,
+ new Fields("key"),
+ families,
+ fields,
+ SourceMode.SCAN_ALL,
+ base64Scan = scanner)
+ }
+}