diff options
author | Bryan Newbold <bnewbold@archive.org> | 2018-07-19 15:23:29 -0700 |
---|---|---|
committer | Bryan Newbold <bnewbold@archive.org> | 2018-07-24 11:27:45 -0700 |
commit | 5883c665b9c6d1d27fe4b0cfd98098f671a5f5dd (patch) | |
tree | a39f95df6403afbf7e982769cc72aba433394e38 /scalding/src/main | |
parent | 8ae41461f901a8142af06610a4b9f500ce4fe47f (diff) | |
download | sandcrawler-5883c665b9c6d1d27fe4b0cfd98098f671a5f5dd.tar.gz sandcrawler-5883c665b9c6d1d27fe4b0cfd98098f671a5f5dd.zip |
CdxBackfillJob back to HBase; tests work
Diffstat (limited to 'scalding/src/main')
-rw-r--r-- | scalding/src/main/scala/sandcrawler/CdxBackfillJob.scala | 12 |
1 files changed, 5 insertions, 7 deletions
diff --git a/scalding/src/main/scala/sandcrawler/CdxBackfillJob.scala b/scalding/src/main/scala/sandcrawler/CdxBackfillJob.scala index a0d3e37..5ff667d 100644 --- a/scalding/src/main/scala/sandcrawler/CdxBackfillJob.scala +++ b/scalding/src/main/scala/sandcrawler/CdxBackfillJob.scala @@ -41,10 +41,11 @@ class CdxBackfillJob(args: Args) extends JobBase(args) with HBasePipeConversions import CdxBackfillJob._ val hbaseSource = getHBaseSource(args("hbase-table"), args("zookeeper-hosts")) - // XXX: val hbaseSink = getHBaseSink(args("hbase-table"), args("zookeeper-hosts")) + val hbaseSink = getHBaseSink(args("hbase-table"), args("zookeeper-hosts")) // Parse CDX lines from text file to typed pipe val lines : TypedPipe[String] = TypedPipe.from(TextLine(args("cdx-input-path"))) + val cdxLines : TypedPipe[CdxLine] = lines .filter { isCdxLine } .map { lineToCdxLine } @@ -55,7 +56,9 @@ class CdxBackfillJob(args: Args) extends JobBase(args) with HBasePipeConversions val existingKeys : TypedPipe[String] = hbaseSource .read + .fromBytesWritable( new Fields("key") ) .toTypedPipe[String]('key) + //.debug // filters out all the lines that have an existing SHA1 key in HBase // the groupBy statements are to select key values to join on @@ -63,19 +66,14 @@ class CdxBackfillJob(args: Args) extends JobBase(args) with HBasePipeConversions .groupBy( identity ) .rightJoin(cdxRows.groupBy(_._1)) .toTypedPipe - .debug .collect { case (_, (None, row)) => row } - .debug + //.debug // convert to tuple form and write out into HBase - /* newRows .toPipe('key, 'c, 'cdx, 'mime) .toBytesWritable( new Fields("key", "c", "cdx", "mime") ) .write(hbaseSink) - */ - newRows - .write(TypedTsv[(String, String, String, String)]("dummy-output")) } |