diff options
Diffstat (limited to 'scalding')
4 files changed, 43 insertions, 31 deletions
diff --git a/scalding/src/main/scala/sandcrawler/CrossrefScorable.scala b/scalding/src/main/scala/sandcrawler/CrossrefScorable.scala index 146feec..817bee5 100644 --- a/scalding/src/main/scala/sandcrawler/CrossrefScorable.scala +++ b/scalding/src/main/scala/sandcrawler/CrossrefScorable.scala @@ -36,9 +36,8 @@ class CrossrefScorable extends Scorable with HBasePipeConversions { TextLine(args("crossref-input")) } - def getFeaturesPipe(pipe : Pipe) : TypedPipe[MapFeatures] = { - // Here I CANNOT call Pipe.toTypedPipe() - pipe + def getFeaturesPipe(args : Args)(implicit mode : Mode, flowDef : FlowDef) : TypedPipe[MapFeatures] = { + getSource(args).read .toTypedPipe[String](new Fields("line")) .map{ json : String => CrossrefScorable.crossrefToSlug(json) match { diff --git a/scalding/src/main/scala/sandcrawler/GrobidScorable.scala b/scalding/src/main/scala/sandcrawler/GrobidScorable.scala index ba15f22..61055f2 100644 --- a/scalding/src/main/scala/sandcrawler/GrobidScorable.scala +++ b/scalding/src/main/scala/sandcrawler/GrobidScorable.scala @@ -16,10 +16,11 @@ class GrobidScorable extends Scorable with HBasePipeConversions { GrobidScorable.getHBaseSource(args("hbase-table"), args("zookeeper-hosts")) } - def getFeaturesPipe(pipe : cascading.pipe.Pipe) : TypedPipe[MapFeatures] = { - pipe + def getFeaturesPipe(args : Args)(implicit mode : Mode, flowDef : FlowDef) : TypedPipe[MapFeatures] = { + getSource(args) + .read .fromBytesWritable(new Fields("key", "tei_json")) - .toTypedPipe[(String, String)](new Fields('key, 'tei_json)) + .toTypedPipe[(String, String)](new Fields("key", "tei_json")) .map { entry => val (key : String, json : String) = (entry._1, entry._2) GrobidScorable.grobidToSlug(json) match { diff --git a/scalding/src/main/scala/sandcrawler/Scorable.scala b/scalding/src/main/scala/sandcrawler/Scorable.scala index 65d9b41..0ec8e46 100644 --- a/scalding/src/main/scala/sandcrawler/Scorable.scala +++ b/scalding/src/main/scala/sandcrawler/Scorable.scala @@ -13,9 +13,9 @@ case class ReduceFeatures(json : String) case class ReduceOutput(val slug : String, score : Int, json1 : String, json2 : String) abstract class Scorable { - def getInputPipe(pipe : cascading.pipe.Pipe) : TypedPipe[(String, ReduceFeatures)] = + def getInputPipe(args : Args)(implicit mode : Mode, flowDef : FlowDef) : TypedPipe[(String, ReduceFeatures)] = { - getFeaturesPipe(pipe) + getFeaturesPipe(args) .filter { entry => Scorable.isValidSlug(entry.slug) } .groupBy { case MapFeatures(slug, json) => slug } .map { tuple => @@ -26,7 +26,7 @@ abstract class Scorable { // abstract methods def getSource(args : Args) : Source - def getFeaturesPipe(pipe : cascading.pipe.Pipe) : TypedPipe[MapFeatures] + def getFeaturesPipe(args : Args)(implicit mode : Mode, flowDef : FlowDef) : TypedPipe[MapFeatures] } object Scorable { diff --git a/scalding/src/main/scala/sandcrawler/ScoreJob.scala b/scalding/src/main/scala/sandcrawler/ScoreJob.scala index 0dbe64d..bc5bf87 100644 --- a/scalding/src/main/scala/sandcrawler/ScoreJob.scala +++ b/scalding/src/main/scala/sandcrawler/ScoreJob.scala @@ -2,16 +2,32 @@ package sandcrawler import cascading.flow.FlowDef import cascading.tuple.Fields -import com.twitter.scalding._ -import com.twitter.scalding.typed.TDsl._ +import com.twitter.scalding.{Args,Source,TextLine,TypedPipe, TypedTsv} +//import com.twitter.scalding.typed.TDsl._ import parallelai.spyglass.base.JobBase import parallelai.spyglass.hbase.HBasePipeConversions import parallelai.spyglass.hbase.HBaseSource - -//case class MapFeatures(slug : String, json : String) +import com.twitter.scalding.{ Dsl, RichPipe, IterableSource, TupleSetter, TupleConverter } +import cascading.pipe.Pipe class ScoreJob(args: Args) extends JobBase(args) { //with HBasePipeConversions { + // TODO: Instantiate any subclass of Scorable specified in args. + val sc1 : Scorable = new GrobidScorable() + val sc2 : Scorable = new GrobidScorable() + val pipe1 : TypedPipe[(String, ReduceFeatures)] = sc1.getInputPipe(args) + val pipe2 : TypedPipe[(String, ReduceFeatures)] = sc2.getInputPipe(args) + + pipe1.join(pipe2).map { entry => + val (slug : String, (features1 : ReduceFeatures, features2 : ReduceFeatures)) = entry + new ReduceOutput( + slug, + Scorable.computeSimilarity(features1, features2), + features1.json, + features2.json) + } + .write(TypedTsv[ReduceOutput](args("output"))) + /* val grobidSource = HBaseCrossrefScore.getHBaseSource( args("hbase-table"), args("zookeeper-hosts")) @@ -34,7 +50,6 @@ class ScoreJob(args: Args) extends JobBase(args) { //with HBasePipeConversions { // This leads to a compile error: val pipe11 : TypedPipe[String] = ScoreJob.getFeaturesPipe1(pipe0) - /* val pipe : cascading.pipe.Pipe = grobidSource .read val grobidPipe : TypedPipe[(String, String)] = pipe @@ -46,22 +61,6 @@ class ScoreJob(args: Args) extends JobBase(args) { //with HBasePipeConversions { // Let's try making a method call. // ScoreJob.etFeaturesPipe(pipe) - // TODO: Instantiate any subclass of Scorable specified in args. - Scorable sc1 = new GrobidScorable() - Scorable sc2 = new CrossrefScorable() - val pipe1 : TypedPipe[(String, ReduceFeatures)] = sc1.getInputPipe(sc1.getSource().read) - val pipe2 : TypedPipe[(String, ReduceFeatures)] = sc2.getInputPipe(sc2.getSource().read) - - - pipe1.join(pipe2).map { entry => - val (slug : String, (features1 : ReduceFeatures, features2 : ReduceFeatures)) = entry - new ReduceOutput( - slug, - Scorable.computeSimilarity(features1, features2), - features1.json, - features2.json) - } - .write(TypedTsv[ReduceOutput](args("output"))) */ } @@ -93,12 +92,25 @@ object ScoreJob { } } + /* + implicit def sourceToRichPipe(src: Source): RichPipe = new RichPipe(src.read) + + // This converts an Iterable into a Pipe or RichPipe with index (int-based) fields + implicit def toPipe[T](iter: Iterable[T])(implicit set: TupleSetter[T], conv: TupleConverter[T]): Pipe = + IterableSource[T](iter)(set, conv).read + + implicit def iterableToRichPipe[T](iter: Iterable[T])(implicit set: TupleSetter[T], conv: TupleConverter[T]): RichPipe = + RichPipe(toPipe(iter)(set, conv)) + + // Provide args as an implicit val for extensions such as the Checkpoint extension. +// implicit protected def _implicitJobArgs: Args = args + def getFeaturesPipe1(pipe : cascading.pipe.Pipe) : TypedPipe[String] = { pipe // The next line gives an error: value toTypedPipe is not a member of cascading.pipe.Pipe .toTypedPipe[String](new Fields("line")) } -/* + def getFeaturesPipe(pipe : cascading.pipe.Pipe) : TypedPipe[MapFeatures] = { pipe .fromBytesWritable(new Fields("key", "tei_json")) |