diff options
Diffstat (limited to 'scalding/src/main')
7 files changed, 99 insertions, 78 deletions
diff --git a/scalding/src/main/scala/sandcrawler/BibjsonScorable.scala b/scalding/src/main/scala/sandcrawler/BibjsonScorable.scala index 0d26d75..abf9220 100644 --- a/scalding/src/main/scala/sandcrawler/BibjsonScorable.scala +++ b/scalding/src/main/scala/sandcrawler/BibjsonScorable.scala @@ -15,7 +15,7 @@ class BibjsonScorable extends Scorable { TextLine(args("bibjson-input")) } - def getFeaturesPipe(args : Args)(implicit mode : Mode, flowDef : FlowDef) : TypedPipe[MapFeatures] = { + def getFeaturesPipe(args : Args)(implicit mode : Mode, flowDef : FlowDef) : TypedPipe[Option[MapFeatures]] = { getSource(args).read .toTypedPipe[String](new Fields("line")) .map { BibjsonScorable.bibjsonToMapFeatures(_) } @@ -23,9 +23,9 @@ class BibjsonScorable extends Scorable { } object BibjsonScorable { - def bibjsonToMapFeatures(json : String) : MapFeatures = { + def bibjsonToMapFeatures(json : String) : Option[MapFeatures] = { Scorable.jsonToMap(json) match { - case None => MapFeatures(Scorable.NoSlug, json) + case None => None case Some(map) => { if (map contains "title") { val title = Scorable.getString(map, "title") @@ -33,13 +33,16 @@ object BibjsonScorable { val sha1 = Scorable.getString(map, "sha") // TODO: year, authors (if available) if (title == null || title.isEmpty) { - new MapFeatures(Scorable.NoSlug, json) + None } else { val sf : ScorableFeatures = ScorableFeatures.create(title=title, doi=doi, sha1=sha1) - new MapFeatures(sf.toSlug, sf.toString) + sf.toSlug match { + case None => None + case Some(slug) => Some(MapFeatures(slug, sf.toString)) + } } } else { - new MapFeatures(Scorable.NoSlug, json) + None } } } diff --git a/scalding/src/main/scala/sandcrawler/CrossrefScorable.scala b/scalding/src/main/scala/sandcrawler/CrossrefScorable.scala index f51c210..bb6413f 100644 --- a/scalding/src/main/scala/sandcrawler/CrossrefScorable.scala +++ b/scalding/src/main/scala/sandcrawler/CrossrefScorable.scala @@ -17,7 +17,7 @@ class CrossrefScorable extends Scorable with HBasePipeConversions { TextLine(args("crossref-input")) } - def getFeaturesPipe(args : Args)(implicit mode : Mode, flowDef : FlowDef) : TypedPipe[MapFeatures] = { + def getFeaturesPipe(args : Args)(implicit mode : Mode, flowDef : FlowDef) : TypedPipe[Option[MapFeatures]] = { getSource(args).read .toTypedPipe[String](new Fields("line")) .filter { CrossrefScorable.keepRecord(_) } @@ -55,37 +55,45 @@ object CrossrefScorable { // Returns None if title is null, empty, or too long. def mapToTitle(map : Map[String, Any]) : Option[String] = { - if (map contains "title") { - val titles = map("title").asInstanceOf[List[String]] - if (titles.isEmpty || titles == null) { - None + def getTitle : Option[String] = { + if (map contains "title") { + val titles = map("title").asInstanceOf[List[String]] + if (titles.isEmpty || titles == null) None else Some(titles(0)) } else { - val baseTitle: String = titles(0) - // TODO(bnewbold): this code block is horrible - val baseSubtitle: String = if (map contains "subtitle") { - val subtitles = map("subtitle").asInstanceOf[List[String]] - if (!subtitles.isEmpty && subtitles != null) { - val sub = subtitles(0) - if (sub != null && !sub.isEmpty && baseTitle != null) { - sub - } else { - "" - } + None + } + } + + def getSubtitle : Option[String] = { + if (map contains "subtitle") { + val subtitles = map("subtitle").asInstanceOf[List[String]] + if (subtitles.isEmpty || subtitles == null) { + None + } else { + val sub = subtitles(0) + if (sub == null || sub.isEmpty) { + None } else { - "" + Some(sub) } - } else { - "" } - val title = if (baseSubtitle.isEmpty) { - baseTitle + } else { + None + } + } + + getTitle match { + case None => None + case Some(baseTitle) => { + if (baseTitle == null) { + None } else { - baseTitle.concat(": ".concat(baseSubtitle)) + getSubtitle match { + case None => Some(baseTitle) + case Some(baseSubtitle) => Some(baseTitle.concat(":".concat(baseSubtitle))) + } } - if (title == null || title.isEmpty || title.length > Scorable.MaxTitleLength) None else Some(title) } - } else { - None } } @@ -106,34 +114,39 @@ object CrossrefScorable { case None => None case Some(created) => { Some(created.asInstanceOf[Map[String,Any]] - .get("date-parts") - .get - .asInstanceOf[List[Any]](0) - .asInstanceOf[List[Any]](0) - .asInstanceOf[Double] - .toInt) + .get("date-parts") + .get + .asInstanceOf[List[Any]](0) + .asInstanceOf[List[Any]](0) + .asInstanceOf[Double] + .toInt) } } } - def jsonToMapFeatures(json : String) : MapFeatures = { + def jsonToMapFeatures(json : String) : Option[MapFeatures] = { + def makeMapFeatures(title : String, doi : String, authors : List[String], year : Int, contentType : String) : Option[MapFeatures] = { + if (doi.isEmpty || doi == null || authors.length == 0 || !(ContentTypeWhitelist contains contentType)) { + None + } else { + val sf : ScorableFeatures = ScorableFeatures.create(title=title, authors=authors, doi=doi.toLowerCase(), year=year) + sf.toSlug match { + case None => None + case Some(slug) => Some(MapFeatures(slug, sf.toString)) + } + } + } Scorable.jsonToMap(json) match { - case None => MapFeatures(Scorable.NoSlug, json) + case None => None case Some(map) => mapToTitle(map) match { - case None => MapFeatures(Scorable.NoSlug, json) - case Some(title) => { - val doi = Scorable.getString(map, "DOI") - val authors: List[String] = mapToAuthorList(map) - val year: Int = mapToYear(map).getOrElse(0) - val contentType: String = map.get("type").map(e => e.asInstanceOf[String]).getOrElse("MISSING-CONTENT-TYPE") - if (doi.isEmpty || doi == null || authors.length == 0 || !(ContentTypeWhitelist contains contentType)) { - MapFeatures(Scorable.NoSlug, json) - } else { - val sf : ScorableFeatures = ScorableFeatures.create(title=title, authors=authors, doi=doi.toLowerCase(), year=year) - MapFeatures(sf.toSlug, sf.toString) - } - } + case None => None + case Some(title) => makeMapFeatures( + title=title, + doi=Scorable.getString(map, "DOI"), + authors=mapToAuthorList(map), + year=mapToYear(map).getOrElse(0), + contentType=map.get("type").map(e => e.asInstanceOf[String]).getOrElse("MISSING-CONTENT-TYPE")) } } } diff --git a/scalding/src/main/scala/sandcrawler/GrobidScorable.scala b/scalding/src/main/scala/sandcrawler/GrobidScorable.scala index 899ce66..f4ed129 100644 --- a/scalding/src/main/scala/sandcrawler/GrobidScorable.scala +++ b/scalding/src/main/scala/sandcrawler/GrobidScorable.scala @@ -20,7 +20,7 @@ class GrobidScorable extends Scorable with HBasePipeConversions { GrobidScorable.getHBaseSource(args("hbase-table"), args("zookeeper-hosts")) } - def getFeaturesPipe(args : Args)(implicit mode : Mode, flowDef : FlowDef) : TypedPipe[MapFeatures] = { + def getFeaturesPipe(args : Args)(implicit mode : Mode, flowDef : FlowDef) : TypedPipe[Option[MapFeatures]] = { getSource(args) .read // Can't just "fromBytesWritable" because we have multiple types @@ -65,16 +65,16 @@ object GrobidScorable { HBaseBuilder.build(table, host, List("grobid0:metadata", "grobid0:status_code"), SourceMode.SCAN_ALL) } - def jsonToMapFeatures(key : String, json : String) : MapFeatures = { + def jsonToMapFeatures(key : String, json : String) : Option[MapFeatures] = { Scorable.jsonToMap(json) match { - case None => MapFeatures(Scorable.NoSlug, json) + case None => None case Some(map) => { if (map contains "title") { val authors: List[String] = mapToAuthorList(map) val title = Scorable.getString(map, "title") ScorableFeatures.create(title=title, authors=authors, sha1=key).toMapFeatures } else { - MapFeatures(Scorable.NoSlug, json) + None } } } diff --git a/scalding/src/main/scala/sandcrawler/GrobidScorableDumpJob.scala b/scalding/src/main/scala/sandcrawler/GrobidScorableDumpJob.scala index 19b257f..3146a6c 100644 --- a/scalding/src/main/scala/sandcrawler/GrobidScorableDumpJob.scala +++ b/scalding/src/main/scala/sandcrawler/GrobidScorableDumpJob.scala @@ -40,11 +40,11 @@ class GrobidScorableDumpJob(args: Args) extends JobBase(args) { parsedGrobidRows.inc GrobidScorable.jsonToMapFeatures(entry._1, entry._2) } - .filter { entry => Scorable.isValidSlug(entry.slug) } - .map { entry => + .filterNot { entry => entry.isEmpty } + .map { entry => { validGrobidRows.inc - entry - } + entry.get + }} .groupBy { case MapFeatures(slug, json) => slug } .map { tuple => val (slug : String, features : MapFeatures) = tuple diff --git a/scalding/src/main/scala/sandcrawler/MatchBenchmarkJob.scala b/scalding/src/main/scala/sandcrawler/MatchBenchmarkJob.scala index 1578258..292de75 100644 --- a/scalding/src/main/scala/sandcrawler/MatchBenchmarkJob.scala +++ b/scalding/src/main/scala/sandcrawler/MatchBenchmarkJob.scala @@ -15,7 +15,8 @@ class MatchBenchmarkJob(args: Args) extends JobBase(args) { val pipe1 : TypedPipe[(String, ReduceFeatures)] = sc1.getInputPipe(leftArgs) val pipe2 : TypedPipe[(String, ReduceFeatures)] = sc2.getInputPipe(rightArgs) - pipe1.join(pipe2).map { entry => + pipe1.join(pipe2) + .map { entry => val (slug : String, (features1 : ReduceFeatures, features2 : ReduceFeatures)) = entry new ReduceOutput( slug, diff --git a/scalding/src/main/scala/sandcrawler/Scorable.scala b/scalding/src/main/scala/sandcrawler/Scorable.scala index f7eb95d..5d67044 100644 --- a/scalding/src/main/scala/sandcrawler/Scorable.scala +++ b/scalding/src/main/scala/sandcrawler/Scorable.scala @@ -13,10 +13,12 @@ case class ReduceFeatures(json : String) case class ReduceOutput(val slug : String, score : Int, json1 : String, json2 : String) abstract class Scorable { - def getInputPipe(args : Args)(implicit mode : Mode, flowDef : FlowDef) : TypedPipe[(String, ReduceFeatures)] = - { - getFeaturesPipe(args) - .filter { entry => Scorable.isValidSlug(entry.slug) } + def getInputPipe(args : Args)(implicit mode : Mode, flowDef : FlowDef) : TypedPipe[(String, ReduceFeatures)] = { + val validFeatures : TypedPipe[MapFeatures] = getFeaturesPipe(args) + .filterNot { entry => entry.isEmpty } + .map { entry => entry.get } + + validFeatures .groupBy { case MapFeatures(slug, json) => slug } .map { tuple => val (slug : String, features : MapFeatures) = tuple @@ -26,16 +28,11 @@ abstract class Scorable { // abstract methods def getSource(args : Args) : Source - def getFeaturesPipe(args : Args)(implicit mode : Mode, flowDef : FlowDef) : TypedPipe[MapFeatures] + def getFeaturesPipe(args : Args)(implicit mode : Mode, flowDef : FlowDef) : TypedPipe[Option[MapFeatures]] } object Scorable { val MaxTitleLength = 1023 - val NoSlug = "NO SLUG" // Used for slug if title is empty or unparseable - - def isValidSlug(slug : String) : Boolean = { - slug != NoSlug - } def jsonToMap(json : String) : Option[Map[String, Any]] = { // https://stackoverflow.com/a/32717262/631051 diff --git a/scalding/src/main/scala/sandcrawler/ScorableFeatures.scala b/scalding/src/main/scala/sandcrawler/ScorableFeatures.scala index 241db79..be2b495 100644 --- a/scalding/src/main/scala/sandcrawler/ScorableFeatures.scala +++ b/scalding/src/main/scala/sandcrawler/ScorableFeatures.scala @@ -35,20 +35,27 @@ class ScorableFeatures private(title : String, authors : List[Any] = List(), yea JSONObject(toMap).toString } - def toSlug() : String = { + def toSlug() : Option[String] = { if (title == null) { - Scorable.NoSlug + None } else { val unaccented = StringUtilities.removeAccents(title) // Remove punctuation val slug = StringUtilities.removePunctuation((unaccented.toLowerCase())).replaceAll("\\s", "") if (slug.isEmpty - || slug == null - || (ScorableFeatures.SlugBlacklist contains slug) - || (slug.length < ScorableFeatures.MinSlugLength)) Scorable.NoSlug else slug + || slug == null + || (ScorableFeatures.SlugBlacklist contains slug) + || (slug.length < ScorableFeatures.MinSlugLength)) { + None + } else { + Some(slug) + } } } - def toMapFeatures : MapFeatures = - MapFeatures(toSlug, toString) + def toMapFeatures : Option[MapFeatures] = + toSlug match { + case None => None + case Some(slug) => Some(MapFeatures(slug, toString)) + } } |