diff options
author | Ellen Spertus <ellen.spertus@gmail.com> | 2018-08-10 20:49:44 -0700 |
---|---|---|
committer | Ellen Spertus <ellen.spertus@gmail.com> | 2018-08-10 20:49:44 -0700 |
commit | 768e7ef0d127cf55119543be6e656751704ca5b2 (patch) | |
tree | 27df4f067ebe693275f4995ac271660f5ac676d9 /scalding/src/main | |
parent | b7f77f6337b450406ae0a90b81faeba27394afb0 (diff) | |
download | sandcrawler-768e7ef0d127cf55119543be6e656751704ca5b2.tar.gz sandcrawler-768e7ef0d127cf55119543be6e656751704ca5b2.zip |
Tests pass. Still have changes to do but made huge progress.
Diffstat (limited to 'scalding/src/main')
-rw-r--r-- | scalding/src/main/scala/sandcrawler/CrossrefScorable.scala | 38 | ||||
-rw-r--r-- | scalding/src/main/scala/sandcrawler/ScoreJob.scala | 44 |
2 files changed, 27 insertions, 55 deletions
diff --git a/scalding/src/main/scala/sandcrawler/CrossrefScorable.scala b/scalding/src/main/scala/sandcrawler/CrossrefScorable.scala index 817bee5..b2f6537 100644 --- a/scalding/src/main/scala/sandcrawler/CrossrefScorable.scala +++ b/scalding/src/main/scala/sandcrawler/CrossrefScorable.scala @@ -9,6 +9,7 @@ import parallelai.spyglass.hbase.HBaseConstants.SourceMode import parallelai.spyglass.hbase.HBasePipeConversions import parallelai.spyglass.hbase.HBaseSource import TDsl._ +import scala.util.parsing.json.JSONObject import java.text.Normalizer import java.util.Arrays @@ -31,7 +32,7 @@ import parallelai.spyglass.hbase.HBasePipeConversions import parallelai.spyglass.hbase.HBaseSource class CrossrefScorable extends Scorable with HBasePipeConversions { - // TODO: Generalize args so there can be multiple Grobid pipes in one job. + // TODO: Generalize args so there can be multiple Crossref pipes in one job. def getSource(args : Args) : Source = { TextLine(args("crossref-input")) } @@ -39,26 +40,31 @@ class CrossrefScorable extends Scorable with HBasePipeConversions { def getFeaturesPipe(args : Args)(implicit mode : Mode, flowDef : FlowDef) : TypedPipe[MapFeatures] = { getSource(args).read .toTypedPipe[String](new Fields("line")) - .map{ json : String => - CrossrefScorable.crossrefToSlug(json) match { - case Some(slug) => new MapFeatures(slug, json) + .map{ json : String => + CrossrefScorable.simplifyJson(json) match { case None => new MapFeatures(Scorable.NoSlug, json) + case Some(map) => new MapFeatures( + Scorable.titleToSlug(map("title").asInstanceOf[String]), + JSONObject(map).toString) } } } -} -object CrossrefScorable { - def crossrefToSlug(json : String) : Option[String] = { - Scorable.jsonToMap(json) match { - case None => None - case Some(map) => { - if (map contains "title") { - // TODO: Don't ignore titles after the first. - val title = map("title").asInstanceOf[List[String]](0) - Some(Scorable.titleToSlug(title)) - } else { - None + object CrossrefScorable { + def simplifyJson(json : String) : Option[Map[String, Any]] = { + Scorable.jsonToMap(json) match { + case None => None + case Some(map) => { + if (map contains "title") { + val titles = map("title").asInstanceOf[List[String]] + if (titles.isEmpty) { + None + } else { + Some(Map("title" -> titles(0))) + } + } else { + None + } } } } diff --git a/scalding/src/main/scala/sandcrawler/ScoreJob.scala b/scalding/src/main/scala/sandcrawler/ScoreJob.scala index bc5bf87..386b367 100644 --- a/scalding/src/main/scala/sandcrawler/ScoreJob.scala +++ b/scalding/src/main/scala/sandcrawler/ScoreJob.scala @@ -3,7 +3,7 @@ package sandcrawler import cascading.flow.FlowDef import cascading.tuple.Fields import com.twitter.scalding.{Args,Source,TextLine,TypedPipe, TypedTsv} -//import com.twitter.scalding.typed.TDsl._ +//import com.twitter.scalding.source.TypedText import parallelai.spyglass.base.JobBase import parallelai.spyglass.hbase.HBasePipeConversions import parallelai.spyglass.hbase.HBaseSource @@ -13,7 +13,7 @@ import cascading.pipe.Pipe class ScoreJob(args: Args) extends JobBase(args) { //with HBasePipeConversions { // TODO: Instantiate any subclass of Scorable specified in args. val sc1 : Scorable = new GrobidScorable() - val sc2 : Scorable = new GrobidScorable() + val sc2 : Scorable = new CrossrefScorable() val pipe1 : TypedPipe[(String, ReduceFeatures)] = sc1.getInputPipe(args) val pipe2 : TypedPipe[(String, ReduceFeatures)] = sc2.getInputPipe(args) @@ -25,44 +25,10 @@ class ScoreJob(args: Args) extends JobBase(args) { //with HBasePipeConversions { features1.json, features2.json) } - .write(TypedTsv[ReduceOutput](args("output"))) - - /* - val grobidSource = HBaseCrossrefScore.getHBaseSource( - args("hbase-table"), - args("zookeeper-hosts")) - - val source0 : Source = TextLine("foo") - val pipe0 : cascading.pipe.Pipe = source0.read - // This compiles: - val pipe00 : TypedPipe[String] = getFeaturesPipe0(pipe0) - - // Calling a method within ScoreJob compiles fine. - def getFeaturesPipe0(pipe : cascading.pipe.Pipe) : TypedPipe[String] = { - pipe - // This compiles: - .toTypedPipe[String](new Fields("line")) - } - - // Calling a function in a ScoreJob object leads to a compiler error. - val source1 : Source = TextLine("foo") - val pipe1 : cascading.pipe.Pipe = source1.read - // This leads to a compile error: - val pipe11 : TypedPipe[String] = ScoreJob.getFeaturesPipe1(pipe0) - - val pipe : cascading.pipe.Pipe = grobidSource - .read - val grobidPipe : TypedPipe[(String, String)] = pipe - .fromBytesWritable(new Fields("key", "tei_json")) - // Here I CAN call Pipe.toTypedPipe() - .toTypedPipe[(String, String)]('key, 'tei_json) - .write(TypedTsv[(String, String)](args("output"))) - - // Let's try making a method call. -// ScoreJob.etFeaturesPipe(pipe) - - */ + //TypedTsv doesn't work over case classes. + .map { entry => (entry.slug, entry.score, entry.json1, entry.json2) } + .write(TypedTsv[(String, Int, String, String)](args("output"))) } // Ugly hack to get non-String information into ScoreJob above. |