diff options
author | Ellen Spertus <ellen.spertus@gmail.com> | 2018-08-07 09:51:18 -0700 |
---|---|---|
committer | Ellen Spertus <ellen.spertus@gmail.com> | 2018-08-07 09:51:18 -0700 |
commit | c71b2da70ff7d3b77082db25672f6f3669f2238c (patch) | |
tree | 30f1cad98c781fdd5d1de18ffda232cd4286c72f | |
parent | 308b33d889d804380427d2aa112efec77b3e1770 (diff) | |
download | sandcrawler-c71b2da70ff7d3b77082db25672f6f3669f2238c.tar.gz sandcrawler-c71b2da70ff7d3b77082db25672f6f3669f2238c.zip |
Added CrossrefScorable.scala. All code compiles.
3 files changed, 34 insertions, 10 deletions
diff --git a/scalding/src/main/scala/sandcrawler/CrossrefScorable.scala b/scalding/src/main/scala/sandcrawler/CrossrefScorable.scala new file mode 100644 index 0000000..a603e2d --- /dev/null +++ b/scalding/src/main/scala/sandcrawler/CrossrefScorable.scala @@ -0,0 +1,27 @@ +package sandcrawler + +import cascading.flow.FlowDef +import cascading.pipe.Pipe +import cascading.tuple.Fields +import com.twitter.scalding._ +import com.twitter.scalding.typed.TDsl._ +import parallelai.spyglass.hbase.HBaseConstants.SourceMode +import parallelai.spyglass.hbase.HBasePipeConversions +import parallelai.spyglass.hbase.HBaseSource + +class CrossrefScorable extends Scorable { + def getFeaturesPipe(args : Args)(implicit flowDef : FlowDef, mode : Mode) = { +// val crossrefSource = TextLine(args("crossref-input")) +// val crossrefPipe : TypedPipe[MapFeatures] = crossrefSource + TextLine(args("crossref-input")) + .read + .toTypedPipe[String](new Fields("line")) + .map{ json : String => + HBaseCrossrefScore.crossrefToSlug(json) match { + case Some(slug) => new MapFeatures(slug, json) + case None => new MapFeatures(Scorable.NoSlug, json) + } + } +// crossrefPipe + } +} diff --git a/scalding/src/main/scala/sandcrawler/GrobidScorable.scala b/scalding/src/main/scala/sandcrawler/GrobidScorable.scala index 5dac64c..8da7708 100644 --- a/scalding/src/main/scala/sandcrawler/GrobidScorable.scala +++ b/scalding/src/main/scala/sandcrawler/GrobidScorable.scala @@ -16,8 +16,9 @@ class GrobidScorable extends Scorable with HBasePipeConversions { args("hbase-table"), args("zookeeper-hosts")) - val pipe0 : Pipe = grobidSource.read - val grobidPipe : TypedPipe[MapFeatures] = pipe0 +// val pipe0 : Pipe = grobidSource.read +// val grobidPipe : TypedPipe[MapFeatures] = pipe0 + grobidSource.read .fromBytesWritable(new Fields("key", "tei_json")) // .debug // Should be 4 tuples for mocked data // TODO: Figure out why this line (used in HBaseCrossrefScoreJob.scala) @@ -26,14 +27,10 @@ class GrobidScorable extends Scorable with HBasePipeConversions { .map { entry => val (key : String, json : String) = (entry._1, entry._2) HBaseCrossrefScore.grobidToSlug(json) match { - case Some(slug) => new MapFeatures(slug, key, json) - case None => new MapFeatures(Scorable.NoSlug, key, json) + case Some(slug) => new MapFeatures(slug, json) + case None => new MapFeatures(Scorable.NoSlug, json) } } - .filter { - _.slug != Scorable.NoSlug - } - grobidPipe } /* def fromBytesWritableLocal(f: Fields): Pipe = { diff --git a/scalding/src/main/scala/sandcrawler/Scorable.scala b/scalding/src/main/scala/sandcrawler/Scorable.scala index 89dc835..950a6d4 100644 --- a/scalding/src/main/scala/sandcrawler/Scorable.scala +++ b/scalding/src/main/scala/sandcrawler/Scorable.scala @@ -7,7 +7,7 @@ import cascading.flow.FlowDef import com.twitter.scalding._ import com.twitter.scalding.typed.TDsl._ -case class MapFeatures(val key : String, slug : String, json : String) +case class MapFeatures(slug : String, json : String) case class ReduceFeatures(json : String) case class ReduceOutput(val score : Int, json1 : String, json2 : String) @@ -16,7 +16,7 @@ abstract class Scorable { { getFeaturesPipe(args)(flowDef, mode) .filter { entry => Scorable.isValidSlug(entry.slug) } - .groupBy { case MapFeatures(key, slug, json) => slug } + .groupBy { case MapFeatures(slug, json) => slug } .map { tuple => val (slug : String, features : MapFeatures) = tuple (slug, ReduceFeatures(features.json)) |