aboutsummaryrefslogtreecommitdiffstats
path: root/scalding/src/main/scala
diff options
context:
space:
mode:
authorEllen Spertus <ellen.spertus@gmail.com>2018-08-12 19:12:32 -0700
committerEllen Spertus <ellen.spertus@gmail.com>2018-08-12 19:12:32 -0700
commit5615428921a45ba6a2fb005b255a28dcbb83b13f (patch)
tree4c085076194ecdbad63c62194711d5baf657f60f /scalding/src/main/scala
parent05c0213547f29842bbae6faaf77e983a364d4a2e (diff)
downloadsandcrawler-5615428921a45ba6a2fb005b255a28dcbb83b13f.tar.gz
sandcrawler-5615428921a45ba6a2fb005b255a28dcbb83b13f.zip
Snapshot before changing Scorable to find bug.
Diffstat (limited to 'scalding/src/main/scala')
-rw-r--r--scalding/src/main/scala/sandcrawler/CrossrefScorable.scala41
-rw-r--r--scalding/src/main/scala/sandcrawler/Scorable.scala1
2 files changed, 23 insertions, 19 deletions
diff --git a/scalding/src/main/scala/sandcrawler/CrossrefScorable.scala b/scalding/src/main/scala/sandcrawler/CrossrefScorable.scala
index 667a5cc..e257152 100644
--- a/scalding/src/main/scala/sandcrawler/CrossrefScorable.scala
+++ b/scalding/src/main/scala/sandcrawler/CrossrefScorable.scala
@@ -41,26 +41,31 @@ class CrossrefScorable extends Scorable with HBasePipeConversions {
def getFeaturesPipe(args : Args)(implicit mode : Mode, flowDef : FlowDef) : TypedPipe[MapFeatures] = {
getSource(args).read
.toTypedPipe[String](new Fields("line"))
- .map{ json : String =>
- Scorable.jsonToMap(json) match {
- case None => MapFeatures(Scorable.NoSlug, json)
- case Some(map) => {
- if ((map contains "title") && (map contains "DOI")) {
- val titles = map("title").asInstanceOf[List[String]]
- if (titles.isEmpty) {
- new MapFeatures(Scorable.NoSlug, json)
- } else {
- val title = titles(0)
- val map2 = Scorable.toScorableMap(title=titles(0), doi=map("DOI").asInstanceOf[String])
- new MapFeatures(
- Scorable.mapToSlug(map2),
- JSONObject(map2).toString)
- }
- } else {
- new MapFeatures(Scorable.NoSlug, json)
- }
+ .map { CrossrefScorable.jsonToMapFeatures(_) }
+ }
+}
+
+object CrossrefScorable {
+ def jsonToMapFeatures(json : String) : MapFeatures = {
+ Scorable.jsonToMap(json) match {
+ case None => MapFeatures(Scorable.NoSlug, json)
+ case Some(map) => {
+ if ((map contains "titles") && (map contains "DOI")) {
+ val titles = map("titles").asInstanceOf[List[String]]
+ val doi = Scorable.getString(map, "DOI")
+ if (titles.isEmpty || titles == null || doi.isEmpty || doi == null) {
+ new MapFeatures(Scorable.NoSlug, json)
+ } else {
+ val title = titles(0)
+ val map2 = Scorable.toScorableMap(title=title, doi=doi)
+ new MapFeatures(
+ Scorable.mapToSlug(map2),
+ JSONObject(map2).toString)
}
+ } else {
+ new MapFeatures(Scorable.NoSlug, json)
}
}
+ }
}
}
diff --git a/scalding/src/main/scala/sandcrawler/Scorable.scala b/scalding/src/main/scala/sandcrawler/Scorable.scala
index 929461b..a256fa4 100644
--- a/scalding/src/main/scala/sandcrawler/Scorable.scala
+++ b/scalding/src/main/scala/sandcrawler/Scorable.scala
@@ -7,7 +7,6 @@ import scala.util.parsing.json.JSONObject
import cascading.flow.FlowDef
import com.twitter.scalding._
import com.twitter.scalding.typed.TDsl._
-//import TDsl._
case class MapFeatures(slug : String, json : String)
case class ReduceFeatures(json : String)