aboutsummaryrefslogtreecommitdiffstats
path: root/scalding/src/main
diff options
context:
space:
mode:
Diffstat (limited to 'scalding/src/main')
-rw-r--r--scalding/src/main/resources/slug-blacklist.txt426
-rw-r--r--scalding/src/main/scala/sandcrawler/BibjsonScorable.scala50
-rw-r--r--scalding/src/main/scala/sandcrawler/CrossrefScorable.scala98
-rw-r--r--scalding/src/main/scala/sandcrawler/GrobidScorable.scala30
-rw-r--r--scalding/src/main/scala/sandcrawler/GrobidScorableDumpJob.scala2
-rw-r--r--scalding/src/main/scala/sandcrawler/MatchBenchmarkJob.scala29
-rw-r--r--scalding/src/main/scala/sandcrawler/Scorable.scala1
-rw-r--r--scalding/src/main/scala/sandcrawler/ScorableFeatures.scala17
-rw-r--r--scalding/src/main/scala/sandcrawler/ScoreJob.scala92
-rw-r--r--scalding/src/main/scala/sandcrawler/StringUtilities.scala2
10 files changed, 680 insertions, 67 deletions
diff --git a/scalding/src/main/resources/slug-blacklist.txt b/scalding/src/main/resources/slug-blacklist.txt
index 7dc701f..ad3dc1d 100644
--- a/scalding/src/main/resources/slug-blacklist.txt
+++ b/scalding/src/main/resources/slug-blacklist.txt
@@ -1,34 +1,458 @@
abbreviations
+abbreviationsandacronyms
+aboutauthors
+abouttheauthor
+abouttheauthors
+aboutthecover
+abouttheeditors
+abreviations
abstract
+abstractnotsubmittedforonlinepublication
+abstracts
+abstractsofaapaposterandpodiumpresentations
+abstractsofcommunications
+abstractsofthesesfromthescandinaviancountries
+abstractwithdrawn
+acknowledgement
acknowledgements
+acknowledgementsvii
+acknowledgementtoreferees
+acknowledgment
+acknowledgmentofreferees
+acknowledgments
+addendum
+additionalresources
+address
+advertisersindex
+affect
+affiliation
+agenda
+agradecimientos
+aimsandscope
+annexa
+annualacknowledgementofmanuscriptreviewers
+appendices
+appendix1
+appendixa
+appendixb
+appointmentsandstaffchanges
+approximation
+apresentacao
article
+articles
+articlesofsignificantinterestselectedfromthisissuebytheeditors
+associationnews
+ataglance
+atribute
+authorguidelines
+authorindex
+authorindexforvolume81
authorreply
+authors
authorsreply
+authorsresponse
+avantpropos
+award
+awardsappointmentsannouncements
+backcover
+background
+bibliografia
+bibliography
+bigdata
+blankpage
+blood
+boardoftrustees
+bookofabstracts
bookreview
bookreviews
+bookreviewsandnotices
+bookreviewssection
+booksreceived
+buchbesprechungen
+bulletin
+calendar
+calendarofevents
+calendarofmeetings
+callforarticles
+callforpapers
casereport
+casereports
+chairmansopeningremarks
+changes
+chaos
+chapter1
+chapter10
+chapter1introduction
+chapter7
+chapteri
+chapterone
+chapteroneintroduction
+chaptertwo
+citation
+classes
+classified
+classifieds
+collaborateurs
+comment
+commentaries
commentary
commentaryon
commenton
+comments
commentto
+committee
+communication
+communications
+communicationstotheeditor
+communiquedepresse
+community
+components
+comptesrendus
+computerscience
+conclusion
+conclusions
+conferencereport
+congratulations
+congresscalendar
+conservation
+content
contents
+context
+continuingeducation
+continuingmedicaleducation
+contributors
+copyright
+copyrightform
+copyrightnotice
+correction
+corrections
correspondence
+corrigenda
+corrigendum
+councilminutes
+cover
+coverimage
+currentresearch
+curriculumvitae
+danksagung
+dearreaders
+decisionmaking
dedication
+dedicatoria
+definition
+description
+discussion
+distribution
+documents
+ear
+editorial
editorialadvisoryboard
+editorialboard
+editorialcomment
+editorialcomments
+editorialconsultants
+editoriale
+editorialeditorial
+editorialinformation
+editorialintroduction
+editorialnote
+editorials
+editorialsoftwaresurveysection
+editorialstaff
+editorialstatement
+editorinchief
+editors
+editorschoice
+editorscomment
+editorscorner
+editorscorrespondence
+editorsintroduction
+editorsnote
+editorspicks
+editorspreface
+education
+einfuhrung
+einleitung
+electrophoresis
+employment
+endnotes
+entrevista
+entscheidungsverzeichnis
+epilogue
+equipment
+errata
+erratum
+essay
+essays
+executivesummary
+exercises
+extendedabstracts
+feature
+features
+fichatecnica
+figure3
+finalexam
+finalreport
focus
+foreward
+foreword
+forthcomingarticles
+forthcomingevents
+fortherecord
+forum
+frequentlyaskedquestions
+fromtheeditor
+fromtheeditors
+fromthepresident
+frontmatter
+furtherreadings
+genealogy
+generaldiscussion
+generalinformation
+generalintroduction
+germany
+glosario
+glossary
+glossaryofterms
+guesteditorial
+guideforauthors
+guidelinesforcontributors
+health
+heartfailure
+highlights
+highlightsfromthisissue
+history
+home
+homework
hypothesis
+iii
+importantnotice
+impressum
inbrief
+index
+indexofauthors
+indexofauthorsandtitles
+indice
+indicegeneral
+informationforauthors
+informationtoauthors
+inhalt
+inhaltsverzeichnis
+inmemoriam
+inreply
+insidethisissue
+institutenews
+instructionsforauthors
+instructionstoauthors
+interview
+inthestudy
+inthisissue
+introduccion
introduction
+introductiongenerale
introductiontotheissue
+introductorycomments
+inventions
+invitedcommentary
+issuesandevents
+jobdescription
+keywords
+languageteaching
+lecture
+letter
+letterfromtheeditor
+letters
+letterstotheeditor
lettertotheeditor
+lettertotheeditors
+linearalgebra
+linearregression
+links
+listedestableaux
listofabbreviations
+listofcontributors
+listoffigures
+listofparticipants
+listofpublications
+listofreferees
+listofreviewers
+listoftables
+literacy
+literature
+literaturecited
+literaturrundschau
+litteraturverzeichniss
+livresrecus
+lucina
+lungcancer
+magazin
+maintenance
+materials
+materialsafetydatasheet
+materialsandmethods
+medicinalchemistry
+meetingabstracts
+meetings
+meetingsandconferences
+meetingsofinterest
+membershipapplication
+memoranda
+memorandum
+messagefromgeneralcochairs
+messagefromthechairs
+messagefromtheprogramchairs
+messagefromtheprogramcochairs
+metaanalysis
+missionstatement
+motivation
+mrsnews
+name
+newbooks
+newlyelectedmembersofthecollege
+newproducts
+news
+newsandnotes
+newsandreviews
+newsandviews
+newsviews
+noii
note
+notes
+notesandcomments
+notesandnews
+notesforcontributors
+notesoncontributors
+notice
+noticeboard
+notitle
+notitleavailable
+obituaries
+obituary
+online
+openaccess
+oralabstracts
+oralpresentations
+organizingcommittee
+originalarticle
+originalarticles
+other
+outline
overview
+panorama
+papers
+paperstoappearinforthcomingissues
+partone
+personalandmiscellaneous
+perspective
+perspectives
+place
+positionsavailable
+poster
+posterpresentations
+postscript
preface
+preliminarymaterial
+presentacio
+presentacion
+presentation
+pressrelease
+print
+printing
+proceedings
+profile
+programcommittee
+projectmanagement
+publication
+publichealth
+publishersnote
+question
+radiology
+readersforum
+recensions
+recentpublications
+redaktorensforord
references
+referenciasbibliograficas
+regression
+rehabilitation
+rejoinder
+remerciements
+reply
+replybyauthors
+researchresearchers
+resenas
+resources
+response
+responsetothelettertotheeditor
results
+resume
+resumen
+resumes
+resumo
review
reviewarticle
+revieweracknowledgement
+revieweracknowledgement2013
+reviewers
+reviewessay
+reviews
+reviewsanddescriptionsoftablesandbooks
+rezension
+safety
+section
+security
+selectedbibliography
+shortcommunication
+shorternotices
+socialengineering
+sommaire
+sommario
+specialsection
+specifications
+subjectindex
+subscriptions
+suggestedreadings
+sumario
+summaries
+summariesofkeyjournalarticles
summary
+summaryofproceedings
+summer
+sun
+supplementarymaterial
+symposium
+symptom
+synthese
+tabledesmatieres
+tableofcontents
+tableofcontentsandprologue
+technicalreport
+theauthors
+thebasics
+theeditorsdesk
+thefirstauthorreplies
+thelancet
+theoreticalbackground
+thetimes
+theworldbank
+theyearinreview
+thismonthin
+timemanagement
+titeleiinhaltsverzeichnis
title
-name
+titlepage
+titlepagei
+tocorrespondents
+totheeditor
+unitedkingdom
+unitednations
+unitedstates
+upcomingevents
+vorwort
+website
+welcome
+whatshappening
+whatsnew
+workscited
+yourquestionsanswered
+zusammenfassung
diff --git a/scalding/src/main/scala/sandcrawler/BibjsonScorable.scala b/scalding/src/main/scala/sandcrawler/BibjsonScorable.scala
new file mode 100644
index 0000000..cdd598f
--- /dev/null
+++ b/scalding/src/main/scala/sandcrawler/BibjsonScorable.scala
@@ -0,0 +1,50 @@
+package sandcrawler
+
+import scala.math
+import scala.util.parsing.json.JSON
+import scala.util.parsing.json.JSONObject
+
+import cascading.flow.FlowDef
+import cascading.tuple.Fields
+import com.twitter.scalding._
+import com.twitter.scalding.typed.TDsl._
+// XXX: import parallelai.spyglass.hbase.HBasePipeConversions
+
+// XXX: class BibjsonScorable extends Scorable with HBasePipeConversions {
+
+class BibjsonScorable extends Scorable {
+
+ def getSource(args : Args) : Source = {
+ TextLine(args("bibjson-input"))
+ }
+
+ def getFeaturesPipe(args : Args)(implicit mode : Mode, flowDef : FlowDef) : TypedPipe[MapFeatures] = {
+ getSource(args).read
+ .toTypedPipe[String](new Fields("line"))
+ .map { BibjsonScorable.bibjsonToMapFeatures(_) }
+ }
+}
+
+object BibjsonScorable {
+ def bibjsonToMapFeatures(json : String) : MapFeatures = {
+ Scorable.jsonToMap(json) match {
+ case None => MapFeatures(Scorable.NoSlug, json)
+ case Some(map) => {
+ if (map contains "title") {
+ val title = Scorable.getString(map, "title")
+ val doi = Scorable.getString(map, "doi")
+ val sha1 = Scorable.getString(map, "sha")
+ // TODO: year, authors (if available)
+ if (title == null || title.isEmpty) {
+ new MapFeatures(Scorable.NoSlug, json)
+ } else {
+ val sf : ScorableFeatures = ScorableFeatures.create(title=title, doi=doi, sha1=sha1)
+ new MapFeatures(sf.toSlug, sf.toString)
+ }
+ } else {
+ new MapFeatures(Scorable.NoSlug, json)
+ }
+ }
+ }
+ }
+}
diff --git a/scalding/src/main/scala/sandcrawler/CrossrefScorable.scala b/scalding/src/main/scala/sandcrawler/CrossrefScorable.scala
index 5d1eaf5..039fa85 100644
--- a/scalding/src/main/scala/sandcrawler/CrossrefScorable.scala
+++ b/scalding/src/main/scala/sandcrawler/CrossrefScorable.scala
@@ -2,6 +2,7 @@ package sandcrawler
import scala.math
import scala.util.parsing.json.JSON
+import scala.util.parsing.json.JSONArray
import scala.util.parsing.json.JSONObject
import cascading.flow.FlowDef
@@ -19,29 +20,100 @@ class CrossrefScorable extends Scorable with HBasePipeConversions {
def getFeaturesPipe(args : Args)(implicit mode : Mode, flowDef : FlowDef) : TypedPipe[MapFeatures] = {
getSource(args).read
.toTypedPipe[String](new Fields("line"))
+ .filter { CrossrefScorable.keepRecord(_) }
.map { CrossrefScorable.jsonToMapFeatures(_) }
}
}
object CrossrefScorable {
+
+ val ContentTypeWhitelist: Set[String] = Set(
+ "book",
+ "book-chapter",
+ "dataset",
+ "dissertation",
+ "journal-article",
+ "letter",
+ "monograph",
+ "posted-content",
+ "pre-print",
+ "proceedings-article",
+ "report",
+ "working-paper")
+
+ def keepRecord(json : String) : Boolean = {
+ Scorable.jsonToMap(json) match {
+ case None => false
+ case Some(map) => {
+ mapToTitle(map) match {
+ case None => false
+ case Some(title) => title.length <= Scorable.MaxTitleLength
+ }
+ }
+ }
+ }
+
+ // Returns None if title is null, empty, or too long.
+ def mapToTitle(map : Map[String, Any]) : Option[String] = {
+ if (map contains "title") {
+ val titles = map("title").asInstanceOf[List[String]]
+ if (titles.isEmpty || titles == null) {
+ None
+ } else {
+ val title = titles(0)
+ if (title == null || title.isEmpty || title.length > Scorable.MaxTitleLength) None else Some(title)
+ }
+ } else {
+ None
+ }
+ }
+
+ def mapToAuthorList(map : Map[String, Any]) : List[String] = {
+ if (map contains "author") {
+ val objArray = map("author").asInstanceOf[List[Any]].map(e => e.asInstanceOf[Map[String,Any]])
+ // TODO(bnewbold): combine given and family names?
+ objArray
+ .filter(e => e contains "family")
+ .map(e => e.get("family").get.asInstanceOf[String])
+ } else {
+ List()
+ }
+ }
+
+ def mapToYear(map : Map[String, Any]) : Option[Int] = {
+ map.get("created") match {
+ case None => None
+ case Some(created) => {
+ Some(created.asInstanceOf[Map[String,Any]]
+ .get("date-parts")
+ .get
+ .asInstanceOf[List[Any]](0)
+ .asInstanceOf[List[Any]](0)
+ .asInstanceOf[Double]
+ .toInt)
+ }
+ }
+ }
+
def jsonToMapFeatures(json : String) : MapFeatures = {
Scorable.jsonToMap(json) match {
case None => MapFeatures(Scorable.NoSlug, json)
- case Some(map) => {
- if ((map contains "title") && (map contains "DOI")) {
- val titles = map("title").asInstanceOf[List[String]]
- val doi = Scorable.getString(map, "DOI")
- if (titles.isEmpty || titles == null || doi.isEmpty || doi == null) {
- new MapFeatures(Scorable.NoSlug, json)
- } else {
- // bnewbold: not checking that titles(0) is non-null/non-empty; case would be, in JSON, "title": [ null ]
- val sf : ScorableFeatures = ScorableFeatures.create(title=titles(0), doi=doi)
- new MapFeatures(sf.toSlug, sf.toString)
+ case Some(map) =>
+ mapToTitle(map) match {
+ case None => MapFeatures(Scorable.NoSlug, json)
+ case Some(title) => {
+ val doi = Scorable.getString(map, "DOI")
+ val authors: List[String] = mapToAuthorList(map)
+ val year: Int = mapToYear(map).getOrElse(0)
+ val contentType: String = map.get("type").map(e => e.asInstanceOf[String]).getOrElse("MISSING-CONTENT-TYPE")
+ if (doi.isEmpty || doi == null || authors.length == 0 || !(ContentTypeWhitelist contains contentType)) {
+ MapFeatures(Scorable.NoSlug, json)
+ } else {
+ val sf : ScorableFeatures = ScorableFeatures.create(title=title, authors=authors, doi=doi.toLowerCase(), year=year)
+ MapFeatures(sf.toSlug, sf.toString)
+ }
}
- } else {
- new MapFeatures(Scorable.NoSlug, json)
}
- }
}
}
}
diff --git a/scalding/src/main/scala/sandcrawler/GrobidScorable.scala b/scalding/src/main/scala/sandcrawler/GrobidScorable.scala
index e510f75..c55cb40 100644
--- a/scalding/src/main/scala/sandcrawler/GrobidScorable.scala
+++ b/scalding/src/main/scala/sandcrawler/GrobidScorable.scala
@@ -31,11 +31,37 @@ class GrobidScorable extends Scorable with HBasePipeConversions {
}
// TODO: Should I combine next two stages for efficiency?
.collect { case (key, json, StatusOK) => (key, json) }
+ .filter { case (key, json) => GrobidScorable.keepRecord(json) }
.map { entry : (String, String) => GrobidScorable.jsonToMapFeatures(entry._1, entry._2) }
}
}
object GrobidScorable {
+ def keepRecord(json : String) : Boolean = {
+ Scorable.jsonToMap(json) match {
+ case None => false
+ case Some(map) => {
+ if (map contains "title") {
+ val title = Scorable.getString(map, "title")
+ title != null && title.length <= Scorable.MaxTitleLength
+ } else {
+ false
+ }
+ }
+ }
+ }
+
+ def mapToAuthorList(map : Map[String, Any]) : List[String] = {
+ if (map contains "authors") {
+ val objArray = map("authors").asInstanceOf[List[Any]].map(e => e.asInstanceOf[Map[String,Any]])
+ objArray
+ .filter(e => e contains "name")
+ .map(e => e.get("name").get.asInstanceOf[String])
+ } else {
+ List()
+ }
+ }
+
def getHBaseSource(table : String, host : String) : HBaseSource = {
HBaseBuilder.build(table, host, List("grobid0:metadata", "grobid0:status_code"), SourceMode.SCAN_ALL)
}
@@ -45,7 +71,9 @@ object GrobidScorable {
case None => MapFeatures(Scorable.NoSlug, json)
case Some(map) => {
if (map contains "title") {
- ScorableFeatures.create(title=Scorable.getString(map, "title"), sha1=key).toMapFeatures
+ val authors: List[String] = mapToAuthorList(map)
+ val title = Scorable.getString(map, "title")
+ ScorableFeatures.create(title=title, authors=authors, sha1=key).toMapFeatures
} else {
MapFeatures(Scorable.NoSlug, json)
}
diff --git a/scalding/src/main/scala/sandcrawler/GrobidScorableDumpJob.scala b/scalding/src/main/scala/sandcrawler/GrobidScorableDumpJob.scala
index 468b68e..f4e84fe 100644
--- a/scalding/src/main/scala/sandcrawler/GrobidScorableDumpJob.scala
+++ b/scalding/src/main/scala/sandcrawler/GrobidScorableDumpJob.scala
@@ -5,8 +5,6 @@ import cascading.flow.FlowDef
import cascading.pipe.Pipe
import cascading.tuple.Fields
import com.twitter.scalding._
-import com.twitter.scalding._
-import com.twitter.scalding.typed.TDsl._
import com.twitter.scalding.typed.TDsl._
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.util.Bytes
diff --git a/scalding/src/main/scala/sandcrawler/MatchBenchmarkJob.scala b/scalding/src/main/scala/sandcrawler/MatchBenchmarkJob.scala
new file mode 100644
index 0000000..1578258
--- /dev/null
+++ b/scalding/src/main/scala/sandcrawler/MatchBenchmarkJob.scala
@@ -0,0 +1,29 @@
+package sandcrawler
+
+import cascading.pipe.Pipe
+import com.twitter.scalding.Args
+import com.twitter.scalding.TypedPipe
+import com.twitter.scalding.TypedTsv
+import parallelai.spyglass.base.JobBase
+
+class MatchBenchmarkJob(args: Args) extends JobBase(args) {
+ // TODO: Instantiate any subclass of Scorable specified in args.
+ val sc1 : Scorable = new BibjsonScorable()
+ val sc2 : Scorable = new BibjsonScorable()
+ val leftArgs = args + ("bibjson-input" -> List(args("left-bibjson")))
+ val rightArgs = args + ("bibjson-input" -> List(args("right-bibjson")))
+ val pipe1 : TypedPipe[(String, ReduceFeatures)] = sc1.getInputPipe(leftArgs)
+ val pipe2 : TypedPipe[(String, ReduceFeatures)] = sc2.getInputPipe(rightArgs)
+
+ pipe1.join(pipe2).map { entry =>
+ val (slug : String, (features1 : ReduceFeatures, features2 : ReduceFeatures)) = entry
+ new ReduceOutput(
+ slug,
+ Scorable.computeSimilarity(features1, features2),
+ features1.json,
+ features2.json)
+ }
+ //TypedTsv doesn't work over case classes.
+ .map { entry => (entry.slug, entry.score, entry.json1, entry.json2) }
+ .write(TypedTsv[(String, Int, String, String)](args("output")))
+}
diff --git a/scalding/src/main/scala/sandcrawler/Scorable.scala b/scalding/src/main/scala/sandcrawler/Scorable.scala
index 9b9c633..5aac032 100644
--- a/scalding/src/main/scala/sandcrawler/Scorable.scala
+++ b/scalding/src/main/scala/sandcrawler/Scorable.scala
@@ -30,6 +30,7 @@ abstract class Scorable {
}
object Scorable {
+ val MaxTitleLength = 1023
val NoSlug = "NO SLUG" // Used for slug if title is empty or unparseable
def isValidSlug(slug : String) : Boolean = {
diff --git a/scalding/src/main/scala/sandcrawler/ScorableFeatures.scala b/scalding/src/main/scala/sandcrawler/ScorableFeatures.scala
index 0b9868a..241db79 100644
--- a/scalding/src/main/scala/sandcrawler/ScorableFeatures.scala
+++ b/scalding/src/main/scala/sandcrawler/ScorableFeatures.scala
@@ -3,6 +3,7 @@ package sandcrawler
import java.io.InputStream
import scala.io.Source
+import scala.util.parsing.json.JSONArray
import scala.util.parsing.json.JSONObject
object ScorableFeatures {
@@ -10,11 +11,13 @@ object ScorableFeatures {
val fileStream : InputStream = getClass.getResourceAsStream("/slug-blacklist.txt")
val SlugBlacklist : Set[String] = Source.fromInputStream(fileStream).getLines.toSet
fileStream.close
+ val MinSlugLength = 8
// Static factory method
- def create(title : String, year : Int = 0, doi : String = "", sha1 : String = "") : ScorableFeatures = {
+ def create(title : String, authors : List[Any] = List(), year : Int = 0, doi : String = "", sha1 : String = "") : ScorableFeatures = {
new ScorableFeatures(
title=if (title == null) "" else title,
+ authors=if (authors == null) List() else authors.map(a => if (a == null) "" else a),
year=year,
doi=if (doi == null) "" else doi,
sha1=if (sha1 == null) "" else sha1)
@@ -23,13 +26,14 @@ object ScorableFeatures {
// Contains features needed to make slug and to score (in combination
// with a second ScorableFeatures). Create with above static factory method.
-class ScorableFeatures private(title : String, year: Int = 0, doi : String = "", sha1: String = "") {
+class ScorableFeatures private(title : String, authors : List[Any] = List(), year: Int = 0, doi : String = "", sha1: String = "") {
def toMap() : Map[String, Any] =
- Map("title" -> title, "year" -> year, "doi" -> doi, "sha1" -> sha1)
+ Map("title" -> title, "authors" -> JSONArray(authors), "year" -> year, "doi" -> doi, "sha1" -> sha1)
- override def toString() : String =
+ override def toString() : String = {
JSONObject(toMap).toString
+ }
def toSlug() : String = {
if (title == null) {
@@ -38,7 +42,10 @@ class ScorableFeatures private(title : String, year: Int = 0, doi : String = "",
val unaccented = StringUtilities.removeAccents(title)
// Remove punctuation
val slug = StringUtilities.removePunctuation((unaccented.toLowerCase())).replaceAll("\\s", "")
- if (slug.isEmpty || slug == null || (ScorableFeatures.SlugBlacklist contains slug)) Scorable.NoSlug else slug
+ if (slug.isEmpty
+ || slug == null
+ || (ScorableFeatures.SlugBlacklist contains slug)
+ || (slug.length < ScorableFeatures.MinSlugLength)) Scorable.NoSlug else slug
}
}
diff --git a/scalding/src/main/scala/sandcrawler/ScoreJob.scala b/scalding/src/main/scala/sandcrawler/ScoreJob.scala
index 28e9132..107f504 100644
--- a/scalding/src/main/scala/sandcrawler/ScoreJob.scala
+++ b/scalding/src/main/scala/sandcrawler/ScoreJob.scala
@@ -2,59 +2,63 @@ package sandcrawler
import cascading.pipe.Pipe
import com.twitter.scalding.Args
+import com.twitter.scalding.Stat
import com.twitter.scalding.TypedPipe
import com.twitter.scalding.TypedTsv
import parallelai.spyglass.base.JobBase
class ScoreJob(args: Args) extends JobBase(args) {
- // TODO: Instantiate any subclass of Scorable specified in args.
- val sc1 : Scorable = new GrobidScorable()
- val sc2 : Scorable = new CrossrefScorable()
- val pipe1 : TypedPipe[(String, ReduceFeatures)] = sc1.getInputPipe(args)
- val pipe2 : TypedPipe[(String, ReduceFeatures)] = sc2.getInputPipe(args)
- pipe1
- .addTrap(TypedTsv(args("output") + ".trapped"))
- .join(pipe2)
- .map { entry =>
- val (slug : String, (features1 : ReduceFeatures, features2 : ReduceFeatures)) = entry
- new ReduceOutput(
- slug,
- Scorable.computeSimilarity(features1, features2),
- features1.json,
- features2.json)
- }
- //TypedTsv doesn't work over case classes.
- .map { entry => (entry.slug, entry.score, entry.json1, entry.json2) }
- .write(TypedTsv[(String, Int, String, String)](args("output")))
-}
-
-/*
-// Ugly hack to get non-String information into ScoreJob above.
-object ScoreJob {
- var scorable1 : Option[Scorable] = None
- var scorable2 : Option[Scorable] = None
+ val grobidRowCount = Stat("grobid-rows-filtered", "sandcrawler")
+ val crossrefRowCount = Stat("crossref-rows-filtered", "sandcrawler")
+ val joinedRowCount = Stat("joined-rows", "sandcrawler")
+ /* TODO:
+ val uniqueDoiCount = Stat("unique-doi-count", "sandcrawler")
+ val uniqueSha1Count = Stat("unique-sha1-count", "sandcrawler")
+ */
- def setScorable1(s : Scorable) {
- scorable1 = Some(s)
- }
-
- def getScorable1() : Scorable = {
- scorable1 match {
- case Some(s) => s
- case None => null
+ val grobidScorable : Scorable = new GrobidScorable()
+ val crossrefScorable : Scorable = new CrossrefScorable()
+ val grobidPipe : TypedPipe[(String, ReduceFeatures)] = grobidScorable
+ .getInputPipe(args)
+ .map { r =>
+ grobidRowCount.inc
+ r
+ }
+ val crossrefPipe : TypedPipe[(String, ReduceFeatures)] = crossrefScorable
+ .getInputPipe(args)
+ .map { r =>
+ crossrefRowCount.inc
+ r
}
- }
- def setScorable2(s: Scorable) {
- scorable2 = Some(s)
- }
+ val joinedPipe = grobidPipe
+ .addTrap(TypedTsv(args("output") + ".trapped"))
+ .join(crossrefPipe)
+
+ /* TODO:
+ // Reduces to count unique SHA1 and DOI
+ joinedPipe
+ .map { case (_, (grobidFeatures, _)) => grobidFeatures.sha }
+ .distinct
+ .map { _ => uniqueSha1Count.inc }
+ joinedPipe
+ .map { case (_, (_, crossrefFeatures)) => crossrefFeatures.doi }
+ .distinct
+ .map { _ => uniqueDoiCount.inc }
+ */
- def getScorable2() : Scorable = {
- scorable2 match {
- case Some(s) => s
- case None => null
+ // TypedTsv doesn't work over case classes.
+ joinedPipe
+ .map { case (slug, (grobidFeatures, crossrefFeatures)) =>
+ joinedRowCount.inc
+ //val (slug : String, (grobidFeatures: ReduceFeatures, crossrefFeatures: ReduceFeatures)) = entry
+ new ReduceOutput(
+ slug,
+ Scorable.computeSimilarity(grobidFeatures, crossrefFeatures),
+ grobidFeatures.json,
+ crossrefFeatures.json)
}
- }
+ .map { entry => (entry.slug, entry.score, entry.json1, entry.json2) }
+ .write(TypedTsv[(String, Int, String, String)](args("output")))
}
- */
diff --git a/scalding/src/main/scala/sandcrawler/StringUtilities.scala b/scalding/src/main/scala/sandcrawler/StringUtilities.scala
index 2745875..e03b60d 100644
--- a/scalding/src/main/scala/sandcrawler/StringUtilities.scala
+++ b/scalding/src/main/scala/sandcrawler/StringUtilities.scala
@@ -36,7 +36,7 @@ object StringUtilities {
// Source: https://stackoverflow.com/a/30076541/631051
def removePunctuation(s: String) : String = {
- s.replaceAll("""[\p{Punct}]""", "")
+ s.replaceAll("""[\p{Punct}’·“”‘’“”«»「」]""", "")
}
// Adapted from: https://stackoverflow.com/a/16018452/631051