diff options
-rw-r--r-- | scalding/src/main/scala/sandcrawler/HBaseBuilder.scala | 49 | ||||
-rw-r--r-- | scalding/src/test/scala/sandcrawler/HBaseBuilderTest.scala | 43 |
2 files changed, 92 insertions, 0 deletions
diff --git a/scalding/src/main/scala/sandcrawler/HBaseBuilder.scala b/scalding/src/main/scala/sandcrawler/HBaseBuilder.scala new file mode 100644 index 0000000..c55aef6 --- /dev/null +++ b/scalding/src/main/scala/sandcrawler/HBaseBuilder.scala @@ -0,0 +1,49 @@ +package sandcrawler + +import cascading.tuple.Fields +import parallelai.spyglass.hbase.HBaseConstants.SourceMode +import parallelai.spyglass.hbase.HBaseSource +import scala._ + +object HBaseBuilder { + // map from column families to column names + val schema = Map("f" -> List("c"), + "file" -> List("size", "mime", "cdx"), + "grobid0" -> List("status_code", "quality", "status", "tei_xml", "tei_json", "metadata"), + "match0" -> List("status", "doi", "info")) + // map from colFamily:colName -> colFamily + // Code from https://stackoverflow.com/a/50595189/6310511 + val inverseSchema = for ((k, vs) <- schema; v <- vs) yield (k + ":" + v, k) + + @throws(classOf[IllegalArgumentException]) + def parseColSpec(colSpecs: List[String]) : (List[String], List[Fields]) = { + // Verify that all column specifiers are legal. + for (colSpec <- colSpecs) { + if (!(inverseSchema contains colSpec)) { + throw new IllegalArgumentException("No such column: " + colSpec) + } + val pair = colSpec split(":") + if (colSpec.split(":").length != 2) { + throw new IllegalArgumentException("Bad column specifier " + colSpec + + " (specifiers should be family:name)") + } + } + + // Produce and return a tuple containing: + // 1. A list of column families. + // 2. A corresponding list of Fields, each containing column names. + val groupMap: Map[String, List[String]] = colSpecs.groupBy(c => (c split ":")(0)) + val families = groupMap.keys.toList + val groupedColNames : List[List[String]] = families map {fam => { + val cols = {groupMap(fam).map(v => v.split(":")(1))} + cols}} + (families, groupedColNames.map({fields => new Fields(fields : _*)})) + } + + /* + def build(table: String, server: String, colSpec: List[String], sourceMode: SourceMode, keyList: List[String]) { + val (families: List[String], fields: List[Fields]) = parseColSpec(colSpec) + new HBaseSource(table, server, new Fields("key"), families, fields, sourceMode = sourceMode, keyList = keyList) + } + */ +} diff --git a/scalding/src/test/scala/sandcrawler/HBaseBuilderTest.scala b/scalding/src/test/scala/sandcrawler/HBaseBuilderTest.scala new file mode 100644 index 0000000..b45751d --- /dev/null +++ b/scalding/src/test/scala/sandcrawler/HBaseBuilderTest.scala @@ -0,0 +1,43 @@ +package example + +import cascading.tuple.Fields +import org.scalatest._ +import sandcrawler.HBaseBuilder + +class HBaseBuilderTest extends FlatSpec with Matchers { + "parseColSpec()" should "work on legal nontrivial input" in { + val (fams, fields) = HBaseBuilder.parseColSpec(List("file:size", "file:cdx", "match0:status")) + fams should have length 2 + fields should have length 2 + val fileIndex = fams.indexOf("file") + fileIndex should not be -1 + fields(fileIndex) should be (new Fields("size", "cdx")) + val match0Index = fams.indexOf("match0") + match0Index should not be -1 + fields(match0Index) should be (new Fields("status")) + } + + it should "work on empty input" in { + val (fams, fields) = HBaseBuilder.parseColSpec(List()) + fams should have length 0 + fields should have length 0 + } + + it should "throw IllegalArgumentException on malformed input" in { + a [IllegalArgumentException] should be thrownBy { + HBaseBuilder.parseColSpec(List("file_size")) + } + } + + it should "throw IllegalArgumentException on nonexistent family" in { + a [IllegalArgumentException] should be thrownBy { + HBaseBuilder.parseColSpec(List("foo:bar")) + } + } + + it should "throw IllegalArgumentException on nonexistent column" in { + a [IllegalArgumentException] should be thrownBy { + HBaseBuilder.parseColSpec(List("file:bar")) + } + } +} |