aboutsummaryrefslogtreecommitdiffstats
path: root/scalding/src/main/scala/sandcrawler/ScoreJob.scala
diff options
context:
space:
mode:
authorEllen Spertus <ellen.spertus@gmail.com>2018-08-06 14:16:19 -0700
committerEllen Spertus <ellen.spertus@gmail.com>2018-08-06 14:16:19 -0700
commitb1d8a72a5cc469b5139d9a976ccfa9b4b3eea61d (patch)
tree2d0a4367abd95331b941d7dd919c16c5cecec6e9 /scalding/src/main/scala/sandcrawler/ScoreJob.scala
parent81dbd0e05653682dccb8bc74b39067b4ee7ac1f2 (diff)
downloadsandcrawler-b1d8a72a5cc469b5139d9a976ccfa9b4b3eea61d.tar.gz
sandcrawler-b1d8a72a5cc469b5139d9a976ccfa9b4b3eea61d.zip
Partly refactored HBaseCrossrefScoreJob. Everything compiles.
Diffstat (limited to 'scalding/src/main/scala/sandcrawler/ScoreJob.scala')
-rw-r--r--scalding/src/main/scala/sandcrawler/ScoreJob.scala20
1 files changed, 20 insertions, 0 deletions
diff --git a/scalding/src/main/scala/sandcrawler/ScoreJob.scala b/scalding/src/main/scala/sandcrawler/ScoreJob.scala
new file mode 100644
index 0000000..8d4d957
--- /dev/null
+++ b/scalding/src/main/scala/sandcrawler/ScoreJob.scala
@@ -0,0 +1,20 @@
+import java.text.Normalizer
+
+import scala.math
+import scala.util.parsing.json.JSON
+
+import com.twitter.scalding._
+import com.twitter.scalding.typed.TDsl._
+import parallelai.spyglass.base.JobBase
+import parallelai.spyglass.hbase.HBasePipeConversions
+
+class ScoreJob(args: Args, sc1 : Scorable, sc2 : Scorable) extends JobBase(args) with HBasePipeConversions {
+ val pipe1 : TypedPipe[(String, ReduceFeatures)] = sc1.getInputPipe(args)
+ val pipe2 : TypedPipe[(String, ReduceFeatures)] = sc2.getInputPipe(args)
+
+ pipe1.join(pipe2).map { entry =>
+ val (slug : String, (features1 : ReduceFeatures, features2 : ReduceFeatures)) = entry
+ Scorable.computeOutput(features1, features2)
+ }
+ .write(TypedTsv[ReduceOutput](args("output")))
+}