aboutsummaryrefslogtreecommitdiffstats
path: root/scalding/src/main/scala
diff options
context:
space:
mode:
authorEllen Spertus <ellen.spertus@gmail.com>2018-08-07 09:51:18 -0700
committerEllen Spertus <ellen.spertus@gmail.com>2018-08-07 09:51:18 -0700
commitc71b2da70ff7d3b77082db25672f6f3669f2238c (patch)
tree30f1cad98c781fdd5d1de18ffda232cd4286c72f /scalding/src/main/scala
parent308b33d889d804380427d2aa112efec77b3e1770 (diff)
downloadsandcrawler-c71b2da70ff7d3b77082db25672f6f3669f2238c.tar.gz
sandcrawler-c71b2da70ff7d3b77082db25672f6f3669f2238c.zip
Added CrossrefScorable.scala. All code compiles.
Diffstat (limited to 'scalding/src/main/scala')
-rw-r--r--scalding/src/main/scala/sandcrawler/CrossrefScorable.scala27
-rw-r--r--scalding/src/main/scala/sandcrawler/GrobidScorable.scala13
-rw-r--r--scalding/src/main/scala/sandcrawler/Scorable.scala4
3 files changed, 34 insertions, 10 deletions
diff --git a/scalding/src/main/scala/sandcrawler/CrossrefScorable.scala b/scalding/src/main/scala/sandcrawler/CrossrefScorable.scala
new file mode 100644
index 0000000..a603e2d
--- /dev/null
+++ b/scalding/src/main/scala/sandcrawler/CrossrefScorable.scala
@@ -0,0 +1,27 @@
+package sandcrawler
+
+import cascading.flow.FlowDef
+import cascading.pipe.Pipe
+import cascading.tuple.Fields
+import com.twitter.scalding._
+import com.twitter.scalding.typed.TDsl._
+import parallelai.spyglass.hbase.HBaseConstants.SourceMode
+import parallelai.spyglass.hbase.HBasePipeConversions
+import parallelai.spyglass.hbase.HBaseSource
+
+class CrossrefScorable extends Scorable {
+ def getFeaturesPipe(args : Args)(implicit flowDef : FlowDef, mode : Mode) = {
+// val crossrefSource = TextLine(args("crossref-input"))
+// val crossrefPipe : TypedPipe[MapFeatures] = crossrefSource
+ TextLine(args("crossref-input"))
+ .read
+ .toTypedPipe[String](new Fields("line"))
+ .map{ json : String =>
+ HBaseCrossrefScore.crossrefToSlug(json) match {
+ case Some(slug) => new MapFeatures(slug, json)
+ case None => new MapFeatures(Scorable.NoSlug, json)
+ }
+ }
+// crossrefPipe
+ }
+}
diff --git a/scalding/src/main/scala/sandcrawler/GrobidScorable.scala b/scalding/src/main/scala/sandcrawler/GrobidScorable.scala
index 5dac64c..8da7708 100644
--- a/scalding/src/main/scala/sandcrawler/GrobidScorable.scala
+++ b/scalding/src/main/scala/sandcrawler/GrobidScorable.scala
@@ -16,8 +16,9 @@ class GrobidScorable extends Scorable with HBasePipeConversions {
args("hbase-table"),
args("zookeeper-hosts"))
- val pipe0 : Pipe = grobidSource.read
- val grobidPipe : TypedPipe[MapFeatures] = pipe0
+// val pipe0 : Pipe = grobidSource.read
+// val grobidPipe : TypedPipe[MapFeatures] = pipe0
+ grobidSource.read
.fromBytesWritable(new Fields("key", "tei_json"))
// .debug // Should be 4 tuples for mocked data
// TODO: Figure out why this line (used in HBaseCrossrefScoreJob.scala)
@@ -26,14 +27,10 @@ class GrobidScorable extends Scorable with HBasePipeConversions {
.map { entry =>
val (key : String, json : String) = (entry._1, entry._2)
HBaseCrossrefScore.grobidToSlug(json) match {
- case Some(slug) => new MapFeatures(slug, key, json)
- case None => new MapFeatures(Scorable.NoSlug, key, json)
+ case Some(slug) => new MapFeatures(slug, json)
+ case None => new MapFeatures(Scorable.NoSlug, json)
}
}
- .filter {
- _.slug != Scorable.NoSlug
- }
- grobidPipe
}
/*
def fromBytesWritableLocal(f: Fields): Pipe = {
diff --git a/scalding/src/main/scala/sandcrawler/Scorable.scala b/scalding/src/main/scala/sandcrawler/Scorable.scala
index 89dc835..950a6d4 100644
--- a/scalding/src/main/scala/sandcrawler/Scorable.scala
+++ b/scalding/src/main/scala/sandcrawler/Scorable.scala
@@ -7,7 +7,7 @@ import cascading.flow.FlowDef
import com.twitter.scalding._
import com.twitter.scalding.typed.TDsl._
-case class MapFeatures(val key : String, slug : String, json : String)
+case class MapFeatures(slug : String, json : String)
case class ReduceFeatures(json : String)
case class ReduceOutput(val score : Int, json1 : String, json2 : String)
@@ -16,7 +16,7 @@ abstract class Scorable {
{
getFeaturesPipe(args)(flowDef, mode)
.filter { entry => Scorable.isValidSlug(entry.slug) }
- .groupBy { case MapFeatures(key, slug, json) => slug }
+ .groupBy { case MapFeatures(slug, json) => slug }
.map { tuple =>
val (slug : String, features : MapFeatures) = tuple
(slug, ReduceFeatures(features.json))