From 28c0518379d226ac25597c2840c5c81bd8551487 Mon Sep 17 00:00:00 2001 From: Ellen Spertus Date: Thu, 9 Aug 2018 20:26:31 -0700 Subject: WIP --- scalding/src/main/scala/sandcrawler/CrossrefScorable.scala | 9 ++++++--- scalding/src/main/scala/sandcrawler/GrobidScorable.scala | 9 +++++---- scalding/src/main/scala/sandcrawler/Scorable.scala | 9 +++++---- scalding/src/main/scala/sandcrawler/ScoreJob.scala | 7 +++++-- 4 files changed, 21 insertions(+), 13 deletions(-) (limited to 'scalding/src/main/scala') diff --git a/scalding/src/main/scala/sandcrawler/CrossrefScorable.scala b/scalding/src/main/scala/sandcrawler/CrossrefScorable.scala index b221718..249c9ab 100644 --- a/scalding/src/main/scala/sandcrawler/CrossrefScorable.scala +++ b/scalding/src/main/scala/sandcrawler/CrossrefScorable.scala @@ -10,10 +10,13 @@ import parallelai.spyglass.hbase.HBasePipeConversions import parallelai.spyglass.hbase.HBaseSource class CrossrefScorable extends Scorable { - def getFeaturesPipe(args : Args) : TypedPipe[MapFeatures] = { - // TODO: Generalize args so there can be multiple Grobid pipes in one job. + // TODO: Generalize args so there can be multiple Grobid pipes in one job. + def getSource(args : Args) : Source = { TextLine(args("crossref-input")) - .read + } + + def getFeaturesPipe(pipe : Pipe) : TypedPipe[MapFeatures] = { + pipe .toTypedPipe[String](new Fields("line")) .map{ json : String => CrossrefScorable.crossrefToSlug(json) match { diff --git a/scalding/src/main/scala/sandcrawler/GrobidScorable.scala b/scalding/src/main/scala/sandcrawler/GrobidScorable.scala index 6229718..5c6b140 100644 --- a/scalding/src/main/scala/sandcrawler/GrobidScorable.scala +++ b/scalding/src/main/scala/sandcrawler/GrobidScorable.scala @@ -10,13 +10,14 @@ import parallelai.spyglass.hbase.HBasePipeConversions import parallelai.spyglass.hbase.HBaseSource class GrobidScorable extends Scorable with HBasePipeConversions { - def getFeaturesPipe(args : Args) : TypedPipe[MapFeatures] = { + def getSource(args : Args) : Source = { // TODO: Generalize args so there can be multiple grobid pipes in one job. GrobidScorable.getHBaseSource(args("hbase-table"), args("zookeeper-hosts")) - .read + } + + def getFeaturesPipe(pipe : Pipe) : TypedPipe[MapFeatures] = { + pipe .fromBytesWritable(new Fields("key", "tei_json")) - // TODO: Figure out why this line (used in HBaseCrossrefScoreJob.scala) - // didn't work here: .toTypedPipe[(String, String)]('key, 'tei_json) .toTypedPipe[(String, String)](new Fields("key", "tei_json")) .map { entry => val (key : String, json : String) = (entry._1, entry._2) diff --git a/scalding/src/main/scala/sandcrawler/Scorable.scala b/scalding/src/main/scala/sandcrawler/Scorable.scala index 2d2345b..92b61bc 100644 --- a/scalding/src/main/scala/sandcrawler/Scorable.scala +++ b/scalding/src/main/scala/sandcrawler/Scorable.scala @@ -12,9 +12,9 @@ case class ReduceFeatures(json : String) case class ReduceOutput(val slug : String, score : Int, json1 : String, json2 : String) abstract class Scorable { - def getInputPipe(args : Args) : TypedPipe[(String, ReduceFeatures)] = + def getInputPipe(pipe : Pipe) : TypedPipe[(String, ReduceFeatures)] = { - getFeaturesPipe(args) + getFeaturesPipe(pipe) .filter { entry => Scorable.isValidSlug(entry.slug) } .groupBy { case MapFeatures(slug, json) => slug } .map { tuple => @@ -23,8 +23,9 @@ abstract class Scorable { } } - // abstract method - def getFeaturesPipe(args : Args) : TypedPipe[MapFeatures] + // abstract methods + def getSource(args : Args) : Source + def getFeaturesPipe(pipe : Pipe) : TypedPipe[MapFeatures] } object Scorable { diff --git a/scalding/src/main/scala/sandcrawler/ScoreJob.scala b/scalding/src/main/scala/sandcrawler/ScoreJob.scala index 66ba29e..7891596 100644 --- a/scalding/src/main/scala/sandcrawler/ScoreJob.scala +++ b/scalding/src/main/scala/sandcrawler/ScoreJob.scala @@ -9,8 +9,11 @@ import parallelai.spyglass.hbase.HBasePipeConversions class ScoreJob(args: Args) extends JobBase(args) with HBasePipeConversions { - val pipe1 : TypedPipe[(String, ReduceFeatures)] = ScoreJob.getScorable1().getInputPipe(args) - val pipe2 : TypedPipe[(String, ReduceFeatures)] = ScoreJob.getScorable2().getInputPipe(args) + // TODO: Instantiate any subclass of Scorable specified in args. + Scorable sc1 = new GrobidScorable() + Scorable sc2 = new CrossrefScorable() + val pipe1 : TypedPipe[(String, ReduceFeatures)] = sc1.getInputPipe(sc1.getSource().read) + val pipe2 : TypedPipe[(String, ReduceFeatures)] = sc2.getInputPipe(sc2.getSource().read) pipe1.join(pipe2).map { entry => val (slug : String, (features1 : ReduceFeatures, features2 : ReduceFeatures)) = entry -- cgit v1.2.3