diff options
Diffstat (limited to 'scalding')
5 files changed, 90 insertions, 8 deletions
diff --git a/scalding/src/main/scala/sandcrawler/CrossrefScorable.scala b/scalding/src/main/scala/sandcrawler/CrossrefScorable.scala index 9842122..146feec 100644 --- a/scalding/src/main/scala/sandcrawler/CrossrefScorable.scala +++ b/scalding/src/main/scala/sandcrawler/CrossrefScorable.scala @@ -10,6 +10,26 @@ import parallelai.spyglass.hbase.HBasePipeConversions import parallelai.spyglass.hbase.HBaseSource import TDsl._ +import java.text.Normalizer +import java.util.Arrays +import java.util.Properties +import java.util.regex.Pattern + +import scala.math +import scala.util.parsing.json.JSON + +import cascading.tuple.Fields +import com.twitter.scalding._ +import com.twitter.scalding.typed.CoGrouped +import com.twitter.scalding.typed.Grouped +import com.twitter.scalding.typed.TDsl._ +import org.apache.hadoop.hbase.io.ImmutableBytesWritable +import org.apache.hadoop.hbase.util.Bytes +import parallelai.spyglass.base.JobBase +import parallelai.spyglass.hbase.HBaseConstants.SourceMode +import parallelai.spyglass.hbase.HBasePipeConversions +import parallelai.spyglass.hbase.HBaseSource + class CrossrefScorable extends Scorable with HBasePipeConversions { // TODO: Generalize args so there can be multiple Grobid pipes in one job. def getSource(args : Args) : Source = { @@ -17,6 +37,7 @@ class CrossrefScorable extends Scorable with HBasePipeConversions { } def getFeaturesPipe(pipe : Pipe) : TypedPipe[MapFeatures] = { + // Here I CANNOT call Pipe.toTypedPipe() pipe .toTypedPipe[String](new Fields("line")) .map{ json : String => diff --git a/scalding/src/main/scala/sandcrawler/GrobidScorable.scala b/scalding/src/main/scala/sandcrawler/GrobidScorable.scala index 51e40f9..ba15f22 100644 --- a/scalding/src/main/scala/sandcrawler/GrobidScorable.scala +++ b/scalding/src/main/scala/sandcrawler/GrobidScorable.scala @@ -8,7 +8,7 @@ import com.twitter.scalding.typed.TDsl._ import parallelai.spyglass.hbase.HBaseConstants.SourceMode import parallelai.spyglass.hbase.HBasePipeConversions import parallelai.spyglass.hbase.HBaseSource -import TDsl._ +//import TDsl._ class GrobidScorable extends Scorable with HBasePipeConversions { def getSource(args : Args) : Source = { diff --git a/scalding/src/main/scala/sandcrawler/HBaseCrossrefScoreJob.scala b/scalding/src/main/scala/sandcrawler/HBaseCrossrefScoreJob.scala index 725474d..018a74b 100644 --- a/scalding/src/main/scala/sandcrawler/HBaseCrossrefScoreJob.scala +++ b/scalding/src/main/scala/sandcrawler/HBaseCrossrefScoreJob.scala @@ -19,6 +19,7 @@ import parallelai.spyglass.base.JobBase import parallelai.spyglass.hbase.HBaseConstants.SourceMode import parallelai.spyglass.hbase.HBasePipeConversions import parallelai.spyglass.hbase.HBaseSource +import TDsl._ class HBaseCrossrefScoreJob(args: Args) extends JobBase(args) with HBasePipeConversions { val NoTitle = "NO TITLE" // Used for slug if title is empty or unparseable @@ -30,13 +31,13 @@ class HBaseCrossrefScoreJob(args: Args) extends JobBase(args) with HBasePipeConv val temp : cascading.pipe.Pipe = grobidSource .read - .fromBytesWritable(new Fields("key", "tei_json")) + + // Here I CAN call Pipe.toTypedPipe() val grobidPipe : TypedPipe[(String, String, String)] = temp - // .debug // Should be 4 tuples for mocked data + .fromBytesWritable(new Fields("key", "tei_json")) .toTypedPipe[(String, String)]('key, 'tei_json) .map { entry => val (key, json) = (entry._1, entry._2) - // TODO: Consider passing forward only a subset of JSON. HBaseCrossrefScore.grobidToSlug(json) match { case Some(slug) => (slug, key, json) case None => (NoTitle, key, json) @@ -46,7 +47,6 @@ class HBaseCrossrefScoreJob(args: Args) extends JobBase(args) with HBasePipeConv val (slug, _, _) = entry slug != NoTitle } -// .debug // SHould be 3 tuples for mocked data val grobidGroup = grobidPipe .groupBy { case (slug, key, json) => slug } diff --git a/scalding/src/main/scala/sandcrawler/Scorable.scala b/scalding/src/main/scala/sandcrawler/Scorable.scala index bd03d57..65d9b41 100644 --- a/scalding/src/main/scala/sandcrawler/Scorable.scala +++ b/scalding/src/main/scala/sandcrawler/Scorable.scala @@ -6,7 +6,7 @@ import scala.util.parsing.json.JSON import cascading.flow.FlowDef import com.twitter.scalding._ import com.twitter.scalding.typed.TDsl._ -import TDsl._ +//import TDsl._ case class MapFeatures(slug : String, json : String) case class ReduceFeatures(json : String) diff --git a/scalding/src/main/scala/sandcrawler/ScoreJob.scala b/scalding/src/main/scala/sandcrawler/ScoreJob.scala index 7891596..0dbe64d 100644 --- a/scalding/src/main/scala/sandcrawler/ScoreJob.scala +++ b/scalding/src/main/scala/sandcrawler/ScoreJob.scala @@ -1,13 +1,50 @@ package sandcrawler import cascading.flow.FlowDef +import cascading.tuple.Fields import com.twitter.scalding._ import com.twitter.scalding.typed.TDsl._ import parallelai.spyglass.base.JobBase import parallelai.spyglass.hbase.HBasePipeConversions +import parallelai.spyglass.hbase.HBaseSource -class ScoreJob(args: Args) extends JobBase(args) with - HBasePipeConversions { +//case class MapFeatures(slug : String, json : String) + +class ScoreJob(args: Args) extends JobBase(args) { //with HBasePipeConversions { + + val grobidSource = HBaseCrossrefScore.getHBaseSource( + args("hbase-table"), + args("zookeeper-hosts")) + + val source0 : Source = TextLine("foo") + val pipe0 : cascading.pipe.Pipe = source0.read + // This compiles: + val pipe00 : TypedPipe[String] = getFeaturesPipe0(pipe0) + + // Calling a method within ScoreJob compiles fine. + def getFeaturesPipe0(pipe : cascading.pipe.Pipe) : TypedPipe[String] = { + pipe + // This compiles: + .toTypedPipe[String](new Fields("line")) + } + + // Calling a function in a ScoreJob object leads to a compiler error. + val source1 : Source = TextLine("foo") + val pipe1 : cascading.pipe.Pipe = source1.read + // This leads to a compile error: + val pipe11 : TypedPipe[String] = ScoreJob.getFeaturesPipe1(pipe0) + + /* + val pipe : cascading.pipe.Pipe = grobidSource + .read + val grobidPipe : TypedPipe[(String, String)] = pipe + .fromBytesWritable(new Fields("key", "tei_json")) + // Here I CAN call Pipe.toTypedPipe() + .toTypedPipe[(String, String)]('key, 'tei_json) + .write(TypedTsv[(String, String)](args("output"))) + + // Let's try making a method call. +// ScoreJob.etFeaturesPipe(pipe) // TODO: Instantiate any subclass of Scorable specified in args. Scorable sc1 = new GrobidScorable() @@ -15,6 +52,7 @@ class ScoreJob(args: Args) extends JobBase(args) with val pipe1 : TypedPipe[(String, ReduceFeatures)] = sc1.getInputPipe(sc1.getSource().read) val pipe2 : TypedPipe[(String, ReduceFeatures)] = sc2.getInputPipe(sc2.getSource().read) + pipe1.join(pipe2).map { entry => val (slug : String, (features1 : ReduceFeatures, features2 : ReduceFeatures)) = entry new ReduceOutput( @@ -24,6 +62,8 @@ class ScoreJob(args: Args) extends JobBase(args) with features2.json) } .write(TypedTsv[ReduceOutput](args("output"))) + */ + } // Ugly hack to get non-String information into ScoreJob above. @@ -52,4 +92,25 @@ object ScoreJob { case None => null } } + + def getFeaturesPipe1(pipe : cascading.pipe.Pipe) : TypedPipe[String] = { + pipe + // The next line gives an error: value toTypedPipe is not a member of cascading.pipe.Pipe + .toTypedPipe[String](new Fields("line")) + } +/* + def getFeaturesPipe(pipe : cascading.pipe.Pipe) : TypedPipe[MapFeatures] = { + pipe + .fromBytesWritable(new Fields("key", "tei_json")) + // I needed to change symbols to strings when I pulled this out of ScoreJob. + .toTypedPipe[(String, String)](new Fields("key", "tei_json")) + .map { entry => + val (key : String, json : String) = (entry._1, entry._2) + GrobidScorable.grobidToSlug(json) match { + case Some(slug) => new MapFeatures(slug, json) + case None => new MapFeatures(Scorable.NoSlug, json) + } + } + } + */ } |