diff options
Diffstat (limited to 'scalding/src/main/scala/sandcrawler/Scorable.scala')
-rw-r--r-- | scalding/src/main/scala/sandcrawler/Scorable.scala | 17 |
1 files changed, 7 insertions, 10 deletions
diff --git a/scalding/src/main/scala/sandcrawler/Scorable.scala b/scalding/src/main/scala/sandcrawler/Scorable.scala index f7eb95d..5d67044 100644 --- a/scalding/src/main/scala/sandcrawler/Scorable.scala +++ b/scalding/src/main/scala/sandcrawler/Scorable.scala @@ -13,10 +13,12 @@ case class ReduceFeatures(json : String) case class ReduceOutput(val slug : String, score : Int, json1 : String, json2 : String) abstract class Scorable { - def getInputPipe(args : Args)(implicit mode : Mode, flowDef : FlowDef) : TypedPipe[(String, ReduceFeatures)] = - { - getFeaturesPipe(args) - .filter { entry => Scorable.isValidSlug(entry.slug) } + def getInputPipe(args : Args)(implicit mode : Mode, flowDef : FlowDef) : TypedPipe[(String, ReduceFeatures)] = { + val validFeatures : TypedPipe[MapFeatures] = getFeaturesPipe(args) + .filterNot { entry => entry.isEmpty } + .map { entry => entry.get } + + validFeatures .groupBy { case MapFeatures(slug, json) => slug } .map { tuple => val (slug : String, features : MapFeatures) = tuple @@ -26,16 +28,11 @@ abstract class Scorable { // abstract methods def getSource(args : Args) : Source - def getFeaturesPipe(args : Args)(implicit mode : Mode, flowDef : FlowDef) : TypedPipe[MapFeatures] + def getFeaturesPipe(args : Args)(implicit mode : Mode, flowDef : FlowDef) : TypedPipe[Option[MapFeatures]] } object Scorable { val MaxTitleLength = 1023 - val NoSlug = "NO SLUG" // Used for slug if title is empty or unparseable - - def isValidSlug(slug : String) : Boolean = { - slug != NoSlug - } def jsonToMap(json : String) : Option[Map[String, Any]] = { // https://stackoverflow.com/a/32717262/631051 |