diff options
Diffstat (limited to 'scalding/src/main/scala')
5 files changed, 65 insertions, 9 deletions
diff --git a/scalding/src/main/scala/sandcrawler/GrobidScorable.scala b/scalding/src/main/scala/sandcrawler/GrobidScorable.scala new file mode 100644 index 0000000..5dac64c --- /dev/null +++ b/scalding/src/main/scala/sandcrawler/GrobidScorable.scala @@ -0,0 +1,48 @@ +package sandcrawler + +import cascading.flow.FlowDef +import cascading.pipe.Pipe +import cascading.tuple.Fields +import com.twitter.scalding._ +import com.twitter.scalding.typed.TDsl._ +import parallelai.spyglass.hbase.HBaseConstants.SourceMode +import parallelai.spyglass.hbase.HBasePipeConversions +import parallelai.spyglass.hbase.HBaseSource + +class GrobidScorable extends Scorable with HBasePipeConversions { + def getFeaturesPipe(args : Args)(implicit flowDef : FlowDef, mode : Mode) = { + // TODO: Clean up code after debugging. + val grobidSource = HBaseCrossrefScore.getHBaseSource( + args("hbase-table"), + args("zookeeper-hosts")) + + val pipe0 : Pipe = grobidSource.read + val grobidPipe : TypedPipe[MapFeatures] = pipe0 + .fromBytesWritable(new Fields("key", "tei_json")) + // .debug // Should be 4 tuples for mocked data + // TODO: Figure out why this line (used in HBaseCrossrefScoreJob.scala) + // didn't work here: .toTypedPipe[(String, String)]('key, 'tei_json) + .toTypedPipe[(String, String)](new Fields("key", "tei_json")) + .map { entry => + val (key : String, json : String) = (entry._1, entry._2) + HBaseCrossrefScore.grobidToSlug(json) match { + case Some(slug) => new MapFeatures(slug, key, json) + case None => new MapFeatures(Scorable.NoSlug, key, json) + } + } + .filter { + _.slug != Scorable.NoSlug + } + grobidPipe + } +/* + def fromBytesWritableLocal(f: Fields): Pipe = { + asList(f) + .foldLeft(pipe) { (p, fld) => { + p.map(fld.toString -> fld.toString) { from: org.apache.hadoop.hbase.io.ImmutableBytesWritable => + Option(from).map(x => Bytes.toString(x.get)).getOrElse(null) + } + }} + } + */ +} diff --git a/scalding/src/main/scala/sandcrawler/HBaseCrossrefScoreJob.scala b/scalding/src/main/scala/sandcrawler/HBaseCrossrefScoreJob.scala index 01d852e..2fbb19f 100644 --- a/scalding/src/main/scala/sandcrawler/HBaseCrossrefScoreJob.scala +++ b/scalding/src/main/scala/sandcrawler/HBaseCrossrefScoreJob.scala @@ -27,8 +27,9 @@ class HBaseCrossrefScoreJob(args: Args) extends JobBase(args) with HBasePipeConv val grobidSource = HBaseCrossrefScore.getHBaseSource( args("hbase-table"), args("zookeeper-hosts")) - val grobidPipe : TypedPipe[(String, String, String)] = grobidSource - .read + + val pipe0 : cascading.pipe.Pipe = grobidSource.read + val grobidPipe : TypedPipe[(String, String, String)] = pipe0 .fromBytesWritable(new Fields("key", "tei_json")) // .debug // Should be 4 tuples for mocked data .toTypedPipe[(String, String)]('key, 'tei_json) @@ -78,7 +79,6 @@ class HBaseCrossrefScoreJob(args: Args) extends JobBase(args) with HBasePipeConv HBaseCrossrefScore.computeOutput(sha1, grobidJson, crossrefJson)} // Output: score, sha1, doi, grobid title, crossref title .write(TypedTsv[(Int, String, String, String, String)](args("output"))) - } object HBaseCrossrefScore { diff --git a/scalding/src/main/scala/sandcrawler/Scorable.scala b/scalding/src/main/scala/sandcrawler/Scorable.scala index 8e0c560..89dc835 100644 --- a/scalding/src/main/scala/sandcrawler/Scorable.scala +++ b/scalding/src/main/scala/sandcrawler/Scorable.scala @@ -1,6 +1,9 @@ +package sandcrawler + import scala.math import scala.util.parsing.json.JSON +import cascading.flow.FlowDef import com.twitter.scalding._ import com.twitter.scalding.typed.TDsl._ @@ -9,9 +12,9 @@ case class ReduceFeatures(json : String) case class ReduceOutput(val score : Int, json1 : String, json2 : String) abstract class Scorable { - def getInputPipe(args : Args) : TypedPipe[(String, ReduceFeatures)] = + def getInputPipe(args : Args, flowDef : FlowDef, mode : Mode) : TypedPipe[(String, ReduceFeatures)] = { - getFeaturesPipe(args) + getFeaturesPipe(args)(flowDef, mode) .filter { entry => Scorable.isValidSlug(entry.slug) } .groupBy { case MapFeatures(key, slug, json) => slug } .map { tuple => @@ -21,7 +24,7 @@ abstract class Scorable { } // abstract method - def getFeaturesPipe(args : Args) : TypedPipe[MapFeatures] + def getFeaturesPipe(args : Args)(implicit flowDef : FlowDef, mode : Mode) : TypedPipe[MapFeatures] } object Scorable { diff --git a/scalding/src/main/scala/sandcrawler/ScoreJob.scala b/scalding/src/main/scala/sandcrawler/ScoreJob.scala index 8d4d957..22cc9e9 100644 --- a/scalding/src/main/scala/sandcrawler/ScoreJob.scala +++ b/scalding/src/main/scala/sandcrawler/ScoreJob.scala @@ -1,16 +1,19 @@ +package sandcrawler + import java.text.Normalizer import scala.math import scala.util.parsing.json.JSON +import cascading.flow.FlowDef import com.twitter.scalding._ import com.twitter.scalding.typed.TDsl._ import parallelai.spyglass.base.JobBase import parallelai.spyglass.hbase.HBasePipeConversions -class ScoreJob(args: Args, sc1 : Scorable, sc2 : Scorable) extends JobBase(args) with HBasePipeConversions { - val pipe1 : TypedPipe[(String, ReduceFeatures)] = sc1.getInputPipe(args) - val pipe2 : TypedPipe[(String, ReduceFeatures)] = sc2.getInputPipe(args) +class ScoreJob(args: Args, sc1 : Scorable, sc2 : Scorable)(implicit flowDef : FlowDef, mode: Mode) extends JobBase(args) with HBasePipeConversions { + val pipe1 : TypedPipe[(String, ReduceFeatures)] = sc1.getInputPipe(args, flowDef, mode) + val pipe2 : TypedPipe[(String, ReduceFeatures)] = sc2.getInputPipe(args, flowDef, mode) pipe1.join(pipe2).map { entry => val (slug : String, (features1 : ReduceFeatures, features2 : ReduceFeatures)) = entry diff --git a/scalding/src/main/scala/sandcrawler/StringUtilities.scala b/scalding/src/main/scala/sandcrawler/StringUtilities.scala index 290b03f..1ae6db3 100644 --- a/scalding/src/main/scala/sandcrawler/StringUtilities.scala +++ b/scalding/src/main/scala/sandcrawler/StringUtilities.scala @@ -1,3 +1,5 @@ +package sandcrawler + import java.text.Normalizer import java.util.regex.Pattern |