aboutsummaryrefslogtreecommitdiffstats
path: root/scalding/src/main/scala/sandcrawler/ScoreJob.scala
diff options
context:
space:
mode:
Diffstat (limited to 'scalding/src/main/scala/sandcrawler/ScoreJob.scala')
-rw-r--r--scalding/src/main/scala/sandcrawler/ScoreJob.scala7
1 files changed, 5 insertions, 2 deletions
diff --git a/scalding/src/main/scala/sandcrawler/ScoreJob.scala b/scalding/src/main/scala/sandcrawler/ScoreJob.scala
index 66ba29e..7891596 100644
--- a/scalding/src/main/scala/sandcrawler/ScoreJob.scala
+++ b/scalding/src/main/scala/sandcrawler/ScoreJob.scala
@@ -9,8 +9,11 @@ import parallelai.spyglass.hbase.HBasePipeConversions
class ScoreJob(args: Args) extends JobBase(args) with
HBasePipeConversions {
- val pipe1 : TypedPipe[(String, ReduceFeatures)] = ScoreJob.getScorable1().getInputPipe(args)
- val pipe2 : TypedPipe[(String, ReduceFeatures)] = ScoreJob.getScorable2().getInputPipe(args)
+ // TODO: Instantiate any subclass of Scorable specified in args.
+ Scorable sc1 = new GrobidScorable()
+ Scorable sc2 = new CrossrefScorable()
+ val pipe1 : TypedPipe[(String, ReduceFeatures)] = sc1.getInputPipe(sc1.getSource().read)
+ val pipe2 : TypedPipe[(String, ReduceFeatures)] = sc2.getInputPipe(sc2.getSource().read)
pipe1.join(pipe2).map { entry =>
val (slug : String, (features1 : ReduceFeatures, features2 : ReduceFeatures)) = entry