diff options
Diffstat (limited to 'scalding')
| -rw-r--r-- | scalding/src/main/scala/sandcrawler/CdxBackfillJob.scala | 67 | 
1 files changed, 46 insertions, 21 deletions
| diff --git a/scalding/src/main/scala/sandcrawler/CdxBackfillJob.scala b/scalding/src/main/scala/sandcrawler/CdxBackfillJob.scala index 4f08665..0251e07 100644 --- a/scalding/src/main/scala/sandcrawler/CdxBackfillJob.scala +++ b/scalding/src/main/scala/sandcrawler/CdxBackfillJob.scala @@ -4,13 +4,24 @@ import cascading.property.AppProps  import cascading.tuple.Fields  import cascading.pipe.joiner._  import com.twitter.scalding._ +import com.twitter.scalding.typed.TDsl._  import java.util.Properties +import cascading.tap.SinkMode  import parallelai.spyglass.base.JobBase -import parallelai.spyglass.hbase.{HBaseSource, HBasePipeConversions}  import parallelai.spyglass.hbase.HBaseConstants.SourceMode +import parallelai.spyglass.hbase.{HBaseSource, HBasePipeConversions}  // Type that represents a raw parsed CDX line -case class CdxLine(surt: String, datetime: String, url: String, mime: String, http_status: String, sha1: String, c_size: String, offset: String, warc: String) +case class CdxLine(surt: String, +                   datetime: String, +                   url: String, +                   mime: String, +                   http_status: String, +                   sha1: String, +                   c_size: String, +                   offset: String, +                   warc: String) +  /**   *  CDX backfill: @@ -27,38 +38,44 @@ case class CdxLine(surt: String, datetime: String, url: String, mime: String, ht   */  class CdxBackfillJob(args: Args) extends JobBase(args) with HBasePipeConversions { -  //import CdxLine._ -  // XXX remove all other CdxBackfillJob.whatever    import CdxBackfillJob._ -  val cdxInputPath = args("cdx-input-path") -  val hbaseTable = args("hbase-table") -  val zookeeperHosts = args("zookeeper-hosts") -    val hbaseSource = getHBaseSource(args("hbase-table"), args("zookeeper-hosts")) +  val hbaseSink = getHBaseSink(args("hbase-table"), args("zookeeper-hosts")) -  val lines : TypedPipe[String] = TypedPipe.from(TextLine(cdxInputPath)) - +  // Parse CDX lines from text file to typed pipe +  val lines : TypedPipe[String] = TypedPipe.from(TextLine(args("cdx-input-path")))    val cdxLines : TypedPipe[CdxLine] = lines      .filter { isCdxLine }      .map { lineToCdxLine }      .filter { CdxBackfillJob.keepCdx(_) } -  val cdxRows = cdxLines -    .map { CdxBackfillJob.cdxLineToRow(_) } -    .toPipe(('key, 'f_c, 'file_cdx, 'file_mime)) +  val cdxRows : TypedPipe[(String, String, String, String)] = cdxLines +    .map { CdxBackfillJob.cdxLineToRow } -  val hbaseKeys = hbaseSource -    .project('key) -    .mapTo('key -> 'existingKey) { key : String => key } +  val existingKeys : TypedPipe[String] = hbaseSource +    .read +    .toTypedPipe[String]('key)    // filters out all the lines that have an existing SHA1 key in HBase -  val newRows = cdxRows -    .joinWithLarger('key -> 'existingKey, hbaseKeys, joiner = new LeftJoin) -    .filter('existingKey) { k : String => k == null } // is String the right type? - +  // the groupBy statements are to select key values to join on +  val newRows : TypedPipe[(String, String, String, String)] = existingKeys +    .groupBy( identity ) +    .rightJoin(cdxRows.groupBy(_._1)) +    .toTypedPipe +    .debug +    .collect { case (_, (None, row)) => row } +    .debug + +  // convert to tuple form and write out into HBase    newRows -    .write(hbaseSource) +    .toPipe('key, 'c, 'cdx, 'mime) +    .toBytesWritable( new Fields("key", "c", "cdx", "mime") ) +    .write(hbaseSink) + +    // XXX: +    //.toPipe("all") +    //.mapTo('all -> ('key, 'c, 'cdx, 'mime)) { x : (String, String, String, String) => x }  } @@ -72,6 +89,14 @@ object CdxBackfillJob {        SourceMode.SCAN_ALL)    } +  def getHBaseSink(hbase_table: String, zookeeper_hosts: String) : HBaseSource = { +    return HBaseBuilder.buildSink( +      hbase_table, +      zookeeper_hosts, +      List("f:c", "file:cdx", "file:mime"), +      SinkMode.UPDATE) +  } +    def normalizeMime(raw: String) : String = {      val NORMAL_MIME = List("application/pdf", | 
