diff options
| author | Bryan Newbold <bnewbold@archive.org> | 2018-08-21 17:11:40 -0700 | 
|---|---|---|
| committer | Bryan Newbold <bnewbold@archive.org> | 2018-08-21 21:32:58 -0700 | 
| commit | 01dcfcb9786889228b8f83d211351202bf3cf4fb (patch) | |
| tree | d6fcc8e7dcc41cc83879c23ba2cd34a1569349b4 /scalding/src/main | |
| parent | 24b35b230a0e2b45fd9607b0bf9f6090c4bfab52 (diff) | |
| download | sandcrawler-01dcfcb9786889228b8f83d211351202bf3cf4fb.tar.gz sandcrawler-01dcfcb9786889228b8f83d211351202bf3cf4fb.zip | |
rewrite MissingColumnDumpJob as a join (sigh)
Diffstat (limited to 'scalding/src/main')
| -rw-r--r-- | scalding/src/main/scala/sandcrawler/MissingColumnDumpJob.scala | 66 | 
1 files changed, 37 insertions, 29 deletions
| diff --git a/scalding/src/main/scala/sandcrawler/MissingColumnDumpJob.scala b/scalding/src/main/scala/sandcrawler/MissingColumnDumpJob.scala index de0228d..cc3bf23 100644 --- a/scalding/src/main/scala/sandcrawler/MissingColumnDumpJob.scala +++ b/scalding/src/main/scala/sandcrawler/MissingColumnDumpJob.scala @@ -5,55 +5,63 @@ import java.util.Properties  import cascading.property.AppProps  import cascading.tuple.Fields  import com.twitter.scalding._ -import org.apache.hadoop.hbase.util.Bytes -import org.apache.hadoop.hbase.client.Scan -import org.apache.hadoop.hbase.filter.SingleColumnValueFilter -import org.apache.hadoop.hbase.filter.CompareFilter +import com.twitter.scalding.typed.TDsl._  import parallelai.spyglass.base.JobBase  import parallelai.spyglass.hbase.HBaseConstants.SourceMode  import parallelai.spyglass.hbase.HBasePipeConversions -import parallelai.spyglass.hbase.HBaseRawSource  import parallelai.spyglass.hbase.HBaseSource +// This nasty, no-good, horrible Job outputs a list of keys ("sha1:A234...") +// for which the given "column" does not have a value set. +// It does this using a self-join because SpyGlass's HBase SCAN support seems +// to be extremely limited.  class MissingColumnDumpJob(args: Args) extends JobBase(args) with HBasePipeConversions {    val output = args("output") -  MissingColumnDumpJob.getHBaseSource( +  val allKeys : TypedPipe[String] = MissingColumnDumpJob.getHBaseKeySource( +    args("hbase-table"), +    args("zookeeper-hosts")) +    .read +    .fromBytesWritable('key) +    .toTypedPipe[String]('key) + +  val existingKeys : TypedPipe[(String,Boolean)] = MissingColumnDumpJob.getHBaseColSource(      args("hbase-table"),      args("zookeeper-hosts"),      args("column"))      .read      .fromBytesWritable('key) -    .write(Tsv(output)) +    .toTypedPipe[String]('key) +    .map{ key => (key, true) } + +  val missingKeys : TypedPipe[String] = allKeys +    .groupBy( identity ) +    .leftJoin(existingKeys.groupBy(_._1)) +    .toTypedPipe +    .collect { case (key, (_, None)) => key } + +  missingKeys +    .write(TypedTsv[String](output)) +  }  object MissingColumnDumpJob {    // eg, "wbgrp-journal-extract-0-qa",7 "mtrcs-zk1.us.archive.org:2181" -  def getHBaseSource(hbaseTable: String, zookeeperHosts: String, col: String) : HBaseSource = { - -    val colFamily = col.split(":")(0) -    val colColumn = col.split(":")(1) -    val scan = new Scan -    val filter = new SingleColumnValueFilter( -      Bytes.toBytes(colFamily), -      Bytes.toBytes(colColumn), -      CompareFilter.CompareOp.EQUAL, -      Bytes.toBytes("") -    ) -    filter.setFilterIfMissing(false) -    scan.setFilter(filter) -    val scanner = HBaseRawSource.convertScanToString(scan) -    val (families, fields) = HBaseBuilder.parseColSpecs(List("f:c", col)) - -    new HBaseRawSource( +  def getHBaseColSource(hbaseTable: String, zookeeperHosts: String, col: String) : HBaseSource = { +    HBaseBuilder.build( +      hbaseTable, +      zookeeperHosts, +      List(col), +      SourceMode.SCAN_ALL) +  } + +  def getHBaseKeySource(hbaseTable: String, zookeeperHosts: String) : HBaseSource = { +    HBaseBuilder.build(        hbaseTable,        zookeeperHosts, -      new Fields("key"), -      families, -      fields, -      SourceMode.SCAN_ALL, -      base64Scan = scanner) +      List("f:c"), +      SourceMode.SCAN_ALL)    }  } | 
