diff options
Diffstat (limited to 'scalding/src/main')
-rw-r--r-- | scalding/src/main/scala/sandcrawler/CdxBackfillJob.scala | 67 |
1 files changed, 46 insertions, 21 deletions
diff --git a/scalding/src/main/scala/sandcrawler/CdxBackfillJob.scala b/scalding/src/main/scala/sandcrawler/CdxBackfillJob.scala index 4f08665..0251e07 100644 --- a/scalding/src/main/scala/sandcrawler/CdxBackfillJob.scala +++ b/scalding/src/main/scala/sandcrawler/CdxBackfillJob.scala @@ -4,13 +4,24 @@ import cascading.property.AppProps import cascading.tuple.Fields import cascading.pipe.joiner._ import com.twitter.scalding._ +import com.twitter.scalding.typed.TDsl._ import java.util.Properties +import cascading.tap.SinkMode import parallelai.spyglass.base.JobBase -import parallelai.spyglass.hbase.{HBaseSource, HBasePipeConversions} import parallelai.spyglass.hbase.HBaseConstants.SourceMode +import parallelai.spyglass.hbase.{HBaseSource, HBasePipeConversions} // Type that represents a raw parsed CDX line -case class CdxLine(surt: String, datetime: String, url: String, mime: String, http_status: String, sha1: String, c_size: String, offset: String, warc: String) +case class CdxLine(surt: String, + datetime: String, + url: String, + mime: String, + http_status: String, + sha1: String, + c_size: String, + offset: String, + warc: String) + /** * CDX backfill: @@ -27,38 +38,44 @@ case class CdxLine(surt: String, datetime: String, url: String, mime: String, ht */ class CdxBackfillJob(args: Args) extends JobBase(args) with HBasePipeConversions { - //import CdxLine._ - // XXX remove all other CdxBackfillJob.whatever import CdxBackfillJob._ - val cdxInputPath = args("cdx-input-path") - val hbaseTable = args("hbase-table") - val zookeeperHosts = args("zookeeper-hosts") - val hbaseSource = getHBaseSource(args("hbase-table"), args("zookeeper-hosts")) + val hbaseSink = getHBaseSink(args("hbase-table"), args("zookeeper-hosts")) - val lines : TypedPipe[String] = TypedPipe.from(TextLine(cdxInputPath)) - + // Parse CDX lines from text file to typed pipe + val lines : TypedPipe[String] = TypedPipe.from(TextLine(args("cdx-input-path"))) val cdxLines : TypedPipe[CdxLine] = lines .filter { isCdxLine } .map { lineToCdxLine } .filter { CdxBackfillJob.keepCdx(_) } - val cdxRows = cdxLines - .map { CdxBackfillJob.cdxLineToRow(_) } - .toPipe(('key, 'f_c, 'file_cdx, 'file_mime)) + val cdxRows : TypedPipe[(String, String, String, String)] = cdxLines + .map { CdxBackfillJob.cdxLineToRow } - val hbaseKeys = hbaseSource - .project('key) - .mapTo('key -> 'existingKey) { key : String => key } + val existingKeys : TypedPipe[String] = hbaseSource + .read + .toTypedPipe[String]('key) // filters out all the lines that have an existing SHA1 key in HBase - val newRows = cdxRows - .joinWithLarger('key -> 'existingKey, hbaseKeys, joiner = new LeftJoin) - .filter('existingKey) { k : String => k == null } // is String the right type? - + // the groupBy statements are to select key values to join on + val newRows : TypedPipe[(String, String, String, String)] = existingKeys + .groupBy( identity ) + .rightJoin(cdxRows.groupBy(_._1)) + .toTypedPipe + .debug + .collect { case (_, (None, row)) => row } + .debug + + // convert to tuple form and write out into HBase newRows - .write(hbaseSource) + .toPipe('key, 'c, 'cdx, 'mime) + .toBytesWritable( new Fields("key", "c", "cdx", "mime") ) + .write(hbaseSink) + + // XXX: + //.toPipe("all") + //.mapTo('all -> ('key, 'c, 'cdx, 'mime)) { x : (String, String, String, String) => x } } @@ -72,6 +89,14 @@ object CdxBackfillJob { SourceMode.SCAN_ALL) } + def getHBaseSink(hbase_table: String, zookeeper_hosts: String) : HBaseSource = { + return HBaseBuilder.buildSink( + hbase_table, + zookeeper_hosts, + List("f:c", "file:cdx", "file:mime"), + SinkMode.UPDATE) + } + def normalizeMime(raw: String) : String = { val NORMAL_MIME = List("application/pdf", |