diff options
Diffstat (limited to 'scalding')
4 files changed, 9 insertions, 10 deletions
diff --git a/scalding/src/main/scala/sandcrawler/CrossrefScorable.scala b/scalding/src/main/scala/sandcrawler/CrossrefScorable.scala index d5da845..b221718 100644 --- a/scalding/src/main/scala/sandcrawler/CrossrefScorable.scala +++ b/scalding/src/main/scala/sandcrawler/CrossrefScorable.scala @@ -10,7 +10,7 @@ import parallelai.spyglass.hbase.HBasePipeConversions import parallelai.spyglass.hbase.HBaseSource class CrossrefScorable extends Scorable { - def getFeaturesPipe(args : Args)(implicit flowDef : FlowDef, mode : Mode) : TypedPipe[MapFeatures] = { + def getFeaturesPipe(args : Args) : TypedPipe[MapFeatures] = { // TODO: Generalize args so there can be multiple Grobid pipes in one job. TextLine(args("crossref-input")) .read diff --git a/scalding/src/main/scala/sandcrawler/GrobidScorable.scala b/scalding/src/main/scala/sandcrawler/GrobidScorable.scala index 4c67074..6229718 100644 --- a/scalding/src/main/scala/sandcrawler/GrobidScorable.scala +++ b/scalding/src/main/scala/sandcrawler/GrobidScorable.scala @@ -10,7 +10,7 @@ import parallelai.spyglass.hbase.HBasePipeConversions import parallelai.spyglass.hbase.HBaseSource class GrobidScorable extends Scorable with HBasePipeConversions { - def getFeaturesPipe(args : Args)(implicit flowDef : FlowDef, mode : Mode) : TypedPipe[MapFeatures] = { + def getFeaturesPipe(args : Args) : TypedPipe[MapFeatures] = { // TODO: Generalize args so there can be multiple grobid pipes in one job. GrobidScorable.getHBaseSource(args("hbase-table"), args("zookeeper-hosts")) .read diff --git a/scalding/src/main/scala/sandcrawler/Scorable.scala b/scalding/src/main/scala/sandcrawler/Scorable.scala index cfdc192..2d2345b 100644 --- a/scalding/src/main/scala/sandcrawler/Scorable.scala +++ b/scalding/src/main/scala/sandcrawler/Scorable.scala @@ -12,9 +12,9 @@ case class ReduceFeatures(json : String) case class ReduceOutput(val slug : String, score : Int, json1 : String, json2 : String) abstract class Scorable { - def getInputPipe(args : Args, flowDef : FlowDef, mode : Mode) : TypedPipe[(String, ReduceFeatures)] = + def getInputPipe(args : Args) : TypedPipe[(String, ReduceFeatures)] = { - getFeaturesPipe(args)(flowDef, mode) + getFeaturesPipe(args) .filter { entry => Scorable.isValidSlug(entry.slug) } .groupBy { case MapFeatures(slug, json) => slug } .map { tuple => @@ -24,7 +24,7 @@ abstract class Scorable { } // abstract method - def getFeaturesPipe(args : Args)(implicit flowDef : FlowDef, mode : Mode) : TypedPipe[MapFeatures] + def getFeaturesPipe(args : Args) : TypedPipe[MapFeatures] } object Scorable { diff --git a/scalding/src/main/scala/sandcrawler/ScoreJob.scala b/scalding/src/main/scala/sandcrawler/ScoreJob.scala index aa20d0f..66ba29e 100644 --- a/scalding/src/main/scala/sandcrawler/ScoreJob.scala +++ b/scalding/src/main/scala/sandcrawler/ScoreJob.scala @@ -6,11 +6,11 @@ import com.twitter.scalding.typed.TDsl._ import parallelai.spyglass.base.JobBase import parallelai.spyglass.hbase.HBasePipeConversions -class ScoreJob(args: Args)(implicit flowDef : FlowDef, mode: Mode) extends JobBase(args) with +class ScoreJob(args: Args) extends JobBase(args) with HBasePipeConversions { - /* - val pipe1 : TypedPipe[(String, ReduceFeatures)] = ScoreJob.getScorable1().getInputPipe(args, flowDef, mode) - val pipe2 : TypedPipe[(String, ReduceFeatures)] = ScoreJob.getScorable2().getInputPipe(args, flowDef, mode) + + val pipe1 : TypedPipe[(String, ReduceFeatures)] = ScoreJob.getScorable1().getInputPipe(args) + val pipe2 : TypedPipe[(String, ReduceFeatures)] = ScoreJob.getScorable2().getInputPipe(args) pipe1.join(pipe2).map { entry => val (slug : String, (features1 : ReduceFeatures, features2 : ReduceFeatures)) = entry @@ -21,7 +21,6 @@ class ScoreJob(args: Args)(implicit flowDef : FlowDef, mode: Mode) extends JobBa features2.json) } .write(TypedTsv[ReduceOutput](args("output"))) - */ } // Ugly hack to get non-String information into ScoreJob above. |