aboutsummaryrefslogtreecommitdiffstats
path: root/scalding
diff options
context:
space:
mode:
authorEllen Spertus <ellen.spertus@gmail.com>2018-08-06 16:38:46 -0700
committerEllen Spertus <ellen.spertus@gmail.com>2018-08-06 16:38:46 -0700
commit308b33d889d804380427d2aa112efec77b3e1770 (patch)
treee0181fd4eb2d311bf45827447afe5d93291f931f /scalding
parentb1d8a72a5cc469b5139d9a976ccfa9b4b3eea61d (diff)
downloadsandcrawler-308b33d889d804380427d2aa112efec77b3e1770.tar.gz
sandcrawler-308b33d889d804380427d2aa112efec77b3e1770.zip
New code compiles. Old tests pass. New tests not yet written.
Diffstat (limited to 'scalding')
-rw-r--r--scalding/src/main/scala/sandcrawler/GrobidScorable.scala48
-rw-r--r--scalding/src/main/scala/sandcrawler/HBaseCrossrefScoreJob.scala6
-rw-r--r--scalding/src/main/scala/sandcrawler/Scorable.scala9
-rw-r--r--scalding/src/main/scala/sandcrawler/ScoreJob.scala9
-rw-r--r--scalding/src/main/scala/sandcrawler/StringUtilities.scala2
5 files changed, 65 insertions, 9 deletions
diff --git a/scalding/src/main/scala/sandcrawler/GrobidScorable.scala b/scalding/src/main/scala/sandcrawler/GrobidScorable.scala
new file mode 100644
index 0000000..5dac64c
--- /dev/null
+++ b/scalding/src/main/scala/sandcrawler/GrobidScorable.scala
@@ -0,0 +1,48 @@
+package sandcrawler
+
+import cascading.flow.FlowDef
+import cascading.pipe.Pipe
+import cascading.tuple.Fields
+import com.twitter.scalding._
+import com.twitter.scalding.typed.TDsl._
+import parallelai.spyglass.hbase.HBaseConstants.SourceMode
+import parallelai.spyglass.hbase.HBasePipeConversions
+import parallelai.spyglass.hbase.HBaseSource
+
+class GrobidScorable extends Scorable with HBasePipeConversions {
+ def getFeaturesPipe(args : Args)(implicit flowDef : FlowDef, mode : Mode) = {
+ // TODO: Clean up code after debugging.
+ val grobidSource = HBaseCrossrefScore.getHBaseSource(
+ args("hbase-table"),
+ args("zookeeper-hosts"))
+
+ val pipe0 : Pipe = grobidSource.read
+ val grobidPipe : TypedPipe[MapFeatures] = pipe0
+ .fromBytesWritable(new Fields("key", "tei_json"))
+ // .debug // Should be 4 tuples for mocked data
+ // TODO: Figure out why this line (used in HBaseCrossrefScoreJob.scala)
+ // didn't work here: .toTypedPipe[(String, String)]('key, 'tei_json)
+ .toTypedPipe[(String, String)](new Fields("key", "tei_json"))
+ .map { entry =>
+ val (key : String, json : String) = (entry._1, entry._2)
+ HBaseCrossrefScore.grobidToSlug(json) match {
+ case Some(slug) => new MapFeatures(slug, key, json)
+ case None => new MapFeatures(Scorable.NoSlug, key, json)
+ }
+ }
+ .filter {
+ _.slug != Scorable.NoSlug
+ }
+ grobidPipe
+ }
+/*
+ def fromBytesWritableLocal(f: Fields): Pipe = {
+ asList(f)
+ .foldLeft(pipe) { (p, fld) => {
+ p.map(fld.toString -> fld.toString) { from: org.apache.hadoop.hbase.io.ImmutableBytesWritable =>
+ Option(from).map(x => Bytes.toString(x.get)).getOrElse(null)
+ }
+ }}
+ }
+ */
+}
diff --git a/scalding/src/main/scala/sandcrawler/HBaseCrossrefScoreJob.scala b/scalding/src/main/scala/sandcrawler/HBaseCrossrefScoreJob.scala
index 01d852e..2fbb19f 100644
--- a/scalding/src/main/scala/sandcrawler/HBaseCrossrefScoreJob.scala
+++ b/scalding/src/main/scala/sandcrawler/HBaseCrossrefScoreJob.scala
@@ -27,8 +27,9 @@ class HBaseCrossrefScoreJob(args: Args) extends JobBase(args) with HBasePipeConv
val grobidSource = HBaseCrossrefScore.getHBaseSource(
args("hbase-table"),
args("zookeeper-hosts"))
- val grobidPipe : TypedPipe[(String, String, String)] = grobidSource
- .read
+
+ val pipe0 : cascading.pipe.Pipe = grobidSource.read
+ val grobidPipe : TypedPipe[(String, String, String)] = pipe0
.fromBytesWritable(new Fields("key", "tei_json"))
// .debug // Should be 4 tuples for mocked data
.toTypedPipe[(String, String)]('key, 'tei_json)
@@ -78,7 +79,6 @@ class HBaseCrossrefScoreJob(args: Args) extends JobBase(args) with HBasePipeConv
HBaseCrossrefScore.computeOutput(sha1, grobidJson, crossrefJson)}
// Output: score, sha1, doi, grobid title, crossref title
.write(TypedTsv[(Int, String, String, String, String)](args("output")))
-
}
object HBaseCrossrefScore {
diff --git a/scalding/src/main/scala/sandcrawler/Scorable.scala b/scalding/src/main/scala/sandcrawler/Scorable.scala
index 8e0c560..89dc835 100644
--- a/scalding/src/main/scala/sandcrawler/Scorable.scala
+++ b/scalding/src/main/scala/sandcrawler/Scorable.scala
@@ -1,6 +1,9 @@
+package sandcrawler
+
import scala.math
import scala.util.parsing.json.JSON
+import cascading.flow.FlowDef
import com.twitter.scalding._
import com.twitter.scalding.typed.TDsl._
@@ -9,9 +12,9 @@ case class ReduceFeatures(json : String)
case class ReduceOutput(val score : Int, json1 : String, json2 : String)
abstract class Scorable {
- def getInputPipe(args : Args) : TypedPipe[(String, ReduceFeatures)] =
+ def getInputPipe(args : Args, flowDef : FlowDef, mode : Mode) : TypedPipe[(String, ReduceFeatures)] =
{
- getFeaturesPipe(args)
+ getFeaturesPipe(args)(flowDef, mode)
.filter { entry => Scorable.isValidSlug(entry.slug) }
.groupBy { case MapFeatures(key, slug, json) => slug }
.map { tuple =>
@@ -21,7 +24,7 @@ abstract class Scorable {
}
// abstract method
- def getFeaturesPipe(args : Args) : TypedPipe[MapFeatures]
+ def getFeaturesPipe(args : Args)(implicit flowDef : FlowDef, mode : Mode) : TypedPipe[MapFeatures]
}
object Scorable {
diff --git a/scalding/src/main/scala/sandcrawler/ScoreJob.scala b/scalding/src/main/scala/sandcrawler/ScoreJob.scala
index 8d4d957..22cc9e9 100644
--- a/scalding/src/main/scala/sandcrawler/ScoreJob.scala
+++ b/scalding/src/main/scala/sandcrawler/ScoreJob.scala
@@ -1,16 +1,19 @@
+package sandcrawler
+
import java.text.Normalizer
import scala.math
import scala.util.parsing.json.JSON
+import cascading.flow.FlowDef
import com.twitter.scalding._
import com.twitter.scalding.typed.TDsl._
import parallelai.spyglass.base.JobBase
import parallelai.spyglass.hbase.HBasePipeConversions
-class ScoreJob(args: Args, sc1 : Scorable, sc2 : Scorable) extends JobBase(args) with HBasePipeConversions {
- val pipe1 : TypedPipe[(String, ReduceFeatures)] = sc1.getInputPipe(args)
- val pipe2 : TypedPipe[(String, ReduceFeatures)] = sc2.getInputPipe(args)
+class ScoreJob(args: Args, sc1 : Scorable, sc2 : Scorable)(implicit flowDef : FlowDef, mode: Mode) extends JobBase(args) with HBasePipeConversions {
+ val pipe1 : TypedPipe[(String, ReduceFeatures)] = sc1.getInputPipe(args, flowDef, mode)
+ val pipe2 : TypedPipe[(String, ReduceFeatures)] = sc2.getInputPipe(args, flowDef, mode)
pipe1.join(pipe2).map { entry =>
val (slug : String, (features1 : ReduceFeatures, features2 : ReduceFeatures)) = entry
diff --git a/scalding/src/main/scala/sandcrawler/StringUtilities.scala b/scalding/src/main/scala/sandcrawler/StringUtilities.scala
index 290b03f..1ae6db3 100644
--- a/scalding/src/main/scala/sandcrawler/StringUtilities.scala
+++ b/scalding/src/main/scala/sandcrawler/StringUtilities.scala
@@ -1,3 +1,5 @@
+package sandcrawler
+
import java.text.Normalizer
import java.util.regex.Pattern