aboutsummaryrefslogtreecommitdiffstats
path: root/scalding
diff options
context:
space:
mode:
Diffstat (limited to 'scalding')
-rw-r--r--scalding/src/main/scala/sandcrawler/CrossrefScorable.scala2
-rw-r--r--scalding/src/main/scala/sandcrawler/GrobidScorable.scala2
-rw-r--r--scalding/src/main/scala/sandcrawler/Scorable.scala6
-rw-r--r--scalding/src/main/scala/sandcrawler/ScoreJob.scala9
4 files changed, 9 insertions, 10 deletions
diff --git a/scalding/src/main/scala/sandcrawler/CrossrefScorable.scala b/scalding/src/main/scala/sandcrawler/CrossrefScorable.scala
index d5da845..b221718 100644
--- a/scalding/src/main/scala/sandcrawler/CrossrefScorable.scala
+++ b/scalding/src/main/scala/sandcrawler/CrossrefScorable.scala
@@ -10,7 +10,7 @@ import parallelai.spyglass.hbase.HBasePipeConversions
import parallelai.spyglass.hbase.HBaseSource
class CrossrefScorable extends Scorable {
- def getFeaturesPipe(args : Args)(implicit flowDef : FlowDef, mode : Mode) : TypedPipe[MapFeatures] = {
+ def getFeaturesPipe(args : Args) : TypedPipe[MapFeatures] = {
// TODO: Generalize args so there can be multiple Grobid pipes in one job.
TextLine(args("crossref-input"))
.read
diff --git a/scalding/src/main/scala/sandcrawler/GrobidScorable.scala b/scalding/src/main/scala/sandcrawler/GrobidScorable.scala
index 4c67074..6229718 100644
--- a/scalding/src/main/scala/sandcrawler/GrobidScorable.scala
+++ b/scalding/src/main/scala/sandcrawler/GrobidScorable.scala
@@ -10,7 +10,7 @@ import parallelai.spyglass.hbase.HBasePipeConversions
import parallelai.spyglass.hbase.HBaseSource
class GrobidScorable extends Scorable with HBasePipeConversions {
- def getFeaturesPipe(args : Args)(implicit flowDef : FlowDef, mode : Mode) : TypedPipe[MapFeatures] = {
+ def getFeaturesPipe(args : Args) : TypedPipe[MapFeatures] = {
// TODO: Generalize args so there can be multiple grobid pipes in one job.
GrobidScorable.getHBaseSource(args("hbase-table"), args("zookeeper-hosts"))
.read
diff --git a/scalding/src/main/scala/sandcrawler/Scorable.scala b/scalding/src/main/scala/sandcrawler/Scorable.scala
index cfdc192..2d2345b 100644
--- a/scalding/src/main/scala/sandcrawler/Scorable.scala
+++ b/scalding/src/main/scala/sandcrawler/Scorable.scala
@@ -12,9 +12,9 @@ case class ReduceFeatures(json : String)
case class ReduceOutput(val slug : String, score : Int, json1 : String, json2 : String)
abstract class Scorable {
- def getInputPipe(args : Args, flowDef : FlowDef, mode : Mode) : TypedPipe[(String, ReduceFeatures)] =
+ def getInputPipe(args : Args) : TypedPipe[(String, ReduceFeatures)] =
{
- getFeaturesPipe(args)(flowDef, mode)
+ getFeaturesPipe(args)
.filter { entry => Scorable.isValidSlug(entry.slug) }
.groupBy { case MapFeatures(slug, json) => slug }
.map { tuple =>
@@ -24,7 +24,7 @@ abstract class Scorable {
}
// abstract method
- def getFeaturesPipe(args : Args)(implicit flowDef : FlowDef, mode : Mode) : TypedPipe[MapFeatures]
+ def getFeaturesPipe(args : Args) : TypedPipe[MapFeatures]
}
object Scorable {
diff --git a/scalding/src/main/scala/sandcrawler/ScoreJob.scala b/scalding/src/main/scala/sandcrawler/ScoreJob.scala
index aa20d0f..66ba29e 100644
--- a/scalding/src/main/scala/sandcrawler/ScoreJob.scala
+++ b/scalding/src/main/scala/sandcrawler/ScoreJob.scala
@@ -6,11 +6,11 @@ import com.twitter.scalding.typed.TDsl._
import parallelai.spyglass.base.JobBase
import parallelai.spyglass.hbase.HBasePipeConversions
-class ScoreJob(args: Args)(implicit flowDef : FlowDef, mode: Mode) extends JobBase(args) with
+class ScoreJob(args: Args) extends JobBase(args) with
HBasePipeConversions {
- /*
- val pipe1 : TypedPipe[(String, ReduceFeatures)] = ScoreJob.getScorable1().getInputPipe(args, flowDef, mode)
- val pipe2 : TypedPipe[(String, ReduceFeatures)] = ScoreJob.getScorable2().getInputPipe(args, flowDef, mode)
+
+ val pipe1 : TypedPipe[(String, ReduceFeatures)] = ScoreJob.getScorable1().getInputPipe(args)
+ val pipe2 : TypedPipe[(String, ReduceFeatures)] = ScoreJob.getScorable2().getInputPipe(args)
pipe1.join(pipe2).map { entry =>
val (slug : String, (features1 : ReduceFeatures, features2 : ReduceFeatures)) = entry
@@ -21,7 +21,6 @@ class ScoreJob(args: Args)(implicit flowDef : FlowDef, mode: Mode) extends JobBa
features2.json)
}
.write(TypedTsv[ReduceOutput](args("output")))
- */
}
// Ugly hack to get non-String information into ScoreJob above.