aboutsummaryrefslogtreecommitdiffstats
path: root/scalding/src
diff options
context:
space:
mode:
Diffstat (limited to 'scalding/src')
-rw-r--r--scalding/src/main/scala/sandcrawler/HBaseStatusCountJob.scala30
-rw-r--r--scalding/src/test/scala/sandcrawler/HBaseStatusCountTest.scala36
2 files changed, 48 insertions, 18 deletions
diff --git a/scalding/src/main/scala/sandcrawler/HBaseStatusCountJob.scala b/scalding/src/main/scala/sandcrawler/HBaseStatusCountJob.scala
index aabf9f8..375d155 100644
--- a/scalding/src/main/scala/sandcrawler/HBaseStatusCountJob.scala
+++ b/scalding/src/main/scala/sandcrawler/HBaseStatusCountJob.scala
@@ -1,5 +1,33 @@
package sandcrawler
+import cascading.property.AppProps
+import cascading.tuple.Fields
+import com.twitter.scalding._
+import com.twitter.scalding.typed.TDsl._
+import java.util.Properties
+import parallelai.spyglass.base.JobBase
+import org.apache.hadoop.hbase.util.Bytes
+import parallelai.spyglass.hbase.{HBaseSource, HBasePipeConversions}
+import parallelai.spyglass.hbase.HBaseConstants.SourceMode
import com.twitter.scalding.Args
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable
-class HBaseStatusCountJob(args: Args) extends HBaseCountJob(args, "grobid0:status_code")
+class HBaseStatusCountJob(args: Args) extends JobBase(args) with HBasePipeConversions {
+
+ val colSpec = "grobid0:status_code"
+ val output = args("output")
+ HBaseBuilder.parseColSpec(colSpec)
+ val Col: String = colSpec.split(":")(1)
+
+ val source : TypedPipe[Long] = HBaseCountJob.getHBaseSource(args("hbase-table"),
+ args("zookeeper-hosts"),
+ colSpec)
+ .read
+ .toTypedPipe[(ImmutableBytesWritable,ImmutableBytesWritable)]('key, 'status_code)
+ .map { case (key, raw_code) => Bytes.toLong(raw_code.copyBytes()) }
+
+ source.groupBy { identity }
+ .size
+ .debug
+ .write(TypedTsv[(Long,Long)](output))
+}
diff --git a/scalding/src/test/scala/sandcrawler/HBaseStatusCountTest.scala b/scalding/src/test/scala/sandcrawler/HBaseStatusCountTest.scala
index fca9c69..11ab1d0 100644
--- a/scalding/src/test/scala/sandcrawler/HBaseStatusCountTest.scala
+++ b/scalding/src/test/scala/sandcrawler/HBaseStatusCountTest.scala
@@ -1,7 +1,7 @@
package sandcrawler
import cascading.tuple.{Tuple, Fields}
-import com.twitter.scalding.{JobTest, Tsv, TupleConversions}
+import com.twitter.scalding.{JobTest, Tsv, TypedTsv, TupleConversions}
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.util.Bytes
import org.junit.runner.RunWith
@@ -20,22 +20,24 @@ class HBaseStatusCountTest extends FunSpec with TupleConversions {
val log = LoggerFactory.getLogger(this.getClass.getName)
- val statusType1 = "200"
- val statusType2 = "404"
+ val statusType1 : Long = 200
+ val statusType2 : Long = 404
+ val statusType1Bytes = Bytes.toBytes(statusType1)
+ val statusType2Bytes = Bytes.toBytes(statusType2)
val sampleData = List(
- List("sha1:K2DKSSVTXWPRMFDTWSTCQW3RVWRIOV3Q", statusType1),
- List("sha1:C3YNNEGH5WAG5ZAAXWAEBNXJWT6CZ3WU", statusType1),
- List("sha1:SDKUVHC3YNNEGH5WAG5ZAAXWAEBNX4WT", statusType2),
- List("sha1:35985C3YNNEGH5WAG5ZAAXWAEBNXJW56", statusType2),
- List("sha1:885C3YNNEGH5WAG5ZAAXWA8BNXJWT6CZ", statusType2),
- List("sha1:00904C3YNNEGH5WAG5ZA9XWAEBNXJWT6", statusType2),
- List("sha1:249C3YNNEGH5WAG5ZAAXWAEBNXJWT6CZ", statusType1),
- List("sha1:095893C3YNNEGH5WAG5ZAAXWAEBNXJWT", statusType2)
+ List(Bytes.toBytes("sha1:K2DKSSVTXWPRMFDTWSTCQW3RVWRIOV3Q"), statusType1Bytes),
+ List(Bytes.toBytes("sha1:C3YNNEGH5WAG5ZAAXWAEBNXJWT6CZ3WU"), statusType1Bytes),
+ List(Bytes.toBytes("sha1:SDKUVHC3YNNEGH5WAG5ZAAXWAEBNX4WT"), statusType2Bytes),
+ List(Bytes.toBytes("sha1:35985C3YNNEGH5WAG5ZAAXWAEBNXJW56"), statusType2Bytes),
+ List(Bytes.toBytes("sha1:885C3YNNEGH5WAG5ZAAXWA8BNXJWT6CZ"), statusType2Bytes),
+ List(Bytes.toBytes("sha1:00904C3YNNEGH5WAG5ZA9XWAEBNXJWT6"), statusType2Bytes),
+ List(Bytes.toBytes("sha1:249C3YNNEGH5WAG5ZAAXWAEBNXJWT6CZ"), statusType1Bytes),
+ List(Bytes.toBytes("sha1:095893C3YNNEGH5WAG5ZAAXWAEBNXJWT"), statusType2Bytes)
)
- val statusType1Count = sampleData.count(lst => lst(1) == statusType1)
- val statusType2Count = sampleData.count(lst => lst(1) == statusType2)
+ val statusType1Count = sampleData.count(lst => lst(1) == statusType1Bytes)
+ val statusType2Count = sampleData.count(lst => lst(1) == statusType2Bytes)
JobTest("sandcrawler.HBaseStatusCountJob")
.arg("test", "")
@@ -45,15 +47,15 @@ class HBaseStatusCountTest extends FunSpec with TupleConversions {
.arg("zookeeper-hosts", testHost)
.arg("debug", "true")
.source[Tuple](HBaseCountJob.getHBaseSource(testTable, testHost, "grobid0:status_code"),
- sampleData.map(l => new Tuple(l.map(s => {new ImmutableBytesWritable(Bytes.toBytes(s))}):_*)))
- .sink[Tuple](Tsv(output)) {
+ sampleData.map(l => new Tuple(l.map(s => {new ImmutableBytesWritable(s)}):_*)))
+ .sink[Tuple](TypedTsv[(Long, Long)](output)) {
outputBuffer =>
it("should return a 2-element list.") {
assert(outputBuffer.size === 2)
}
- // Convert List[Tuple] to Map[String, Integer].
- val counts = outputBuffer.map(t => (t.getString(0), t.getInteger(1))).toMap
+ // Convert List[Tuple] to Map[Long, Long].
+ val counts = outputBuffer.map(t => (t.getLong(0), t.getLong(1))).toMap
it("should have the appropriate number of each status type") {
assert(counts(statusType1) == statusType1Count)
assert(counts(statusType2) == statusType2Count)