aboutsummaryrefslogtreecommitdiffstats
path: root/scalding/src/main/scala/sandcrawler/Scorable.scala
diff options
context:
space:
mode:
Diffstat (limited to 'scalding/src/main/scala/sandcrawler/Scorable.scala')
-rw-r--r--scalding/src/main/scala/sandcrawler/Scorable.scala9
1 files changed, 6 insertions, 3 deletions
diff --git a/scalding/src/main/scala/sandcrawler/Scorable.scala b/scalding/src/main/scala/sandcrawler/Scorable.scala
index 8e0c560..89dc835 100644
--- a/scalding/src/main/scala/sandcrawler/Scorable.scala
+++ b/scalding/src/main/scala/sandcrawler/Scorable.scala
@@ -1,6 +1,9 @@
+package sandcrawler
+
import scala.math
import scala.util.parsing.json.JSON
+import cascading.flow.FlowDef
import com.twitter.scalding._
import com.twitter.scalding.typed.TDsl._
@@ -9,9 +12,9 @@ case class ReduceFeatures(json : String)
case class ReduceOutput(val score : Int, json1 : String, json2 : String)
abstract class Scorable {
- def getInputPipe(args : Args) : TypedPipe[(String, ReduceFeatures)] =
+ def getInputPipe(args : Args, flowDef : FlowDef, mode : Mode) : TypedPipe[(String, ReduceFeatures)] =
{
- getFeaturesPipe(args)
+ getFeaturesPipe(args)(flowDef, mode)
.filter { entry => Scorable.isValidSlug(entry.slug) }
.groupBy { case MapFeatures(key, slug, json) => slug }
.map { tuple =>
@@ -21,7 +24,7 @@ abstract class Scorable {
}
// abstract method
- def getFeaturesPipe(args : Args) : TypedPipe[MapFeatures]
+ def getFeaturesPipe(args : Args)(implicit flowDef : FlowDef, mode : Mode) : TypedPipe[MapFeatures]
}
object Scorable {