diff options
author | Ellen Spertus <ellen.spertus@gmail.com> | 2018-08-06 14:16:19 -0700 |
---|---|---|
committer | Ellen Spertus <ellen.spertus@gmail.com> | 2018-08-06 14:16:19 -0700 |
commit | b1d8a72a5cc469b5139d9a976ccfa9b4b3eea61d (patch) | |
tree | 2d0a4367abd95331b941d7dd919c16c5cecec6e9 /scalding/src/main/scala/sandcrawler/ScoreJob.scala | |
parent | 81dbd0e05653682dccb8bc74b39067b4ee7ac1f2 (diff) | |
download | sandcrawler-b1d8a72a5cc469b5139d9a976ccfa9b4b3eea61d.tar.gz sandcrawler-b1d8a72a5cc469b5139d9a976ccfa9b4b3eea61d.zip |
Partly refactored HBaseCrossrefScoreJob. Everything compiles.
Diffstat (limited to 'scalding/src/main/scala/sandcrawler/ScoreJob.scala')
-rw-r--r-- | scalding/src/main/scala/sandcrawler/ScoreJob.scala | 20 |
1 files changed, 20 insertions, 0 deletions
diff --git a/scalding/src/main/scala/sandcrawler/ScoreJob.scala b/scalding/src/main/scala/sandcrawler/ScoreJob.scala new file mode 100644 index 0000000..8d4d957 --- /dev/null +++ b/scalding/src/main/scala/sandcrawler/ScoreJob.scala @@ -0,0 +1,20 @@ +import java.text.Normalizer + +import scala.math +import scala.util.parsing.json.JSON + +import com.twitter.scalding._ +import com.twitter.scalding.typed.TDsl._ +import parallelai.spyglass.base.JobBase +import parallelai.spyglass.hbase.HBasePipeConversions + +class ScoreJob(args: Args, sc1 : Scorable, sc2 : Scorable) extends JobBase(args) with HBasePipeConversions { + val pipe1 : TypedPipe[(String, ReduceFeatures)] = sc1.getInputPipe(args) + val pipe2 : TypedPipe[(String, ReduceFeatures)] = sc2.getInputPipe(args) + + pipe1.join(pipe2).map { entry => + val (slug : String, (features1 : ReduceFeatures, features2 : ReduceFeatures)) = entry + Scorable.computeOutput(features1, features2) + } + .write(TypedTsv[ReduceOutput](args("output"))) +} |