aboutsummaryrefslogtreecommitdiffstats
path: root/scalding/src
diff options
context:
space:
mode:
authorEllen Spertus <ellen.spertus@gmail.com>2018-08-09 22:13:46 -0700
committerEllen Spertus <ellen.spertus@gmail.com>2018-08-09 22:13:46 -0700
commit5ce5e5dc98cdbb5a84c79313df93d670111e6a1d (patch)
tree869f6989f8f10961e901b61e45c82c3b531b5321 /scalding/src
parent2528dd4afdf2e1a3419dbf354011f1ecc25c77a5 (diff)
downloadsandcrawler-5ce5e5dc98cdbb5a84c79313df93d670111e6a1d.tar.gz
sandcrawler-5ce5e5dc98cdbb5a84c79313df93d670111e6a1d.zip
Broken code to share with Bryan.
Diffstat (limited to 'scalding/src')
-rw-r--r--scalding/src/main/scala/sandcrawler/CrossrefScorable.scala21
-rw-r--r--scalding/src/main/scala/sandcrawler/GrobidScorable.scala2
-rw-r--r--scalding/src/main/scala/sandcrawler/HBaseCrossrefScoreJob.scala8
-rw-r--r--scalding/src/main/scala/sandcrawler/Scorable.scala2
-rw-r--r--scalding/src/main/scala/sandcrawler/ScoreJob.scala65
5 files changed, 90 insertions, 8 deletions
diff --git a/scalding/src/main/scala/sandcrawler/CrossrefScorable.scala b/scalding/src/main/scala/sandcrawler/CrossrefScorable.scala
index 9842122..146feec 100644
--- a/scalding/src/main/scala/sandcrawler/CrossrefScorable.scala
+++ b/scalding/src/main/scala/sandcrawler/CrossrefScorable.scala
@@ -10,6 +10,26 @@ import parallelai.spyglass.hbase.HBasePipeConversions
import parallelai.spyglass.hbase.HBaseSource
import TDsl._
+import java.text.Normalizer
+import java.util.Arrays
+import java.util.Properties
+import java.util.regex.Pattern
+
+import scala.math
+import scala.util.parsing.json.JSON
+
+import cascading.tuple.Fields
+import com.twitter.scalding._
+import com.twitter.scalding.typed.CoGrouped
+import com.twitter.scalding.typed.Grouped
+import com.twitter.scalding.typed.TDsl._
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable
+import org.apache.hadoop.hbase.util.Bytes
+import parallelai.spyglass.base.JobBase
+import parallelai.spyglass.hbase.HBaseConstants.SourceMode
+import parallelai.spyglass.hbase.HBasePipeConversions
+import parallelai.spyglass.hbase.HBaseSource
+
class CrossrefScorable extends Scorable with HBasePipeConversions {
// TODO: Generalize args so there can be multiple Grobid pipes in one job.
def getSource(args : Args) : Source = {
@@ -17,6 +37,7 @@ class CrossrefScorable extends Scorable with HBasePipeConversions {
}
def getFeaturesPipe(pipe : Pipe) : TypedPipe[MapFeatures] = {
+ // Here I CANNOT call Pipe.toTypedPipe()
pipe
.toTypedPipe[String](new Fields("line"))
.map{ json : String =>
diff --git a/scalding/src/main/scala/sandcrawler/GrobidScorable.scala b/scalding/src/main/scala/sandcrawler/GrobidScorable.scala
index 51e40f9..ba15f22 100644
--- a/scalding/src/main/scala/sandcrawler/GrobidScorable.scala
+++ b/scalding/src/main/scala/sandcrawler/GrobidScorable.scala
@@ -8,7 +8,7 @@ import com.twitter.scalding.typed.TDsl._
import parallelai.spyglass.hbase.HBaseConstants.SourceMode
import parallelai.spyglass.hbase.HBasePipeConversions
import parallelai.spyglass.hbase.HBaseSource
-import TDsl._
+//import TDsl._
class GrobidScorable extends Scorable with HBasePipeConversions {
def getSource(args : Args) : Source = {
diff --git a/scalding/src/main/scala/sandcrawler/HBaseCrossrefScoreJob.scala b/scalding/src/main/scala/sandcrawler/HBaseCrossrefScoreJob.scala
index 725474d..018a74b 100644
--- a/scalding/src/main/scala/sandcrawler/HBaseCrossrefScoreJob.scala
+++ b/scalding/src/main/scala/sandcrawler/HBaseCrossrefScoreJob.scala
@@ -19,6 +19,7 @@ import parallelai.spyglass.base.JobBase
import parallelai.spyglass.hbase.HBaseConstants.SourceMode
import parallelai.spyglass.hbase.HBasePipeConversions
import parallelai.spyglass.hbase.HBaseSource
+import TDsl._
class HBaseCrossrefScoreJob(args: Args) extends JobBase(args) with HBasePipeConversions {
val NoTitle = "NO TITLE" // Used for slug if title is empty or unparseable
@@ -30,13 +31,13 @@ class HBaseCrossrefScoreJob(args: Args) extends JobBase(args) with HBasePipeConv
val temp : cascading.pipe.Pipe = grobidSource
.read
- .fromBytesWritable(new Fields("key", "tei_json"))
+
+ // Here I CAN call Pipe.toTypedPipe()
val grobidPipe : TypedPipe[(String, String, String)] = temp
- // .debug // Should be 4 tuples for mocked data
+ .fromBytesWritable(new Fields("key", "tei_json"))
.toTypedPipe[(String, String)]('key, 'tei_json)
.map { entry =>
val (key, json) = (entry._1, entry._2)
- // TODO: Consider passing forward only a subset of JSON.
HBaseCrossrefScore.grobidToSlug(json) match {
case Some(slug) => (slug, key, json)
case None => (NoTitle, key, json)
@@ -46,7 +47,6 @@ class HBaseCrossrefScoreJob(args: Args) extends JobBase(args) with HBasePipeConv
val (slug, _, _) = entry
slug != NoTitle
}
-// .debug // SHould be 3 tuples for mocked data
val grobidGroup = grobidPipe
.groupBy { case (slug, key, json) => slug }
diff --git a/scalding/src/main/scala/sandcrawler/Scorable.scala b/scalding/src/main/scala/sandcrawler/Scorable.scala
index bd03d57..65d9b41 100644
--- a/scalding/src/main/scala/sandcrawler/Scorable.scala
+++ b/scalding/src/main/scala/sandcrawler/Scorable.scala
@@ -6,7 +6,7 @@ import scala.util.parsing.json.JSON
import cascading.flow.FlowDef
import com.twitter.scalding._
import com.twitter.scalding.typed.TDsl._
-import TDsl._
+//import TDsl._
case class MapFeatures(slug : String, json : String)
case class ReduceFeatures(json : String)
diff --git a/scalding/src/main/scala/sandcrawler/ScoreJob.scala b/scalding/src/main/scala/sandcrawler/ScoreJob.scala
index 7891596..0dbe64d 100644
--- a/scalding/src/main/scala/sandcrawler/ScoreJob.scala
+++ b/scalding/src/main/scala/sandcrawler/ScoreJob.scala
@@ -1,13 +1,50 @@
package sandcrawler
import cascading.flow.FlowDef
+import cascading.tuple.Fields
import com.twitter.scalding._
import com.twitter.scalding.typed.TDsl._
import parallelai.spyglass.base.JobBase
import parallelai.spyglass.hbase.HBasePipeConversions
+import parallelai.spyglass.hbase.HBaseSource
-class ScoreJob(args: Args) extends JobBase(args) with
- HBasePipeConversions {
+//case class MapFeatures(slug : String, json : String)
+
+class ScoreJob(args: Args) extends JobBase(args) { //with HBasePipeConversions {
+
+ val grobidSource = HBaseCrossrefScore.getHBaseSource(
+ args("hbase-table"),
+ args("zookeeper-hosts"))
+
+ val source0 : Source = TextLine("foo")
+ val pipe0 : cascading.pipe.Pipe = source0.read
+ // This compiles:
+ val pipe00 : TypedPipe[String] = getFeaturesPipe0(pipe0)
+
+ // Calling a method within ScoreJob compiles fine.
+ def getFeaturesPipe0(pipe : cascading.pipe.Pipe) : TypedPipe[String] = {
+ pipe
+ // This compiles:
+ .toTypedPipe[String](new Fields("line"))
+ }
+
+ // Calling a function in a ScoreJob object leads to a compiler error.
+ val source1 : Source = TextLine("foo")
+ val pipe1 : cascading.pipe.Pipe = source1.read
+ // This leads to a compile error:
+ val pipe11 : TypedPipe[String] = ScoreJob.getFeaturesPipe1(pipe0)
+
+ /*
+ val pipe : cascading.pipe.Pipe = grobidSource
+ .read
+ val grobidPipe : TypedPipe[(String, String)] = pipe
+ .fromBytesWritable(new Fields("key", "tei_json"))
+ // Here I CAN call Pipe.toTypedPipe()
+ .toTypedPipe[(String, String)]('key, 'tei_json)
+ .write(TypedTsv[(String, String)](args("output")))
+
+ // Let's try making a method call.
+// ScoreJob.etFeaturesPipe(pipe)
// TODO: Instantiate any subclass of Scorable specified in args.
Scorable sc1 = new GrobidScorable()
@@ -15,6 +52,7 @@ class ScoreJob(args: Args) extends JobBase(args) with
val pipe1 : TypedPipe[(String, ReduceFeatures)] = sc1.getInputPipe(sc1.getSource().read)
val pipe2 : TypedPipe[(String, ReduceFeatures)] = sc2.getInputPipe(sc2.getSource().read)
+
pipe1.join(pipe2).map { entry =>
val (slug : String, (features1 : ReduceFeatures, features2 : ReduceFeatures)) = entry
new ReduceOutput(
@@ -24,6 +62,8 @@ class ScoreJob(args: Args) extends JobBase(args) with
features2.json)
}
.write(TypedTsv[ReduceOutput](args("output")))
+ */
+
}
// Ugly hack to get non-String information into ScoreJob above.
@@ -52,4 +92,25 @@ object ScoreJob {
case None => null
}
}
+
+ def getFeaturesPipe1(pipe : cascading.pipe.Pipe) : TypedPipe[String] = {
+ pipe
+ // The next line gives an error: value toTypedPipe is not a member of cascading.pipe.Pipe
+ .toTypedPipe[String](new Fields("line"))
+ }
+/*
+ def getFeaturesPipe(pipe : cascading.pipe.Pipe) : TypedPipe[MapFeatures] = {
+ pipe
+ .fromBytesWritable(new Fields("key", "tei_json"))
+ // I needed to change symbols to strings when I pulled this out of ScoreJob.
+ .toTypedPipe[(String, String)](new Fields("key", "tei_json"))
+ .map { entry =>
+ val (key : String, json : String) = (entry._1, entry._2)
+ GrobidScorable.grobidToSlug(json) match {
+ case Some(slug) => new MapFeatures(slug, json)
+ case None => new MapFeatures(Scorable.NoSlug, json)
+ }
+ }
+ }
+ */
}