diff options
author | Ellen Spertus <ellen.spertus@gmail.com> | 2018-08-09 22:13:46 -0700 |
---|---|---|
committer | Ellen Spertus <ellen.spertus@gmail.com> | 2018-08-09 22:13:46 -0700 |
commit | 5ce5e5dc98cdbb5a84c79313df93d670111e6a1d (patch) | |
tree | 869f6989f8f10961e901b61e45c82c3b531b5321 /scalding/src/main/scala/sandcrawler/ScoreJob.scala | |
parent | 2528dd4afdf2e1a3419dbf354011f1ecc25c77a5 (diff) | |
download | sandcrawler-5ce5e5dc98cdbb5a84c79313df93d670111e6a1d.tar.gz sandcrawler-5ce5e5dc98cdbb5a84c79313df93d670111e6a1d.zip |
Broken code to share with Bryan.
Diffstat (limited to 'scalding/src/main/scala/sandcrawler/ScoreJob.scala')
-rw-r--r-- | scalding/src/main/scala/sandcrawler/ScoreJob.scala | 65 |
1 files changed, 63 insertions, 2 deletions
diff --git a/scalding/src/main/scala/sandcrawler/ScoreJob.scala b/scalding/src/main/scala/sandcrawler/ScoreJob.scala index 7891596..0dbe64d 100644 --- a/scalding/src/main/scala/sandcrawler/ScoreJob.scala +++ b/scalding/src/main/scala/sandcrawler/ScoreJob.scala @@ -1,13 +1,50 @@ package sandcrawler import cascading.flow.FlowDef +import cascading.tuple.Fields import com.twitter.scalding._ import com.twitter.scalding.typed.TDsl._ import parallelai.spyglass.base.JobBase import parallelai.spyglass.hbase.HBasePipeConversions +import parallelai.spyglass.hbase.HBaseSource -class ScoreJob(args: Args) extends JobBase(args) with - HBasePipeConversions { +//case class MapFeatures(slug : String, json : String) + +class ScoreJob(args: Args) extends JobBase(args) { //with HBasePipeConversions { + + val grobidSource = HBaseCrossrefScore.getHBaseSource( + args("hbase-table"), + args("zookeeper-hosts")) + + val source0 : Source = TextLine("foo") + val pipe0 : cascading.pipe.Pipe = source0.read + // This compiles: + val pipe00 : TypedPipe[String] = getFeaturesPipe0(pipe0) + + // Calling a method within ScoreJob compiles fine. + def getFeaturesPipe0(pipe : cascading.pipe.Pipe) : TypedPipe[String] = { + pipe + // This compiles: + .toTypedPipe[String](new Fields("line")) + } + + // Calling a function in a ScoreJob object leads to a compiler error. + val source1 : Source = TextLine("foo") + val pipe1 : cascading.pipe.Pipe = source1.read + // This leads to a compile error: + val pipe11 : TypedPipe[String] = ScoreJob.getFeaturesPipe1(pipe0) + + /* + val pipe : cascading.pipe.Pipe = grobidSource + .read + val grobidPipe : TypedPipe[(String, String)] = pipe + .fromBytesWritable(new Fields("key", "tei_json")) + // Here I CAN call Pipe.toTypedPipe() + .toTypedPipe[(String, String)]('key, 'tei_json) + .write(TypedTsv[(String, String)](args("output"))) + + // Let's try making a method call. +// ScoreJob.etFeaturesPipe(pipe) // TODO: Instantiate any subclass of Scorable specified in args. Scorable sc1 = new GrobidScorable() @@ -15,6 +52,7 @@ class ScoreJob(args: Args) extends JobBase(args) with val pipe1 : TypedPipe[(String, ReduceFeatures)] = sc1.getInputPipe(sc1.getSource().read) val pipe2 : TypedPipe[(String, ReduceFeatures)] = sc2.getInputPipe(sc2.getSource().read) + pipe1.join(pipe2).map { entry => val (slug : String, (features1 : ReduceFeatures, features2 : ReduceFeatures)) = entry new ReduceOutput( @@ -24,6 +62,8 @@ class ScoreJob(args: Args) extends JobBase(args) with features2.json) } .write(TypedTsv[ReduceOutput](args("output"))) + */ + } // Ugly hack to get non-String information into ScoreJob above. @@ -52,4 +92,25 @@ object ScoreJob { case None => null } } + + def getFeaturesPipe1(pipe : cascading.pipe.Pipe) : TypedPipe[String] = { + pipe + // The next line gives an error: value toTypedPipe is not a member of cascading.pipe.Pipe + .toTypedPipe[String](new Fields("line")) + } +/* + def getFeaturesPipe(pipe : cascading.pipe.Pipe) : TypedPipe[MapFeatures] = { + pipe + .fromBytesWritable(new Fields("key", "tei_json")) + // I needed to change symbols to strings when I pulled this out of ScoreJob. + .toTypedPipe[(String, String)](new Fields("key", "tei_json")) + .map { entry => + val (key : String, json : String) = (entry._1, entry._2) + GrobidScorable.grobidToSlug(json) match { + case Some(slug) => new MapFeatures(slug, json) + case None => new MapFeatures(Scorable.NoSlug, json) + } + } + } + */ } |