aboutsummaryrefslogtreecommitdiffstats
path: root/scalding/src/main/scala/sandcrawler/CrossrefScorable.scala
diff options
context:
space:
mode:
Diffstat (limited to 'scalding/src/main/scala/sandcrawler/CrossrefScorable.scala')
-rw-r--r--scalding/src/main/scala/sandcrawler/CrossrefScorable.scala27
1 files changed, 27 insertions, 0 deletions
diff --git a/scalding/src/main/scala/sandcrawler/CrossrefScorable.scala b/scalding/src/main/scala/sandcrawler/CrossrefScorable.scala
new file mode 100644
index 0000000..a603e2d
--- /dev/null
+++ b/scalding/src/main/scala/sandcrawler/CrossrefScorable.scala
@@ -0,0 +1,27 @@
+package sandcrawler
+
+import cascading.flow.FlowDef
+import cascading.pipe.Pipe
+import cascading.tuple.Fields
+import com.twitter.scalding._
+import com.twitter.scalding.typed.TDsl._
+import parallelai.spyglass.hbase.HBaseConstants.SourceMode
+import parallelai.spyglass.hbase.HBasePipeConversions
+import parallelai.spyglass.hbase.HBaseSource
+
+class CrossrefScorable extends Scorable {
+ def getFeaturesPipe(args : Args)(implicit flowDef : FlowDef, mode : Mode) = {
+// val crossrefSource = TextLine(args("crossref-input"))
+// val crossrefPipe : TypedPipe[MapFeatures] = crossrefSource
+ TextLine(args("crossref-input"))
+ .read
+ .toTypedPipe[String](new Fields("line"))
+ .map{ json : String =>
+ HBaseCrossrefScore.crossrefToSlug(json) match {
+ case Some(slug) => new MapFeatures(slug, json)
+ case None => new MapFeatures(Scorable.NoSlug, json)
+ }
+ }
+// crossrefPipe
+ }
+}