aboutsummaryrefslogtreecommitdiffstats
path: root/scalding/src/main/scala/sandcrawler/CrossrefScorable.scala
diff options
context:
space:
mode:
Diffstat (limited to 'scalding/src/main/scala/sandcrawler/CrossrefScorable.scala')
-rw-r--r--scalding/src/main/scala/sandcrawler/CrossrefScorable.scala47
1 files changed, 47 insertions, 0 deletions
diff --git a/scalding/src/main/scala/sandcrawler/CrossrefScorable.scala b/scalding/src/main/scala/sandcrawler/CrossrefScorable.scala
new file mode 100644
index 0000000..ff8201a
--- /dev/null
+++ b/scalding/src/main/scala/sandcrawler/CrossrefScorable.scala
@@ -0,0 +1,47 @@
+package sandcrawler
+
+import scala.math
+import scala.util.parsing.json.JSON
+import scala.util.parsing.json.JSONObject
+
+import cascading.flow.FlowDef
+import cascading.tuple.Fields
+import com.twitter.scalding._
+import com.twitter.scalding.typed.TDsl._
+import parallelai.spyglass.hbase.HBasePipeConversions
+
+class CrossrefScorable extends Scorable with HBasePipeConversions {
+ // TODO: Generalize args so there can be multiple Crossref pipes in one job.
+ def getSource(args : Args) : Source = {
+ TextLine(args("crossref-input"))
+ }
+
+ def getFeaturesPipe(args : Args)(implicit mode : Mode, flowDef : FlowDef) : TypedPipe[MapFeatures] = {
+ getSource(args).read
+ .toTypedPipe[String](new Fields("line"))
+ .map { CrossrefScorable.jsonToMapFeatures(_) }
+ }
+}
+
+object CrossrefScorable {
+ def jsonToMapFeatures(json : String) : MapFeatures = {
+ Scorable.jsonToMap(json) match {
+ case None => MapFeatures(Scorable.NoSlug, json)
+ case Some(map) => {
+ if ((map contains "title") && (map contains "DOI")) {
+ val titles = map("title").asInstanceOf[List[String]]
+ val doi = Scorable.getString(map, "DOI")
+ if (titles.isEmpty || titles == null || doi.isEmpty || doi == null) {
+ new MapFeatures(Scorable.NoSlug, json)
+ } else {
+ // bnewbold: not checking that titles(0) is non-null/non-empty; case would be, in JSON, "title": [ null ]
+ val sf : ScorableFeatures = new ScorableFeatures(title=titles(0), doi=doi)
+ new MapFeatures(sf.toSlug, sf.toString)
+ }
+ } else {
+ new MapFeatures(Scorable.NoSlug, json)
+ }
+ }
+ }
+ }
+}