From 5883c665b9c6d1d27fe4b0cfd98098f671a5f5dd Mon Sep 17 00:00:00 2001 From: Bryan Newbold Date: Thu, 19 Jul 2018 15:23:29 -0700 Subject: CdxBackfillJob back to HBase; tests work --- scalding/src/main/scala/sandcrawler/CdxBackfillJob.scala | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) (limited to 'scalding/src/main') diff --git a/scalding/src/main/scala/sandcrawler/CdxBackfillJob.scala b/scalding/src/main/scala/sandcrawler/CdxBackfillJob.scala index a0d3e37..5ff667d 100644 --- a/scalding/src/main/scala/sandcrawler/CdxBackfillJob.scala +++ b/scalding/src/main/scala/sandcrawler/CdxBackfillJob.scala @@ -41,10 +41,11 @@ class CdxBackfillJob(args: Args) extends JobBase(args) with HBasePipeConversions import CdxBackfillJob._ val hbaseSource = getHBaseSource(args("hbase-table"), args("zookeeper-hosts")) - // XXX: val hbaseSink = getHBaseSink(args("hbase-table"), args("zookeeper-hosts")) + val hbaseSink = getHBaseSink(args("hbase-table"), args("zookeeper-hosts")) // Parse CDX lines from text file to typed pipe val lines : TypedPipe[String] = TypedPipe.from(TextLine(args("cdx-input-path"))) + val cdxLines : TypedPipe[CdxLine] = lines .filter { isCdxLine } .map { lineToCdxLine } @@ -55,7 +56,9 @@ class CdxBackfillJob(args: Args) extends JobBase(args) with HBasePipeConversions val existingKeys : TypedPipe[String] = hbaseSource .read + .fromBytesWritable( new Fields("key") ) .toTypedPipe[String]('key) + //.debug // filters out all the lines that have an existing SHA1 key in HBase // the groupBy statements are to select key values to join on @@ -63,19 +66,14 @@ class CdxBackfillJob(args: Args) extends JobBase(args) with HBasePipeConversions .groupBy( identity ) .rightJoin(cdxRows.groupBy(_._1)) .toTypedPipe - .debug .collect { case (_, (None, row)) => row } - .debug + //.debug // convert to tuple form and write out into HBase - /* newRows .toPipe('key, 'c, 'cdx, 'mime) .toBytesWritable( new Fields("key", "c", "cdx", "mime") ) .write(hbaseSink) - */ - newRows - .write(TypedTsv[(String, String, String, String)]("dummy-output")) } -- cgit v1.2.3