aboutsummaryrefslogtreecommitdiffstats
path: root/scalding/src
diff options
context:
space:
mode:
Diffstat (limited to 'scalding/src')
-rw-r--r--scalding/src/main/scala/sandcrawler/CrossrefScorable.scala5
-rw-r--r--scalding/src/main/scala/sandcrawler/GrobidScorable.scala7
-rw-r--r--scalding/src/main/scala/sandcrawler/Scorable.scala6
-rw-r--r--scalding/src/main/scala/sandcrawler/ScoreJob.scala56
4 files changed, 43 insertions, 31 deletions
diff --git a/scalding/src/main/scala/sandcrawler/CrossrefScorable.scala b/scalding/src/main/scala/sandcrawler/CrossrefScorable.scala
index 146feec..817bee5 100644
--- a/scalding/src/main/scala/sandcrawler/CrossrefScorable.scala
+++ b/scalding/src/main/scala/sandcrawler/CrossrefScorable.scala
@@ -36,9 +36,8 @@ class CrossrefScorable extends Scorable with HBasePipeConversions {
TextLine(args("crossref-input"))
}
- def getFeaturesPipe(pipe : Pipe) : TypedPipe[MapFeatures] = {
- // Here I CANNOT call Pipe.toTypedPipe()
- pipe
+ def getFeaturesPipe(args : Args)(implicit mode : Mode, flowDef : FlowDef) : TypedPipe[MapFeatures] = {
+ getSource(args).read
.toTypedPipe[String](new Fields("line"))
.map{ json : String =>
CrossrefScorable.crossrefToSlug(json) match {
diff --git a/scalding/src/main/scala/sandcrawler/GrobidScorable.scala b/scalding/src/main/scala/sandcrawler/GrobidScorable.scala
index ba15f22..61055f2 100644
--- a/scalding/src/main/scala/sandcrawler/GrobidScorable.scala
+++ b/scalding/src/main/scala/sandcrawler/GrobidScorable.scala
@@ -16,10 +16,11 @@ class GrobidScorable extends Scorable with HBasePipeConversions {
GrobidScorable.getHBaseSource(args("hbase-table"), args("zookeeper-hosts"))
}
- def getFeaturesPipe(pipe : cascading.pipe.Pipe) : TypedPipe[MapFeatures] = {
- pipe
+ def getFeaturesPipe(args : Args)(implicit mode : Mode, flowDef : FlowDef) : TypedPipe[MapFeatures] = {
+ getSource(args)
+ .read
.fromBytesWritable(new Fields("key", "tei_json"))
- .toTypedPipe[(String, String)](new Fields('key, 'tei_json))
+ .toTypedPipe[(String, String)](new Fields("key", "tei_json"))
.map { entry =>
val (key : String, json : String) = (entry._1, entry._2)
GrobidScorable.grobidToSlug(json) match {
diff --git a/scalding/src/main/scala/sandcrawler/Scorable.scala b/scalding/src/main/scala/sandcrawler/Scorable.scala
index 65d9b41..0ec8e46 100644
--- a/scalding/src/main/scala/sandcrawler/Scorable.scala
+++ b/scalding/src/main/scala/sandcrawler/Scorable.scala
@@ -13,9 +13,9 @@ case class ReduceFeatures(json : String)
case class ReduceOutput(val slug : String, score : Int, json1 : String, json2 : String)
abstract class Scorable {
- def getInputPipe(pipe : cascading.pipe.Pipe) : TypedPipe[(String, ReduceFeatures)] =
+ def getInputPipe(args : Args)(implicit mode : Mode, flowDef : FlowDef) : TypedPipe[(String, ReduceFeatures)] =
{
- getFeaturesPipe(pipe)
+ getFeaturesPipe(args)
.filter { entry => Scorable.isValidSlug(entry.slug) }
.groupBy { case MapFeatures(slug, json) => slug }
.map { tuple =>
@@ -26,7 +26,7 @@ abstract class Scorable {
// abstract methods
def getSource(args : Args) : Source
- def getFeaturesPipe(pipe : cascading.pipe.Pipe) : TypedPipe[MapFeatures]
+ def getFeaturesPipe(args : Args)(implicit mode : Mode, flowDef : FlowDef) : TypedPipe[MapFeatures]
}
object Scorable {
diff --git a/scalding/src/main/scala/sandcrawler/ScoreJob.scala b/scalding/src/main/scala/sandcrawler/ScoreJob.scala
index 0dbe64d..bc5bf87 100644
--- a/scalding/src/main/scala/sandcrawler/ScoreJob.scala
+++ b/scalding/src/main/scala/sandcrawler/ScoreJob.scala
@@ -2,16 +2,32 @@ package sandcrawler
import cascading.flow.FlowDef
import cascading.tuple.Fields
-import com.twitter.scalding._
-import com.twitter.scalding.typed.TDsl._
+import com.twitter.scalding.{Args,Source,TextLine,TypedPipe, TypedTsv}
+//import com.twitter.scalding.typed.TDsl._
import parallelai.spyglass.base.JobBase
import parallelai.spyglass.hbase.HBasePipeConversions
import parallelai.spyglass.hbase.HBaseSource
-
-//case class MapFeatures(slug : String, json : String)
+import com.twitter.scalding.{ Dsl, RichPipe, IterableSource, TupleSetter, TupleConverter }
+import cascading.pipe.Pipe
class ScoreJob(args: Args) extends JobBase(args) { //with HBasePipeConversions {
+ // TODO: Instantiate any subclass of Scorable specified in args.
+ val sc1 : Scorable = new GrobidScorable()
+ val sc2 : Scorable = new GrobidScorable()
+ val pipe1 : TypedPipe[(String, ReduceFeatures)] = sc1.getInputPipe(args)
+ val pipe2 : TypedPipe[(String, ReduceFeatures)] = sc2.getInputPipe(args)
+
+ pipe1.join(pipe2).map { entry =>
+ val (slug : String, (features1 : ReduceFeatures, features2 : ReduceFeatures)) = entry
+ new ReduceOutput(
+ slug,
+ Scorable.computeSimilarity(features1, features2),
+ features1.json,
+ features2.json)
+ }
+ .write(TypedTsv[ReduceOutput](args("output")))
+ /*
val grobidSource = HBaseCrossrefScore.getHBaseSource(
args("hbase-table"),
args("zookeeper-hosts"))
@@ -34,7 +50,6 @@ class ScoreJob(args: Args) extends JobBase(args) { //with HBasePipeConversions {
// This leads to a compile error:
val pipe11 : TypedPipe[String] = ScoreJob.getFeaturesPipe1(pipe0)
- /*
val pipe : cascading.pipe.Pipe = grobidSource
.read
val grobidPipe : TypedPipe[(String, String)] = pipe
@@ -46,22 +61,6 @@ class ScoreJob(args: Args) extends JobBase(args) { //with HBasePipeConversions {
// Let's try making a method call.
// ScoreJob.etFeaturesPipe(pipe)
- // TODO: Instantiate any subclass of Scorable specified in args.
- Scorable sc1 = new GrobidScorable()
- Scorable sc2 = new CrossrefScorable()
- val pipe1 : TypedPipe[(String, ReduceFeatures)] = sc1.getInputPipe(sc1.getSource().read)
- val pipe2 : TypedPipe[(String, ReduceFeatures)] = sc2.getInputPipe(sc2.getSource().read)
-
-
- pipe1.join(pipe2).map { entry =>
- val (slug : String, (features1 : ReduceFeatures, features2 : ReduceFeatures)) = entry
- new ReduceOutput(
- slug,
- Scorable.computeSimilarity(features1, features2),
- features1.json,
- features2.json)
- }
- .write(TypedTsv[ReduceOutput](args("output")))
*/
}
@@ -93,12 +92,25 @@ object ScoreJob {
}
}
+ /*
+ implicit def sourceToRichPipe(src: Source): RichPipe = new RichPipe(src.read)
+
+ // This converts an Iterable into a Pipe or RichPipe with index (int-based) fields
+ implicit def toPipe[T](iter: Iterable[T])(implicit set: TupleSetter[T], conv: TupleConverter[T]): Pipe =
+ IterableSource[T](iter)(set, conv).read
+
+ implicit def iterableToRichPipe[T](iter: Iterable[T])(implicit set: TupleSetter[T], conv: TupleConverter[T]): RichPipe =
+ RichPipe(toPipe(iter)(set, conv))
+
+ // Provide args as an implicit val for extensions such as the Checkpoint extension.
+// implicit protected def _implicitJobArgs: Args = args
+
def getFeaturesPipe1(pipe : cascading.pipe.Pipe) : TypedPipe[String] = {
pipe
// The next line gives an error: value toTypedPipe is not a member of cascading.pipe.Pipe
.toTypedPipe[String](new Fields("line"))
}
-/*
+
def getFeaturesPipe(pipe : cascading.pipe.Pipe) : TypedPipe[MapFeatures] = {
pipe
.fromBytesWritable(new Fields("key", "tei_json"))