diff options
Diffstat (limited to 'scalding/src/main/scala/sandcrawler/Scorable.scala')
-rw-r--r-- | scalding/src/main/scala/sandcrawler/Scorable.scala | 9 |
1 files changed, 6 insertions, 3 deletions
diff --git a/scalding/src/main/scala/sandcrawler/Scorable.scala b/scalding/src/main/scala/sandcrawler/Scorable.scala index 8e0c560..89dc835 100644 --- a/scalding/src/main/scala/sandcrawler/Scorable.scala +++ b/scalding/src/main/scala/sandcrawler/Scorable.scala @@ -1,6 +1,9 @@ +package sandcrawler + import scala.math import scala.util.parsing.json.JSON +import cascading.flow.FlowDef import com.twitter.scalding._ import com.twitter.scalding.typed.TDsl._ @@ -9,9 +12,9 @@ case class ReduceFeatures(json : String) case class ReduceOutput(val score : Int, json1 : String, json2 : String) abstract class Scorable { - def getInputPipe(args : Args) : TypedPipe[(String, ReduceFeatures)] = + def getInputPipe(args : Args, flowDef : FlowDef, mode : Mode) : TypedPipe[(String, ReduceFeatures)] = { - getFeaturesPipe(args) + getFeaturesPipe(args)(flowDef, mode) .filter { entry => Scorable.isValidSlug(entry.slug) } .groupBy { case MapFeatures(key, slug, json) => slug } .map { tuple => @@ -21,7 +24,7 @@ abstract class Scorable { } // abstract method - def getFeaturesPipe(args : Args) : TypedPipe[MapFeatures] + def getFeaturesPipe(args : Args)(implicit flowDef : FlowDef, mode : Mode) : TypedPipe[MapFeatures] } object Scorable { |