diff options
| author | bnewbold <bnewbold@archive.org> | 2018-06-04 22:05:54 +0000 | 
|---|---|---|
| committer | bnewbold <bnewbold@archive.org> | 2018-06-04 22:05:54 +0000 | 
| commit | b21cc924224202aba412370acafaaa4846bb4a8d (patch) | |
| tree | 3d0fcc2f2bee55eb038686c566bdab1763e65c74 | |
| parent | 70069a78c8798352b5bef815a3fd4aa9e9b52394 (diff) | |
| parent | 9a18da82738c16d3066e02e886ee84e04156889f (diff) | |
| download | sandcrawler-b21cc924224202aba412370acafaaa4846bb4a8d.tar.gz sandcrawler-b21cc924224202aba412370acafaaa4846bb4a8d.zip | |
Merge branch 'refactoring' into 'master'
Refactoring to add, use, and test class HBaseBuilder to eliminate duplicated code and facilitate HBaseSource creation
See merge request webgroup/sandcrawler!1
4 files changed, 101 insertions, 20 deletions
| diff --git a/scalding/src/main/scala/sandcrawler/HBaseBuilder.scala b/scalding/src/main/scala/sandcrawler/HBaseBuilder.scala new file mode 100644 index 0000000..87cc0cb --- /dev/null +++ b/scalding/src/main/scala/sandcrawler/HBaseBuilder.scala @@ -0,0 +1,46 @@ +package sandcrawler + +import cascading.tuple.Fields +import parallelai.spyglass.hbase.HBaseConstants.SourceMode +import parallelai.spyglass.hbase.HBaseSource +import scala._ + +object HBaseBuilder { +  // map from column families to column names +  val schema = Map("f" -> List("c"), +    "file" -> List("size", "mime", "cdx"), +    "grobid0" -> List("status_code", "quality", "status", "tei_xml", "tei_json", "metadata"), +    "match0" -> List("status", "doi", "info")) +  val inverseSchema = {for ((k,vs) <- schema; v <-vs) yield (k + ":" + v)}.toList + +  // The argument should be a comma-separated list of family:column, such as "f:c, file:size". +  @throws(classOf[IllegalArgumentException]) +  def parseColSpec(colSpecs: List[String]) : (List[String], List[Fields]) = { +    // Verify that all column specifiers are legal. +    for (colSpec <- colSpecs) { +      if (!(inverseSchema contains colSpec)) { +        throw new IllegalArgumentException("No such column: " + colSpec) +      } +      val pair = colSpec split(":") +      if (colSpec.split(":").length != 2) { +        throw new IllegalArgumentException("Bad column specifier " + colSpec +  +          " (specifiers should be family:name)") +      } +    } + +    // Produce and return a tuple containing: +    // 1. A list of column families. +    // 2. A corresponding list of Fields, each containing column names. +    val groupMap: Map[String, List[String]] = colSpecs.groupBy(c => (c split ":")(0)) +    val families = groupMap.keys.toList +    val groupedColNames : List[List[String]] = families map {fam => { +        val cols = {groupMap(fam).map(v => v.split(":")(1))} +        cols}} +    (families, groupedColNames.map({fields => new Fields(fields : _*)})) +  } + +  def build(table: String, server: String, colSpec: List[String], sourceMode: SourceMode, keyList: List[String] = List("key")) = { +    val (families, fields) = parseColSpec(colSpec) +    new HBaseSource(table, server, new Fields("key"), families, fields, sourceMode = sourceMode, keyList = keyList) +  } +} diff --git a/scalding/src/main/scala/sandcrawler/HBaseRowCountJob.scala b/scalding/src/main/scala/sandcrawler/HBaseRowCountJob.scala index d47fe60..9d074ab 100644 --- a/scalding/src/main/scala/sandcrawler/HBaseRowCountJob.scala +++ b/scalding/src/main/scala/sandcrawler/HBaseRowCountJob.scala @@ -16,17 +16,17 @@ class HBaseRowCountJob(args: Args) extends JobBase(args) with HBasePipeConversio    val output = args("output") -  val hbs = new HBaseSource( -    //"table_name", -    //"quorum_name:2181", -    "wbgrp-journal-extract-0-qa",     // HBase Table Name -    "mtrcs-zk1.us.archive.org:2181",  // HBase Zookeeper server (to get runtime config info; can be array?) -    new Fields("key"), -    List("file"), -    List(new Fields("size", "mimetype")), -    sourceMode = SourceMode.SCAN_ALL) +  HBaseRowCountJob.getHBaseSource      .read      .debug      .groupAll { _.size('count) }      .write(Tsv(output))  } + +object HBaseRowCountJob { +  def getHBaseSource = HBaseBuilder.build( +    "wbgrp-journal-extract-0-qa",     // HBase Table Name +    "mtrcs-zk1.us.archive.org:2181",  // HBase Zookeeper server (to get runtime config info; can be array?) +    List("file:size", "file:mime"), +    SourceMode.SCAN_ALL) +} diff --git a/scalding/src/test/scala/sandcrawler/HBaseBuilderTest.scala b/scalding/src/test/scala/sandcrawler/HBaseBuilderTest.scala new file mode 100644 index 0000000..b45751d --- /dev/null +++ b/scalding/src/test/scala/sandcrawler/HBaseBuilderTest.scala @@ -0,0 +1,43 @@ +package example + +import cascading.tuple.Fields +import org.scalatest._ +import sandcrawler.HBaseBuilder + +class HBaseBuilderTest extends FlatSpec with Matchers { +  "parseColSpec()" should "work on legal nontrivial input" in { +    val (fams, fields) = HBaseBuilder.parseColSpec(List("file:size", "file:cdx", "match0:status")) +    fams should have length 2 +    fields should have length 2 +    val fileIndex = fams.indexOf("file") +    fileIndex should not be -1 +    fields(fileIndex) should be (new Fields("size", "cdx")) +    val match0Index = fams.indexOf("match0") +    match0Index should not be -1 +    fields(match0Index) should be (new Fields("status")) +  } + +  it should "work on empty input" in { +    val (fams, fields) = HBaseBuilder.parseColSpec(List()) +    fams should have length 0 +    fields should have length 0 +  } + +  it should "throw IllegalArgumentException on malformed input" in { +    a [IllegalArgumentException] should be thrownBy { +      HBaseBuilder.parseColSpec(List("file_size")) +    } +  } + +  it should "throw IllegalArgumentException on nonexistent family" in { +    a [IllegalArgumentException] should be thrownBy { +      HBaseBuilder.parseColSpec(List("foo:bar")) +    } +  } + +  it should "throw IllegalArgumentException on nonexistent column" in { +    a [IllegalArgumentException] should be thrownBy { +      HBaseBuilder.parseColSpec(List("file:bar")) +    } +  } +} diff --git a/scalding/src/test/scala/sandcrawler/HBaseRowCountTest.scala b/scalding/src/test/scala/sandcrawler/HBaseRowCountTest.scala index 94b3740..abb017c 100644 --- a/scalding/src/test/scala/sandcrawler/HBaseRowCountTest.scala +++ b/scalding/src/test/scala/sandcrawler/HBaseRowCountTest.scala @@ -12,6 +12,7 @@ import scala._  import com.twitter.scalding.Tsv  import parallelai.spyglass.hbase.HBaseSource  import parallelai.spyglass.hbase.HBaseConstants.SourceMode +import sandcrawler.HBaseRowCountJob  /**   * Example of how to define tests for HBaseSource @@ -39,18 +40,9 @@ class HBaseRowCountTest extends FunSpec with TupleConversions {      .arg("app.conf.path", "app.conf")      .arg("output", output)      .arg("debug", "true") -    .source[Tuple]( -    new HBaseSource( -      //"table_name", -      //"quorum_name:2181", -      "wbgrp-journal-extract-0-qa", -      "mtrcs-zk1.us.archive.org:2181", -      new Fields("key"), -      List("file"), -      List(new Fields("size", "mimetype")), -      sourceMode = SourceMode.SCAN_ALL), +    .source[Tuple](HBaseRowCountJob.getHBaseSource,        sampleData.map(l => new Tuple(l.map(s => {new ImmutableBytesWritable(Bytes.toBytes(s))}):_*))) -    .sink[Tuple](Tsv(output)) { +      .sink[Tuple](Tsv(output)) {        outputBuffer =>          it("should return the test data provided.") { | 
