aboutsummaryrefslogtreecommitdiffstats
path: root/scalding/src/main/scala/sandcrawler/Scorable.scala
diff options
context:
space:
mode:
Diffstat (limited to 'scalding/src/main/scala/sandcrawler/Scorable.scala')
-rw-r--r--scalding/src/main/scala/sandcrawler/Scorable.scala17
1 files changed, 7 insertions, 10 deletions
diff --git a/scalding/src/main/scala/sandcrawler/Scorable.scala b/scalding/src/main/scala/sandcrawler/Scorable.scala
index f7eb95d..5d67044 100644
--- a/scalding/src/main/scala/sandcrawler/Scorable.scala
+++ b/scalding/src/main/scala/sandcrawler/Scorable.scala
@@ -13,10 +13,12 @@ case class ReduceFeatures(json : String)
case class ReduceOutput(val slug : String, score : Int, json1 : String, json2 : String)
abstract class Scorable {
- def getInputPipe(args : Args)(implicit mode : Mode, flowDef : FlowDef) : TypedPipe[(String, ReduceFeatures)] =
- {
- getFeaturesPipe(args)
- .filter { entry => Scorable.isValidSlug(entry.slug) }
+ def getInputPipe(args : Args)(implicit mode : Mode, flowDef : FlowDef) : TypedPipe[(String, ReduceFeatures)] = {
+ val validFeatures : TypedPipe[MapFeatures] = getFeaturesPipe(args)
+ .filterNot { entry => entry.isEmpty }
+ .map { entry => entry.get }
+
+ validFeatures
.groupBy { case MapFeatures(slug, json) => slug }
.map { tuple =>
val (slug : String, features : MapFeatures) = tuple
@@ -26,16 +28,11 @@ abstract class Scorable {
// abstract methods
def getSource(args : Args) : Source
- def getFeaturesPipe(args : Args)(implicit mode : Mode, flowDef : FlowDef) : TypedPipe[MapFeatures]
+ def getFeaturesPipe(args : Args)(implicit mode : Mode, flowDef : FlowDef) : TypedPipe[Option[MapFeatures]]
}
object Scorable {
val MaxTitleLength = 1023
- val NoSlug = "NO SLUG" // Used for slug if title is empty or unparseable
-
- def isValidSlug(slug : String) : Boolean = {
- slug != NoSlug
- }
def jsonToMap(json : String) : Option[Map[String, Any]] = {
// https://stackoverflow.com/a/32717262/631051