diff options
Diffstat (limited to 'scalding/src/main/scala/sandcrawler/ScoreJob.scala')
-rw-r--r-- | scalding/src/main/scala/sandcrawler/ScoreJob.scala | 20 |
1 files changed, 20 insertions, 0 deletions
diff --git a/scalding/src/main/scala/sandcrawler/ScoreJob.scala b/scalding/src/main/scala/sandcrawler/ScoreJob.scala new file mode 100644 index 0000000..8d4d957 --- /dev/null +++ b/scalding/src/main/scala/sandcrawler/ScoreJob.scala @@ -0,0 +1,20 @@ +import java.text.Normalizer + +import scala.math +import scala.util.parsing.json.JSON + +import com.twitter.scalding._ +import com.twitter.scalding.typed.TDsl._ +import parallelai.spyglass.base.JobBase +import parallelai.spyglass.hbase.HBasePipeConversions + +class ScoreJob(args: Args, sc1 : Scorable, sc2 : Scorable) extends JobBase(args) with HBasePipeConversions { + val pipe1 : TypedPipe[(String, ReduceFeatures)] = sc1.getInputPipe(args) + val pipe2 : TypedPipe[(String, ReduceFeatures)] = sc2.getInputPipe(args) + + pipe1.join(pipe2).map { entry => + val (slug : String, (features1 : ReduceFeatures, features2 : ReduceFeatures)) = entry + Scorable.computeOutput(features1, features2) + } + .write(TypedTsv[ReduceOutput](args("output"))) +} |