aboutsummaryrefslogtreecommitdiffstats
path: root/scalding/src/main/scala
diff options
context:
space:
mode:
Diffstat (limited to 'scalding/src/main/scala')
-rw-r--r--scalding/src/main/scala/sandcrawler/CrossrefScorable.scala1
-rw-r--r--scalding/src/main/scala/sandcrawler/GrobidScorable.scala15
-rw-r--r--scalding/src/main/scala/sandcrawler/Scorable.scala2
-rw-r--r--scalding/src/main/scala/sandcrawler/ScoreJob.scala46
4 files changed, 46 insertions, 18 deletions
diff --git a/scalding/src/main/scala/sandcrawler/CrossrefScorable.scala b/scalding/src/main/scala/sandcrawler/CrossrefScorable.scala
index ee4cc54..d5da845 100644
--- a/scalding/src/main/scala/sandcrawler/CrossrefScorable.scala
+++ b/scalding/src/main/scala/sandcrawler/CrossrefScorable.scala
@@ -11,6 +11,7 @@ import parallelai.spyglass.hbase.HBaseSource
class CrossrefScorable extends Scorable {
def getFeaturesPipe(args : Args)(implicit flowDef : FlowDef, mode : Mode) : TypedPipe[MapFeatures] = {
+ // TODO: Generalize args so there can be multiple Grobid pipes in one job.
TextLine(args("crossref-input"))
.read
.toTypedPipe[String](new Fields("line"))
diff --git a/scalding/src/main/scala/sandcrawler/GrobidScorable.scala b/scalding/src/main/scala/sandcrawler/GrobidScorable.scala
index 95d6dae..4c67074 100644
--- a/scalding/src/main/scala/sandcrawler/GrobidScorable.scala
+++ b/scalding/src/main/scala/sandcrawler/GrobidScorable.scala
@@ -11,14 +11,9 @@ import parallelai.spyglass.hbase.HBaseSource
class GrobidScorable extends Scorable with HBasePipeConversions {
def getFeaturesPipe(args : Args)(implicit flowDef : FlowDef, mode : Mode) : TypedPipe[MapFeatures] = {
- // TODO: Clean up code after debugging.
- val grobidSource = HBaseBuilder.build(
- args("hbase-table"),
- args("zookeeper-hosts"),
- List("grobid0:tei_json"),
- SourceMode.SCAN_ALL)
-
- grobidSource.read
+ // TODO: Generalize args so there can be multiple grobid pipes in one job.
+ GrobidScorable.getHBaseSource(args("hbase-table"), args("zookeeper-hosts"))
+ .read
.fromBytesWritable(new Fields("key", "tei_json"))
// TODO: Figure out why this line (used in HBaseCrossrefScoreJob.scala)
// didn't work here: .toTypedPipe[(String, String)]('key, 'tei_json)
@@ -34,6 +29,10 @@ class GrobidScorable extends Scorable with HBasePipeConversions {
}
object GrobidScorable {
+ def getHBaseSource(table : String, host : String) : HBaseSource = {
+ HBaseBuilder.build(table, host, List("grobid0:tei_json"), SourceMode.SCAN_ALL)
+ }
+
def grobidToSlug(json : String) : Option[String] = {
Scorable.jsonToMap(json) match {
case None => None
diff --git a/scalding/src/main/scala/sandcrawler/Scorable.scala b/scalding/src/main/scala/sandcrawler/Scorable.scala
index 86336cb..cfdc192 100644
--- a/scalding/src/main/scala/sandcrawler/Scorable.scala
+++ b/scalding/src/main/scala/sandcrawler/Scorable.scala
@@ -9,7 +9,7 @@ import com.twitter.scalding.typed.TDsl._
case class MapFeatures(slug : String, json : String)
case class ReduceFeatures(json : String)
-case class ReduceOutput(val score : Int, json1 : String, json2 : String)
+case class ReduceOutput(val slug : String, score : Int, json1 : String, json2 : String)
abstract class Scorable {
def getInputPipe(args : Args, flowDef : FlowDef, mode : Mode) : TypedPipe[(String, ReduceFeatures)] =
diff --git a/scalding/src/main/scala/sandcrawler/ScoreJob.scala b/scalding/src/main/scala/sandcrawler/ScoreJob.scala
index e6a5dc1..aa20d0f 100644
--- a/scalding/src/main/scala/sandcrawler/ScoreJob.scala
+++ b/scalding/src/main/scala/sandcrawler/ScoreJob.scala
@@ -1,25 +1,53 @@
package sandcrawler
-import java.text.Normalizer
-
-import scala.math
-import scala.util.parsing.json.JSON
-
import cascading.flow.FlowDef
import com.twitter.scalding._
import com.twitter.scalding.typed.TDsl._
import parallelai.spyglass.base.JobBase
import parallelai.spyglass.hbase.HBasePipeConversions
-class ScoreJob(args: Args, sc1 : Scorable, sc2 : Scorable)(implicit flowDef : FlowDef, mode: Mode) extends JobBase(args) with HBasePipeConversions {
- val pipe1 : TypedPipe[(String, ReduceFeatures)] = sc1.getInputPipe(args, flowDef, mode)
- val pipe2 : TypedPipe[(String, ReduceFeatures)] = sc2.getInputPipe(args, flowDef, mode)
+class ScoreJob(args: Args)(implicit flowDef : FlowDef, mode: Mode) extends JobBase(args) with
+ HBasePipeConversions {
+ /*
+ val pipe1 : TypedPipe[(String, ReduceFeatures)] = ScoreJob.getScorable1().getInputPipe(args, flowDef, mode)
+ val pipe2 : TypedPipe[(String, ReduceFeatures)] = ScoreJob.getScorable2().getInputPipe(args, flowDef, mode)
pipe1.join(pipe2).map { entry =>
val (slug : String, (features1 : ReduceFeatures, features2 : ReduceFeatures)) = entry
- new ReduceOutput(Scorable.computeSimilarity(features1, features2),
+ new ReduceOutput(
+ slug,
+ Scorable.computeSimilarity(features1, features2),
features1.json,
features2.json)
}
.write(TypedTsv[ReduceOutput](args("output")))
+ */
+}
+
+// Ugly hack to get non-String information into ScoreJob above.
+object ScoreJob {
+ var scorable1 : Option[Scorable] = None
+ var scorable2 : Option[Scorable] = None
+
+ def setScorable1(s : Scorable) {
+ scorable1 = Some(s)
+ }
+
+ def getScorable1() : Scorable = {
+ scorable1 match {
+ case Some(s) => s
+ case None => null
+ }
+ }
+
+ def setScorable2(s: Scorable) {
+ scorable2 = Some(s)
+ }
+
+ def getScorable2() : Scorable = {
+ scorable2 match {
+ case Some(s) => s
+ case None => null
+ }
+ }
}