aboutsummaryrefslogtreecommitdiffstats
path: root/scalding/src/main/scala/sandcrawler/ScoreJob.scala
diff options
context:
space:
mode:
authorEllen Spertus <ellen.spertus@gmail.com>2018-08-09 22:13:46 -0700
committerEllen Spertus <ellen.spertus@gmail.com>2018-08-09 22:13:46 -0700
commit5ce5e5dc98cdbb5a84c79313df93d670111e6a1d (patch)
tree869f6989f8f10961e901b61e45c82c3b531b5321 /scalding/src/main/scala/sandcrawler/ScoreJob.scala
parent2528dd4afdf2e1a3419dbf354011f1ecc25c77a5 (diff)
downloadsandcrawler-5ce5e5dc98cdbb5a84c79313df93d670111e6a1d.tar.gz
sandcrawler-5ce5e5dc98cdbb5a84c79313df93d670111e6a1d.zip
Broken code to share with Bryan.
Diffstat (limited to 'scalding/src/main/scala/sandcrawler/ScoreJob.scala')
-rw-r--r--scalding/src/main/scala/sandcrawler/ScoreJob.scala65
1 files changed, 63 insertions, 2 deletions
diff --git a/scalding/src/main/scala/sandcrawler/ScoreJob.scala b/scalding/src/main/scala/sandcrawler/ScoreJob.scala
index 7891596..0dbe64d 100644
--- a/scalding/src/main/scala/sandcrawler/ScoreJob.scala
+++ b/scalding/src/main/scala/sandcrawler/ScoreJob.scala
@@ -1,13 +1,50 @@
package sandcrawler
import cascading.flow.FlowDef
+import cascading.tuple.Fields
import com.twitter.scalding._
import com.twitter.scalding.typed.TDsl._
import parallelai.spyglass.base.JobBase
import parallelai.spyglass.hbase.HBasePipeConversions
+import parallelai.spyglass.hbase.HBaseSource
-class ScoreJob(args: Args) extends JobBase(args) with
- HBasePipeConversions {
+//case class MapFeatures(slug : String, json : String)
+
+class ScoreJob(args: Args) extends JobBase(args) { //with HBasePipeConversions {
+
+ val grobidSource = HBaseCrossrefScore.getHBaseSource(
+ args("hbase-table"),
+ args("zookeeper-hosts"))
+
+ val source0 : Source = TextLine("foo")
+ val pipe0 : cascading.pipe.Pipe = source0.read
+ // This compiles:
+ val pipe00 : TypedPipe[String] = getFeaturesPipe0(pipe0)
+
+ // Calling a method within ScoreJob compiles fine.
+ def getFeaturesPipe0(pipe : cascading.pipe.Pipe) : TypedPipe[String] = {
+ pipe
+ // This compiles:
+ .toTypedPipe[String](new Fields("line"))
+ }
+
+ // Calling a function in a ScoreJob object leads to a compiler error.
+ val source1 : Source = TextLine("foo")
+ val pipe1 : cascading.pipe.Pipe = source1.read
+ // This leads to a compile error:
+ val pipe11 : TypedPipe[String] = ScoreJob.getFeaturesPipe1(pipe0)
+
+ /*
+ val pipe : cascading.pipe.Pipe = grobidSource
+ .read
+ val grobidPipe : TypedPipe[(String, String)] = pipe
+ .fromBytesWritable(new Fields("key", "tei_json"))
+ // Here I CAN call Pipe.toTypedPipe()
+ .toTypedPipe[(String, String)]('key, 'tei_json)
+ .write(TypedTsv[(String, String)](args("output")))
+
+ // Let's try making a method call.
+// ScoreJob.etFeaturesPipe(pipe)
// TODO: Instantiate any subclass of Scorable specified in args.
Scorable sc1 = new GrobidScorable()
@@ -15,6 +52,7 @@ class ScoreJob(args: Args) extends JobBase(args) with
val pipe1 : TypedPipe[(String, ReduceFeatures)] = sc1.getInputPipe(sc1.getSource().read)
val pipe2 : TypedPipe[(String, ReduceFeatures)] = sc2.getInputPipe(sc2.getSource().read)
+
pipe1.join(pipe2).map { entry =>
val (slug : String, (features1 : ReduceFeatures, features2 : ReduceFeatures)) = entry
new ReduceOutput(
@@ -24,6 +62,8 @@ class ScoreJob(args: Args) extends JobBase(args) with
features2.json)
}
.write(TypedTsv[ReduceOutput](args("output")))
+ */
+
}
// Ugly hack to get non-String information into ScoreJob above.
@@ -52,4 +92,25 @@ object ScoreJob {
case None => null
}
}
+
+ def getFeaturesPipe1(pipe : cascading.pipe.Pipe) : TypedPipe[String] = {
+ pipe
+ // The next line gives an error: value toTypedPipe is not a member of cascading.pipe.Pipe
+ .toTypedPipe[String](new Fields("line"))
+ }
+/*
+ def getFeaturesPipe(pipe : cascading.pipe.Pipe) : TypedPipe[MapFeatures] = {
+ pipe
+ .fromBytesWritable(new Fields("key", "tei_json"))
+ // I needed to change symbols to strings when I pulled this out of ScoreJob.
+ .toTypedPipe[(String, String)](new Fields("key", "tei_json"))
+ .map { entry =>
+ val (key : String, json : String) = (entry._1, entry._2)
+ GrobidScorable.grobidToSlug(json) match {
+ case Some(slug) => new MapFeatures(slug, json)
+ case None => new MapFeatures(Scorable.NoSlug, json)
+ }
+ }
+ }
+ */
}