aboutsummaryrefslogtreecommitdiffstats
path: root/scalding/src/main/scala
diff options
context:
space:
mode:
Diffstat (limited to 'scalding/src/main/scala')
-rw-r--r--scalding/src/main/scala/sandcrawler/CrossrefScorable.scala2
-rw-r--r--scalding/src/main/scala/sandcrawler/GrobidScorable.scala12
-rw-r--r--scalding/src/main/scala/sandcrawler/GrobidScorableDumpJob.scala62
-rw-r--r--scalding/src/main/scala/sandcrawler/HBaseColCountJob.scala37
-rw-r--r--scalding/src/main/scala/sandcrawler/HBaseStatusCodeCountJob.scala32
-rw-r--r--scalding/src/main/scala/sandcrawler/HBaseStatusCountJob.scala10
-rw-r--r--scalding/src/main/scala/sandcrawler/ScorableFeatures.scala51
-rw-r--r--scalding/src/main/scala/sandcrawler/ScoreJob.scala13
-rw-r--r--scalding/src/main/scala/sandcrawler/StringUtilities.scala2
9 files changed, 179 insertions, 42 deletions
diff --git a/scalding/src/main/scala/sandcrawler/CrossrefScorable.scala b/scalding/src/main/scala/sandcrawler/CrossrefScorable.scala
index ff8201a..5d1eaf5 100644
--- a/scalding/src/main/scala/sandcrawler/CrossrefScorable.scala
+++ b/scalding/src/main/scala/sandcrawler/CrossrefScorable.scala
@@ -35,7 +35,7 @@ object CrossrefScorable {
new MapFeatures(Scorable.NoSlug, json)
} else {
// bnewbold: not checking that titles(0) is non-null/non-empty; case would be, in JSON, "title": [ null ]
- val sf : ScorableFeatures = new ScorableFeatures(title=titles(0), doi=doi)
+ val sf : ScorableFeatures = ScorableFeatures.create(title=titles(0), doi=doi)
new MapFeatures(sf.toSlug, sf.toString)
}
} else {
diff --git a/scalding/src/main/scala/sandcrawler/GrobidScorable.scala b/scalding/src/main/scala/sandcrawler/GrobidScorable.scala
index 9a09e05..e510f75 100644
--- a/scalding/src/main/scala/sandcrawler/GrobidScorable.scala
+++ b/scalding/src/main/scala/sandcrawler/GrobidScorable.scala
@@ -24,10 +24,10 @@ class GrobidScorable extends Scorable with HBasePipeConversions {
getSource(args)
.read
// Can't just "fromBytesWritable" because we have multiple types?
- .toTypedPipe[(ImmutableBytesWritable,ImmutableBytesWritable,ImmutableBytesWritable)](new Fields("key", "tei_json", "status_code"))
- .filter { case (_, tei_json, status_code) => tei_json != null && status_code != null }
- .map { case (key, tei_json, status_code) =>
- (Bytes.toString(key.copyBytes()), Bytes.toString(tei_json.copyBytes()), Bytes.toLong(status_code.copyBytes()))
+ .toTypedPipe[(ImmutableBytesWritable,ImmutableBytesWritable,ImmutableBytesWritable)](new Fields("key", "metadata", "status_code"))
+ .filter { case (_, metadata, status_code) => metadata != null && status_code != null }
+ .map { case (key, metadata, status_code) =>
+ (Bytes.toString(key.copyBytes()), Bytes.toString(metadata.copyBytes()), Bytes.toLong(status_code.copyBytes()))
}
// TODO: Should I combine next two stages for efficiency?
.collect { case (key, json, StatusOK) => (key, json) }
@@ -37,7 +37,7 @@ class GrobidScorable extends Scorable with HBasePipeConversions {
object GrobidScorable {
def getHBaseSource(table : String, host : String) : HBaseSource = {
- HBaseBuilder.build(table, host, List("grobid0:tei_json", "grobid0:status_code"), SourceMode.SCAN_ALL)
+ HBaseBuilder.build(table, host, List("grobid0:metadata", "grobid0:status_code"), SourceMode.SCAN_ALL)
}
def jsonToMapFeatures(key : String, json : String) : MapFeatures = {
@@ -45,7 +45,7 @@ object GrobidScorable {
case None => MapFeatures(Scorable.NoSlug, json)
case Some(map) => {
if (map contains "title") {
- new ScorableFeatures(Scorable.getString(map, "title"), sha1=key).toMapFeatures
+ ScorableFeatures.create(title=Scorable.getString(map, "title"), sha1=key).toMapFeatures
} else {
MapFeatures(Scorable.NoSlug, json)
}
diff --git a/scalding/src/main/scala/sandcrawler/GrobidScorableDumpJob.scala b/scalding/src/main/scala/sandcrawler/GrobidScorableDumpJob.scala
new file mode 100644
index 0000000..468b68e
--- /dev/null
+++ b/scalding/src/main/scala/sandcrawler/GrobidScorableDumpJob.scala
@@ -0,0 +1,62 @@
+
+package sandcrawler
+
+import cascading.flow.FlowDef
+import cascading.pipe.Pipe
+import cascading.tuple.Fields
+import com.twitter.scalding._
+import com.twitter.scalding._
+import com.twitter.scalding.typed.TDsl._
+import com.twitter.scalding.typed.TDsl._
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable
+import org.apache.hadoop.hbase.util.Bytes
+import parallelai.spyglass.base.JobBase
+import parallelai.spyglass.hbase.HBaseConstants.SourceMode
+import parallelai.spyglass.hbase.HBasePipeConversions
+import parallelai.spyglass.hbase.HBaseSource
+
+class GrobidScorableDumpJob(args: Args) extends JobBase(args) {
+
+ val grobidHbaseRows = Stat("hbase-rows-scanned", "hbase-grobid-dump")
+ val filteredGrobidRows = Stat("grobid-rows-filtered", "hbase-grobid-dump")
+ val parsedGrobidRows = Stat("grobid-rows-parsed", "hbase-grobid-dump")
+ val validGrobidRows = Stat("grobid-rows-valid-slug", "hbase-grobid-dump")
+
+ val pipe = GrobidScorable.getHBaseSource(args("hbase-table"), args("zookeeper-hosts"))
+ .read
+ // Can't just "fromBytesWritable" because we have multiple types?
+ .toTypedPipe[(ImmutableBytesWritable,ImmutableBytesWritable,ImmutableBytesWritable)](new Fields("key", "metadata", "status_code"))
+ .filter { case (_, metadata, status_code) =>
+ grobidHbaseRows.inc
+ metadata != null && status_code != null
+ }
+ .map { case (key, metadata, status_code) =>
+ (Bytes.toString(key.copyBytes()), Bytes.toString(metadata.copyBytes()), Bytes.toLong(status_code.copyBytes()))
+ }
+ // TODO: Should I combine next two stages for efficiency?
+ .collect { case (key, json, 200) =>
+ filteredGrobidRows.inc
+ (key, json)
+ }
+ .map { entry : (String, String) =>
+ parsedGrobidRows.inc
+ GrobidScorable.jsonToMapFeatures(entry._1, entry._2)
+ }
+ .filter { entry => Scorable.isValidSlug(entry.slug) }
+ .map { entry =>
+ validGrobidRows.inc
+ entry
+ }
+ // XXX: this groupBy after the map?
+ .groupBy { case MapFeatures(slug, json) => slug }
+ .map { tuple =>
+ val (slug : String, features : MapFeatures) = tuple
+ (slug, ReduceFeatures(features.json))
+ }
+
+ pipe
+ .map { case (slug, features) =>
+ (slug, features.json)
+ }
+ .write(TypedTsv[(String, String)](args("output")))
+}
diff --git a/scalding/src/main/scala/sandcrawler/HBaseColCountJob.scala b/scalding/src/main/scala/sandcrawler/HBaseColCountJob.scala
new file mode 100644
index 0000000..20cc7a1
--- /dev/null
+++ b/scalding/src/main/scala/sandcrawler/HBaseColCountJob.scala
@@ -0,0 +1,37 @@
+package sandcrawler
+
+import java.util.Properties
+
+import cascading.property.AppProps
+import cascading.tuple.Fields
+import com.twitter.scalding._
+import parallelai.spyglass.base.JobBase
+import parallelai.spyglass.hbase.HBaseConstants.SourceMode
+import parallelai.spyglass.hbase.HBasePipeConversions
+import parallelai.spyglass.hbase.HBaseSource
+
+class HBaseColCountJob(args: Args) extends JobBase(args) with HBasePipeConversions {
+
+ val output = args("output")
+
+ HBaseColCountJob.getHBaseSource(
+ args("hbase-table"),
+ args("zookeeper-hosts"),
+ args("column"))
+ .read
+ .debug
+ .groupAll { _.size('count) }
+ .write(Tsv(output))
+}
+
+object HBaseColCountJob {
+
+ // eg, "wbgrp-journal-extract-0-qa",7 "mtrcs-zk1.us.archive.org:2181"
+ def getHBaseSource(hbaseTable: String, zookeeperHosts: String, col: String) : HBaseSource = {
+ HBaseBuilder.build(
+ hbaseTable,
+ zookeeperHosts,
+ List(col),
+ SourceMode.SCAN_ALL)
+ }
+}
diff --git a/scalding/src/main/scala/sandcrawler/HBaseStatusCodeCountJob.scala b/scalding/src/main/scala/sandcrawler/HBaseStatusCodeCountJob.scala
new file mode 100644
index 0000000..4d9880f
--- /dev/null
+++ b/scalding/src/main/scala/sandcrawler/HBaseStatusCodeCountJob.scala
@@ -0,0 +1,32 @@
+package sandcrawler
+
+import java.util.Properties
+
+import cascading.property.AppProps
+import cascading.tuple.Fields
+import com.twitter.scalding._
+import com.twitter.scalding.typed.TDsl._
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable
+import org.apache.hadoop.hbase.util.Bytes
+import parallelai.spyglass.base.JobBase
+import parallelai.spyglass.hbase.HBaseConstants.SourceMode
+import parallelai.spyglass.hbase.HBasePipeConversions
+import parallelai.spyglass.hbase.HBaseSource
+
+class HBaseStatusCodeCountJob(args: Args) extends JobBase(args) with HBasePipeConversions {
+
+ val source = HBaseCountJob.getHBaseSource(
+ args("hbase-table"),
+ args("zookeeper-hosts"),
+ "grobid0:status_code")
+
+ val statusPipe : TypedPipe[Long] = source
+ .read
+ .toTypedPipe[(ImmutableBytesWritable,ImmutableBytesWritable)]('key, 'status_code)
+ .map { case (key, raw_code) => Bytes.toLong(raw_code.copyBytes()) }
+
+ statusPipe.groupBy { identity }
+ .size
+ .debug
+ .write(TypedTsv[(Long,Long)](args("output")))
+}
diff --git a/scalding/src/main/scala/sandcrawler/HBaseStatusCountJob.scala b/scalding/src/main/scala/sandcrawler/HBaseStatusCountJob.scala
index fd0b4e2..f79d672 100644
--- a/scalding/src/main/scala/sandcrawler/HBaseStatusCountJob.scala
+++ b/scalding/src/main/scala/sandcrawler/HBaseStatusCountJob.scala
@@ -18,15 +18,15 @@ class HBaseStatusCountJob(args: Args) extends JobBase(args) with HBasePipeConver
val source = HBaseCountJob.getHBaseSource(
args("hbase-table"),
args("zookeeper-hosts"),
- "grobid0:status_code")
+ "grobid0:status")
- val statusPipe : TypedPipe[Long] = source
+ val statusPipe : TypedPipe[String] = source
.read
- .toTypedPipe[(ImmutableBytesWritable,ImmutableBytesWritable)]('key, 'status_code)
- .map { case (key, raw_code) => Bytes.toLong(raw_code.copyBytes()) }
+ .toTypedPipe[(ImmutableBytesWritable,ImmutableBytesWritable)]('key, 'status)
+ .map { case (key, raw_status) => Bytes.toString(raw_status.copyBytes()) }
statusPipe.groupBy { identity }
.size
.debug
- .write(TypedTsv[(Long,Long)](args("output")))
+ .write(TypedTsv[(String,Long)](args("output")))
}
diff --git a/scalding/src/main/scala/sandcrawler/ScorableFeatures.scala b/scalding/src/main/scala/sandcrawler/ScorableFeatures.scala
index 8ed3369..0b9868a 100644
--- a/scalding/src/main/scala/sandcrawler/ScorableFeatures.scala
+++ b/scalding/src/main/scala/sandcrawler/ScorableFeatures.scala
@@ -1,31 +1,35 @@
package sandcrawler
+import java.io.InputStream
+
+import scala.io.Source
import scala.util.parsing.json.JSONObject
+object ScorableFeatures {
+ // TODO: Add exception handling.
+ val fileStream : InputStream = getClass.getResourceAsStream("/slug-blacklist.txt")
+ val SlugBlacklist : Set[String] = Source.fromInputStream(fileStream).getLines.toSet
+ fileStream.close
+
+ // Static factory method
+ def create(title : String, year : Int = 0, doi : String = "", sha1 : String = "") : ScorableFeatures = {
+ new ScorableFeatures(
+ title=if (title == null) "" else title,
+ year=year,
+ doi=if (doi == null) "" else doi,
+ sha1=if (sha1 == null) "" else sha1)
+ }
+}
// Contains features needed to make slug and to score (in combination
-// with a second ScorableFeatures).
-class ScorableFeatures(title : String, year: Int = 0, doi : String = "", sha1: String = "") {
-
- val slugBlacklist = Set( "abbreviations", "abstract", "acknowledgements",
- "article", "authorreply", "authorsreply", "bookreview", "bookreviews",
- "casereport", "commentary", "commentaryon", "commenton", "commentto",
- "contents", "correspondence", "dedication", "editorialadvisoryboard",
- "focus", "hypothesis", "inbrief", "introduction", "introductiontotheissue",
- "lettertotheeditor", "listofabbreviations", "note", "overview", "preface",
- "references", "results", "review", "reviewarticle", "summary", "title",
- "name")
-
- def toMap() : Map[String, Any] = {
- Map("title" -> (if (title == null) "" else title),
- "year" -> year,
- "doi" -> (if (doi == null) "" else doi),
- "sha1" -> (if (sha1 == null) "" else sha1))
- }
+// with a second ScorableFeatures). Create with above static factory method.
+class ScorableFeatures private(title : String, year: Int = 0, doi : String = "", sha1: String = "") {
- override def toString() : String = {
- JSONObject(toMap()).toString
- }
+ def toMap() : Map[String, Any] =
+ Map("title" -> title, "year" -> year, "doi" -> doi, "sha1" -> sha1)
+
+ override def toString() : String =
+ JSONObject(toMap).toString
def toSlug() : String = {
if (title == null) {
@@ -34,11 +38,10 @@ class ScorableFeatures(title : String, year: Int = 0, doi : String = "", sha1: S
val unaccented = StringUtilities.removeAccents(title)
// Remove punctuation
val slug = StringUtilities.removePunctuation((unaccented.toLowerCase())).replaceAll("\\s", "")
- if (slug.isEmpty || slug == null || (slugBlacklist contains slug)) Scorable.NoSlug else slug
+ if (slug.isEmpty || slug == null || (ScorableFeatures.SlugBlacklist contains slug)) Scorable.NoSlug else slug
}
}
- def toMapFeatures = {
+ def toMapFeatures : MapFeatures =
MapFeatures(toSlug, toString)
- }
}
diff --git a/scalding/src/main/scala/sandcrawler/ScoreJob.scala b/scalding/src/main/scala/sandcrawler/ScoreJob.scala
index 75d45e9..28e9132 100644
--- a/scalding/src/main/scala/sandcrawler/ScoreJob.scala
+++ b/scalding/src/main/scala/sandcrawler/ScoreJob.scala
@@ -13,15 +13,18 @@ class ScoreJob(args: Args) extends JobBase(args) {
val pipe1 : TypedPipe[(String, ReduceFeatures)] = sc1.getInputPipe(args)
val pipe2 : TypedPipe[(String, ReduceFeatures)] = sc2.getInputPipe(args)
- pipe1.join(pipe2).map { entry =>
- val (slug : String, (features1 : ReduceFeatures, features2 : ReduceFeatures)) = entry
- new ReduceOutput(
+ pipe1
+ .addTrap(TypedTsv(args("output") + ".trapped"))
+ .join(pipe2)
+ .map { entry =>
+ val (slug : String, (features1 : ReduceFeatures, features2 : ReduceFeatures)) = entry
+ new ReduceOutput(
slug,
Scorable.computeSimilarity(features1, features2),
features1.json,
features2.json)
- }
- //TypedTsv doesn't work over case classes.
+ }
+ //TypedTsv doesn't work over case classes.
.map { entry => (entry.slug, entry.score, entry.json1, entry.json2) }
.write(TypedTsv[(String, Int, String, String)](args("output")))
}
diff --git a/scalding/src/main/scala/sandcrawler/StringUtilities.scala b/scalding/src/main/scala/sandcrawler/StringUtilities.scala
index 2745875..e03b60d 100644
--- a/scalding/src/main/scala/sandcrawler/StringUtilities.scala
+++ b/scalding/src/main/scala/sandcrawler/StringUtilities.scala
@@ -36,7 +36,7 @@ object StringUtilities {
// Source: https://stackoverflow.com/a/30076541/631051
def removePunctuation(s: String) : String = {
- s.replaceAll("""[\p{Punct}]""", "")
+ s.replaceAll("""[\p{Punct}’·“”‘’“”«»「」]""", "")
}
// Adapted from: https://stackoverflow.com/a/16018452/631051