aboutsummaryrefslogtreecommitdiffstats
path: root/scalding/src/main/scala/sandcrawler/ScoreJob.scala
diff options
context:
space:
mode:
authorEllen Spertus <ellen.spertus@gmail.com>2018-08-06 16:38:46 -0700
committerEllen Spertus <ellen.spertus@gmail.com>2018-08-06 16:38:46 -0700
commit308b33d889d804380427d2aa112efec77b3e1770 (patch)
treee0181fd4eb2d311bf45827447afe5d93291f931f /scalding/src/main/scala/sandcrawler/ScoreJob.scala
parentb1d8a72a5cc469b5139d9a976ccfa9b4b3eea61d (diff)
downloadsandcrawler-308b33d889d804380427d2aa112efec77b3e1770.tar.gz
sandcrawler-308b33d889d804380427d2aa112efec77b3e1770.zip
New code compiles. Old tests pass. New tests not yet written.
Diffstat (limited to 'scalding/src/main/scala/sandcrawler/ScoreJob.scala')
-rw-r--r--scalding/src/main/scala/sandcrawler/ScoreJob.scala9
1 files changed, 6 insertions, 3 deletions
diff --git a/scalding/src/main/scala/sandcrawler/ScoreJob.scala b/scalding/src/main/scala/sandcrawler/ScoreJob.scala
index 8d4d957..22cc9e9 100644
--- a/scalding/src/main/scala/sandcrawler/ScoreJob.scala
+++ b/scalding/src/main/scala/sandcrawler/ScoreJob.scala
@@ -1,16 +1,19 @@
+package sandcrawler
+
import java.text.Normalizer
import scala.math
import scala.util.parsing.json.JSON
+import cascading.flow.FlowDef
import com.twitter.scalding._
import com.twitter.scalding.typed.TDsl._
import parallelai.spyglass.base.JobBase
import parallelai.spyglass.hbase.HBasePipeConversions
-class ScoreJob(args: Args, sc1 : Scorable, sc2 : Scorable) extends JobBase(args) with HBasePipeConversions {
- val pipe1 : TypedPipe[(String, ReduceFeatures)] = sc1.getInputPipe(args)
- val pipe2 : TypedPipe[(String, ReduceFeatures)] = sc2.getInputPipe(args)
+class ScoreJob(args: Args, sc1 : Scorable, sc2 : Scorable)(implicit flowDef : FlowDef, mode: Mode) extends JobBase(args) with HBasePipeConversions {
+ val pipe1 : TypedPipe[(String, ReduceFeatures)] = sc1.getInputPipe(args, flowDef, mode)
+ val pipe2 : TypedPipe[(String, ReduceFeatures)] = sc2.getInputPipe(args, flowDef, mode)
pipe1.join(pipe2).map { entry =>
val (slug : String, (features1 : ReduceFeatures, features2 : ReduceFeatures)) = entry