aboutsummaryrefslogtreecommitdiffstats
path: root/scalding/src/main
diff options
context:
space:
mode:
Diffstat (limited to 'scalding/src/main')
-rw-r--r--scalding/src/main/scala/sandcrawler/CdxBackfillJob.scala67
1 files changed, 46 insertions, 21 deletions
diff --git a/scalding/src/main/scala/sandcrawler/CdxBackfillJob.scala b/scalding/src/main/scala/sandcrawler/CdxBackfillJob.scala
index 4f08665..0251e07 100644
--- a/scalding/src/main/scala/sandcrawler/CdxBackfillJob.scala
+++ b/scalding/src/main/scala/sandcrawler/CdxBackfillJob.scala
@@ -4,13 +4,24 @@ import cascading.property.AppProps
import cascading.tuple.Fields
import cascading.pipe.joiner._
import com.twitter.scalding._
+import com.twitter.scalding.typed.TDsl._
import java.util.Properties
+import cascading.tap.SinkMode
import parallelai.spyglass.base.JobBase
-import parallelai.spyglass.hbase.{HBaseSource, HBasePipeConversions}
import parallelai.spyglass.hbase.HBaseConstants.SourceMode
+import parallelai.spyglass.hbase.{HBaseSource, HBasePipeConversions}
// Type that represents a raw parsed CDX line
-case class CdxLine(surt: String, datetime: String, url: String, mime: String, http_status: String, sha1: String, c_size: String, offset: String, warc: String)
+case class CdxLine(surt: String,
+ datetime: String,
+ url: String,
+ mime: String,
+ http_status: String,
+ sha1: String,
+ c_size: String,
+ offset: String,
+ warc: String)
+
/**
* CDX backfill:
@@ -27,38 +38,44 @@ case class CdxLine(surt: String, datetime: String, url: String, mime: String, ht
*/
class CdxBackfillJob(args: Args) extends JobBase(args) with HBasePipeConversions {
- //import CdxLine._
- // XXX remove all other CdxBackfillJob.whatever
import CdxBackfillJob._
- val cdxInputPath = args("cdx-input-path")
- val hbaseTable = args("hbase-table")
- val zookeeperHosts = args("zookeeper-hosts")
-
val hbaseSource = getHBaseSource(args("hbase-table"), args("zookeeper-hosts"))
+ val hbaseSink = getHBaseSink(args("hbase-table"), args("zookeeper-hosts"))
- val lines : TypedPipe[String] = TypedPipe.from(TextLine(cdxInputPath))
-
+ // Parse CDX lines from text file to typed pipe
+ val lines : TypedPipe[String] = TypedPipe.from(TextLine(args("cdx-input-path")))
val cdxLines : TypedPipe[CdxLine] = lines
.filter { isCdxLine }
.map { lineToCdxLine }
.filter { CdxBackfillJob.keepCdx(_) }
- val cdxRows = cdxLines
- .map { CdxBackfillJob.cdxLineToRow(_) }
- .toPipe(('key, 'f_c, 'file_cdx, 'file_mime))
+ val cdxRows : TypedPipe[(String, String, String, String)] = cdxLines
+ .map { CdxBackfillJob.cdxLineToRow }
- val hbaseKeys = hbaseSource
- .project('key)
- .mapTo('key -> 'existingKey) { key : String => key }
+ val existingKeys : TypedPipe[String] = hbaseSource
+ .read
+ .toTypedPipe[String]('key)
// filters out all the lines that have an existing SHA1 key in HBase
- val newRows = cdxRows
- .joinWithLarger('key -> 'existingKey, hbaseKeys, joiner = new LeftJoin)
- .filter('existingKey) { k : String => k == null } // is String the right type?
-
+ // the groupBy statements are to select key values to join on
+ val newRows : TypedPipe[(String, String, String, String)] = existingKeys
+ .groupBy( identity )
+ .rightJoin(cdxRows.groupBy(_._1))
+ .toTypedPipe
+ .debug
+ .collect { case (_, (None, row)) => row }
+ .debug
+
+ // convert to tuple form and write out into HBase
newRows
- .write(hbaseSource)
+ .toPipe('key, 'c, 'cdx, 'mime)
+ .toBytesWritable( new Fields("key", "c", "cdx", "mime") )
+ .write(hbaseSink)
+
+ // XXX:
+ //.toPipe("all")
+ //.mapTo('all -> ('key, 'c, 'cdx, 'mime)) { x : (String, String, String, String) => x }
}
@@ -72,6 +89,14 @@ object CdxBackfillJob {
SourceMode.SCAN_ALL)
}
+ def getHBaseSink(hbase_table: String, zookeeper_hosts: String) : HBaseSource = {
+ return HBaseBuilder.buildSink(
+ hbase_table,
+ zookeeper_hosts,
+ List("f:c", "file:cdx", "file:mime"),
+ SinkMode.UPDATE)
+ }
+
def normalizeMime(raw: String) : String = {
val NORMAL_MIME = List("application/pdf",