aboutsummaryrefslogtreecommitdiffstats
path: root/scalding/src
diff options
context:
space:
mode:
Diffstat (limited to 'scalding/src')
-rw-r--r--scalding/src/main/scala/sandcrawler/BibjsonScorable.scala15
-rw-r--r--scalding/src/main/scala/sandcrawler/CrossrefScorable.scala15
-rw-r--r--scalding/src/main/scala/sandcrawler/GrobidScorable.scala8
-rw-r--r--scalding/src/main/scala/sandcrawler/GrobidScorableDumpJob.scala7
-rw-r--r--scalding/src/main/scala/sandcrawler/MatchBenchmarkJob.scala3
-rw-r--r--scalding/src/main/scala/sandcrawler/Scorable.scala17
-rw-r--r--scalding/src/main/scala/sandcrawler/ScorableFeatures.scala13
7 files changed, 41 insertions, 37 deletions
diff --git a/scalding/src/main/scala/sandcrawler/BibjsonScorable.scala b/scalding/src/main/scala/sandcrawler/BibjsonScorable.scala
index 0d26d75..abf9220 100644
--- a/scalding/src/main/scala/sandcrawler/BibjsonScorable.scala
+++ b/scalding/src/main/scala/sandcrawler/BibjsonScorable.scala
@@ -15,7 +15,7 @@ class BibjsonScorable extends Scorable {
TextLine(args("bibjson-input"))
}
- def getFeaturesPipe(args : Args)(implicit mode : Mode, flowDef : FlowDef) : TypedPipe[MapFeatures] = {
+ def getFeaturesPipe(args : Args)(implicit mode : Mode, flowDef : FlowDef) : TypedPipe[Option[MapFeatures]] = {
getSource(args).read
.toTypedPipe[String](new Fields("line"))
.map { BibjsonScorable.bibjsonToMapFeatures(_) }
@@ -23,9 +23,9 @@ class BibjsonScorable extends Scorable {
}
object BibjsonScorable {
- def bibjsonToMapFeatures(json : String) : MapFeatures = {
+ def bibjsonToMapFeatures(json : String) : Option[MapFeatures] = {
Scorable.jsonToMap(json) match {
- case None => MapFeatures(Scorable.NoSlug, json)
+ case None => None
case Some(map) => {
if (map contains "title") {
val title = Scorable.getString(map, "title")
@@ -33,13 +33,16 @@ object BibjsonScorable {
val sha1 = Scorable.getString(map, "sha")
// TODO: year, authors (if available)
if (title == null || title.isEmpty) {
- new MapFeatures(Scorable.NoSlug, json)
+ None
} else {
val sf : ScorableFeatures = ScorableFeatures.create(title=title, doi=doi, sha1=sha1)
- new MapFeatures(sf.toSlug, sf.toString)
+ sf.toSlug match {
+ case None => None
+ case Some(slug) => Some(MapFeatures(slug, sf.toString))
+ }
}
} else {
- new MapFeatures(Scorable.NoSlug, json)
+ None
}
}
}
diff --git a/scalding/src/main/scala/sandcrawler/CrossrefScorable.scala b/scalding/src/main/scala/sandcrawler/CrossrefScorable.scala
index f51c210..c13945f 100644
--- a/scalding/src/main/scala/sandcrawler/CrossrefScorable.scala
+++ b/scalding/src/main/scala/sandcrawler/CrossrefScorable.scala
@@ -17,7 +17,7 @@ class CrossrefScorable extends Scorable with HBasePipeConversions {
TextLine(args("crossref-input"))
}
- def getFeaturesPipe(args : Args)(implicit mode : Mode, flowDef : FlowDef) : TypedPipe[MapFeatures] = {
+ def getFeaturesPipe(args : Args)(implicit mode : Mode, flowDef : FlowDef) : TypedPipe[Option[MapFeatures]] = {
getSource(args).read
.toTypedPipe[String](new Fields("line"))
.filter { CrossrefScorable.keepRecord(_) }
@@ -116,22 +116,25 @@ object CrossrefScorable {
}
}
- def jsonToMapFeatures(json : String) : MapFeatures = {
+ def jsonToMapFeatures(json : String) : Option[MapFeatures] = {
Scorable.jsonToMap(json) match {
- case None => MapFeatures(Scorable.NoSlug, json)
+ case None => None
case Some(map) =>
mapToTitle(map) match {
- case None => MapFeatures(Scorable.NoSlug, json)
+ case None => None
case Some(title) => {
val doi = Scorable.getString(map, "DOI")
val authors: List[String] = mapToAuthorList(map)
val year: Int = mapToYear(map).getOrElse(0)
val contentType: String = map.get("type").map(e => e.asInstanceOf[String]).getOrElse("MISSING-CONTENT-TYPE")
if (doi.isEmpty || doi == null || authors.length == 0 || !(ContentTypeWhitelist contains contentType)) {
- MapFeatures(Scorable.NoSlug, json)
+ None
} else {
val sf : ScorableFeatures = ScorableFeatures.create(title=title, authors=authors, doi=doi.toLowerCase(), year=year)
- MapFeatures(sf.toSlug, sf.toString)
+ sf.toSlug match {
+ case None => None
+ case Some(slug) => Some(MapFeatures(slug, sf.toString))
+ }
}
}
}
diff --git a/scalding/src/main/scala/sandcrawler/GrobidScorable.scala b/scalding/src/main/scala/sandcrawler/GrobidScorable.scala
index 899ce66..f4ed129 100644
--- a/scalding/src/main/scala/sandcrawler/GrobidScorable.scala
+++ b/scalding/src/main/scala/sandcrawler/GrobidScorable.scala
@@ -20,7 +20,7 @@ class GrobidScorable extends Scorable with HBasePipeConversions {
GrobidScorable.getHBaseSource(args("hbase-table"), args("zookeeper-hosts"))
}
- def getFeaturesPipe(args : Args)(implicit mode : Mode, flowDef : FlowDef) : TypedPipe[MapFeatures] = {
+ def getFeaturesPipe(args : Args)(implicit mode : Mode, flowDef : FlowDef) : TypedPipe[Option[MapFeatures]] = {
getSource(args)
.read
// Can't just "fromBytesWritable" because we have multiple types
@@ -65,16 +65,16 @@ object GrobidScorable {
HBaseBuilder.build(table, host, List("grobid0:metadata", "grobid0:status_code"), SourceMode.SCAN_ALL)
}
- def jsonToMapFeatures(key : String, json : String) : MapFeatures = {
+ def jsonToMapFeatures(key : String, json : String) : Option[MapFeatures] = {
Scorable.jsonToMap(json) match {
- case None => MapFeatures(Scorable.NoSlug, json)
+ case None => None
case Some(map) => {
if (map contains "title") {
val authors: List[String] = mapToAuthorList(map)
val title = Scorable.getString(map, "title")
ScorableFeatures.create(title=title, authors=authors, sha1=key).toMapFeatures
} else {
- MapFeatures(Scorable.NoSlug, json)
+ None
}
}
}
diff --git a/scalding/src/main/scala/sandcrawler/GrobidScorableDumpJob.scala b/scalding/src/main/scala/sandcrawler/GrobidScorableDumpJob.scala
index 19b257f..d40410b 100644
--- a/scalding/src/main/scala/sandcrawler/GrobidScorableDumpJob.scala
+++ b/scalding/src/main/scala/sandcrawler/GrobidScorableDumpJob.scala
@@ -40,11 +40,8 @@ class GrobidScorableDumpJob(args: Args) extends JobBase(args) {
parsedGrobidRows.inc
GrobidScorable.jsonToMapFeatures(entry._1, entry._2)
}
- .filter { entry => Scorable.isValidSlug(entry.slug) }
- .map { entry =>
- validGrobidRows.inc
- entry
- }
+ .filterNot { entry => entry.isEmpty }
+ .map { entry => entry.get }
.groupBy { case MapFeatures(slug, json) => slug }
.map { tuple =>
val (slug : String, features : MapFeatures) = tuple
diff --git a/scalding/src/main/scala/sandcrawler/MatchBenchmarkJob.scala b/scalding/src/main/scala/sandcrawler/MatchBenchmarkJob.scala
index 1578258..292de75 100644
--- a/scalding/src/main/scala/sandcrawler/MatchBenchmarkJob.scala
+++ b/scalding/src/main/scala/sandcrawler/MatchBenchmarkJob.scala
@@ -15,7 +15,8 @@ class MatchBenchmarkJob(args: Args) extends JobBase(args) {
val pipe1 : TypedPipe[(String, ReduceFeatures)] = sc1.getInputPipe(leftArgs)
val pipe2 : TypedPipe[(String, ReduceFeatures)] = sc2.getInputPipe(rightArgs)
- pipe1.join(pipe2).map { entry =>
+ pipe1.join(pipe2)
+ .map { entry =>
val (slug : String, (features1 : ReduceFeatures, features2 : ReduceFeatures)) = entry
new ReduceOutput(
slug,
diff --git a/scalding/src/main/scala/sandcrawler/Scorable.scala b/scalding/src/main/scala/sandcrawler/Scorable.scala
index f7eb95d..5d67044 100644
--- a/scalding/src/main/scala/sandcrawler/Scorable.scala
+++ b/scalding/src/main/scala/sandcrawler/Scorable.scala
@@ -13,10 +13,12 @@ case class ReduceFeatures(json : String)
case class ReduceOutput(val slug : String, score : Int, json1 : String, json2 : String)
abstract class Scorable {
- def getInputPipe(args : Args)(implicit mode : Mode, flowDef : FlowDef) : TypedPipe[(String, ReduceFeatures)] =
- {
- getFeaturesPipe(args)
- .filter { entry => Scorable.isValidSlug(entry.slug) }
+ def getInputPipe(args : Args)(implicit mode : Mode, flowDef : FlowDef) : TypedPipe[(String, ReduceFeatures)] = {
+ val validFeatures : TypedPipe[MapFeatures] = getFeaturesPipe(args)
+ .filterNot { entry => entry.isEmpty }
+ .map { entry => entry.get }
+
+ validFeatures
.groupBy { case MapFeatures(slug, json) => slug }
.map { tuple =>
val (slug : String, features : MapFeatures) = tuple
@@ -26,16 +28,11 @@ abstract class Scorable {
// abstract methods
def getSource(args : Args) : Source
- def getFeaturesPipe(args : Args)(implicit mode : Mode, flowDef : FlowDef) : TypedPipe[MapFeatures]
+ def getFeaturesPipe(args : Args)(implicit mode : Mode, flowDef : FlowDef) : TypedPipe[Option[MapFeatures]]
}
object Scorable {
val MaxTitleLength = 1023
- val NoSlug = "NO SLUG" // Used for slug if title is empty or unparseable
-
- def isValidSlug(slug : String) : Boolean = {
- slug != NoSlug
- }
def jsonToMap(json : String) : Option[Map[String, Any]] = {
// https://stackoverflow.com/a/32717262/631051
diff --git a/scalding/src/main/scala/sandcrawler/ScorableFeatures.scala b/scalding/src/main/scala/sandcrawler/ScorableFeatures.scala
index 241db79..b56f102 100644
--- a/scalding/src/main/scala/sandcrawler/ScorableFeatures.scala
+++ b/scalding/src/main/scala/sandcrawler/ScorableFeatures.scala
@@ -35,9 +35,9 @@ class ScorableFeatures private(title : String, authors : List[Any] = List(), yea
JSONObject(toMap).toString
}
- def toSlug() : String = {
+ def toSlug() : Option[String] = {
if (title == null) {
- Scorable.NoSlug
+ None
} else {
val unaccented = StringUtilities.removeAccents(title)
// Remove punctuation
@@ -45,10 +45,13 @@ class ScorableFeatures private(title : String, authors : List[Any] = List(), yea
if (slug.isEmpty
|| slug == null
|| (ScorableFeatures.SlugBlacklist contains slug)
- || (slug.length < ScorableFeatures.MinSlugLength)) Scorable.NoSlug else slug
+ || (slug.length < ScorableFeatures.MinSlugLength)) None else Some(slug)
}
}
- def toMapFeatures : MapFeatures =
- MapFeatures(toSlug, toString)
+ def toMapFeatures : Option[MapFeatures] =
+ toSlug match {
+ case None => None
+ case Some(slug) => Some(MapFeatures(slug, toString))
+ }
}