aboutsummaryrefslogtreecommitdiffstats
path: root/scalding/src/main/scala/sandcrawler/ScoreJob.scala
blob: 22cc9e9de438459ef03aa5534ac8c63da5833bf1 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
package sandcrawler

import java.text.Normalizer

import scala.math
import scala.util.parsing.json.JSON

import cascading.flow.FlowDef
import com.twitter.scalding._
import com.twitter.scalding.typed.TDsl._
import parallelai.spyglass.base.JobBase
import parallelai.spyglass.hbase.HBasePipeConversions

class ScoreJob(args: Args, sc1 : Scorable, sc2 : Scorable)(implicit flowDef : FlowDef, mode: Mode) extends JobBase(args) with HBasePipeConversions {
  val pipe1 : TypedPipe[(String, ReduceFeatures)] = sc1.getInputPipe(args, flowDef, mode)
  val pipe2 : TypedPipe[(String, ReduceFeatures)] = sc2.getInputPipe(args, flowDef, mode)

  pipe1.join(pipe2).map { entry =>
    val (slug : String, (features1 : ReduceFeatures, features2 : ReduceFeatures)) = entry
    Scorable.computeOutput(features1, features2)
  }
    .write(TypedTsv[ReduceOutput](args("output")))
}