aboutsummaryrefslogtreecommitdiffstats
path: root/scalding/src/main
diff options
context:
space:
mode:
Diffstat (limited to 'scalding/src/main')
-rw-r--r--scalding/src/main/scala/sandcrawler/CdxBackfillJob.scala12
1 files changed, 5 insertions, 7 deletions
diff --git a/scalding/src/main/scala/sandcrawler/CdxBackfillJob.scala b/scalding/src/main/scala/sandcrawler/CdxBackfillJob.scala
index a0d3e37..5ff667d 100644
--- a/scalding/src/main/scala/sandcrawler/CdxBackfillJob.scala
+++ b/scalding/src/main/scala/sandcrawler/CdxBackfillJob.scala
@@ -41,10 +41,11 @@ class CdxBackfillJob(args: Args) extends JobBase(args) with HBasePipeConversions
import CdxBackfillJob._
val hbaseSource = getHBaseSource(args("hbase-table"), args("zookeeper-hosts"))
- // XXX: val hbaseSink = getHBaseSink(args("hbase-table"), args("zookeeper-hosts"))
+ val hbaseSink = getHBaseSink(args("hbase-table"), args("zookeeper-hosts"))
// Parse CDX lines from text file to typed pipe
val lines : TypedPipe[String] = TypedPipe.from(TextLine(args("cdx-input-path")))
+
val cdxLines : TypedPipe[CdxLine] = lines
.filter { isCdxLine }
.map { lineToCdxLine }
@@ -55,7 +56,9 @@ class CdxBackfillJob(args: Args) extends JobBase(args) with HBasePipeConversions
val existingKeys : TypedPipe[String] = hbaseSource
.read
+ .fromBytesWritable( new Fields("key") )
.toTypedPipe[String]('key)
+ //.debug
// filters out all the lines that have an existing SHA1 key in HBase
// the groupBy statements are to select key values to join on
@@ -63,19 +66,14 @@ class CdxBackfillJob(args: Args) extends JobBase(args) with HBasePipeConversions
.groupBy( identity )
.rightJoin(cdxRows.groupBy(_._1))
.toTypedPipe
- .debug
.collect { case (_, (None, row)) => row }
- .debug
+ //.debug
// convert to tuple form and write out into HBase
- /*
newRows
.toPipe('key, 'c, 'cdx, 'mime)
.toBytesWritable( new Fields("key", "c", "cdx", "mime") )
.write(hbaseSink)
- */
- newRows
- .write(TypedTsv[(String, String, String, String)]("dummy-output"))
}