diff options
Diffstat (limited to 'scalding/src')
7 files changed, 41 insertions, 37 deletions
diff --git a/scalding/src/main/scala/sandcrawler/BibjsonScorable.scala b/scalding/src/main/scala/sandcrawler/BibjsonScorable.scala index 0d26d75..abf9220 100644 --- a/scalding/src/main/scala/sandcrawler/BibjsonScorable.scala +++ b/scalding/src/main/scala/sandcrawler/BibjsonScorable.scala @@ -15,7 +15,7 @@ class BibjsonScorable extends Scorable { TextLine(args("bibjson-input")) } - def getFeaturesPipe(args : Args)(implicit mode : Mode, flowDef : FlowDef) : TypedPipe[MapFeatures] = { + def getFeaturesPipe(args : Args)(implicit mode : Mode, flowDef : FlowDef) : TypedPipe[Option[MapFeatures]] = { getSource(args).read .toTypedPipe[String](new Fields("line")) .map { BibjsonScorable.bibjsonToMapFeatures(_) } @@ -23,9 +23,9 @@ class BibjsonScorable extends Scorable { } object BibjsonScorable { - def bibjsonToMapFeatures(json : String) : MapFeatures = { + def bibjsonToMapFeatures(json : String) : Option[MapFeatures] = { Scorable.jsonToMap(json) match { - case None => MapFeatures(Scorable.NoSlug, json) + case None => None case Some(map) => { if (map contains "title") { val title = Scorable.getString(map, "title") @@ -33,13 +33,16 @@ object BibjsonScorable { val sha1 = Scorable.getString(map, "sha") // TODO: year, authors (if available) if (title == null || title.isEmpty) { - new MapFeatures(Scorable.NoSlug, json) + None } else { val sf : ScorableFeatures = ScorableFeatures.create(title=title, doi=doi, sha1=sha1) - new MapFeatures(sf.toSlug, sf.toString) + sf.toSlug match { + case None => None + case Some(slug) => Some(MapFeatures(slug, sf.toString)) + } } } else { - new MapFeatures(Scorable.NoSlug, json) + None } } } diff --git a/scalding/src/main/scala/sandcrawler/CrossrefScorable.scala b/scalding/src/main/scala/sandcrawler/CrossrefScorable.scala index f51c210..c13945f 100644 --- a/scalding/src/main/scala/sandcrawler/CrossrefScorable.scala +++ b/scalding/src/main/scala/sandcrawler/CrossrefScorable.scala @@ -17,7 +17,7 @@ class CrossrefScorable extends Scorable with HBasePipeConversions { TextLine(args("crossref-input")) } - def getFeaturesPipe(args : Args)(implicit mode : Mode, flowDef : FlowDef) : TypedPipe[MapFeatures] = { + def getFeaturesPipe(args : Args)(implicit mode : Mode, flowDef : FlowDef) : TypedPipe[Option[MapFeatures]] = { getSource(args).read .toTypedPipe[String](new Fields("line")) .filter { CrossrefScorable.keepRecord(_) } @@ -116,22 +116,25 @@ object CrossrefScorable { } } - def jsonToMapFeatures(json : String) : MapFeatures = { + def jsonToMapFeatures(json : String) : Option[MapFeatures] = { Scorable.jsonToMap(json) match { - case None => MapFeatures(Scorable.NoSlug, json) + case None => None case Some(map) => mapToTitle(map) match { - case None => MapFeatures(Scorable.NoSlug, json) + case None => None case Some(title) => { val doi = Scorable.getString(map, "DOI") val authors: List[String] = mapToAuthorList(map) val year: Int = mapToYear(map).getOrElse(0) val contentType: String = map.get("type").map(e => e.asInstanceOf[String]).getOrElse("MISSING-CONTENT-TYPE") if (doi.isEmpty || doi == null || authors.length == 0 || !(ContentTypeWhitelist contains contentType)) { - MapFeatures(Scorable.NoSlug, json) + None } else { val sf : ScorableFeatures = ScorableFeatures.create(title=title, authors=authors, doi=doi.toLowerCase(), year=year) - MapFeatures(sf.toSlug, sf.toString) + sf.toSlug match { + case None => None + case Some(slug) => Some(MapFeatures(slug, sf.toString)) + } } } } diff --git a/scalding/src/main/scala/sandcrawler/GrobidScorable.scala b/scalding/src/main/scala/sandcrawler/GrobidScorable.scala index 899ce66..f4ed129 100644 --- a/scalding/src/main/scala/sandcrawler/GrobidScorable.scala +++ b/scalding/src/main/scala/sandcrawler/GrobidScorable.scala @@ -20,7 +20,7 @@ class GrobidScorable extends Scorable with HBasePipeConversions { GrobidScorable.getHBaseSource(args("hbase-table"), args("zookeeper-hosts")) } - def getFeaturesPipe(args : Args)(implicit mode : Mode, flowDef : FlowDef) : TypedPipe[MapFeatures] = { + def getFeaturesPipe(args : Args)(implicit mode : Mode, flowDef : FlowDef) : TypedPipe[Option[MapFeatures]] = { getSource(args) .read // Can't just "fromBytesWritable" because we have multiple types @@ -65,16 +65,16 @@ object GrobidScorable { HBaseBuilder.build(table, host, List("grobid0:metadata", "grobid0:status_code"), SourceMode.SCAN_ALL) } - def jsonToMapFeatures(key : String, json : String) : MapFeatures = { + def jsonToMapFeatures(key : String, json : String) : Option[MapFeatures] = { Scorable.jsonToMap(json) match { - case None => MapFeatures(Scorable.NoSlug, json) + case None => None case Some(map) => { if (map contains "title") { val authors: List[String] = mapToAuthorList(map) val title = Scorable.getString(map, "title") ScorableFeatures.create(title=title, authors=authors, sha1=key).toMapFeatures } else { - MapFeatures(Scorable.NoSlug, json) + None } } } diff --git a/scalding/src/main/scala/sandcrawler/GrobidScorableDumpJob.scala b/scalding/src/main/scala/sandcrawler/GrobidScorableDumpJob.scala index 19b257f..d40410b 100644 --- a/scalding/src/main/scala/sandcrawler/GrobidScorableDumpJob.scala +++ b/scalding/src/main/scala/sandcrawler/GrobidScorableDumpJob.scala @@ -40,11 +40,8 @@ class GrobidScorableDumpJob(args: Args) extends JobBase(args) { parsedGrobidRows.inc GrobidScorable.jsonToMapFeatures(entry._1, entry._2) } - .filter { entry => Scorable.isValidSlug(entry.slug) } - .map { entry => - validGrobidRows.inc - entry - } + .filterNot { entry => entry.isEmpty } + .map { entry => entry.get } .groupBy { case MapFeatures(slug, json) => slug } .map { tuple => val (slug : String, features : MapFeatures) = tuple diff --git a/scalding/src/main/scala/sandcrawler/MatchBenchmarkJob.scala b/scalding/src/main/scala/sandcrawler/MatchBenchmarkJob.scala index 1578258..292de75 100644 --- a/scalding/src/main/scala/sandcrawler/MatchBenchmarkJob.scala +++ b/scalding/src/main/scala/sandcrawler/MatchBenchmarkJob.scala @@ -15,7 +15,8 @@ class MatchBenchmarkJob(args: Args) extends JobBase(args) { val pipe1 : TypedPipe[(String, ReduceFeatures)] = sc1.getInputPipe(leftArgs) val pipe2 : TypedPipe[(String, ReduceFeatures)] = sc2.getInputPipe(rightArgs) - pipe1.join(pipe2).map { entry => + pipe1.join(pipe2) + .map { entry => val (slug : String, (features1 : ReduceFeatures, features2 : ReduceFeatures)) = entry new ReduceOutput( slug, diff --git a/scalding/src/main/scala/sandcrawler/Scorable.scala b/scalding/src/main/scala/sandcrawler/Scorable.scala index f7eb95d..5d67044 100644 --- a/scalding/src/main/scala/sandcrawler/Scorable.scala +++ b/scalding/src/main/scala/sandcrawler/Scorable.scala @@ -13,10 +13,12 @@ case class ReduceFeatures(json : String) case class ReduceOutput(val slug : String, score : Int, json1 : String, json2 : String) abstract class Scorable { - def getInputPipe(args : Args)(implicit mode : Mode, flowDef : FlowDef) : TypedPipe[(String, ReduceFeatures)] = - { - getFeaturesPipe(args) - .filter { entry => Scorable.isValidSlug(entry.slug) } + def getInputPipe(args : Args)(implicit mode : Mode, flowDef : FlowDef) : TypedPipe[(String, ReduceFeatures)] = { + val validFeatures : TypedPipe[MapFeatures] = getFeaturesPipe(args) + .filterNot { entry => entry.isEmpty } + .map { entry => entry.get } + + validFeatures .groupBy { case MapFeatures(slug, json) => slug } .map { tuple => val (slug : String, features : MapFeatures) = tuple @@ -26,16 +28,11 @@ abstract class Scorable { // abstract methods def getSource(args : Args) : Source - def getFeaturesPipe(args : Args)(implicit mode : Mode, flowDef : FlowDef) : TypedPipe[MapFeatures] + def getFeaturesPipe(args : Args)(implicit mode : Mode, flowDef : FlowDef) : TypedPipe[Option[MapFeatures]] } object Scorable { val MaxTitleLength = 1023 - val NoSlug = "NO SLUG" // Used for slug if title is empty or unparseable - - def isValidSlug(slug : String) : Boolean = { - slug != NoSlug - } def jsonToMap(json : String) : Option[Map[String, Any]] = { // https://stackoverflow.com/a/32717262/631051 diff --git a/scalding/src/main/scala/sandcrawler/ScorableFeatures.scala b/scalding/src/main/scala/sandcrawler/ScorableFeatures.scala index 241db79..b56f102 100644 --- a/scalding/src/main/scala/sandcrawler/ScorableFeatures.scala +++ b/scalding/src/main/scala/sandcrawler/ScorableFeatures.scala @@ -35,9 +35,9 @@ class ScorableFeatures private(title : String, authors : List[Any] = List(), yea JSONObject(toMap).toString } - def toSlug() : String = { + def toSlug() : Option[String] = { if (title == null) { - Scorable.NoSlug + None } else { val unaccented = StringUtilities.removeAccents(title) // Remove punctuation @@ -45,10 +45,13 @@ class ScorableFeatures private(title : String, authors : List[Any] = List(), yea if (slug.isEmpty || slug == null || (ScorableFeatures.SlugBlacklist contains slug) - || (slug.length < ScorableFeatures.MinSlugLength)) Scorable.NoSlug else slug + || (slug.length < ScorableFeatures.MinSlugLength)) None else Some(slug) } } - def toMapFeatures : MapFeatures = - MapFeatures(toSlug, toString) + def toMapFeatures : Option[MapFeatures] = + toSlug match { + case None => None + case Some(slug) => Some(MapFeatures(slug, toString)) + } } |