diff options
authorbnewbold <bnewbold@archive.org>2018-06-04 22:05:54 +0000
committerbnewbold <bnewbold@archive.org>2018-06-04 22:05:54 +0000
commitb21cc924224202aba412370acafaaa4846bb4a8d (patch)
parent70069a78c8798352b5bef815a3fd4aa9e9b52394 (diff)
parent9a18da82738c16d3066e02e886ee84e04156889f (diff)
Merge branch 'refactoring' into 'master'
Refactoring to add, use, and test class HBaseBuilder to eliminate duplicated code and facilitate HBaseSource creation See merge request webgroup/sandcrawler!1
4 files changed, 101 insertions, 20 deletions
diff --git a/scalding/src/main/scala/sandcrawler/HBaseBuilder.scala b/scalding/src/main/scala/sandcrawler/HBaseBuilder.scala
new file mode 100644
index 0000000..87cc0cb
--- /dev/null
+++ b/scalding/src/main/scala/sandcrawler/HBaseBuilder.scala
@@ -0,0 +1,46 @@
+package sandcrawler
+import cascading.tuple.Fields
+import parallelai.spyglass.hbase.HBaseConstants.SourceMode
+import parallelai.spyglass.hbase.HBaseSource
+import scala._
+object HBaseBuilder {
+ // map from column families to column names
+ val schema = Map("f" -> List("c"),
+ "file" -> List("size", "mime", "cdx"),
+ "grobid0" -> List("status_code", "quality", "status", "tei_xml", "tei_json", "metadata"),
+ "match0" -> List("status", "doi", "info"))
+ val inverseSchema = {for ((k,vs) <- schema; v <-vs) yield (k + ":" + v)}.toList
+ // The argument should be a comma-separated list of family:column, such as "f:c, file:size".
+ @throws(classOf[IllegalArgumentException])
+ def parseColSpec(colSpecs: List[String]) : (List[String], List[Fields]) = {
+ // Verify that all column specifiers are legal.
+ for (colSpec <- colSpecs) {
+ if (!(inverseSchema contains colSpec)) {
+ throw new IllegalArgumentException("No such column: " + colSpec)
+ }
+ val pair = colSpec split(":")
+ if (colSpec.split(":").length != 2) {
+ throw new IllegalArgumentException("Bad column specifier " + colSpec +
+ " (specifiers should be family:name)")
+ }
+ }
+ // Produce and return a tuple containing:
+ // 1. A list of column families.
+ // 2. A corresponding list of Fields, each containing column names.
+ val groupMap: Map[String, List[String]] = colSpecs.groupBy(c => (c split ":")(0))
+ val families = groupMap.keys.toList
+ val groupedColNames : List[List[String]] = families map {fam => {
+ val cols = {groupMap(fam).map(v => v.split(":")(1))}
+ cols}}
+ (families, groupedColNames.map({fields => new Fields(fields : _*)}))
+ }
+ def build(table: String, server: String, colSpec: List[String], sourceMode: SourceMode, keyList: List[String] = List("key")) = {
+ val (families, fields) = parseColSpec(colSpec)
+ new HBaseSource(table, server, new Fields("key"), families, fields, sourceMode = sourceMode, keyList = keyList)
+ }
diff --git a/scalding/src/main/scala/sandcrawler/HBaseRowCountJob.scala b/scalding/src/main/scala/sandcrawler/HBaseRowCountJob.scala
index d47fe60..9d074ab 100644
--- a/scalding/src/main/scala/sandcrawler/HBaseRowCountJob.scala
+++ b/scalding/src/main/scala/sandcrawler/HBaseRowCountJob.scala
@@ -16,17 +16,17 @@ class HBaseRowCountJob(args: Args) extends JobBase(args) with HBasePipeConversio
val output = args("output")
- val hbs = new HBaseSource(
- //"table_name",
- //"quorum_name:2181",
- "wbgrp-journal-extract-0-qa", // HBase Table Name
- "mtrcs-zk1.us.archive.org:2181", // HBase Zookeeper server (to get runtime config info; can be array?)
- new Fields("key"),
- List("file"),
- List(new Fields("size", "mimetype")),
- sourceMode = SourceMode.SCAN_ALL)
+ HBaseRowCountJob.getHBaseSource
.groupAll { _.size('count) }
+object HBaseRowCountJob {
+ def getHBaseSource = HBaseBuilder.build(
+ "wbgrp-journal-extract-0-qa", // HBase Table Name
+ "mtrcs-zk1.us.archive.org:2181", // HBase Zookeeper server (to get runtime config info; can be array?)
+ List("file:size", "file:mime"),
+ SourceMode.SCAN_ALL)
diff --git a/scalding/src/test/scala/sandcrawler/HBaseBuilderTest.scala b/scalding/src/test/scala/sandcrawler/HBaseBuilderTest.scala
new file mode 100644
index 0000000..b45751d
--- /dev/null
+++ b/scalding/src/test/scala/sandcrawler/HBaseBuilderTest.scala
@@ -0,0 +1,43 @@
+package example
+import cascading.tuple.Fields
+import org.scalatest._
+import sandcrawler.HBaseBuilder
+class HBaseBuilderTest extends FlatSpec with Matchers {
+ "parseColSpec()" should "work on legal nontrivial input" in {
+ val (fams, fields) = HBaseBuilder.parseColSpec(List("file:size", "file:cdx", "match0:status"))
+ fams should have length 2
+ fields should have length 2
+ val fileIndex = fams.indexOf("file")
+ fileIndex should not be -1
+ fields(fileIndex) should be (new Fields("size", "cdx"))
+ val match0Index = fams.indexOf("match0")
+ match0Index should not be -1
+ fields(match0Index) should be (new Fields("status"))
+ }
+ it should "work on empty input" in {
+ val (fams, fields) = HBaseBuilder.parseColSpec(List())
+ fams should have length 0
+ fields should have length 0
+ }
+ it should "throw IllegalArgumentException on malformed input" in {
+ a [IllegalArgumentException] should be thrownBy {
+ HBaseBuilder.parseColSpec(List("file_size"))
+ }
+ }
+ it should "throw IllegalArgumentException on nonexistent family" in {
+ a [IllegalArgumentException] should be thrownBy {
+ HBaseBuilder.parseColSpec(List("foo:bar"))
+ }
+ }
+ it should "throw IllegalArgumentException on nonexistent column" in {
+ a [IllegalArgumentException] should be thrownBy {
+ HBaseBuilder.parseColSpec(List("file:bar"))
+ }
+ }
diff --git a/scalding/src/test/scala/sandcrawler/HBaseRowCountTest.scala b/scalding/src/test/scala/sandcrawler/HBaseRowCountTest.scala
index 94b3740..abb017c 100644
--- a/scalding/src/test/scala/sandcrawler/HBaseRowCountTest.scala
+++ b/scalding/src/test/scala/sandcrawler/HBaseRowCountTest.scala
@@ -12,6 +12,7 @@ import scala._
import com.twitter.scalding.Tsv
import parallelai.spyglass.hbase.HBaseSource
import parallelai.spyglass.hbase.HBaseConstants.SourceMode
+import sandcrawler.HBaseRowCountJob
* Example of how to define tests for HBaseSource
@@ -39,18 +40,9 @@ class HBaseRowCountTest extends FunSpec with TupleConversions {
.arg("app.conf.path", "app.conf")
.arg("output", output)
.arg("debug", "true")
- .source[Tuple](
- new HBaseSource(
- //"table_name",
- //"quorum_name:2181",
- "wbgrp-journal-extract-0-qa",
- "mtrcs-zk1.us.archive.org:2181",
- new Fields("key"),
- List("file"),
- List(new Fields("size", "mimetype")),
- sourceMode = SourceMode.SCAN_ALL),
+ .source[Tuple](HBaseRowCountJob.getHBaseSource,
sampleData.map(l => new Tuple(l.map(s => {new ImmutableBytesWritable(Bytes.toBytes(s))}):_*)))
- .sink[Tuple](Tsv(output)) {
+ .sink[Tuple](Tsv(output)) {
outputBuffer =>
it("should return the test data provided.") {