diff options
Diffstat (limited to 'scalding/src/main')
-rw-r--r-- | scalding/src/main/scala/sandcrawler/CdxBackfillJob.scala | 12 |
1 files changed, 5 insertions, 7 deletions
diff --git a/scalding/src/main/scala/sandcrawler/CdxBackfillJob.scala b/scalding/src/main/scala/sandcrawler/CdxBackfillJob.scala index a0d3e37..5ff667d 100644 --- a/scalding/src/main/scala/sandcrawler/CdxBackfillJob.scala +++ b/scalding/src/main/scala/sandcrawler/CdxBackfillJob.scala @@ -41,10 +41,11 @@ class CdxBackfillJob(args: Args) extends JobBase(args) with HBasePipeConversions import CdxBackfillJob._ val hbaseSource = getHBaseSource(args("hbase-table"), args("zookeeper-hosts")) - // XXX: val hbaseSink = getHBaseSink(args("hbase-table"), args("zookeeper-hosts")) + val hbaseSink = getHBaseSink(args("hbase-table"), args("zookeeper-hosts")) // Parse CDX lines from text file to typed pipe val lines : TypedPipe[String] = TypedPipe.from(TextLine(args("cdx-input-path"))) + val cdxLines : TypedPipe[CdxLine] = lines .filter { isCdxLine } .map { lineToCdxLine } @@ -55,7 +56,9 @@ class CdxBackfillJob(args: Args) extends JobBase(args) with HBasePipeConversions val existingKeys : TypedPipe[String] = hbaseSource .read + .fromBytesWritable( new Fields("key") ) .toTypedPipe[String]('key) + //.debug // filters out all the lines that have an existing SHA1 key in HBase // the groupBy statements are to select key values to join on @@ -63,19 +66,14 @@ class CdxBackfillJob(args: Args) extends JobBase(args) with HBasePipeConversions .groupBy( identity ) .rightJoin(cdxRows.groupBy(_._1)) .toTypedPipe - .debug .collect { case (_, (None, row)) => row } - .debug + //.debug // convert to tuple form and write out into HBase - /* newRows .toPipe('key, 'c, 'cdx, 'mime) .toBytesWritable( new Fields("key", "c", "cdx", "mime") ) .write(hbaseSink) - */ - newRows - .write(TypedTsv[(String, String, String, String)]("dummy-output")) } |