aboutsummaryrefslogtreecommitdiffstats
path: root/scalding
diff options
context:
space:
mode:
authorEllen Spertus <ellen.spertus@gmail.com>2018-08-09 20:26:31 -0700
committerEllen Spertus <ellen.spertus@gmail.com>2018-08-09 20:26:31 -0700
commit28c0518379d226ac25597c2840c5c81bd8551487 (patch)
tree3414260a4e7f02cf6142f84484a62460ab3c2753 /scalding
parent818ad070626d6af7c490017e0bd9b53f30f20150 (diff)
downloadsandcrawler-28c0518379d226ac25597c2840c5c81bd8551487.tar.gz
sandcrawler-28c0518379d226ac25597c2840c5c81bd8551487.zip
WIP
Diffstat (limited to 'scalding')
-rw-r--r--scalding/src/main/scala/sandcrawler/CrossrefScorable.scala9
-rw-r--r--scalding/src/main/scala/sandcrawler/GrobidScorable.scala9
-rw-r--r--scalding/src/main/scala/sandcrawler/Scorable.scala9
-rw-r--r--scalding/src/main/scala/sandcrawler/ScoreJob.scala7
4 files changed, 21 insertions, 13 deletions
diff --git a/scalding/src/main/scala/sandcrawler/CrossrefScorable.scala b/scalding/src/main/scala/sandcrawler/CrossrefScorable.scala
index b221718..249c9ab 100644
--- a/scalding/src/main/scala/sandcrawler/CrossrefScorable.scala
+++ b/scalding/src/main/scala/sandcrawler/CrossrefScorable.scala
@@ -10,10 +10,13 @@ import parallelai.spyglass.hbase.HBasePipeConversions
import parallelai.spyglass.hbase.HBaseSource
class CrossrefScorable extends Scorable {
- def getFeaturesPipe(args : Args) : TypedPipe[MapFeatures] = {
- // TODO: Generalize args so there can be multiple Grobid pipes in one job.
+ // TODO: Generalize args so there can be multiple Grobid pipes in one job.
+ def getSource(args : Args) : Source = {
TextLine(args("crossref-input"))
- .read
+ }
+
+ def getFeaturesPipe(pipe : Pipe) : TypedPipe[MapFeatures] = {
+ pipe
.toTypedPipe[String](new Fields("line"))
.map{ json : String =>
CrossrefScorable.crossrefToSlug(json) match {
diff --git a/scalding/src/main/scala/sandcrawler/GrobidScorable.scala b/scalding/src/main/scala/sandcrawler/GrobidScorable.scala
index 6229718..5c6b140 100644
--- a/scalding/src/main/scala/sandcrawler/GrobidScorable.scala
+++ b/scalding/src/main/scala/sandcrawler/GrobidScorable.scala
@@ -10,13 +10,14 @@ import parallelai.spyglass.hbase.HBasePipeConversions
import parallelai.spyglass.hbase.HBaseSource
class GrobidScorable extends Scorable with HBasePipeConversions {
- def getFeaturesPipe(args : Args) : TypedPipe[MapFeatures] = {
+ def getSource(args : Args) : Source = {
// TODO: Generalize args so there can be multiple grobid pipes in one job.
GrobidScorable.getHBaseSource(args("hbase-table"), args("zookeeper-hosts"))
- .read
+ }
+
+ def getFeaturesPipe(pipe : Pipe) : TypedPipe[MapFeatures] = {
+ pipe
.fromBytesWritable(new Fields("key", "tei_json"))
- // TODO: Figure out why this line (used in HBaseCrossrefScoreJob.scala)
- // didn't work here: .toTypedPipe[(String, String)]('key, 'tei_json)
.toTypedPipe[(String, String)](new Fields("key", "tei_json"))
.map { entry =>
val (key : String, json : String) = (entry._1, entry._2)
diff --git a/scalding/src/main/scala/sandcrawler/Scorable.scala b/scalding/src/main/scala/sandcrawler/Scorable.scala
index 2d2345b..92b61bc 100644
--- a/scalding/src/main/scala/sandcrawler/Scorable.scala
+++ b/scalding/src/main/scala/sandcrawler/Scorable.scala
@@ -12,9 +12,9 @@ case class ReduceFeatures(json : String)
case class ReduceOutput(val slug : String, score : Int, json1 : String, json2 : String)
abstract class Scorable {
- def getInputPipe(args : Args) : TypedPipe[(String, ReduceFeatures)] =
+ def getInputPipe(pipe : Pipe) : TypedPipe[(String, ReduceFeatures)] =
{
- getFeaturesPipe(args)
+ getFeaturesPipe(pipe)
.filter { entry => Scorable.isValidSlug(entry.slug) }
.groupBy { case MapFeatures(slug, json) => slug }
.map { tuple =>
@@ -23,8 +23,9 @@ abstract class Scorable {
}
}
- // abstract method
- def getFeaturesPipe(args : Args) : TypedPipe[MapFeatures]
+ // abstract methods
+ def getSource(args : Args) : Source
+ def getFeaturesPipe(pipe : Pipe) : TypedPipe[MapFeatures]
}
object Scorable {
diff --git a/scalding/src/main/scala/sandcrawler/ScoreJob.scala b/scalding/src/main/scala/sandcrawler/ScoreJob.scala
index 66ba29e..7891596 100644
--- a/scalding/src/main/scala/sandcrawler/ScoreJob.scala
+++ b/scalding/src/main/scala/sandcrawler/ScoreJob.scala
@@ -9,8 +9,11 @@ import parallelai.spyglass.hbase.HBasePipeConversions
class ScoreJob(args: Args) extends JobBase(args) with
HBasePipeConversions {
- val pipe1 : TypedPipe[(String, ReduceFeatures)] = ScoreJob.getScorable1().getInputPipe(args)
- val pipe2 : TypedPipe[(String, ReduceFeatures)] = ScoreJob.getScorable2().getInputPipe(args)
+ // TODO: Instantiate any subclass of Scorable specified in args.
+ Scorable sc1 = new GrobidScorable()
+ Scorable sc2 = new CrossrefScorable()
+ val pipe1 : TypedPipe[(String, ReduceFeatures)] = sc1.getInputPipe(sc1.getSource().read)
+ val pipe2 : TypedPipe[(String, ReduceFeatures)] = sc2.getInputPipe(sc2.getSource().read)
pipe1.join(pipe2).map { entry =>
val (slug : String, (features1 : ReduceFeatures, features2 : ReduceFeatures)) = entry