diff options
Diffstat (limited to 'scalding/src/main/scala/sandcrawler/CrossrefScorable.scala')
-rw-r--r-- | scalding/src/main/scala/sandcrawler/CrossrefScorable.scala | 27 |
1 files changed, 27 insertions, 0 deletions
diff --git a/scalding/src/main/scala/sandcrawler/CrossrefScorable.scala b/scalding/src/main/scala/sandcrawler/CrossrefScorable.scala new file mode 100644 index 0000000..a603e2d --- /dev/null +++ b/scalding/src/main/scala/sandcrawler/CrossrefScorable.scala @@ -0,0 +1,27 @@ +package sandcrawler + +import cascading.flow.FlowDef +import cascading.pipe.Pipe +import cascading.tuple.Fields +import com.twitter.scalding._ +import com.twitter.scalding.typed.TDsl._ +import parallelai.spyglass.hbase.HBaseConstants.SourceMode +import parallelai.spyglass.hbase.HBasePipeConversions +import parallelai.spyglass.hbase.HBaseSource + +class CrossrefScorable extends Scorable { + def getFeaturesPipe(args : Args)(implicit flowDef : FlowDef, mode : Mode) = { +// val crossrefSource = TextLine(args("crossref-input")) +// val crossrefPipe : TypedPipe[MapFeatures] = crossrefSource + TextLine(args("crossref-input")) + .read + .toTypedPipe[String](new Fields("line")) + .map{ json : String => + HBaseCrossrefScore.crossrefToSlug(json) match { + case Some(slug) => new MapFeatures(slug, json) + case None => new MapFeatures(Scorable.NoSlug, json) + } + } +// crossrefPipe + } +} |