diff options
Diffstat (limited to 'scalding/src')
4 files changed, 111 insertions, 10 deletions
diff --git a/scalding/src/main/scala/sandcrawler/HBaseStatusCodeCountJob.scala b/scalding/src/main/scala/sandcrawler/HBaseStatusCodeCountJob.scala new file mode 100644 index 0000000..4d9880f --- /dev/null +++ b/scalding/src/main/scala/sandcrawler/HBaseStatusCodeCountJob.scala @@ -0,0 +1,32 @@ +package sandcrawler + +import java.util.Properties + +import cascading.property.AppProps +import cascading.tuple.Fields +import com.twitter.scalding._ +import com.twitter.scalding.typed.TDsl._ +import org.apache.hadoop.hbase.io.ImmutableBytesWritable +import org.apache.hadoop.hbase.util.Bytes +import parallelai.spyglass.base.JobBase +import parallelai.spyglass.hbase.HBaseConstants.SourceMode +import parallelai.spyglass.hbase.HBasePipeConversions +import parallelai.spyglass.hbase.HBaseSource + +class HBaseStatusCodeCountJob(args: Args) extends JobBase(args) with HBasePipeConversions { + + val source = HBaseCountJob.getHBaseSource( + args("hbase-table"), + args("zookeeper-hosts"), + "grobid0:status_code") + + val statusPipe : TypedPipe[Long] = source + .read + .toTypedPipe[(ImmutableBytesWritable,ImmutableBytesWritable)]('key, 'status_code) + .map { case (key, raw_code) => Bytes.toLong(raw_code.copyBytes()) } + + statusPipe.groupBy { identity } + .size + .debug + .write(TypedTsv[(Long,Long)](args("output"))) +} diff --git a/scalding/src/main/scala/sandcrawler/HBaseStatusCountJob.scala b/scalding/src/main/scala/sandcrawler/HBaseStatusCountJob.scala index fd0b4e2..1635e03 100644 --- a/scalding/src/main/scala/sandcrawler/HBaseStatusCountJob.scala +++ b/scalding/src/main/scala/sandcrawler/HBaseStatusCountJob.scala @@ -18,15 +18,15 @@ class HBaseStatusCountJob(args: Args) extends JobBase(args) with HBasePipeConver val source = HBaseCountJob.getHBaseSource( args("hbase-table"), args("zookeeper-hosts"), - "grobid0:status_code") + "grobid0:status") val statusPipe : TypedPipe[Long] = source .read - .toTypedPipe[(ImmutableBytesWritable,ImmutableBytesWritable)]('key, 'status_code) - .map { case (key, raw_code) => Bytes.toLong(raw_code.copyBytes()) } + .toTypedPipe[(ImmutableBytesWritable,ImmutableBytesWritable)]('key, 'status) + .map { case (key, raw_status) => Bytes.toString(raw_code.copyBytes()) } statusPipe.groupBy { identity } .size .debug - .write(TypedTsv[(Long,Long)](args("output"))) + .write(TypedTsv[(Long,String)](args("output"))) } diff --git a/scalding/src/test/scala/sandcrawler/HBaseStatusCodeCountTest.scala b/scalding/src/test/scala/sandcrawler/HBaseStatusCodeCountTest.scala new file mode 100644 index 0000000..3291670 --- /dev/null +++ b/scalding/src/test/scala/sandcrawler/HBaseStatusCodeCountTest.scala @@ -0,0 +1,71 @@ +package sandcrawler + +import cascading.tuple.Fields +import cascading.tuple.Tuple +import com.twitter.scalding.JobTest +import com.twitter.scalding.Tsv +import com.twitter.scalding.TupleConversions +import com.twitter.scalding.TypedTsv +import org.apache.hadoop.hbase.io.ImmutableBytesWritable +import org.apache.hadoop.hbase.util.Bytes +import org.junit.runner.RunWith +import org.scalatest.FunSpec +import org.scalatest.junit.JUnitRunner +import org.slf4j.LoggerFactory +import parallelai.spyglass.hbase.HBaseConstants.SourceMode +import parallelai.spyglass.hbase.HBaseSource +import scala._ + +@RunWith(classOf[JUnitRunner]) +class HBaseStatusCountTest extends FunSpec with TupleConversions { + + val output = "/tmp/testOutput" + val (testTable, testHost) = ("test-table", "dummy-host:2181") + + val log = LoggerFactory.getLogger(this.getClass.getName) + + val statusType1 : Long = 200 + val statusType2 : Long = 404 + val statusType1Bytes = Bytes.toBytes(statusType1) + val statusType2Bytes = Bytes.toBytes(statusType2) + + // TODO(bnewbold): now to express a null (empty value) in this list? + val sampleData : List[List[Array[Byte]]] = List( + ("sha1:K2DKSSVTXWPRMFDTWSTCQW3RVWRIOV3Q", statusType1Bytes), + ("sha1:C3YNNEGH5WAG5ZAAXWAEBNXJWT6CZ3WU", statusType1Bytes), + ("sha1:SDKUVHC3YNNEGH5WAG5ZAAXWAEBNX4WT", statusType2Bytes), + ("sha1:35985C3YNNEGH5WAG5ZAAXWAEBNXJW56", statusType2Bytes), + ("sha1:885C3YNNEGH5WAG5ZAAXWA8BNXJWT6CZ", statusType2Bytes), + ("sha1:00904C3YNNEGH5WAG5ZA9XWAEBNXJWT6", statusType2Bytes), + ("sha1:249C3YNNEGH5WAG5ZAAXWAEBNXJWT6CZ", statusType1Bytes), + ("sha1:095893C3YNNEGH5WAG5ZAAXWAEBNXJWT", statusType2Bytes)) + .map(pair => List(Bytes.toBytes(pair._1), pair._2)) + + val statusType1Count = sampleData.count(lst => lst(1) == statusType1Bytes) + val statusType2Count = sampleData.count(lst => lst(1) == statusType2Bytes) + + JobTest("sandcrawler.HBaseStatusCountJob") + .arg("test", "") + .arg("app.conf.path", "app.conf") + .arg("output", output) + .arg("hbase-table", testTable) + .arg("zookeeper-hosts", testHost) + .arg("debug", "true") + .source[Tuple](HBaseCountJob.getHBaseSource(testTable, testHost, "grobid0:status_code"), + sampleData.map(l => new Tuple(l.map(s => {new ImmutableBytesWritable(s)}):_*))) + .sink[Tuple](TypedTsv[(Long, Long)](output)) { + outputBuffer => + it("should return a 2-element list.") { + assert(outputBuffer.size === 2) + } + + // Convert List[Tuple] to Map[Long, Long]. + val counts = outputBuffer.map(t => (t.getLong(0), t.getLong(1))).toMap + it("should have the appropriate number of each status type") { + assert(counts(statusType1) == statusType1Count) + assert(counts(statusType2) == statusType2Count) + } + } + .run + .finish +} diff --git a/scalding/src/test/scala/sandcrawler/HBaseStatusCountTest.scala b/scalding/src/test/scala/sandcrawler/HBaseStatusCountTest.scala index 3291670..b53abeb 100644 --- a/scalding/src/test/scala/sandcrawler/HBaseStatusCountTest.scala +++ b/scalding/src/test/scala/sandcrawler/HBaseStatusCountTest.scala @@ -17,17 +17,15 @@ import parallelai.spyglass.hbase.HBaseSource import scala._ @RunWith(classOf[JUnitRunner]) -class HBaseStatusCountTest extends FunSpec with TupleConversions { +class HBaseStatusCodeCountTest extends FunSpec with TupleConversions { val output = "/tmp/testOutput" val (testTable, testHost) = ("test-table", "dummy-host:2181") val log = LoggerFactory.getLogger(this.getClass.getName) - val statusType1 : Long = 200 - val statusType2 : Long = 404 - val statusType1Bytes = Bytes.toBytes(statusType1) - val statusType2Bytes = Bytes.toBytes(statusType2) + val statusType1Bytes = Bytes.toBytes("""{"status": "success"}""") + val statusType2Bytes = Bytes.toBytes("""{"status": "partial"}""") // TODO(bnewbold): now to express a null (empty value) in this list? val sampleData : List[List[Array[Byte]]] = List( @@ -44,7 +42,7 @@ class HBaseStatusCountTest extends FunSpec with TupleConversions { val statusType1Count = sampleData.count(lst => lst(1) == statusType1Bytes) val statusType2Count = sampleData.count(lst => lst(1) == statusType2Bytes) - JobTest("sandcrawler.HBaseStatusCountJob") + JobTest("sandcrawler.HBaseStatusCodeCountJob") .arg("test", "") .arg("app.conf.path", "app.conf") .arg("output", output) |