aboutsummaryrefslogtreecommitdiffstats
path: root/scalding/src/main
diff options
context:
space:
mode:
Diffstat (limited to 'scalding/src/main')
-rw-r--r--scalding/src/main/scala/sandcrawler/BibjsonScorable.scala15
-rw-r--r--scalding/src/main/scala/sandcrawler/CrossrefScorable.scala105
-rw-r--r--scalding/src/main/scala/sandcrawler/GrobidScorable.scala8
-rw-r--r--scalding/src/main/scala/sandcrawler/GrobidScorableDumpJob.scala8
-rw-r--r--scalding/src/main/scala/sandcrawler/MatchBenchmarkJob.scala3
-rw-r--r--scalding/src/main/scala/sandcrawler/Scorable.scala17
-rw-r--r--scalding/src/main/scala/sandcrawler/ScorableFeatures.scala21
7 files changed, 99 insertions, 78 deletions
diff --git a/scalding/src/main/scala/sandcrawler/BibjsonScorable.scala b/scalding/src/main/scala/sandcrawler/BibjsonScorable.scala
index 0d26d75..abf9220 100644
--- a/scalding/src/main/scala/sandcrawler/BibjsonScorable.scala
+++ b/scalding/src/main/scala/sandcrawler/BibjsonScorable.scala
@@ -15,7 +15,7 @@ class BibjsonScorable extends Scorable {
TextLine(args("bibjson-input"))
}
- def getFeaturesPipe(args : Args)(implicit mode : Mode, flowDef : FlowDef) : TypedPipe[MapFeatures] = {
+ def getFeaturesPipe(args : Args)(implicit mode : Mode, flowDef : FlowDef) : TypedPipe[Option[MapFeatures]] = {
getSource(args).read
.toTypedPipe[String](new Fields("line"))
.map { BibjsonScorable.bibjsonToMapFeatures(_) }
@@ -23,9 +23,9 @@ class BibjsonScorable extends Scorable {
}
object BibjsonScorable {
- def bibjsonToMapFeatures(json : String) : MapFeatures = {
+ def bibjsonToMapFeatures(json : String) : Option[MapFeatures] = {
Scorable.jsonToMap(json) match {
- case None => MapFeatures(Scorable.NoSlug, json)
+ case None => None
case Some(map) => {
if (map contains "title") {
val title = Scorable.getString(map, "title")
@@ -33,13 +33,16 @@ object BibjsonScorable {
val sha1 = Scorable.getString(map, "sha")
// TODO: year, authors (if available)
if (title == null || title.isEmpty) {
- new MapFeatures(Scorable.NoSlug, json)
+ None
} else {
val sf : ScorableFeatures = ScorableFeatures.create(title=title, doi=doi, sha1=sha1)
- new MapFeatures(sf.toSlug, sf.toString)
+ sf.toSlug match {
+ case None => None
+ case Some(slug) => Some(MapFeatures(slug, sf.toString))
+ }
}
} else {
- new MapFeatures(Scorable.NoSlug, json)
+ None
}
}
}
diff --git a/scalding/src/main/scala/sandcrawler/CrossrefScorable.scala b/scalding/src/main/scala/sandcrawler/CrossrefScorable.scala
index f51c210..bb6413f 100644
--- a/scalding/src/main/scala/sandcrawler/CrossrefScorable.scala
+++ b/scalding/src/main/scala/sandcrawler/CrossrefScorable.scala
@@ -17,7 +17,7 @@ class CrossrefScorable extends Scorable with HBasePipeConversions {
TextLine(args("crossref-input"))
}
- def getFeaturesPipe(args : Args)(implicit mode : Mode, flowDef : FlowDef) : TypedPipe[MapFeatures] = {
+ def getFeaturesPipe(args : Args)(implicit mode : Mode, flowDef : FlowDef) : TypedPipe[Option[MapFeatures]] = {
getSource(args).read
.toTypedPipe[String](new Fields("line"))
.filter { CrossrefScorable.keepRecord(_) }
@@ -55,37 +55,45 @@ object CrossrefScorable {
// Returns None if title is null, empty, or too long.
def mapToTitle(map : Map[String, Any]) : Option[String] = {
- if (map contains "title") {
- val titles = map("title").asInstanceOf[List[String]]
- if (titles.isEmpty || titles == null) {
- None
+ def getTitle : Option[String] = {
+ if (map contains "title") {
+ val titles = map("title").asInstanceOf[List[String]]
+ if (titles.isEmpty || titles == null) None else Some(titles(0))
} else {
- val baseTitle: String = titles(0)
- // TODO(bnewbold): this code block is horrible
- val baseSubtitle: String = if (map contains "subtitle") {
- val subtitles = map("subtitle").asInstanceOf[List[String]]
- if (!subtitles.isEmpty && subtitles != null) {
- val sub = subtitles(0)
- if (sub != null && !sub.isEmpty && baseTitle != null) {
- sub
- } else {
- ""
- }
+ None
+ }
+ }
+
+ def getSubtitle : Option[String] = {
+ if (map contains "subtitle") {
+ val subtitles = map("subtitle").asInstanceOf[List[String]]
+ if (subtitles.isEmpty || subtitles == null) {
+ None
+ } else {
+ val sub = subtitles(0)
+ if (sub == null || sub.isEmpty) {
+ None
} else {
- ""
+ Some(sub)
}
- } else {
- ""
}
- val title = if (baseSubtitle.isEmpty) {
- baseTitle
+ } else {
+ None
+ }
+ }
+
+ getTitle match {
+ case None => None
+ case Some(baseTitle) => {
+ if (baseTitle == null) {
+ None
} else {
- baseTitle.concat(": ".concat(baseSubtitle))
+ getSubtitle match {
+ case None => Some(baseTitle)
+ case Some(baseSubtitle) => Some(baseTitle.concat(":".concat(baseSubtitle)))
+ }
}
- if (title == null || title.isEmpty || title.length > Scorable.MaxTitleLength) None else Some(title)
}
- } else {
- None
}
}
@@ -106,34 +114,39 @@ object CrossrefScorable {
case None => None
case Some(created) => {
Some(created.asInstanceOf[Map[String,Any]]
- .get("date-parts")
- .get
- .asInstanceOf[List[Any]](0)
- .asInstanceOf[List[Any]](0)
- .asInstanceOf[Double]
- .toInt)
+ .get("date-parts")
+ .get
+ .asInstanceOf[List[Any]](0)
+ .asInstanceOf[List[Any]](0)
+ .asInstanceOf[Double]
+ .toInt)
}
}
}
- def jsonToMapFeatures(json : String) : MapFeatures = {
+ def jsonToMapFeatures(json : String) : Option[MapFeatures] = {
+ def makeMapFeatures(title : String, doi : String, authors : List[String], year : Int, contentType : String) : Option[MapFeatures] = {
+ if (doi.isEmpty || doi == null || authors.length == 0 || !(ContentTypeWhitelist contains contentType)) {
+ None
+ } else {
+ val sf : ScorableFeatures = ScorableFeatures.create(title=title, authors=authors, doi=doi.toLowerCase(), year=year)
+ sf.toSlug match {
+ case None => None
+ case Some(slug) => Some(MapFeatures(slug, sf.toString))
+ }
+ }
+ }
Scorable.jsonToMap(json) match {
- case None => MapFeatures(Scorable.NoSlug, json)
+ case None => None
case Some(map) =>
mapToTitle(map) match {
- case None => MapFeatures(Scorable.NoSlug, json)
- case Some(title) => {
- val doi = Scorable.getString(map, "DOI")
- val authors: List[String] = mapToAuthorList(map)
- val year: Int = mapToYear(map).getOrElse(0)
- val contentType: String = map.get("type").map(e => e.asInstanceOf[String]).getOrElse("MISSING-CONTENT-TYPE")
- if (doi.isEmpty || doi == null || authors.length == 0 || !(ContentTypeWhitelist contains contentType)) {
- MapFeatures(Scorable.NoSlug, json)
- } else {
- val sf : ScorableFeatures = ScorableFeatures.create(title=title, authors=authors, doi=doi.toLowerCase(), year=year)
- MapFeatures(sf.toSlug, sf.toString)
- }
- }
+ case None => None
+ case Some(title) => makeMapFeatures(
+ title=title,
+ doi=Scorable.getString(map, "DOI"),
+ authors=mapToAuthorList(map),
+ year=mapToYear(map).getOrElse(0),
+ contentType=map.get("type").map(e => e.asInstanceOf[String]).getOrElse("MISSING-CONTENT-TYPE"))
}
}
}
diff --git a/scalding/src/main/scala/sandcrawler/GrobidScorable.scala b/scalding/src/main/scala/sandcrawler/GrobidScorable.scala
index 899ce66..f4ed129 100644
--- a/scalding/src/main/scala/sandcrawler/GrobidScorable.scala
+++ b/scalding/src/main/scala/sandcrawler/GrobidScorable.scala
@@ -20,7 +20,7 @@ class GrobidScorable extends Scorable with HBasePipeConversions {
GrobidScorable.getHBaseSource(args("hbase-table"), args("zookeeper-hosts"))
}
- def getFeaturesPipe(args : Args)(implicit mode : Mode, flowDef : FlowDef) : TypedPipe[MapFeatures] = {
+ def getFeaturesPipe(args : Args)(implicit mode : Mode, flowDef : FlowDef) : TypedPipe[Option[MapFeatures]] = {
getSource(args)
.read
// Can't just "fromBytesWritable" because we have multiple types
@@ -65,16 +65,16 @@ object GrobidScorable {
HBaseBuilder.build(table, host, List("grobid0:metadata", "grobid0:status_code"), SourceMode.SCAN_ALL)
}
- def jsonToMapFeatures(key : String, json : String) : MapFeatures = {
+ def jsonToMapFeatures(key : String, json : String) : Option[MapFeatures] = {
Scorable.jsonToMap(json) match {
- case None => MapFeatures(Scorable.NoSlug, json)
+ case None => None
case Some(map) => {
if (map contains "title") {
val authors: List[String] = mapToAuthorList(map)
val title = Scorable.getString(map, "title")
ScorableFeatures.create(title=title, authors=authors, sha1=key).toMapFeatures
} else {
- MapFeatures(Scorable.NoSlug, json)
+ None
}
}
}
diff --git a/scalding/src/main/scala/sandcrawler/GrobidScorableDumpJob.scala b/scalding/src/main/scala/sandcrawler/GrobidScorableDumpJob.scala
index 19b257f..3146a6c 100644
--- a/scalding/src/main/scala/sandcrawler/GrobidScorableDumpJob.scala
+++ b/scalding/src/main/scala/sandcrawler/GrobidScorableDumpJob.scala
@@ -40,11 +40,11 @@ class GrobidScorableDumpJob(args: Args) extends JobBase(args) {
parsedGrobidRows.inc
GrobidScorable.jsonToMapFeatures(entry._1, entry._2)
}
- .filter { entry => Scorable.isValidSlug(entry.slug) }
- .map { entry =>
+ .filterNot { entry => entry.isEmpty }
+ .map { entry => {
validGrobidRows.inc
- entry
- }
+ entry.get
+ }}
.groupBy { case MapFeatures(slug, json) => slug }
.map { tuple =>
val (slug : String, features : MapFeatures) = tuple
diff --git a/scalding/src/main/scala/sandcrawler/MatchBenchmarkJob.scala b/scalding/src/main/scala/sandcrawler/MatchBenchmarkJob.scala
index 1578258..292de75 100644
--- a/scalding/src/main/scala/sandcrawler/MatchBenchmarkJob.scala
+++ b/scalding/src/main/scala/sandcrawler/MatchBenchmarkJob.scala
@@ -15,7 +15,8 @@ class MatchBenchmarkJob(args: Args) extends JobBase(args) {
val pipe1 : TypedPipe[(String, ReduceFeatures)] = sc1.getInputPipe(leftArgs)
val pipe2 : TypedPipe[(String, ReduceFeatures)] = sc2.getInputPipe(rightArgs)
- pipe1.join(pipe2).map { entry =>
+ pipe1.join(pipe2)
+ .map { entry =>
val (slug : String, (features1 : ReduceFeatures, features2 : ReduceFeatures)) = entry
new ReduceOutput(
slug,
diff --git a/scalding/src/main/scala/sandcrawler/Scorable.scala b/scalding/src/main/scala/sandcrawler/Scorable.scala
index f7eb95d..5d67044 100644
--- a/scalding/src/main/scala/sandcrawler/Scorable.scala
+++ b/scalding/src/main/scala/sandcrawler/Scorable.scala
@@ -13,10 +13,12 @@ case class ReduceFeatures(json : String)
case class ReduceOutput(val slug : String, score : Int, json1 : String, json2 : String)
abstract class Scorable {
- def getInputPipe(args : Args)(implicit mode : Mode, flowDef : FlowDef) : TypedPipe[(String, ReduceFeatures)] =
- {
- getFeaturesPipe(args)
- .filter { entry => Scorable.isValidSlug(entry.slug) }
+ def getInputPipe(args : Args)(implicit mode : Mode, flowDef : FlowDef) : TypedPipe[(String, ReduceFeatures)] = {
+ val validFeatures : TypedPipe[MapFeatures] = getFeaturesPipe(args)
+ .filterNot { entry => entry.isEmpty }
+ .map { entry => entry.get }
+
+ validFeatures
.groupBy { case MapFeatures(slug, json) => slug }
.map { tuple =>
val (slug : String, features : MapFeatures) = tuple
@@ -26,16 +28,11 @@ abstract class Scorable {
// abstract methods
def getSource(args : Args) : Source
- def getFeaturesPipe(args : Args)(implicit mode : Mode, flowDef : FlowDef) : TypedPipe[MapFeatures]
+ def getFeaturesPipe(args : Args)(implicit mode : Mode, flowDef : FlowDef) : TypedPipe[Option[MapFeatures]]
}
object Scorable {
val MaxTitleLength = 1023
- val NoSlug = "NO SLUG" // Used for slug if title is empty or unparseable
-
- def isValidSlug(slug : String) : Boolean = {
- slug != NoSlug
- }
def jsonToMap(json : String) : Option[Map[String, Any]] = {
// https://stackoverflow.com/a/32717262/631051
diff --git a/scalding/src/main/scala/sandcrawler/ScorableFeatures.scala b/scalding/src/main/scala/sandcrawler/ScorableFeatures.scala
index 241db79..be2b495 100644
--- a/scalding/src/main/scala/sandcrawler/ScorableFeatures.scala
+++ b/scalding/src/main/scala/sandcrawler/ScorableFeatures.scala
@@ -35,20 +35,27 @@ class ScorableFeatures private(title : String, authors : List[Any] = List(), yea
JSONObject(toMap).toString
}
- def toSlug() : String = {
+ def toSlug() : Option[String] = {
if (title == null) {
- Scorable.NoSlug
+ None
} else {
val unaccented = StringUtilities.removeAccents(title)
// Remove punctuation
val slug = StringUtilities.removePunctuation((unaccented.toLowerCase())).replaceAll("\\s", "")
if (slug.isEmpty
- || slug == null
- || (ScorableFeatures.SlugBlacklist contains slug)
- || (slug.length < ScorableFeatures.MinSlugLength)) Scorable.NoSlug else slug
+ || slug == null
+ || (ScorableFeatures.SlugBlacklist contains slug)
+ || (slug.length < ScorableFeatures.MinSlugLength)) {
+ None
+ } else {
+ Some(slug)
+ }
}
}
- def toMapFeatures : MapFeatures =
- MapFeatures(toSlug, toString)
+ def toMapFeatures : Option[MapFeatures] =
+ toSlug match {
+ case None => None
+ case Some(slug) => Some(MapFeatures(slug, toString))
+ }
}