aboutsummaryrefslogtreecommitdiffstats
path: root/scalding/src/main
diff options
context:
space:
mode:
authorEllen Spertus <ellen.spertus@gmail.com>2018-08-10 20:49:44 -0700
committerEllen Spertus <ellen.spertus@gmail.com>2018-08-10 20:49:44 -0700
commit768e7ef0d127cf55119543be6e656751704ca5b2 (patch)
tree27df4f067ebe693275f4995ac271660f5ac676d9 /scalding/src/main
parentb7f77f6337b450406ae0a90b81faeba27394afb0 (diff)
downloadsandcrawler-768e7ef0d127cf55119543be6e656751704ca5b2.tar.gz
sandcrawler-768e7ef0d127cf55119543be6e656751704ca5b2.zip
Tests pass. Still have changes to do but made huge progress.
Diffstat (limited to 'scalding/src/main')
-rw-r--r--scalding/src/main/scala/sandcrawler/CrossrefScorable.scala38
-rw-r--r--scalding/src/main/scala/sandcrawler/ScoreJob.scala44
2 files changed, 27 insertions, 55 deletions
diff --git a/scalding/src/main/scala/sandcrawler/CrossrefScorable.scala b/scalding/src/main/scala/sandcrawler/CrossrefScorable.scala
index 817bee5..b2f6537 100644
--- a/scalding/src/main/scala/sandcrawler/CrossrefScorable.scala
+++ b/scalding/src/main/scala/sandcrawler/CrossrefScorable.scala
@@ -9,6 +9,7 @@ import parallelai.spyglass.hbase.HBaseConstants.SourceMode
import parallelai.spyglass.hbase.HBasePipeConversions
import parallelai.spyglass.hbase.HBaseSource
import TDsl._
+import scala.util.parsing.json.JSONObject
import java.text.Normalizer
import java.util.Arrays
@@ -31,7 +32,7 @@ import parallelai.spyglass.hbase.HBasePipeConversions
import parallelai.spyglass.hbase.HBaseSource
class CrossrefScorable extends Scorable with HBasePipeConversions {
- // TODO: Generalize args so there can be multiple Grobid pipes in one job.
+ // TODO: Generalize args so there can be multiple Crossref pipes in one job.
def getSource(args : Args) : Source = {
TextLine(args("crossref-input"))
}
@@ -39,26 +40,31 @@ class CrossrefScorable extends Scorable with HBasePipeConversions {
def getFeaturesPipe(args : Args)(implicit mode : Mode, flowDef : FlowDef) : TypedPipe[MapFeatures] = {
getSource(args).read
.toTypedPipe[String](new Fields("line"))
- .map{ json : String =>
- CrossrefScorable.crossrefToSlug(json) match {
- case Some(slug) => new MapFeatures(slug, json)
+ .map{ json : String =>
+ CrossrefScorable.simplifyJson(json) match {
case None => new MapFeatures(Scorable.NoSlug, json)
+ case Some(map) => new MapFeatures(
+ Scorable.titleToSlug(map("title").asInstanceOf[String]),
+ JSONObject(map).toString)
}
}
}
-}
-object CrossrefScorable {
- def crossrefToSlug(json : String) : Option[String] = {
- Scorable.jsonToMap(json) match {
- case None => None
- case Some(map) => {
- if (map contains "title") {
- // TODO: Don't ignore titles after the first.
- val title = map("title").asInstanceOf[List[String]](0)
- Some(Scorable.titleToSlug(title))
- } else {
- None
+ object CrossrefScorable {
+ def simplifyJson(json : String) : Option[Map[String, Any]] = {
+ Scorable.jsonToMap(json) match {
+ case None => None
+ case Some(map) => {
+ if (map contains "title") {
+ val titles = map("title").asInstanceOf[List[String]]
+ if (titles.isEmpty) {
+ None
+ } else {
+ Some(Map("title" -> titles(0)))
+ }
+ } else {
+ None
+ }
}
}
}
diff --git a/scalding/src/main/scala/sandcrawler/ScoreJob.scala b/scalding/src/main/scala/sandcrawler/ScoreJob.scala
index bc5bf87..386b367 100644
--- a/scalding/src/main/scala/sandcrawler/ScoreJob.scala
+++ b/scalding/src/main/scala/sandcrawler/ScoreJob.scala
@@ -3,7 +3,7 @@ package sandcrawler
import cascading.flow.FlowDef
import cascading.tuple.Fields
import com.twitter.scalding.{Args,Source,TextLine,TypedPipe, TypedTsv}
-//import com.twitter.scalding.typed.TDsl._
+//import com.twitter.scalding.source.TypedText
import parallelai.spyglass.base.JobBase
import parallelai.spyglass.hbase.HBasePipeConversions
import parallelai.spyglass.hbase.HBaseSource
@@ -13,7 +13,7 @@ import cascading.pipe.Pipe
class ScoreJob(args: Args) extends JobBase(args) { //with HBasePipeConversions {
// TODO: Instantiate any subclass of Scorable specified in args.
val sc1 : Scorable = new GrobidScorable()
- val sc2 : Scorable = new GrobidScorable()
+ val sc2 : Scorable = new CrossrefScorable()
val pipe1 : TypedPipe[(String, ReduceFeatures)] = sc1.getInputPipe(args)
val pipe2 : TypedPipe[(String, ReduceFeatures)] = sc2.getInputPipe(args)
@@ -25,44 +25,10 @@ class ScoreJob(args: Args) extends JobBase(args) { //with HBasePipeConversions {
features1.json,
features2.json)
}
- .write(TypedTsv[ReduceOutput](args("output")))
-
- /*
- val grobidSource = HBaseCrossrefScore.getHBaseSource(
- args("hbase-table"),
- args("zookeeper-hosts"))
-
- val source0 : Source = TextLine("foo")
- val pipe0 : cascading.pipe.Pipe = source0.read
- // This compiles:
- val pipe00 : TypedPipe[String] = getFeaturesPipe0(pipe0)
-
- // Calling a method within ScoreJob compiles fine.
- def getFeaturesPipe0(pipe : cascading.pipe.Pipe) : TypedPipe[String] = {
- pipe
- // This compiles:
- .toTypedPipe[String](new Fields("line"))
- }
-
- // Calling a function in a ScoreJob object leads to a compiler error.
- val source1 : Source = TextLine("foo")
- val pipe1 : cascading.pipe.Pipe = source1.read
- // This leads to a compile error:
- val pipe11 : TypedPipe[String] = ScoreJob.getFeaturesPipe1(pipe0)
-
- val pipe : cascading.pipe.Pipe = grobidSource
- .read
- val grobidPipe : TypedPipe[(String, String)] = pipe
- .fromBytesWritable(new Fields("key", "tei_json"))
- // Here I CAN call Pipe.toTypedPipe()
- .toTypedPipe[(String, String)]('key, 'tei_json)
- .write(TypedTsv[(String, String)](args("output")))
-
- // Let's try making a method call.
-// ScoreJob.etFeaturesPipe(pipe)
-
- */
+ //TypedTsv doesn't work over case classes.
+ .map { entry => (entry.slug, entry.score, entry.json1, entry.json2) }
+ .write(TypedTsv[(String, Int, String, String)](args("output")))
}
// Ugly hack to get non-String information into ScoreJob above.