aboutsummaryrefslogtreecommitdiffstats
path: root/scalding
diff options
context:
space:
mode:
Diffstat (limited to 'scalding')
-rw-r--r--scalding/README.md36
-rw-r--r--scalding/build.sbt10
-rw-r--r--scalding/scalastyle-config.xml2
-rw-r--r--scalding/scalding-debugging.md10
-rw-r--r--scalding/src/main/resources/slug-denylist.txt554
-rw-r--r--scalding/src/main/scala/sandcrawler/BibjsonScorable.scala50
-rw-r--r--scalding/src/main/scala/sandcrawler/CrossrefScorable.scala153
-rw-r--r--scalding/src/main/scala/sandcrawler/DumpFileMetaJob.scala36
-rw-r--r--scalding/src/main/scala/sandcrawler/DumpGrobidMetaInsertableJob.scala38
-rw-r--r--scalding/src/main/scala/sandcrawler/DumpGrobidStatusCodeJob.scala34
-rw-r--r--scalding/src/main/scala/sandcrawler/DumpGrobidXmlJob.scala41
-rw-r--r--scalding/src/main/scala/sandcrawler/DumpUnGrobidedJob.scala67
-rw-r--r--scalding/src/main/scala/sandcrawler/FatcatScorable.scala146
-rw-r--r--scalding/src/main/scala/sandcrawler/GrobidScorable.scala83
-rw-r--r--scalding/src/main/scala/sandcrawler/GrobidScorableDumpJob.scala59
-rw-r--r--scalding/src/main/scala/sandcrawler/GroupFatcatWorksJob.scala43
-rw-r--r--scalding/src/main/scala/sandcrawler/GroupFatcatWorksSubsetJob.scala52
-rw-r--r--scalding/src/main/scala/sandcrawler/HBaseColCountJob.scala37
-rw-r--r--scalding/src/main/scala/sandcrawler/HBaseRowCountJob.scala2
-rw-r--r--scalding/src/main/scala/sandcrawler/HBaseStatusCodeCountJob.scala32
-rw-r--r--scalding/src/main/scala/sandcrawler/HBaseStatusCountJob.scala10
-rw-r--r--scalding/src/main/scala/sandcrawler/MatchBenchmarkJob.scala30
-rw-r--r--scalding/src/main/scala/sandcrawler/MissingColumnDumpJob.scala67
-rw-r--r--scalding/src/main/scala/sandcrawler/Scorable.scala96
-rw-r--r--scalding/src/main/scala/sandcrawler/ScorableFeatures.scala63
-rw-r--r--scalding/src/main/scala/sandcrawler/ScoreInsertable.scala86
-rw-r--r--scalding/src/main/scala/sandcrawler/ScoreJob.scala48
-rw-r--r--scalding/src/main/scala/sandcrawler/StringUtilities.scala76
-rw-r--r--scalding/src/test/scala/sandcrawler/CrossrefScorableTest.scala172
-rw-r--r--scalding/src/test/scala/sandcrawler/DumpUnGrobidedJobTest.scala72
-rw-r--r--scalding/src/test/scala/sandcrawler/FatcatScorableTest.scala160
-rw-r--r--scalding/src/test/scala/sandcrawler/GrobidScorableDumpJobTest.scala124
-rw-r--r--scalding/src/test/scala/sandcrawler/GrobidScorableTest.scala122
-rw-r--r--scalding/src/test/scala/sandcrawler/HBaseBuilderTest.scala1
-rw-r--r--scalding/src/test/scala/sandcrawler/HBaseMimeCountTest.scala9
-rw-r--r--scalding/src/test/scala/sandcrawler/HBaseRowCountTest.scala11
-rw-r--r--scalding/src/test/scala/sandcrawler/HBaseStatusCodeCountTest.scala71
-rw-r--r--scalding/src/test/scala/sandcrawler/HBaseStatusCountTest.scala50
-rw-r--r--scalding/src/test/scala/sandcrawler/ScorableFeaturesTest.scala64
-rw-r--r--scalding/src/test/scala/sandcrawler/ScorableTest.scala81
-rw-r--r--scalding/src/test/scala/sandcrawler/ScoreInsertableJobTest.scala262
-rw-r--r--scalding/src/test/scala/sandcrawler/ScoreJobTest.scala248
-rw-r--r--scalding/src/test/scala/sandcrawler/StringUtilitiesTest.scala85
43 files changed, 3437 insertions, 56 deletions
diff --git a/scalding/README.md b/scalding/README.md
index 45b62d0..b09e0e8 100644
--- a/scalding/README.md
+++ b/scalding/README.md
@@ -1,12 +1,13 @@
This directory contains Hadoop map/reduce jobs written in Scala (compiled to
-the JVM) using the Scalding framework.
+the JVM) using the Scalding framework. Scalding builds on the Java Cascading
+library, which itself builds on the Java Hadoop libraries.
See the other markdown files in this directory for more background and tips.
## Dependencies
-Locally, you need to have the JVM (eg, OpenJDK 1.8), `sbt` build tool, and
-might need (exactly) Scala version 2.11.8.
+To develop locally, you need to have the JVM (eg, OpenJDK 1.8), `sbt` build
+tool, and might need (exactly) Scala version 2.11.8.
On a debian/ubuntu machine:
@@ -15,24 +16,32 @@ On a debian/ubuntu machine:
sudo apt-get update
sudo apt install scala sbt
+It's also helpful to have a local copy of the `hadoop` binary for running
+benchmarks. The `fetch_hadoop.sh` script in the top level directory will fetch
+an appropriate version.
+
## Building and Running
-Run tests:
+You can run `sbt` commands individually:
+ # run all test
sbt test
-Build a jar and upload to a cluster machine (from which to run in production):
-
+ # build a jar (also runs tests)
sbt assembly
- scp target/scala-2.11/sandcrawler-assembly-0.2.0-SNAPSHOT.jar devbox:
-Run on cluster:
+Or you can start a session and run commands within that, which is *much*
+faster:
+
+ sbt -mem 2048
+
+ sbt> test
+ sbt> assembly
+ sbt> testOnly sandcrawler.SomeTestClassName
- devbox$ touch thing.conf
- devbox$ hadoop jar sandcrawler-assembly-0.2.0-SNAPSHOT.jar \
- com.twitter.scalding.Tool sandcrawler.HBaseRowCountJob --hdfs \
- --app.conf.path thing.conf \
- --output hdfs:///user/bnewbold/spyglass_out_test
+On the cluster, you usually use the `please` script to kick off jobs. Be sure
+to build the jars first, or pass `--rebuild` to do it automatically. You need
+`hadoop` on your path for this.
## Troubleshooting
@@ -42,3 +51,4 @@ If your `sbt` task fails with this error:
try restarting `sbt` with more memory (e.g., `sbt -mem 2048`).
+See `scalding-debugging.md` or maybe `../notes/` for more.
diff --git a/scalding/build.sbt b/scalding/build.sbt
index 980418c..01f55ca 100644
--- a/scalding/build.sbt
+++ b/scalding/build.sbt
@@ -20,6 +20,13 @@ lazy val root = (project in file(".")).
scalaSourceFiles.filterNot(_.getAbsolutePath.contains(dirNameToExclude))
},
+ (scalastyleSources in Test) := {
+ // all .scala files in "src/test/scala"
+ val scalaSourceFiles = ((scalaSource in Test).value ** "*.scala").get
+ val dirNameToExclude = "/example/"
+ scalaSourceFiles.filterNot(_.getAbsolutePath.contains(dirNameToExclude))
+ },
+
name := "sandcrawler",
resolvers += "conjars.org" at "http://conjars.org/repo",
@@ -35,7 +42,7 @@ lazy val root = (project in file(".")).
libraryDependencies += "org.apache.hadoop" % "hadoop-client" % hadoopVersion,
libraryDependencies += "org.apache.hadoop" % "hadoop-mapreduce-client-jobclient" % hadoopVersion classifier "tests",
libraryDependencies += "org.apache.hbase" % "hbase-common" % hbaseVersion,
- libraryDependencies += "parallelai" % "parallelai.spyglass" % "2.11_0.17.2_cdh5.3.1",
+ libraryDependencies += "parallelai" % "parallelai.spyglass" % "2.11_0.17.2_cdh5.3.1-p1",
// cargo-culted from twitter/scalding's build.sbt
// hint via https://stackoverflow.com/questions/23280494/sbt-assembly-error-deduplicate-different-file-contents-found-in-the-following#23280952
@@ -55,4 +62,5 @@ lazy val root = (project in file(".")).
case x => (assemblyMergeStrategy in assembly).value(x)
},
+ testOptions in Test += Tests.Argument("-oF")
)
diff --git a/scalding/scalastyle-config.xml b/scalding/scalastyle-config.xml
index 86d8fca..47d0feb 100644
--- a/scalding/scalastyle-config.xml
+++ b/scalding/scalastyle-config.xml
@@ -35,7 +35,7 @@
<check level="warning" class="org.scalastyle.scalariform.SpacesAfterPlusChecker" enabled="true"></check>
<check level="warning" class="org.scalastyle.file.WhitespaceEndOfLineChecker" enabled="true"></check>
<check level="warning" class="org.scalastyle.scalariform.SpacesBeforePlusChecker" enabled="true"></check>
- <check level="warning" class="org.scalastyle.file.FileLineLengthChecker" enabled="true">
+ <check level="warning" class="org.scalastyle.file.FileLineLengthChecker" enabled="false">
<parameters>
<parameter name="maxLineLength"><![CDATA[160]]></parameter>
<parameter name="tabSize"><![CDATA[4]]></parameter>
diff --git a/scalding/scalding-debugging.md b/scalding/scalding-debugging.md
index bd9dd36..5a54742 100644
--- a/scalding/scalding-debugging.md
+++ b/scalding/scalding-debugging.md
@@ -83,3 +83,13 @@ Values of type `List[Fields]` are not printed in the expected way:
scala> allFields.length
res0: Int = 2
+
+## SpyGlass Column Selection
+
+Two equivalent ways to specify `columns`/`column_families`:
+
+ List("f", "file"),
+ List(new Fields("c"), new Fields("size", "mimetype")),
+
+ List("f", "file", "file")
+ List(new Fields("c"), new Fields("size"), new Fields("mimetype")),
diff --git a/scalding/src/main/resources/slug-denylist.txt b/scalding/src/main/resources/slug-denylist.txt
new file mode 100644
index 0000000..926dbd5
--- /dev/null
+++ b/scalding/src/main/resources/slug-denylist.txt
@@ -0,0 +1,554 @@
+abbreviations
+abbreviationsandacronyms
+aboutauthors
+abouttheauthor
+abouttheauthors
+aboutthecover
+abouttheeditors
+abreviations
+abstract
+abstractnotsubmittedforonlinepublication
+abstracts
+abstractsofaapaposterandpodiumpresentations
+abstractsofcommunications
+abstractsofthesesfromthescandinaviancountries
+abstractwithdrawn
+acknowledgement
+acknowledgements
+acknowledgementsvii
+acknowledgementtoreferees
+acknowledgementtoreviewers
+acknowledgment
+acknowledgmentofreferees
+acknowledgments
+addendum
+additionalresources
+address
+advertisersindex
+affect
+affiliation
+afterword
+agenda
+agradecimentos
+agradecimientos
+aimsandscope
+analysis
+annexa
+announcement
+announcements
+annualacknowledgementofmanuscriptreviewers
+anotefromtheeditor
+appendices
+appendix
+appendix1
+appendixa
+appendixb
+appointmentsandstaffchanges
+approximation
+apresentacao
+article
+articles
+articlesofsignificantinterestselectedfromthisissuebytheeditors
+associationnews
+ataglance
+atribute
+attention
+authorguidelines
+authorindex
+authorindexforvolume81
+authorreply
+authors
+authorsreply
+authorsresponse
+avantpropos
+award
+awardsappointmentsannouncements
+backcover
+background
+backmatter
+berichtigung
+besprechungen
+bibliografia
+bibliographie
+bibliography
+bigdata
+blankpage
+blood
+boardoftrustees
+booknotes
+booknotices
+bookofabstracts
+bookreview
+bookreviews
+bookreviewsandnotices
+bookreviewssection
+booksreceived
+buchbesprechung
+buchbesprechungen
+bulletin
+calendar
+calendarofevents
+calendarofmeetings
+callforarticles
+callforpapers
+casereport
+casereports
+casestudy
+chairmansopeningremarks
+changes
+chaos
+chapter1
+chapter10
+chapter1introduction
+chapter2
+chapter7
+chapteri
+chapterone
+chapteroneintroduction
+chaptertwo
+chapterx
+citation
+classes
+classified
+classifieds
+closingremarks
+collaborateurs
+comment
+commentaries
+commentary
+commentaryon
+commenton
+comments
+commentto
+committee
+communication
+communications
+communicationstotheeditor
+communiquedepresse
+community
+components
+comptesrendus
+computerscience
+concludingremarks
+conclusion
+conclusions
+conferencereport
+congratulations
+congresscalendar
+conservation
+content
+contents
+context
+continuingeducation
+continuingmedicaleducation
+contributors
+copyright
+copyrightform
+copyrightnotice
+correction
+corrections
+correspondence
+corrigenda
+corrigendum
+councilminutes
+cover
+coverimage
+currentresearch
+curriculumvitae
+danksagung
+dearreaders
+decisionmaking
+dedication
+dedicatoria
+definition
+description
+discussion
+diskussion
+distribution
+documents
+ear
+economics
+editorial
+editorialadvisoryboard
+editorialannouncement
+editorialboard
+editorialcomment
+editorialcomments
+editorialconsultants
+editoriale
+editorialeditorial
+editorialforeword
+editorialinformation
+editorialintroduction
+editorialintroductions
+editorialnote
+editorialnotes
+editorialpreface
+editorials
+editorialsoftwaresurveysection
+editorialstaff
+editorialstatement
+editorinchief
+editors
+editorschoice
+editorscomment
+editorscomments
+editorscorner
+editorscorrespondence
+editorsforeword
+editorsintroduction
+editorsletter
+editorsnote
+editorsnotes
+editorspage
+editorspicks
+editorspreface
+education
+einfuhrung
+einleitung
+electrophoresis
+employment
+endnotes
+entrevista
+entscheidungsverzeichnis
+epilogue
+equipment
+errata
+erratum
+essay
+essays
+executivesummary
+exercises
+expediente
+extendedabstracts
+feature
+features
+fichatecnica
+figure3
+finalexam
+finalreport
+focus
+foreward
+foreword
+forthcomingarticles
+forthcomingevents
+fortherecord
+forum
+frequentlyaskedquestions
+fromtheeditor
+fromtheeditorinchief
+fromtheeditors
+fromtheeditorsdesk
+fromthepresident
+frontmatter
+furtherreadings
+genealogy
+generaldiscussion
+generalinformation
+generalintroduction
+germany
+gettingstarted
+glosario
+glossary
+glossaryofterms
+guesteditorial
+guesteditorsforeword
+guesteditorsintroduction
+guideforauthors
+guidelinesforcontributors
+health
+heartfailure
+highlights
+highlightsfromthisissue
+highlightsofthisissue
+history
+home
+homework
+hypothesis
+iii
+imageofthemonth
+importantnotice
+impressum
+inbrief
+index
+indexofauthors
+indexofauthorsandtitles
+indice
+indicegeneral
+informationforauthors
+informationtoauthors
+inhalt
+inhaltsverzeichnis
+inleiding
+inmemoriam
+inreply
+inresponse
+insidethisissue
+institutenews
+instructionsforauthors
+instructionstoauthors
+interview
+inthestudy
+inthisissue
+introducao
+introduccion
+introduction
+introductionandoverview
+introductiongenerale
+introductiontotheissue
+introductiontothespecialissue
+introductorycomments
+introductoryremarks
+introduzione
+inventions
+invitedcommentary
+issuesandevents
+jobdescription
+journalclub
+journalscan
+keywords
+kurzkommentiert
+languageteaching
+lecture
+letter
+letterfromtheeditor
+letterfromtheeditorinchief
+letterfromtheeditors
+letterfromthepresident
+letters
+letterstotheeditor
+letterstotheeditors
+lettertotheeditor
+lettertotheeditors
+liminaire
+linearalgebra
+linearregression
+links
+listedestableaux
+listofabbreviations
+listofcontributors
+listoffigures
+listofparticipants
+listofpublications
+listofreferees
+listofreviewers
+listoftables
+literacy
+literatur
+literature
+literaturecited
+literaturereview
+literaturrundschau
+literaturverzeichnis
+litteraturverzeichniss
+livresrecus
+lucina
+lungcancer
+magazin
+maintenance
+materials
+materialsafetydatasheet
+materialsandmethods
+medicinalchemistry
+meetingabstracts
+meetingreport
+meetings
+meetingsandconferences
+meetingsofinterest
+membershipapplication
+memoranda
+memorandum
+messagefromgeneralcochairs
+messagefromthechairs
+messagefromtheeditor
+messagefromtheeditorinchief
+messagefromthepresident
+messagefromtheprogramchairs
+messagefromtheprogramcochairs
+metaanalysis
+miscellanea
+miscellaneous
+miscellany
+missionstatement
+motivation
+mrsnews
+name
+newbooks
+newlyelectedmembersofthecollege
+newproducts
+news
+newsandnotes
+newsandreviews
+newsandviews
+newsbriefs
+newsinbrief
+newsnotes
+newsviews
+noii
+note
+notefromtheeditor
+notes
+notesandcomments
+notesandnews
+notesdelecture
+notesforcontributors
+notesoncontributors
+notice
+noticeboard
+notitle
+notitleavailable
+obituaries
+obituary
+online
+openaccess
+openingaddress
+openingremarks
+oralabstracts
+oralpresentations
+organizingcommittee
+originalarticle
+originalarticles
+other
+outline
+overview
+panorama
+papers
+paperstoappearinforthcomingissues
+partone
+personalandmiscellaneous
+perspective
+perspectives
+philosophy
+pictureofthemonth
+place
+pointofview
+positionsavailable
+poster
+posterpresentations
+postscript
+preface
+prefaceandacknowledgements
+prefacetothesecondedition
+preliminarymaterial
+presentacio
+presentacion
+presentation
+presidentialaddress
+presidentsmessage
+presidentsreport
+pressrelease
+print
+printing
+proceedings
+proceedingsofthenationalacademyofsciences
+profile
+programcommittee
+projectmanagement
+prologue
+publication
+publichealth
+publishersnote
+question
+questionsandanswers
+radiology
+readersforum
+recensiones
+recensions
+recentpublications
+redaktorensforord
+referate
+references
+referenciasbibliograficas
+regression
+rehabilitation
+rejoinder
+remerciements
+reply
+replybyauthors
+researchresearchers
+resenas
+resources
+response
+responsetothelettertotheeditor
+results
+resume
+resumen
+resumes
+resumo
+retraction
+review
+reviewarticle
+revieweracknowledgement
+revieweracknowledgement2013
+reviewers
+reviewessay
+reviews
+reviewsanddescriptionsoftablesandbooks
+reviewsofbooks
+rezension
+rezensionen
+safety
+section
+security
+selectedbibliography
+shortcommunication
+shorternotices
+shortnotices
+socialengineering
+sociology
+sommaire
+sommario
+specialreport
+specialsection
+specifications
+spistresci
+subjectindex
+subscriptions
+suggestedreadings
+sumario
+summaries
+summariesofkeyjournalarticles
+summary
+summaryofproceedings
+summer
+sun
+supplementarymaterial
+symposium
+symptom
+synthese
+tabledesmatieres
+tableofcontents
+tableofcontentsandprologue
+technicalreport
+theauthors
+theauthorsreply
+thebasics
+theeditorsdesk
+thefirstauthorreplies
+thelancet
+theoreticalbackground
+thetimes
+theworldbank
+theyearinreview
+thismonthin
+thismonthinthejournal
+timemanagement
+titeleiinhaltsverzeichnis
+title
+titlepage
+titlepagei
+tocorrespondents
+totheeditor
+unitedkingdom
+unitednations
+unitedstates
+upcomingevents
+vorwort
+website
+welcome
+whatshappening
+whatsnew
+workscited
+yourquestionsanswered
+zudiesemheft
+zusammenfassung
diff --git a/scalding/src/main/scala/sandcrawler/BibjsonScorable.scala b/scalding/src/main/scala/sandcrawler/BibjsonScorable.scala
new file mode 100644
index 0000000..abf9220
--- /dev/null
+++ b/scalding/src/main/scala/sandcrawler/BibjsonScorable.scala
@@ -0,0 +1,50 @@
+package sandcrawler
+
+import scala.math
+import scala.util.parsing.json.JSON
+import scala.util.parsing.json.JSONObject
+
+import cascading.flow.FlowDef
+import cascading.tuple.Fields
+import com.twitter.scalding._
+import com.twitter.scalding.typed.TDsl._
+
+class BibjsonScorable extends Scorable {
+
+ def getSource(args : Args) : Source = {
+ TextLine(args("bibjson-input"))
+ }
+
+ def getFeaturesPipe(args : Args)(implicit mode : Mode, flowDef : FlowDef) : TypedPipe[Option[MapFeatures]] = {
+ getSource(args).read
+ .toTypedPipe[String](new Fields("line"))
+ .map { BibjsonScorable.bibjsonToMapFeatures(_) }
+ }
+}
+
+object BibjsonScorable {
+ def bibjsonToMapFeatures(json : String) : Option[MapFeatures] = {
+ Scorable.jsonToMap(json) match {
+ case None => None
+ case Some(map) => {
+ if (map contains "title") {
+ val title = Scorable.getString(map, "title")
+ val doi = Scorable.getString(map, "doi")
+ val sha1 = Scorable.getString(map, "sha")
+ // TODO: year, authors (if available)
+ if (title == null || title.isEmpty) {
+ None
+ } else {
+ val sf : ScorableFeatures = ScorableFeatures.create(title=title, doi=doi, sha1=sha1)
+ sf.toSlug match {
+ case None => None
+ case Some(slug) => Some(MapFeatures(slug, sf.toString))
+ }
+ }
+ } else {
+ None
+ }
+ }
+ }
+ }
+}
diff --git a/scalding/src/main/scala/sandcrawler/CrossrefScorable.scala b/scalding/src/main/scala/sandcrawler/CrossrefScorable.scala
new file mode 100644
index 0000000..bb6413f
--- /dev/null
+++ b/scalding/src/main/scala/sandcrawler/CrossrefScorable.scala
@@ -0,0 +1,153 @@
+package sandcrawler
+
+import scala.math
+import scala.util.parsing.json.JSON
+import scala.util.parsing.json.JSONArray
+import scala.util.parsing.json.JSONObject
+
+import cascading.flow.FlowDef
+import cascading.tuple.Fields
+import com.twitter.scalding._
+import com.twitter.scalding.typed.TDsl._
+import parallelai.spyglass.hbase.HBasePipeConversions
+
+class CrossrefScorable extends Scorable with HBasePipeConversions {
+ // TODO: Generalize args so there can be multiple Crossref pipes in one job.
+ def getSource(args : Args) : Source = {
+ TextLine(args("crossref-input"))
+ }
+
+ def getFeaturesPipe(args : Args)(implicit mode : Mode, flowDef : FlowDef) : TypedPipe[Option[MapFeatures]] = {
+ getSource(args).read
+ .toTypedPipe[String](new Fields("line"))
+ .filter { CrossrefScorable.keepRecord(_) }
+ .map { CrossrefScorable.jsonToMapFeatures(_) }
+ }
+}
+
+object CrossrefScorable {
+
+ val ContentTypeWhitelist: Set[String] = Set(
+ "book",
+ "book-chapter",
+ "dataset",
+ "dissertation",
+ "journal-article",
+ "letter",
+ "monograph",
+ "posted-content",
+ "pre-print",
+ "proceedings-article",
+ "report",
+ "working-paper")
+
+ def keepRecord(json : String) : Boolean = {
+ Scorable.jsonToMap(json) match {
+ case None => false
+ case Some(map) => {
+ mapToTitle(map) match {
+ case None => false
+ case Some(title) => title.length <= Scorable.MaxTitleLength
+ }
+ }
+ }
+ }
+
+ // Returns None if title is null, empty, or too long.
+ def mapToTitle(map : Map[String, Any]) : Option[String] = {
+ def getTitle : Option[String] = {
+ if (map contains "title") {
+ val titles = map("title").asInstanceOf[List[String]]
+ if (titles.isEmpty || titles == null) None else Some(titles(0))
+ } else {
+ None
+ }
+ }
+
+ def getSubtitle : Option[String] = {
+ if (map contains "subtitle") {
+ val subtitles = map("subtitle").asInstanceOf[List[String]]
+ if (subtitles.isEmpty || subtitles == null) {
+ None
+ } else {
+ val sub = subtitles(0)
+ if (sub == null || sub.isEmpty) {
+ None
+ } else {
+ Some(sub)
+ }
+ }
+ } else {
+ None
+ }
+ }
+
+ getTitle match {
+ case None => None
+ case Some(baseTitle) => {
+ if (baseTitle == null) {
+ None
+ } else {
+ getSubtitle match {
+ case None => Some(baseTitle)
+ case Some(baseSubtitle) => Some(baseTitle.concat(":".concat(baseSubtitle)))
+ }
+ }
+ }
+ }
+ }
+
+ def mapToAuthorList(map : Map[String, Any]) : List[String] = {
+ if (map contains "author") {
+ val objArray = map("author").asInstanceOf[List[Any]].map(e => e.asInstanceOf[Map[String,Any]])
+ // TODO(bnewbold): combine given and family names?
+ objArray
+ .filter(e => e contains "family")
+ .map(e => e.get("family").get.asInstanceOf[String])
+ } else {
+ List()
+ }
+ }
+
+ def mapToYear(map : Map[String, Any]) : Option[Int] = {
+ map.get("created") match {
+ case None => None
+ case Some(created) => {
+ Some(created.asInstanceOf[Map[String,Any]]
+ .get("date-parts")
+ .get
+ .asInstanceOf[List[Any]](0)
+ .asInstanceOf[List[Any]](0)
+ .asInstanceOf[Double]
+ .toInt)
+ }
+ }
+ }
+
+ def jsonToMapFeatures(json : String) : Option[MapFeatures] = {
+ def makeMapFeatures(title : String, doi : String, authors : List[String], year : Int, contentType : String) : Option[MapFeatures] = {
+ if (doi.isEmpty || doi == null || authors.length == 0 || !(ContentTypeWhitelist contains contentType)) {
+ None
+ } else {
+ val sf : ScorableFeatures = ScorableFeatures.create(title=title, authors=authors, doi=doi.toLowerCase(), year=year)
+ sf.toSlug match {
+ case None => None
+ case Some(slug) => Some(MapFeatures(slug, sf.toString))
+ }
+ }
+ }
+ Scorable.jsonToMap(json) match {
+ case None => None
+ case Some(map) =>
+ mapToTitle(map) match {
+ case None => None
+ case Some(title) => makeMapFeatures(
+ title=title,
+ doi=Scorable.getString(map, "DOI"),
+ authors=mapToAuthorList(map),
+ year=mapToYear(map).getOrElse(0),
+ contentType=map.get("type").map(e => e.asInstanceOf[String]).getOrElse("MISSING-CONTENT-TYPE"))
+ }
+ }
+ }
+}
diff --git a/scalding/src/main/scala/sandcrawler/DumpFileMetaJob.scala b/scalding/src/main/scala/sandcrawler/DumpFileMetaJob.scala
new file mode 100644
index 0000000..b3734f0
--- /dev/null
+++ b/scalding/src/main/scala/sandcrawler/DumpFileMetaJob.scala
@@ -0,0 +1,36 @@
+package sandcrawler
+
+import java.util.Properties
+
+import cascading.property.AppProps
+import cascading.tuple.Fields
+import com.twitter.scalding._
+import com.twitter.scalding.typed.TDsl._
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable
+import org.apache.hadoop.hbase.util.Bytes
+import parallelai.spyglass.base.JobBase
+import parallelai.spyglass.hbase.HBaseConstants.SourceMode
+import parallelai.spyglass.hbase.HBasePipeConversions
+import parallelai.spyglass.hbase.HBaseSource
+
+// Dumps all the info needed to insert a file entity in Fatcat. Useful for
+// joining.
+class DumpFileMetaJob(args: Args) extends JobBase(args) with HBasePipeConversions {
+
+ val metaPipe : TypedPipe[(String, String, String, Long)] = HBaseBuilder.build(args("hbase-table"),
+ args("zookeeper-hosts"),
+ List("file:cdx", "file:mime", "file:size"),
+ SourceMode.SCAN_ALL)
+ .read
+ .toTypedPipe[(ImmutableBytesWritable,ImmutableBytesWritable,ImmutableBytesWritable,ImmutableBytesWritable)](new Fields("key", "cdx", "mime", "size"))
+ .filter { case (_, cdx, mime, size) => cdx != null && mime != null && size != null }
+ .map { case (key, cdx, mime, size) =>
+ (Bytes.toString(key.copyBytes()),
+ Bytes.toString(cdx.copyBytes()),
+ Bytes.toString(mime.copyBytes()),
+ Bytes.toLong(size.copyBytes()))
+ };
+
+ metaPipe.write(TypedTsv[(String,String,String,Long)](args("output")))
+
+}
diff --git a/scalding/src/main/scala/sandcrawler/DumpGrobidMetaInsertableJob.scala b/scalding/src/main/scala/sandcrawler/DumpGrobidMetaInsertableJob.scala
new file mode 100644
index 0000000..ee2b7c2
--- /dev/null
+++ b/scalding/src/main/scala/sandcrawler/DumpGrobidMetaInsertableJob.scala
@@ -0,0 +1,38 @@
+package sandcrawler
+
+import java.util.Properties
+
+import cascading.property.AppProps
+import cascading.tuple.Fields
+import com.twitter.scalding._
+import com.twitter.scalding.typed.TDsl._
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable
+import org.apache.hadoop.hbase.util.Bytes
+import parallelai.spyglass.base.JobBase
+import parallelai.spyglass.hbase.HBaseConstants.SourceMode
+import parallelai.spyglass.hbase.HBasePipeConversions
+import parallelai.spyglass.hbase.HBaseSource
+
+// Dumps the SHA1 key and grobid0:metadata columns, plus file metadata needed
+// to insert into fatcat. Used, eg, as part of long-tail mellon pipeline.
+class DumpGrobidMetaInsertableJob(args: Args) extends JobBase(args) with HBasePipeConversions {
+
+ val metaPipe : TypedPipe[(String, String, String, Long, String)] = HBaseBuilder.build(args("hbase-table"),
+ args("zookeeper-hosts"),
+ List("file:cdx", "file:mime", "file:size", "grobid0:metadata"),
+ SourceMode.SCAN_ALL)
+ .read
+ .toTypedPipe[(ImmutableBytesWritable,ImmutableBytesWritable,ImmutableBytesWritable,ImmutableBytesWritable,ImmutableBytesWritable)](new Fields("key", "cdx", "mime", "size", "metadata"))
+ .filter { case (_, cdx, mime, size, metadata) => cdx != null && mime != null && size != null && metadata != null }
+ .map { case (key, cdx, mime, size, metadata) =>
+ (Bytes.toString(key.copyBytes()),
+ Bytes.toString(cdx.copyBytes()),
+ Bytes.toString(mime.copyBytes()),
+ Bytes.toLong(size.copyBytes()),
+ Bytes.toString(metadata.copyBytes())
+ )
+ };
+
+ metaPipe.write(TypedTsv[(String,String,String,Long,String)](args("output")))
+
+}
diff --git a/scalding/src/main/scala/sandcrawler/DumpGrobidStatusCodeJob.scala b/scalding/src/main/scala/sandcrawler/DumpGrobidStatusCodeJob.scala
new file mode 100644
index 0000000..42b3464
--- /dev/null
+++ b/scalding/src/main/scala/sandcrawler/DumpGrobidStatusCodeJob.scala
@@ -0,0 +1,34 @@
+package sandcrawler
+
+import java.util.Properties
+
+import cascading.property.AppProps
+import cascading.tuple.Fields
+import com.twitter.scalding._
+import com.twitter.scalding.typed.TDsl._
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable
+import org.apache.hadoop.hbase.util.Bytes
+import parallelai.spyglass.base.JobBase
+import parallelai.spyglass.hbase.HBaseConstants.SourceMode
+import parallelai.spyglass.hbase.HBasePipeConversions
+import parallelai.spyglass.hbase.HBaseSource
+
+// Dumps status code for each GROBID-processed file. Good for crawl/corpus
+// analytics, if we consider GROBID status a rough "is this a paper" metric.
+class DumpGrobidStatusCodeJob(args: Args) extends JobBase(args) with HBasePipeConversions {
+
+ val metaPipe : TypedPipe[(String, Long)] = HBaseBuilder.build(args("hbase-table"),
+ args("zookeeper-hosts"),
+ List("grobid0:status_code"),
+ SourceMode.SCAN_ALL)
+ .read
+ .toTypedPipe[(ImmutableBytesWritable,ImmutableBytesWritable)](new Fields("key", "status_code"))
+ .filter { case (_, status_code) => status_code != null }
+ .map { case (key, status_code) =>
+ (Bytes.toString(key.copyBytes()),
+ Bytes.toLong(status_code.copyBytes()))
+ };
+
+ metaPipe.write(TypedTsv[(String,Long)](args("output")))
+
+}
diff --git a/scalding/src/main/scala/sandcrawler/DumpGrobidXmlJob.scala b/scalding/src/main/scala/sandcrawler/DumpGrobidXmlJob.scala
new file mode 100644
index 0000000..953610d
--- /dev/null
+++ b/scalding/src/main/scala/sandcrawler/DumpGrobidXmlJob.scala
@@ -0,0 +1,41 @@
+package sandcrawler
+
+import java.util.Properties
+
+import cascading.property.AppProps
+import cascading.tuple.Fields
+import com.twitter.scalding._
+import com.twitter.scalding.typed.TDsl._
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable
+import org.apache.hadoop.hbase.util.Bytes
+import parallelai.spyglass.base.JobBase
+import parallelai.spyglass.hbase.HBaseConstants.SourceMode
+import parallelai.spyglass.hbase.HBasePipeConversions
+import parallelai.spyglass.hbase.HBaseSource
+import scala.util.parsing.json.JSONObject
+
+// Dumps the SHA1 key and grobid0:tei_xml columns, as TSV/JSON (two TSV
+// columns: one is key, second is JSON). Used for partner delivery/sharing
+class DumpGrobidXmlJob(args: Args) extends JobBase(args) with HBasePipeConversions {
+
+ val metaPipe : TypedPipe[(String, String)] = HBaseBuilder.build(args("hbase-table"),
+ args("zookeeper-hosts"),
+ List("file:cdx", "grobid0:tei_xml"),
+ SourceMode.SCAN_ALL)
+ .read
+ .toTypedPipe[(ImmutableBytesWritable,ImmutableBytesWritable,ImmutableBytesWritable)](new Fields("key", "cdx", "tei_xml"))
+ .filter { case (_, cdx, tei_xml) => cdx != null && tei_xml != null }
+ .map { case (key, cdx, tei_xml) =>
+ (Bytes.toString(key.copyBytes()),
+ JSONObject(
+ Map(
+ "pdf_hash" -> Bytes.toString(key.copyBytes()),
+ "cdx_metadata" -> Bytes.toString(cdx.copyBytes()),
+ "tei_xml" -> Bytes.toString(tei_xml.copyBytes())
+ )).toString
+ )
+ };
+
+ metaPipe.write(TypedTsv[(String,String)](args("output")))
+
+}
diff --git a/scalding/src/main/scala/sandcrawler/DumpUnGrobidedJob.scala b/scalding/src/main/scala/sandcrawler/DumpUnGrobidedJob.scala
new file mode 100644
index 0000000..7fd3ce0
--- /dev/null
+++ b/scalding/src/main/scala/sandcrawler/DumpUnGrobidedJob.scala
@@ -0,0 +1,67 @@
+package sandcrawler
+
+import java.util.Properties
+
+import cascading.property.AppProps
+import cascading.tuple.Fields
+import com.twitter.scalding._
+import com.twitter.scalding.typed.TDsl._
+import parallelai.spyglass.base.JobBase
+import parallelai.spyglass.hbase.HBaseConstants.SourceMode
+import parallelai.spyglass.hbase.HBasePipeConversions
+import parallelai.spyglass.hbase.HBaseSource
+
+// Filters for HBase rows which have not had GROBID run on them, but do have
+// full CDX metadata, and dumps to a TSV for later extraction by the
+// "extraction-ungrobided" job.
+//
+// Does the same horrible join thing that DumpUnGrobidedJob does.
+class DumpUnGrobidedJob(args: Args) extends JobBase(args) with HBasePipeConversions {
+
+ val output = args("output")
+
+ val allKeys : TypedPipe[(String,String,String,String)] = DumpUnGrobidedJob.getHBaseKeySource(
+ args("hbase-table"),
+ args("zookeeper-hosts"))
+ .read
+ .fromBytesWritable('key, 'c, 'mime, 'cdx)
+ .toTypedPipe[(String,String,String,String)]('key, 'c, 'mime, 'cdx)
+
+ val existingKeys : TypedPipe[(String,Boolean)] = DumpUnGrobidedJob.getHBaseColSource(
+ args("hbase-table"),
+ args("zookeeper-hosts"))
+ .read
+ .fromBytesWritable('key)
+ .toTypedPipe[String]('key)
+ .map{ key => (key, true) }
+
+ val missingKeys : TypedPipe[(String,String,String,String)] = allKeys
+ .groupBy(_._1)
+ .leftJoin(existingKeys.groupBy(_._1))
+ .toTypedPipe
+ .collect { case (key, ((_, c, mime, cdx), None)) => (key, c, mime, cdx) }
+
+ missingKeys
+ .write(TypedTsv[(String,String,String,String)](output))
+
+}
+
+object DumpUnGrobidedJob {
+
+ // eg, "wbgrp-journal-extract-0-qa",7 "mtrcs-zk1.us.archive.org:2181"
+ def getHBaseColSource(hbaseTable: String, zookeeperHosts: String) : HBaseSource = {
+ HBaseBuilder.build(
+ hbaseTable,
+ zookeeperHosts,
+ List("grobid0:status_code"),
+ SourceMode.SCAN_ALL)
+ }
+
+ def getHBaseKeySource(hbaseTable: String, zookeeperHosts: String) : HBaseSource = {
+ HBaseBuilder.build(
+ hbaseTable,
+ zookeeperHosts,
+ List("f:c", "file:mime", "file:cdx"),
+ SourceMode.SCAN_ALL)
+ }
+}
diff --git a/scalding/src/main/scala/sandcrawler/FatcatScorable.scala b/scalding/src/main/scala/sandcrawler/FatcatScorable.scala
new file mode 100644
index 0000000..2090e84
--- /dev/null
+++ b/scalding/src/main/scala/sandcrawler/FatcatScorable.scala
@@ -0,0 +1,146 @@
+package sandcrawler
+
+import scala.math
+import scala.util.parsing.json.JSON
+import scala.util.parsing.json.JSONArray
+import scala.util.parsing.json.JSONObject
+
+import cascading.flow.FlowDef
+import cascading.tuple.Fields
+import com.twitter.scalding._
+import com.twitter.scalding.typed.TDsl._
+import parallelai.spyglass.hbase.HBasePipeConversions
+
+
+class FatcatScorableRight extends Scorable {
+
+ def getSource(args : Args) : Source = {
+ TextLine(args("fatcat-release-input-right"))
+ }
+
+ def getFeaturesPipe(args : Args)(implicit mode : Mode, flowDef : FlowDef) : TypedPipe[Option[MapFeatures]] = {
+ getSource(args).read
+ .toTypedPipe[String](new Fields("line"))
+ .filter { FatcatScorable.keepRecord(_) }
+ .map { FatcatScorable.jsonToMapFeatures(_) }
+ }
+}
+
+class FatcatScorable extends Scorable with HBasePipeConversions {
+
+ def getSource(args : Args) : Source = {
+ TextLine(args("fatcat-release-input"))
+ }
+
+ def getFeaturesPipe(args : Args)(implicit mode : Mode, flowDef : FlowDef) : TypedPipe[Option[MapFeatures]] = {
+ getSource(args).read
+ .toTypedPipe[String](new Fields("line"))
+ .filter { FatcatScorable.keepRecord(_) }
+ .map { FatcatScorable.jsonToMapFeatures(_) }
+ }
+}
+
+object FatcatScorable {
+
+ // Note; removed ReleaseType filtering
+
+ def keepRecord(json : String) : Boolean = {
+ Scorable.jsonToMap(json) match {
+ case None => false
+ case Some(map) => {
+ mapToTitle(map) match {
+ case None => false
+ case Some(title) => title.length <= Scorable.MaxTitleLength
+ }
+ }
+ }
+ }
+
+ // Returns None if title is null, empty, or too long.
+ def mapToTitle(map : Map[String, Any]) : Option[String] = {
+ def getTitle : Option[String] = {
+ if (map contains "title") {
+ val title = map("title").asInstanceOf[String]
+ if (title == null || title.isEmpty) None else Some(title)
+ } else {
+ None
+ }
+ }
+
+ def getSubtitle : Option[String] = {
+ if (map contains "subtitle") {
+ val subtitle = map("subtitle").asInstanceOf[String]
+ if (subtitle == null || subtitle.isEmpty) {
+ None
+ } else {
+ Some(subtitle)
+ }
+ } else {
+ None
+ }
+ }
+
+ getTitle match {
+ case None => None
+ case Some(baseTitle) => {
+ if (baseTitle == null) {
+ None
+ } else {
+ getSubtitle match {
+ case None => Some(baseTitle)
+ case Some(baseSubtitle) => Some(baseTitle.concat(":".concat(baseSubtitle)))
+ }
+ }
+ }
+ }
+ }
+
+ def mapToAuthorList(map : Map[String, Any]) : List[String] = {
+ if (map contains "contribs") {
+ val objArray = map("contribs").asInstanceOf[List[Any]].map(e => e.asInstanceOf[Map[String,Any]])
+ // TODO(bnewbold): better name stuff... contrib.surname, creator.surname,
+ // or raw_name split to last
+ objArray
+ .filter(e => e contains "raw_name")
+ .map(e => e.get("raw_name").get.asInstanceOf[String])
+ } else {
+ List()
+ }
+ }
+
+ def mapToYear(map : Map[String, Any]) : Option[Int] = {
+ map.get("release_year") match {
+ case None => None
+ case Some(year) => {
+ Some(year.asInstanceOf[Double].toInt)
+ }
+ }
+ }
+
+ def jsonToMapFeatures(json : String) : Option[MapFeatures] = {
+ def makeMapFeatures(title : String, doi : String, fatcat_release: String, fatcat_work : String, authors : List[String], year : Int, contentType : String) : Option[MapFeatures] = {
+ // NOTE: not doing any filtering here!
+ val sf : ScorableFeatures = ScorableFeatures.create(title=title, authors=authors, doi=doi, fatcat_release=fatcat_release, fatcat_work=fatcat_work, year=year)
+ sf.toSlug match {
+ case None => None
+ case Some(slug) => Some(MapFeatures(slug, sf.toString))
+ }
+ }
+ Scorable.jsonToMap(json) match {
+ case None => None
+ case Some(map) =>
+ mapToTitle(map) match {
+ case None => None
+ case Some(title) => makeMapFeatures(
+ title=title,
+ // TODO: doi=Scorable.getString(map, "doi"),
+ doi=null,
+ fatcat_release=Scorable.getString(map, "ident"),
+ fatcat_work=Scorable.getString(map, "work_id"),
+ authors=mapToAuthorList(map),
+ year=mapToYear(map).getOrElse(0),
+ contentType=map.get("type").map(e => e.asInstanceOf[String]).getOrElse("MISSING-CONTENT-TYPE"))
+ }
+ }
+ }
+}
diff --git a/scalding/src/main/scala/sandcrawler/GrobidScorable.scala b/scalding/src/main/scala/sandcrawler/GrobidScorable.scala
new file mode 100644
index 0000000..f4ed129
--- /dev/null
+++ b/scalding/src/main/scala/sandcrawler/GrobidScorable.scala
@@ -0,0 +1,83 @@
+package sandcrawler
+
+import scala.util.parsing.json.JSONObject
+
+import cascading.flow.FlowDef
+import cascading.tuple.Fields
+import com.twitter.scalding._
+import com.twitter.scalding.typed.TDsl._
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable
+import org.apache.hadoop.hbase.util.Bytes
+import parallelai.spyglass.hbase.HBaseConstants.SourceMode
+import parallelai.spyglass.hbase.HBasePipeConversions
+import parallelai.spyglass.hbase.HBaseSource
+
+class GrobidScorable extends Scorable with HBasePipeConversions {
+ val StatusOK = 200
+
+ def getSource(args : Args) : Source = {
+ // TODO: Generalize args so there can be multiple grobid pipes in one job
+ GrobidScorable.getHBaseSource(args("hbase-table"), args("zookeeper-hosts"))
+ }
+
+ def getFeaturesPipe(args : Args)(implicit mode : Mode, flowDef : FlowDef) : TypedPipe[Option[MapFeatures]] = {
+ getSource(args)
+ .read
+ // Can't just "fromBytesWritable" because we have multiple types
+ .toTypedPipe[(ImmutableBytesWritable,ImmutableBytesWritable,ImmutableBytesWritable)](new Fields("key", "metadata", "status_code"))
+ .filter { case (_, metadata, status_code) => metadata != null && status_code != null }
+ .map { case (key, metadata, status_code) =>
+ (Bytes.toString(key.copyBytes()), Bytes.toString(metadata.copyBytes()), Bytes.toLong(status_code.copyBytes()))
+ }
+ .collect { case (key, json, StatusOK) => (key, json) }
+ .filter { case (key, json) => GrobidScorable.keepRecord(json) }
+ .map { entry : (String, String) => GrobidScorable.jsonToMapFeatures(entry._1, entry._2) }
+ }
+}
+
+object GrobidScorable {
+ def keepRecord(json : String) : Boolean = {
+ Scorable.jsonToMap(json) match {
+ case None => false
+ case Some(map) => {
+ if (map contains "title") {
+ val title = Scorable.getString(map, "title")
+ title != null && title.length <= Scorable.MaxTitleLength
+ } else {
+ false
+ }
+ }
+ }
+ }
+
+ def mapToAuthorList(map : Map[String, Any]) : List[String] = {
+ if (map contains "authors") {
+ val objArray = map("authors").asInstanceOf[List[Any]].map(e => e.asInstanceOf[Map[String,Any]])
+ objArray
+ .filter(e => e contains "name")
+ .map(e => e.get("name").get.asInstanceOf[String])
+ } else {
+ List()
+ }
+ }
+
+ def getHBaseSource(table : String, host : String) : HBaseSource = {
+ HBaseBuilder.build(table, host, List("grobid0:metadata", "grobid0:status_code"), SourceMode.SCAN_ALL)
+ }
+
+ def jsonToMapFeatures(key : String, json : String) : Option[MapFeatures] = {
+ Scorable.jsonToMap(json) match {
+ case None => None
+ case Some(map) => {
+ if (map contains "title") {
+ val authors: List[String] = mapToAuthorList(map)
+ val title = Scorable.getString(map, "title")
+ ScorableFeatures.create(title=title, authors=authors, sha1=key).toMapFeatures
+ } else {
+ None
+ }
+ }
+ }
+ }
+}
+
diff --git a/scalding/src/main/scala/sandcrawler/GrobidScorableDumpJob.scala b/scalding/src/main/scala/sandcrawler/GrobidScorableDumpJob.scala
new file mode 100644
index 0000000..3146a6c
--- /dev/null
+++ b/scalding/src/main/scala/sandcrawler/GrobidScorableDumpJob.scala
@@ -0,0 +1,59 @@
+
+package sandcrawler
+
+import cascading.flow.FlowDef
+import cascading.pipe.Pipe
+import cascading.tuple.Fields
+import com.twitter.scalding._
+import com.twitter.scalding.typed.TDsl._
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable
+import org.apache.hadoop.hbase.util.Bytes
+import parallelai.spyglass.base.JobBase
+import parallelai.spyglass.hbase.HBaseConstants.SourceMode
+import parallelai.spyglass.hbase.HBasePipeConversions
+import parallelai.spyglass.hbase.HBaseSource
+
+class GrobidScorableDumpJob(args: Args) extends JobBase(args) {
+
+ val grobidHbaseRows = Stat("hbase-rows-scanned", "hbase-grobid-dump")
+ val filteredGrobidRows = Stat("grobid-rows-filtered", "hbase-grobid-dump")
+ val parsedGrobidRows = Stat("grobid-rows-parsed", "hbase-grobid-dump")
+ val validGrobidRows = Stat("grobid-rows-valid-slug", "hbase-grobid-dump")
+
+ val pipe = GrobidScorable.getHBaseSource(args("hbase-table"), args("zookeeper-hosts"))
+ .read
+ // Can't just "fromBytesWritable" because we have multiple types?
+ .toTypedPipe[(ImmutableBytesWritable,ImmutableBytesWritable,ImmutableBytesWritable)](new Fields("key", "metadata", "status_code"))
+ .filter { case (_, metadata, status_code) =>
+ grobidHbaseRows.inc
+ metadata != null && status_code != null
+ }
+ .map { case (key, metadata, status_code) =>
+ (Bytes.toString(key.copyBytes()), Bytes.toString(metadata.copyBytes()), Bytes.toLong(status_code.copyBytes()))
+ }
+ // TODO: Should I combine next two stages for efficiency?
+ .collect { case (key, json, 200) =>
+ filteredGrobidRows.inc
+ (key, json)
+ }
+ .map { entry : (String, String) =>
+ parsedGrobidRows.inc
+ GrobidScorable.jsonToMapFeatures(entry._1, entry._2)
+ }
+ .filterNot { entry => entry.isEmpty }
+ .map { entry => {
+ validGrobidRows.inc
+ entry.get
+ }}
+ .groupBy { case MapFeatures(slug, json) => slug }
+ .map { tuple =>
+ val (slug : String, features : MapFeatures) = tuple
+ (slug, ReduceFeatures(features.json))
+ }
+
+ pipe
+ .map { case (slug, features) =>
+ (slug, features.json)
+ }
+ .write(TypedTsv[(String, String)](args("output")))
+}
diff --git a/scalding/src/main/scala/sandcrawler/GroupFatcatWorksJob.scala b/scalding/src/main/scala/sandcrawler/GroupFatcatWorksJob.scala
new file mode 100644
index 0000000..46d2038
--- /dev/null
+++ b/scalding/src/main/scala/sandcrawler/GroupFatcatWorksJob.scala
@@ -0,0 +1,43 @@
+package sandcrawler
+
+import cascading.pipe.Pipe
+import com.twitter.scalding.Args
+import com.twitter.scalding.Stat
+import com.twitter.scalding.TypedPipe
+import com.twitter.scalding.TypedTsv
+import parallelai.spyglass.base.JobBase
+
+class GroupFatcatWorksJob(args: Args) extends JobBase(args) {
+
+ val fatcatRowCount = Stat("fatcat-rows-filtered", "sandcrawler")
+ val joinedRowCount = Stat("joined-rows", "sandcrawler")
+
+ val fatcatScorable : Scorable = new FatcatScorable()
+ val fatcatPipe : TypedPipe[(String, ReduceFeatures)] = fatcatScorable
+ .getInputPipe(args)
+ .map { r =>
+ fatcatRowCount.inc
+ r
+ }
+
+ val joinedPipe = fatcatPipe
+ .addTrap(TypedTsv(args("output") + ".trapped"))
+ .join(fatcatPipe)
+
+ // TypedTsv doesn't work over case classes.
+ joinedPipe
+ // filter out trivial self-matches (releases are identical)
+ .filter { case (slug, (fatcatFeaturesLeft, fatcatFeaturesRight)) =>
+ Scorable.selfMatchable(fatcatFeaturesLeft, fatcatFeaturesRight)
+ }
+ .map { case (slug, (fatcatFeaturesLeft, fatcatFeaturesRight)) =>
+ joinedRowCount.inc
+ new ReduceOutput(
+ slug,
+ Scorable.computeSimilarity(fatcatFeaturesLeft, fatcatFeaturesRight),
+ fatcatFeaturesLeft.json,
+ fatcatFeaturesRight.json)
+ }
+ .map { entry => (entry.slug, entry.score, entry.json1, entry.json2) }
+ .write(TypedTsv[(String, Int, String, String)](args("output")))
+}
diff --git a/scalding/src/main/scala/sandcrawler/GroupFatcatWorksSubsetJob.scala b/scalding/src/main/scala/sandcrawler/GroupFatcatWorksSubsetJob.scala
new file mode 100644
index 0000000..ea5e26b
--- /dev/null
+++ b/scalding/src/main/scala/sandcrawler/GroupFatcatWorksSubsetJob.scala
@@ -0,0 +1,52 @@
+package sandcrawler
+
+import cascading.pipe.Pipe
+import com.twitter.scalding.Args
+import com.twitter.scalding.Stat
+import com.twitter.scalding.TypedPipe
+import com.twitter.scalding.TypedTsv
+import parallelai.spyglass.base.JobBase
+
+class GroupFatcatWorksSubsetJob(args: Args) extends JobBase(args) {
+
+ val fatcatLhsRowCount = Stat("fatcat-rows-filtered-left", "sandcrawler")
+ val fatcatRhsRowCount = Stat("fatcat-rows-filtered-right", "sandcrawler")
+ val joinedRowCount = Stat("joined-rows", "sandcrawler")
+
+ val fatcatScorableLhs : Scorable = new FatcatScorable()
+ val fatcatPipeLhs : TypedPipe[(String, ReduceFeatures)] = fatcatScorableLhs
+ .getInputPipe(args)
+ .map { r =>
+ fatcatLhsRowCount.inc
+ r
+ }
+
+ val fatcatScorableRhs : Scorable = new FatcatScorableRight()
+ val fatcatPipeRhs : TypedPipe[(String, ReduceFeatures)] = fatcatScorableRhs
+ .getInputPipe(args)
+ .map { r =>
+ fatcatRhsRowCount.inc
+ r
+ }
+
+ val joinedPipe = fatcatPipeLhs
+ .addTrap(TypedTsv(args("output") + ".trapped"))
+ .join(fatcatPipeRhs)
+
+ // TypedTsv doesn't work over case classes.
+ joinedPipe
+ // filter out trivial self-matches (releases are identical)
+ .filter { case (slug, (fatcatFeaturesLeft, fatcatFeaturesRight)) =>
+ Scorable.selfMatchable(fatcatFeaturesLeft, fatcatFeaturesRight)
+ }
+ .map { case (slug, (fatcatFeaturesLeft, fatcatFeaturesRight)) =>
+ joinedRowCount.inc
+ new ReduceOutput(
+ slug,
+ Scorable.computeSimilarity(fatcatFeaturesLeft, fatcatFeaturesRight),
+ fatcatFeaturesLeft.json,
+ fatcatFeaturesRight.json)
+ }
+ .map { entry => (entry.slug, entry.score, entry.json1, entry.json2) }
+ .write(TypedTsv[(String, Int, String, String)](args("output")))
+}
diff --git a/scalding/src/main/scala/sandcrawler/HBaseColCountJob.scala b/scalding/src/main/scala/sandcrawler/HBaseColCountJob.scala
new file mode 100644
index 0000000..20cc7a1
--- /dev/null
+++ b/scalding/src/main/scala/sandcrawler/HBaseColCountJob.scala
@@ -0,0 +1,37 @@
+package sandcrawler
+
+import java.util.Properties
+
+import cascading.property.AppProps
+import cascading.tuple.Fields
+import com.twitter.scalding._
+import parallelai.spyglass.base.JobBase
+import parallelai.spyglass.hbase.HBaseConstants.SourceMode
+import parallelai.spyglass.hbase.HBasePipeConversions
+import parallelai.spyglass.hbase.HBaseSource
+
+class HBaseColCountJob(args: Args) extends JobBase(args) with HBasePipeConversions {
+
+ val output = args("output")
+
+ HBaseColCountJob.getHBaseSource(
+ args("hbase-table"),
+ args("zookeeper-hosts"),
+ args("column"))
+ .read
+ .debug
+ .groupAll { _.size('count) }
+ .write(Tsv(output))
+}
+
+object HBaseColCountJob {
+
+ // eg, "wbgrp-journal-extract-0-qa",7 "mtrcs-zk1.us.archive.org:2181"
+ def getHBaseSource(hbaseTable: String, zookeeperHosts: String, col: String) : HBaseSource = {
+ HBaseBuilder.build(
+ hbaseTable,
+ zookeeperHosts,
+ List(col),
+ SourceMode.SCAN_ALL)
+ }
+}
diff --git a/scalding/src/main/scala/sandcrawler/HBaseRowCountJob.scala b/scalding/src/main/scala/sandcrawler/HBaseRowCountJob.scala
index 4c3de33..5c7954a 100644
--- a/scalding/src/main/scala/sandcrawler/HBaseRowCountJob.scala
+++ b/scalding/src/main/scala/sandcrawler/HBaseRowCountJob.scala
@@ -30,7 +30,7 @@ object HBaseRowCountJob {
HBaseBuilder.build(
hbaseTable,
zookeeperHosts,
- List("file:size"),
+ List("f:c"),
SourceMode.SCAN_ALL)
}
}
diff --git a/scalding/src/main/scala/sandcrawler/HBaseStatusCodeCountJob.scala b/scalding/src/main/scala/sandcrawler/HBaseStatusCodeCountJob.scala
new file mode 100644
index 0000000..4d9880f
--- /dev/null
+++ b/scalding/src/main/scala/sandcrawler/HBaseStatusCodeCountJob.scala
@@ -0,0 +1,32 @@
+package sandcrawler
+
+import java.util.Properties
+
+import cascading.property.AppProps
+import cascading.tuple.Fields
+import com.twitter.scalding._
+import com.twitter.scalding.typed.TDsl._
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable
+import org.apache.hadoop.hbase.util.Bytes
+import parallelai.spyglass.base.JobBase
+import parallelai.spyglass.hbase.HBaseConstants.SourceMode
+import parallelai.spyglass.hbase.HBasePipeConversions
+import parallelai.spyglass.hbase.HBaseSource
+
+class HBaseStatusCodeCountJob(args: Args) extends JobBase(args) with HBasePipeConversions {
+
+ val source = HBaseCountJob.getHBaseSource(
+ args("hbase-table"),
+ args("zookeeper-hosts"),
+ "grobid0:status_code")
+
+ val statusPipe : TypedPipe[Long] = source
+ .read
+ .toTypedPipe[(ImmutableBytesWritable,ImmutableBytesWritable)]('key, 'status_code)
+ .map { case (key, raw_code) => Bytes.toLong(raw_code.copyBytes()) }
+
+ statusPipe.groupBy { identity }
+ .size
+ .debug
+ .write(TypedTsv[(Long,Long)](args("output")))
+}
diff --git a/scalding/src/main/scala/sandcrawler/HBaseStatusCountJob.scala b/scalding/src/main/scala/sandcrawler/HBaseStatusCountJob.scala
index fd0b4e2..f79d672 100644
--- a/scalding/src/main/scala/sandcrawler/HBaseStatusCountJob.scala
+++ b/scalding/src/main/scala/sandcrawler/HBaseStatusCountJob.scala
@@ -18,15 +18,15 @@ class HBaseStatusCountJob(args: Args) extends JobBase(args) with HBasePipeConver
val source = HBaseCountJob.getHBaseSource(
args("hbase-table"),
args("zookeeper-hosts"),
- "grobid0:status_code")
+ "grobid0:status")
- val statusPipe : TypedPipe[Long] = source
+ val statusPipe : TypedPipe[String] = source
.read
- .toTypedPipe[(ImmutableBytesWritable,ImmutableBytesWritable)]('key, 'status_code)
- .map { case (key, raw_code) => Bytes.toLong(raw_code.copyBytes()) }
+ .toTypedPipe[(ImmutableBytesWritable,ImmutableBytesWritable)]('key, 'status)
+ .map { case (key, raw_status) => Bytes.toString(raw_status.copyBytes()) }
statusPipe.groupBy { identity }
.size
.debug
- .write(TypedTsv[(Long,Long)](args("output")))
+ .write(TypedTsv[(String,Long)](args("output")))
}
diff --git a/scalding/src/main/scala/sandcrawler/MatchBenchmarkJob.scala b/scalding/src/main/scala/sandcrawler/MatchBenchmarkJob.scala
new file mode 100644
index 0000000..292de75
--- /dev/null
+++ b/scalding/src/main/scala/sandcrawler/MatchBenchmarkJob.scala
@@ -0,0 +1,30 @@
+package sandcrawler
+
+import cascading.pipe.Pipe
+import com.twitter.scalding.Args
+import com.twitter.scalding.TypedPipe
+import com.twitter.scalding.TypedTsv
+import parallelai.spyglass.base.JobBase
+
+class MatchBenchmarkJob(args: Args) extends JobBase(args) {
+ // TODO: Instantiate any subclass of Scorable specified in args.
+ val sc1 : Scorable = new BibjsonScorable()
+ val sc2 : Scorable = new BibjsonScorable()
+ val leftArgs = args + ("bibjson-input" -> List(args("left-bibjson")))
+ val rightArgs = args + ("bibjson-input" -> List(args("right-bibjson")))
+ val pipe1 : TypedPipe[(String, ReduceFeatures)] = sc1.getInputPipe(leftArgs)
+ val pipe2 : TypedPipe[(String, ReduceFeatures)] = sc2.getInputPipe(rightArgs)
+
+ pipe1.join(pipe2)
+ .map { entry =>
+ val (slug : String, (features1 : ReduceFeatures, features2 : ReduceFeatures)) = entry
+ new ReduceOutput(
+ slug,
+ Scorable.computeSimilarity(features1, features2),
+ features1.json,
+ features2.json)
+ }
+ //TypedTsv doesn't work over case classes.
+ .map { entry => (entry.slug, entry.score, entry.json1, entry.json2) }
+ .write(TypedTsv[(String, Int, String, String)](args("output")))
+}
diff --git a/scalding/src/main/scala/sandcrawler/MissingColumnDumpJob.scala b/scalding/src/main/scala/sandcrawler/MissingColumnDumpJob.scala
new file mode 100644
index 0000000..cc3bf23
--- /dev/null
+++ b/scalding/src/main/scala/sandcrawler/MissingColumnDumpJob.scala
@@ -0,0 +1,67 @@
+package sandcrawler
+
+import java.util.Properties
+
+import cascading.property.AppProps
+import cascading.tuple.Fields
+import com.twitter.scalding._
+import com.twitter.scalding.typed.TDsl._
+import parallelai.spyglass.base.JobBase
+import parallelai.spyglass.hbase.HBaseConstants.SourceMode
+import parallelai.spyglass.hbase.HBasePipeConversions
+import parallelai.spyglass.hbase.HBaseSource
+
+// This nasty, no-good, horrible Job outputs a list of keys ("sha1:A234...")
+// for which the given "column" does not have a value set.
+// It does this using a self-join because SpyGlass's HBase SCAN support seems
+// to be extremely limited.
+class MissingColumnDumpJob(args: Args) extends JobBase(args) with HBasePipeConversions {
+
+ val output = args("output")
+
+ val allKeys : TypedPipe[String] = MissingColumnDumpJob.getHBaseKeySource(
+ args("hbase-table"),
+ args("zookeeper-hosts"))
+ .read
+ .fromBytesWritable('key)
+ .toTypedPipe[String]('key)
+
+ val existingKeys : TypedPipe[(String,Boolean)] = MissingColumnDumpJob.getHBaseColSource(
+ args("hbase-table"),
+ args("zookeeper-hosts"),
+ args("column"))
+ .read
+ .fromBytesWritable('key)
+ .toTypedPipe[String]('key)
+ .map{ key => (key, true) }
+
+ val missingKeys : TypedPipe[String] = allKeys
+ .groupBy( identity )
+ .leftJoin(existingKeys.groupBy(_._1))
+ .toTypedPipe
+ .collect { case (key, (_, None)) => key }
+
+ missingKeys
+ .write(TypedTsv[String](output))
+
+}
+
+object MissingColumnDumpJob {
+
+ // eg, "wbgrp-journal-extract-0-qa",7 "mtrcs-zk1.us.archive.org:2181"
+ def getHBaseColSource(hbaseTable: String, zookeeperHosts: String, col: String) : HBaseSource = {
+ HBaseBuilder.build(
+ hbaseTable,
+ zookeeperHosts,
+ List(col),
+ SourceMode.SCAN_ALL)
+ }
+
+ def getHBaseKeySource(hbaseTable: String, zookeeperHosts: String) : HBaseSource = {
+ HBaseBuilder.build(
+ hbaseTable,
+ zookeeperHosts,
+ List("f:c"),
+ SourceMode.SCAN_ALL)
+ }
+}
diff --git a/scalding/src/main/scala/sandcrawler/Scorable.scala b/scalding/src/main/scala/sandcrawler/Scorable.scala
new file mode 100644
index 0000000..d9c38e8
--- /dev/null
+++ b/scalding/src/main/scala/sandcrawler/Scorable.scala
@@ -0,0 +1,96 @@
+package sandcrawler
+
+import scala.math
+import scala.util.parsing.json.JSON
+import scala.util.parsing.json.JSONObject
+
+import cascading.flow.FlowDef
+import com.twitter.scalding._
+import com.twitter.scalding.typed.TDsl._
+
+case class MapFeatures(slug : String, json : String)
+case class ReduceFeatures(json : String)
+case class ReduceOutput(val slug : String, score : Int, json1 : String, json2 : String)
+
+abstract class Scorable {
+ def getInputPipe(args : Args)(implicit mode : Mode, flowDef : FlowDef) : TypedPipe[(String, ReduceFeatures)] = {
+ val validFeatures : TypedPipe[MapFeatures] = getFeaturesPipe(args)
+ .filterNot { entry => entry.isEmpty }
+ .map { entry => entry.get }
+
+ validFeatures
+ .groupBy { case MapFeatures(slug, json) => slug }
+ .map { tuple =>
+ val (slug : String, features : MapFeatures) = tuple
+ (slug, ReduceFeatures(features.json))
+ }
+ }
+
+ // abstract methods
+ def getSource(args : Args) : Source
+ def getFeaturesPipe(args : Args)(implicit mode : Mode, flowDef : FlowDef) : TypedPipe[Option[MapFeatures]]
+}
+
+object Scorable {
+ val MaxTitleLength = 1023
+
+ def jsonToMap(json : String) : Option[Map[String, Any]] = {
+ // https://stackoverflow.com/a/32717262/631051
+ val jsonObject = JSON.parseFull(json)
+ if (jsonObject == None) {
+ None
+ } else {
+ Some(jsonObject.get.asInstanceOf[Map[String, Any]])
+ }
+ }
+
+ def getStringOption(optionalMap : Option[Map[String, Any]], key : String) : Option[String] = {
+ optionalMap match {
+ case None => None
+ case Some(map) => if (map contains key) Some(map(key).asInstanceOf[String]) else None
+ }
+ }
+
+ // Caller is responsible for ensuring that key is a String in map.
+ // TODO: Add and handle ClassCastException
+ def getString(map : Map[String, Any], key : String) : String = {
+ assert(map contains key)
+ map(key).asInstanceOf[String]
+ }
+
+ val MaxScore = 1000
+
+ def selfMatchable(features1 : ReduceFeatures, features2 : ReduceFeatures) : Boolean = {
+ val json1 = jsonToMap(features1.json)
+ val json2 = jsonToMap(features2.json)
+
+ (
+ getStringOption(json1, "fatcat_release") != None &&
+ getStringOption(json2, "fatcat_release") != None &&
+ getStringOption(json1, "fatcat_release") != getStringOption(json2, "fatcat_release") &&
+ (getStringOption(json1, "fatcat_work") match {
+ case None => false
+ case Some(work1) => getStringOption(json2, "fatcat_work") match {
+ case None => false
+ // this last check ensures we don't double-match
+ case Some(work2) => work1 > work2
+ }
+ })
+ )
+ }
+
+ def computeSimilarity(features1 : ReduceFeatures, features2 : ReduceFeatures) : Int = {
+ val json1 = jsonToMap(features1.json)
+ val json2 = jsonToMap(features2.json)
+ getStringOption(json1, "title") match {
+ case None => 0
+ case Some(title1) => {
+ getStringOption(json2, "title") match {
+ case None => 0
+ case Some(title2) =>
+ (StringUtilities.similarity(title1.toLowerCase, title2.toLowerCase) * MaxScore).toInt
+ }
+ }
+ }
+ }
+}
diff --git a/scalding/src/main/scala/sandcrawler/ScorableFeatures.scala b/scalding/src/main/scala/sandcrawler/ScorableFeatures.scala
new file mode 100644
index 0000000..93cd78d
--- /dev/null
+++ b/scalding/src/main/scala/sandcrawler/ScorableFeatures.scala
@@ -0,0 +1,63 @@
+package sandcrawler
+
+import java.io.InputStream
+
+import scala.io.Source
+import scala.util.parsing.json.JSONArray
+import scala.util.parsing.json.JSONObject
+
+object ScorableFeatures {
+ // TODO: Add exception handling.
+ val fileStream : InputStream = getClass.getResourceAsStream("/slug-denylist.txt")
+ val SlugDenylist : Set[String] = Source.fromInputStream(fileStream).getLines.toSet
+ fileStream.close
+ val MinSlugLength = 8
+
+ // Static factory method
+ def create(title : String, authors : List[Any] = List(), year : Int = 0, doi : String = "", fatcat_release : String = "", fatcat_work : String = "", sha1 : String = "") : ScorableFeatures = {
+ new ScorableFeatures(
+ title=if (title == null) "" else title,
+ authors=if (authors == null) List() else authors.map(a => if (a == null) "" else a),
+ year=year,
+ doi=if (doi == null) "" else doi,
+ fatcat_release=if (fatcat_release == null) "" else fatcat_release,
+ fatcat_work=if (fatcat_work == null) "" else fatcat_work,
+ sha1=if (sha1 == null) "" else sha1)
+ }
+}
+
+// Contains features needed to make slug and to score (in combination
+// with a second ScorableFeatures). Create with above static factory method.
+class ScorableFeatures private(title : String, authors : List[Any] = List(), year: Int = 0, doi : String = "", fatcat_release : String = "", fatcat_work : String = "", sha1: String = "") {
+
+ def toMap() : Map[String, Any] =
+ Map("title" -> title, "authors" -> JSONArray(authors), "year" -> year, "doi" -> doi, "fatcat_release" -> fatcat_release, "fatcat_work" -> fatcat_work, "sha1" -> sha1)
+
+ override def toString() : String = {
+ JSONObject(toMap).toString
+ }
+
+ def toSlug() : Option[String] = {
+ if (title == null) {
+ None
+ } else {
+ val unaccented = StringUtilities.removeAccents(title)
+ // Remove punctuation
+ val slug = StringUtilities.removePunctuation((unaccented.toLowerCase())).replaceAll("\\s", "")
+ if (slug.isEmpty
+ || slug == null
+ || (ScorableFeatures.SlugDenylist contains slug)
+ || (slug.length < ScorableFeatures.MinSlugLength)) {
+ None
+ } else {
+ Some(slug)
+ }
+ }
+ }
+
+ def toMapFeatures : Option[MapFeatures] =
+ toSlug match {
+ case None => None
+ case Some(slug) => Some(MapFeatures(slug, toString))
+ }
+}
diff --git a/scalding/src/main/scala/sandcrawler/ScoreInsertable.scala b/scalding/src/main/scala/sandcrawler/ScoreInsertable.scala
new file mode 100644
index 0000000..58007fa
--- /dev/null
+++ b/scalding/src/main/scala/sandcrawler/ScoreInsertable.scala
@@ -0,0 +1,86 @@
+package sandcrawler
+
+import cascading.tuple.Fields
+import cascading.pipe.Pipe
+import com.twitter.scalding._
+import com.twitter.scalding.typed.TDsl._
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable
+import org.apache.hadoop.hbase.util.Bytes
+import parallelai.spyglass.base.JobBase
+import parallelai.spyglass.hbase.HBaseConstants.SourceMode
+import parallelai.spyglass.hbase.HBasePipeConversions
+import parallelai.spyglass.hbase.HBaseSource
+
+class ScoreInsertableJob(args: Args) extends JobBase(args) {
+
+ val grobidRowCount = Stat("grobid-rows-filtered", "sandcrawler")
+ val crossrefRowCount = Stat("crossref-rows-filtered", "sandcrawler")
+ val cdxRowCount = Stat("cdx-rows", "sandcrawler")
+ val scoredRowCount = Stat("scored-rows", "sandcrawler")
+ val joinedRowCount = Stat("joined-rows", "sandcrawler")
+
+ val grobidScorable : Scorable = new GrobidScorable()
+ val crossrefScorable : Scorable = new CrossrefScorable()
+
+ val grobidPipe : TypedPipe[(String, ReduceFeatures)] = grobidScorable
+ .getInputPipe(args)
+ .map { r =>
+ grobidRowCount.inc
+ r
+ }
+ val crossrefPipe : TypedPipe[(String, ReduceFeatures)] = crossrefScorable
+ .getInputPipe(args)
+ .map { r =>
+ crossrefRowCount.inc
+ r
+ }
+ val cdxPipe : TypedPipe[(String, String, String, Long)] = ScoreInsertableJob.getHBaseCdxSource(args("hbase-table"), args("zookeeper-hosts"))
+ .read
+ .toTypedPipe[(ImmutableBytesWritable,ImmutableBytesWritable,ImmutableBytesWritable,ImmutableBytesWritable)](new Fields("key", "cdx", "mime", "size"))
+ .filter { case (_, cdx, mime, size) => cdx != null && mime != null && size != null }
+ .map { case (key, cdx, mime, size) =>
+ (Bytes.toString(key.copyBytes()),
+ Bytes.toString(cdx.copyBytes()),
+ Bytes.toString(mime.copyBytes()),
+ Bytes.toLong(size.copyBytes()))
+ }
+ .map { r =>
+ cdxRowCount.inc
+ r
+ }
+
+ val scoredPipe = grobidPipe
+ .addTrap(TypedTsv(args("output") + ".trapped"))
+ .join(crossrefPipe)
+ .map { case (slug, (grobidFeatures, crossrefFeatures)) =>
+ scoredRowCount.inc
+ //val (slug : String, (grobidFeatures: ReduceFeatures, crossrefFeatures: ReduceFeatures)) = entry
+ // Not ever Empty, I promise
+ val key = Scorable.getStringOption(Scorable.jsonToMap(grobidFeatures.json), "sha1").orNull
+ (key, new ReduceOutput(
+ slug,
+ Scorable.computeSimilarity(grobidFeatures, crossrefFeatures),
+ grobidFeatures.json,
+ crossrefFeatures.json))
+ }
+ .map { case (key, entry) => (key, entry.slug, entry.score, entry.json1, entry.json2) }
+ .groupBy { case (key, _, _, _, _) => key }
+
+ // TypedTsv doesn't work over case classes.
+ val joinedPipe = scoredPipe
+ .join(cdxPipe.groupBy { case (key, _, _, _) => key })
+ .map { case (key, ((_, slug, score, left, right), (_, cdx, mime, size))) => (key, slug, score, left, right, cdx, mime, size) }
+ .write(TypedTsv[(String, String, Int, String, String, String, String, Long)](args("output")))
+}
+
+object ScoreInsertableJob {
+
+ // eg, "wbgrp-journal-extract-0-qa",7 "mtrcs-zk1.us.archive.org:2181"
+ def getHBaseCdxSource(hbaseTable: String, zookeeperHosts: String) : HBaseSource = {
+ HBaseBuilder.build(
+ hbaseTable,
+ zookeeperHosts,
+ List("file:cdx", "file:mime", "file:size"),
+ SourceMode.SCAN_ALL)
+ }
+}
diff --git a/scalding/src/main/scala/sandcrawler/ScoreJob.scala b/scalding/src/main/scala/sandcrawler/ScoreJob.scala
new file mode 100644
index 0000000..ccb9b76
--- /dev/null
+++ b/scalding/src/main/scala/sandcrawler/ScoreJob.scala
@@ -0,0 +1,48 @@
+package sandcrawler
+
+import cascading.pipe.Pipe
+import com.twitter.scalding.Args
+import com.twitter.scalding.Stat
+import com.twitter.scalding.TypedPipe
+import com.twitter.scalding.TypedTsv
+import parallelai.spyglass.base.JobBase
+
+class ScoreJob(args: Args) extends JobBase(args) {
+
+ val grobidRowCount = Stat("grobid-rows-filtered", "sandcrawler")
+ val crossrefRowCount = Stat("crossref-rows-filtered", "sandcrawler")
+ val joinedRowCount = Stat("joined-rows", "sandcrawler")
+
+ val grobidScorable : Scorable = new GrobidScorable()
+ val crossrefScorable : Scorable = new CrossrefScorable()
+ val grobidPipe : TypedPipe[(String, ReduceFeatures)] = grobidScorable
+ .getInputPipe(args)
+ .map { r =>
+ grobidRowCount.inc
+ r
+ }
+ val crossrefPipe : TypedPipe[(String, ReduceFeatures)] = crossrefScorable
+ .getInputPipe(args)
+ .map { r =>
+ crossrefRowCount.inc
+ r
+ }
+
+ val joinedPipe = grobidPipe
+ .addTrap(TypedTsv(args("output") + ".trapped"))
+ .join(crossrefPipe)
+
+ // TypedTsv doesn't work over case classes.
+ joinedPipe
+ .map { case (slug, (grobidFeatures, crossrefFeatures)) =>
+ joinedRowCount.inc
+ //val (slug : String, (grobidFeatures: ReduceFeatures, crossrefFeatures: ReduceFeatures)) = entry
+ new ReduceOutput(
+ slug,
+ Scorable.computeSimilarity(grobidFeatures, crossrefFeatures),
+ grobidFeatures.json,
+ crossrefFeatures.json)
+ }
+ .map { entry => (entry.slug, entry.score, entry.json1, entry.json2) }
+ .write(TypedTsv[(String, Int, String, String)](args("output")))
+}
diff --git a/scalding/src/main/scala/sandcrawler/StringUtilities.scala b/scalding/src/main/scala/sandcrawler/StringUtilities.scala
new file mode 100644
index 0000000..9150ced
--- /dev/null
+++ b/scalding/src/main/scala/sandcrawler/StringUtilities.scala
@@ -0,0 +1,76 @@
+package sandcrawler
+
+import java.text.Normalizer
+import java.util.regex.Pattern
+
+object StringUtilities {
+ // bnewbold: I propose that we:
+ // 1. keep only \p{Ideographic}, \p{Alphabetic}, and \p{Digit}
+ // 2. strip accents
+ // 3. "lower-case" (unicode-aware)
+ // 4. do any final custom/manual mappings
+ //
+ // We should check (test) that null bytes are handled, in addition to other
+ // more obvious characters
+
+ // Adapted from https://git-wip-us.apache.org/repos/asf?p=commons-lang.git;a=blob;f=src/main/java/org/apache/commons/lang3/StringUtils.java;h=1d7b9b99335865a88c509339f700ce71ce2c71f2;hb=HEAD#l934
+ def removeAccents(s : String) : String = {
+ val replacements = Map(
+ '\u0141' -> 'L',
+ '\u0142' -> 'l', // Letter ell
+ '\u00d8' -> 'O',
+ '\u00f8' -> 'o'
+ )
+ val sb = new StringBuilder(Normalizer.normalize(s, Normalizer.Form.NFD))
+ for (i <- 0 to sb.length - 1) {
+ for (key <- replacements.keys) {
+ if (sb(i) == key) {
+ sb.deleteCharAt(i);
+ sb.insert(i, replacements(key))
+ }
+ }
+ }
+ val pattern = Pattern.compile("\\p{InCombiningDiacriticalMarks}+")
+ pattern.matcher(sb).replaceAll("")
+ }
+
+ // Source: https://stackoverflow.com/a/30076541/631051
+ def removePunctuation(s: String) : String = {
+ s.replaceAll("""[\p{Punct}’·“”‘’“”«»「」¿–±§ʿ]""", "")
+ }
+
+ // Adapted from: https://stackoverflow.com/a/16018452/631051
+ def similarity(s1a : String, s2a : String) : Double = {
+ val (s1, s2) = (removeAccents(removePunctuation(s1a)),
+ removeAccents(removePunctuation(s2a)))
+ val longer : String = if (s1.length > s2.length) s1 else s2
+ val shorter : String = if (s1.length > s2.length) s2 else s1
+ if (longer.length == 0) {
+ // Both strings are empty.
+ 1
+ } else {
+ (longer.length - stringDistance(longer, shorter)) / longer.length.toDouble
+ }
+ }
+
+ // Source: https://oldfashionedsoftware.com/2009/11/19/string-distance-and-refactoring-in-scala/
+ def stringDistance(s1: String, s2: String): Int = {
+ val memo = scala.collection.mutable.Map[(List[Char],List[Char]),Int]()
+ def min(a:Int, b:Int, c:Int) = Math.min( Math.min( a, b ), c)
+ def sd(s1: List[Char], s2: List[Char]): Int = {
+ if (!memo.contains((s1, s2))) {
+ memo((s1,s2)) = (s1, s2) match {
+ case (_, Nil) => s1.length
+ case (Nil, _) => s2.length
+ case (c1::t1, c2::t2) =>
+ min( sd(t1,s2) + 1, sd(s1,t2) + 1,
+ sd(t1,t2) + (if (c1==c2) 0 else 1) )
+ }
+ }
+ memo((s1,s2))
+ }
+
+ sd( s1.toList, s2.toList )
+ }
+}
+
diff --git a/scalding/src/test/scala/sandcrawler/CrossrefScorableTest.scala b/scalding/src/test/scala/sandcrawler/CrossrefScorableTest.scala
new file mode 100644
index 0000000..8302b8f
--- /dev/null
+++ b/scalding/src/test/scala/sandcrawler/CrossrefScorableTest.scala
@@ -0,0 +1,172 @@
+package sandcrawler
+
+import cascading.tuple.Fields
+import cascading.tuple.Tuple
+import com.twitter.scalding.JobTest
+import com.twitter.scalding.TextLine
+import com.twitter.scalding.TupleConversions
+import com.twitter.scalding.TypedTsv
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable
+import org.apache.hadoop.hbase.util.Bytes
+import org.scalatest._
+import parallelai.spyglass.hbase.HBaseConstants.SourceMode
+
+class CrossrefScorableTest extends FlatSpec with Matchers {
+ // scalastyle:off
+ val CrossrefString =
+"""
+{ "_id" : { "$oid" : "5a553d5988a035a45bf50ed3" },
+ "indexed" : { "date-parts" : [ [ 2017, 10, 23 ] ],
+ "date-time" : "2017-10-23T17:19:16Z",
+ "timestamp" : { "$numberLong" : "1508779156477" } },
+ "reference-count" : 0,
+ "publisher" : "Elsevier BV",
+ "issue" : "3",
+ "license" : [ { "URL" : "http://www.elsevier.com/tdm/userlicense/1.0/",
+ "start" : { "date-parts" : [ [ 1996, 1, 1 ] ],
+ "date-time" : "1996-01-01T00:00:00Z",
+ "timestamp" : { "$numberLong" : "820454400000" } },
+ "delay-in-days" : 0, "content-version" : "tdm" }],
+ "content-domain" : { "domain" : [], "crossmark-restriction" : false },
+ "published-print" : { "date-parts" : [ [ 1996 ] ] },
+ "DOI" : "<<DOI>>",
+ "type" : "journal-article",
+ "created" : { "date-parts" : [ [ 2002, 7, 25 ] ],
+ "date-time" : "2002-07-25T15:09:41Z",
+ "timestamp" : { "$numberLong" : "1027609781000" } },
+ "page" : "186-187",
+ "source" : "Crossref",
+ "is-referenced-by-count" : 0,
+ "title" : [ "<<TITLE>>" ],
+ "prefix" : "10.1016",
+ "volume" : "9",
+ "author" : [ { "given" : "W", "family" : "Gaier", "affiliation" : [] } ],
+ "member" : "78",
+ "container-title" : [ "Journal de Pédiatrie et de Puériculture" ],
+ "link" : [ { "URL" : "http://api.elsevier.com/content/article/PII:0987-7983(96)87729-2?httpAccept=text/xml",
+ "content-type" : "text/xml",
+ "content-version" : "vor",
+ "intended-application" : "text-mining" },
+ { "URL" :
+ "http://api.elsevier.com/content/article/PII:0987-7983(96)87729-2?httpAccept=text/plain",
+ "content-type" : "text/plain",
+ "content-version" : "vor",
+ "intended-application" : "text-mining" } ],
+ "deposited" : { "date-parts" : [ [ 2015, 9, 3 ] ],
+ "date-time" : "2015-09-03T10:03:43Z",
+ "timestamp" : { "$numberLong" : "1441274623000" } },
+ "score" : 1,
+ "issued" : { "date-parts" : [ [ 1996 ] ] },
+ "references-count" : 0,
+ "alternative-id" : [ "0987-7983(96)87729-2" ],
+ "URL" : "http://dx.doi.org/10.1016/0987-7983(96)87729-2",
+ "ISSN" : [ "0987-7983" ],
+ "issn-type" : [ { "value" : "0987-7983", "type" : "print" } ],
+ "subject" : [ "Pediatrics, Perinatology, and Child Health" ]
+}
+""".replace("<<DOI>>", "10.123/aBc")
+ // scalastyle:on
+ val CrossrefStringWithGoodTitle = CrossrefString.replace("<<TITLE>>", "Some Title")
+ val CrossrefStringWithMaximumTitle = CrossrefString.replace("<<TITLE>>", "T" * Scorable.MaxTitleLength)
+ val CrossrefStringWithExcessiveTitle = CrossrefString.replace("<<TITLE>>", "T" * Scorable.MaxTitleLength + "0")
+ val CrossrefStringWithNullTitle = CrossrefString.replace("\"<<TITLE>>\"", "null")
+ val CrossrefStringWithEmptyTitle = CrossrefString.replace("<<TITLE>>", "")
+ val CrossrefStringWithoutTitle = CrossrefString.replace("title", "nottitle")
+ val MalformedCrossrefString = CrossrefString.replace("}", "")
+ val CrossrefStringWithNoAuthors = CrossrefString.replace("<<TITLE>>", "Some Valid Title").replace("author", "no-author")
+ val CrossrefStringWrongType = CrossrefString.replace("<<TITLE>>", "Some Valid Title").replace("journal-article", "other")
+ val CrossrefStringNoType = CrossrefString.replace("<<TITLE>>", "Some Valid Title").replace("type", "not-type")
+
+ // Unit tests
+ "CrossrefScorable.jsonToMapFeatures()" should "handle invalid JSON" in {
+ CrossrefScorable.jsonToMapFeatures(MalformedCrossrefString) should be (None)
+ }
+
+ it should "handle missing title" in {
+ CrossrefScorable.jsonToMapFeatures(CrossrefStringWithoutTitle) should be (None)
+ }
+
+ it should "handle null title" in {
+ CrossrefScorable.jsonToMapFeatures(CrossrefStringWithNullTitle) should be (None)
+ }
+
+ it should "handle empty title" in {
+ CrossrefScorable.jsonToMapFeatures(CrossrefStringWithEmptyTitle) should be (None)
+ }
+
+ it should "handle subtitle" in {
+ CrossrefScorable.jsonToMapFeatures(
+ """{"title": ["short but not too short"], "subtitle": ["just right!"], "DOI": "10.123/asdf", "type":"journal-article","author":[{ "given" : "W", "family" : "Gaier"}]}""") match {
+ case None => fail()
+ case Some(result) => result.slug shouldBe "shortbutnottooshortjustright"
+ }
+ }
+
+ it should "handle empty subtitle" in {
+ CrossrefScorable.jsonToMapFeatures(
+ """{"title": ["short but not too short"], "subtitle": [""], "DOI": "10.123/asdf", "type":"journal-article", "author":[{ "given" : "W", "family" : "Gaier"}]}""") match {
+ case None => fail()
+ case Some(result) => result.slug shouldBe "shortbutnottooshort"
+ }
+ }
+
+ it should "handle null subtitle" in {
+ CrossrefScorable.jsonToMapFeatures(
+ """{"title": ["short but not too short"], "subtitle": [null], "DOI": "10.123/asdf", "type":"journal-article", "author":[{ "given" : "W", "family" : "Gaier"}]}""") match {
+ case None => fail()
+ case Some(result) => result.slug shouldBe "shortbutnottooshort"
+ }
+ }
+
+ it should "handle missing authors" in {
+ CrossrefScorable.jsonToMapFeatures(CrossrefStringWithNoAuthors) should be (None)
+ }
+
+ it should "handle valid input" in {
+ CrossrefScorable.jsonToMapFeatures(CrossrefStringWithGoodTitle) match {
+ case None => fail()
+ case Some(result) => {
+ result.slug shouldBe "sometitle"
+ Scorable.jsonToMap(result.json) match {
+ case None => fail()
+ case Some(map) => {
+ map("title").asInstanceOf[String] shouldBe "Some Title"
+ map("doi").asInstanceOf[String] shouldBe "10.123/abc"
+ // TODO: full name? not just a string?
+ map("authors").asInstanceOf[List[String]] shouldBe List("Gaier")
+ map("year").asInstanceOf[Double].toInt shouldBe 2002
+ }
+ }
+ }
+ }
+ }
+
+ "CrossrefScorable.keepRecord()" should "return true for valid JSON with title" in {
+ CrossrefScorable.keepRecord(CrossrefStringWithGoodTitle) shouldBe true
+ }
+
+ it should "return true for valid JSON with a title of maximum permitted length" in {
+ CrossrefScorable.keepRecord(CrossrefStringWithMaximumTitle) shouldBe true
+ }
+
+ it should "return false for valid JSON with excessively long title" in {
+ CrossrefScorable.keepRecord(CrossrefStringWithExcessiveTitle) shouldBe false
+ }
+
+ it should "return false for valid JSON with null title" in {
+ CrossrefScorable.keepRecord(CrossrefStringWithNullTitle) shouldBe false
+ }
+
+ it should "return false for valid JSON with no title" in {
+ CrossrefScorable.keepRecord(CrossrefStringWithoutTitle) shouldBe false
+ }
+
+ it should "return false for invalid JSON" in {
+ CrossrefScorable.keepRecord(CrossrefStringWithoutTitle) shouldBe false
+ }
+
+ it should "handle content types" in {
+ CrossrefScorable.jsonToMapFeatures(CrossrefStringWrongType) should be (None)
+ CrossrefScorable.jsonToMapFeatures(CrossrefStringNoType) should be (None)
+ }
+}
diff --git a/scalding/src/test/scala/sandcrawler/DumpUnGrobidedJobTest.scala b/scalding/src/test/scala/sandcrawler/DumpUnGrobidedJobTest.scala
new file mode 100644
index 0000000..8dda5c8
--- /dev/null
+++ b/scalding/src/test/scala/sandcrawler/DumpUnGrobidedJobTest.scala
@@ -0,0 +1,72 @@
+package sandcrawler
+
+import cascading.tuple.Fields
+import cascading.tuple.Tuple
+import com.twitter.scalding.JobTest
+import com.twitter.scalding.Tsv
+import com.twitter.scalding.TupleConversions
+import com.twitter.scalding.TypedTsv
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable
+import org.apache.hadoop.hbase.util.Bytes
+import org.junit.runner.RunWith
+import org.scalatest.FunSpec
+import org.scalatest.junit.JUnitRunner
+import org.slf4j.LoggerFactory
+import parallelai.spyglass.hbase.HBaseConstants.SourceMode
+import parallelai.spyglass.hbase.HBaseSource
+import scala._
+
+@RunWith(classOf[JUnitRunner])
+class DumpUnGrobidedJobTest extends FunSpec with TupleConversions {
+
+ val output = "/tmp/testOutput"
+ val (testTable, testHost) = ("test-table", "dummy-host:2181")
+
+ val log = LoggerFactory.getLogger(this.getClass.getName)
+
+ val statusCode: Long = 200
+ val statusBytes = Bytes.toBytes(statusCode)
+
+ val sampleDataGrobid : List[List[Array[Byte]]] = List(
+ ("sha1:SDKUVHC3YNNEGH5WAG5ZAAXWAEBNX4WT", statusBytes),
+ ("sha1:35985C3YNNEGH5WAG5ZAAXWAEBNXJW56", statusBytes),
+ ("sha1:885C3YNNEGH5WAG5ZAAXWA8BNXJWT6CZ", statusBytes),
+ ("sha1:00904C3YNNEGH5WAG5ZA9XWAEBNXJWT6", statusBytes),
+ ("sha1:249C3YNNEGH5WAG5ZAAXWAEBNXJWT6CZ", statusBytes),
+ ("sha1:095893C3YNNEGH5WAG5ZAAXWAEBNXJWT", statusBytes))
+ .map(pair => List(Bytes.toBytes(pair._1), pair._2))
+
+ val sampleDataFile : List[List[Array[Byte]]] = List(
+ ("sha1:K2DKSSVTXWPRMFDTWSTCQW3RVWRIOV3Q", """{c-json-data}""", "application/pdf", """{cdx-json-data}"""),
+ ("sha1:C3YNNEGH5WAG5ZAAXWAEBNXJWT6CZ3WU", """{c-json-data}""", "application/pdf", """{cdx-json-data}"""),
+ ("sha1:SDKUVHC3YNNEGH5WAG5ZAAXWAEBNX4WT", """{c-json-data}""", "application/pdf", """{cdx-json-data}"""),
+ ("sha1:35985C3YNNEGH5WAG5ZAAXWAEBNXJW56", """{c-json-data}""", "application/pdf", """{cdx-json-data}"""),
+ ("sha1:885C3YNNEGH5WAG5ZAAXWA8BNXJWT6CZ", """{c-json-data}""", "application/pdf", """{cdx-json-data}"""),
+ ("sha1:00904C3YNNEGH5WAG5ZA9XWAEBNXJWT6", """{c-json-data}""", "application/pdf", """{cdx-json-data}"""),
+ ("sha1:249C3YNNEGH5WAG5ZAAXWAEBNXJWT6CZ", """{c-json-data}""", "application/pdf", """{cdx-json-data}"""),
+ ("sha1:095893C3YNNEGH5WAG5ZAAXWAEBNXJWT", """{c-json-data}""", "application/pdf", """{cdx-json-data}"""))
+ .map(pair => List(Bytes.toBytes(pair._1),
+ Bytes.toBytes(pair._2),
+ Bytes.toBytes(pair._3),
+ Bytes.toBytes(pair._4)))
+
+ JobTest("sandcrawler.DumpUnGrobidedJob")
+ .arg("test", "")
+ .arg("app.conf.path", "app.conf")
+ .arg("output", output)
+ .arg("hbase-table", testTable)
+ .arg("zookeeper-hosts", testHost)
+ .arg("debug", "true")
+ .source[Tuple](DumpUnGrobidedJob.getHBaseColSource(testTable, testHost),
+ sampleDataGrobid.map(l => new Tuple(l.map(s => {new ImmutableBytesWritable(s)}):_*)))
+ .source[Tuple](DumpUnGrobidedJob.getHBaseKeySource(testTable, testHost),
+ sampleDataFile.map(l => new Tuple(l.map(s => {new ImmutableBytesWritable(s)}):_*)))
+ .sink[Tuple](TypedTsv[(String,String,String,String)](output)) {
+ outputBuffer =>
+ it("should return correct-length list.") {
+ assert(outputBuffer.size === 2)
+ }
+ }
+ .run
+ .finish
+}
diff --git a/scalding/src/test/scala/sandcrawler/FatcatScorableTest.scala b/scalding/src/test/scala/sandcrawler/FatcatScorableTest.scala
new file mode 100644
index 0000000..823e14a
--- /dev/null
+++ b/scalding/src/test/scala/sandcrawler/FatcatScorableTest.scala
@@ -0,0 +1,160 @@
+package sandcrawler
+
+import cascading.tuple.Fields
+import cascading.tuple.Tuple
+import com.twitter.scalding.JobTest
+import com.twitter.scalding.TextLine
+import com.twitter.scalding.TupleConversions
+import com.twitter.scalding.TypedTsv
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable
+import org.apache.hadoop.hbase.util.Bytes
+import org.scalatest._
+import parallelai.spyglass.hbase.HBaseConstants.SourceMode
+
+class FatcatScorableTest extends FlatSpec with Matchers {
+ // scalastyle:off
+ val FatcatString =
+"""
+{
+ "abstracts": [],
+ "refs": [],
+ "contribs": [
+ {
+ "index": 0,
+ "raw_name": "W Gaier",
+ "surname": "Gaier",
+ "role": "author",
+ "extra": {
+ "seq": "first"
+ }
+ }
+ ],
+ "publisher": "Elsevier BV",
+ "pages": "186-187",
+ "ext_ids": {
+ "doi": "<<DOI>>"
+ },
+ "release_year": 1996,
+ "release_stage": "published",
+ "release_type": "article-journal",
+ "container_id": "3nccslsn5jez3ixrp5skjyjxu4",
+ "title": "<<TITLE>>",
+ "state": "active",
+ "ident": "pnri57u66ffytigdmyybbmouni",
+ "work_id": "tdmqnfzm2nggrhfwzasyegvpyu",
+ "revision": "e50bd04e-d0d4-4ee7-b7a4-6b4f079de154",
+ "extra": {
+ "crossref": {
+ "alternative-id": [
+ "0987-7983(96)87729-2"
+ ],
+ "type": "journal-article"
+ }
+ }
+}
+""".replace("<<DOI>>", "10.123/aBc")
+ // scalastyle:on
+ val FatcatStringWithGoodTitle = FatcatString.replace("<<TITLE>>", "Some Title")
+ val FatcatStringWithMaximumTitle = FatcatString.replace("<<TITLE>>", "T" * Scorable.MaxTitleLength)
+ val FatcatStringWithExcessiveTitle = FatcatString.replace("<<TITLE>>", "T" * Scorable.MaxTitleLength + "0")
+ val FatcatStringWithNullTitle = FatcatString.replace("\"<<TITLE>>\"", "null")
+ val FatcatStringWithEmptyTitle = FatcatString.replace("<<TITLE>>", "")
+ val FatcatStringWithoutTitle = FatcatString.replace("title", "nottitle")
+ val MalformedFatcatString = FatcatString.replace("}", "")
+ val FatcatStringWithNoAuthors = FatcatString.replace("<<TITLE>>", "Some Valid Title").replace("contribs", "no-contribs")
+ //val FatcatStringWrongType = FatcatString.replace("<<TITLE>>", "Some Valid Title").replace("journal-article", "other")
+ //val FatcatStringNoType = FatcatString.replace("<<TITLE>>", "Some Valid Title").replace("type", "not-type")
+
+ // Unit tests
+ "FatcatScorable.jsonToMapFeatures()" should "handle invalid JSON" in {
+ FatcatScorable.jsonToMapFeatures(MalformedFatcatString) should be (None)
+ }
+
+ it should "handle missing title" in {
+ FatcatScorable.jsonToMapFeatures(FatcatStringWithoutTitle) should be (None)
+ }
+
+ it should "handle null title" in {
+ FatcatScorable.jsonToMapFeatures(FatcatStringWithNullTitle) should be (None)
+ }
+
+ it should "handle empty title" in {
+ FatcatScorable.jsonToMapFeatures(FatcatStringWithEmptyTitle) should be (None)
+ }
+
+ it should "handle subtitle" in {
+ FatcatScorable.jsonToMapFeatures(
+ """{"title": "short but not too short", "subtitle": "just right!", "ident": "pnri57u66ffytigdmyybbmouni", "work_id": "tdmqnfzm2nggrhfwzasyegvpyu", "DOI": "10.123/asdf", "type":"journal-article","contribs":[{ "raw_name" : "W Gaier", "surname" : "Gaier"}]}""") match {
+ case None => fail()
+ case Some(result) => result.slug shouldBe "shortbutnottooshortjustright"
+ }
+ }
+
+ it should "handle empty subtitle" in {
+ FatcatScorable.jsonToMapFeatures(
+ """{"title": "short but not too short", "subtitle": "", "ident": "pnri57u66ffytigdmyybbmouni", "work_id": "tdmqnfzm2nggrhfwzasyegvpyu", "DOI": "10.123/asdf", "type":"journal-article", "contribs":[{ "raw_name" : "W Gaier", "surname" : "Gaier"}]}""") match {
+ case None => fail()
+ case Some(result) => result.slug shouldBe "shortbutnottooshort"
+ }
+ }
+
+ it should "handle null subtitle" in {
+ FatcatScorable.jsonToMapFeatures(
+ """{"title": "short but not too short", "subtitle": null, "ident": "pnri57u66ffytigdmyybbmouni", "work_id": "tdmqnfzm2nggrhfwzasyegvpyu", "DOI": "10.123/asdf", "type":"journal-article", "contribs":[{ "raw_name" : "W Gaier", "surname" : "Gaier"}]}""") match {
+ case None => fail()
+ case Some(result) => result.slug shouldBe "shortbutnottooshort"
+ }
+ }
+
+ it should "handle missing authors" in {
+ // TODO: not actually removing these
+ //FatcatScorable.jsonToMapFeatures(FatcatStringWithNoAuthors) should be (None)
+ FatcatScorable.jsonToMapFeatures(FatcatStringWithNoAuthors)
+ }
+
+ it should "handle valid input" in {
+ FatcatScorable.jsonToMapFeatures(FatcatStringWithGoodTitle) match {
+ case None => fail()
+ case Some(result) => {
+ result.slug shouldBe "sometitle"
+ Scorable.jsonToMap(result.json) match {
+ case None => fail()
+ case Some(map) => {
+ map("title").asInstanceOf[String] shouldBe "Some Title"
+ //map("doi").asInstanceOf[String] shouldBe "10.123/abc"
+ map("fatcat_release").asInstanceOf[String] shouldBe "pnri57u66ffytigdmyybbmouni"
+ map("fatcat_work").asInstanceOf[String] shouldBe "tdmqnfzm2nggrhfwzasyegvpyu"
+ // TODO: full name? not just a string?
+ map("authors").asInstanceOf[List[String]] shouldBe List("W Gaier")
+ map("year").asInstanceOf[Double].toInt shouldBe 1996
+ }
+ }
+ }
+ }
+ }
+
+ "FatcatScorable.keepRecord()" should "return true for valid JSON with title" in {
+ FatcatScorable.keepRecord(FatcatStringWithGoodTitle) shouldBe true
+ }
+
+ it should "return true for valid JSON with a title of maximum permitted length" in {
+ FatcatScorable.keepRecord(FatcatStringWithMaximumTitle) shouldBe true
+ }
+
+ it should "return false for valid JSON with excessively long title" in {
+ FatcatScorable.keepRecord(FatcatStringWithExcessiveTitle) shouldBe false
+ }
+
+ it should "return false for valid JSON with null title" in {
+ FatcatScorable.keepRecord(FatcatStringWithNullTitle) shouldBe false
+ }
+
+ it should "return false for valid JSON with no title" in {
+ FatcatScorable.keepRecord(FatcatStringWithoutTitle) shouldBe false
+ }
+
+ it should "return false for invalid JSON" in {
+ FatcatScorable.keepRecord(FatcatStringWithoutTitle) shouldBe false
+ }
+
+}
diff --git a/scalding/src/test/scala/sandcrawler/GrobidScorableDumpJobTest.scala b/scalding/src/test/scala/sandcrawler/GrobidScorableDumpJobTest.scala
new file mode 100644
index 0000000..bf9343b
--- /dev/null
+++ b/scalding/src/test/scala/sandcrawler/GrobidScorableDumpJobTest.scala
@@ -0,0 +1,124 @@
+
+package sandcrawler
+
+import cascading.tuple.Fields
+import cascading.tuple.Tuple
+import com.twitter.scalding.JobTest
+import com.twitter.scalding.TextLine
+import com.twitter.scalding.TupleConversions
+import com.twitter.scalding.TypedTsv
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable
+import org.apache.hadoop.hbase.util.Bytes
+import org.scalatest._
+import parallelai.spyglass.hbase.HBaseConstants.SourceMode
+
+class GrobidScorableDumpJobTest extends FlatSpec with Matchers {
+ //scalastyle:off
+ val JsonString = """
+{
+ "title": "<<TITLE>>",
+ "authors": [
+ {"name": "Brewster Kahle"},
+ {"name": "J Doe"}
+ ],
+ "journal": {
+ "name": "Dummy Example File. Journal of Fake News. pp. 1-2. ISSN 1234-5678",
+ "eissn": null,
+ "issn": null,
+ "issue": null,
+ "publisher": null,
+ "volume": null
+ },
+ "date": "2000",
+ "doi": null,
+ "citations": [
+ { "authors": [{"name": "A Seaperson"}],
+ "date": "2001",
+ "id": "b0",
+ "index": 0,
+ "issue": null,
+ "journal": "Letters in the Alphabet",
+ "publisher": null,
+ "title": "Everything is Wonderful",
+ "url": null,
+ "volume": "20"},
+ { "authors": [],
+ "date": "2011-03-28",
+ "id": "b1",
+ "index": 1,
+ "issue": null,
+ "journal": "The Dictionary",
+ "publisher": null,
+ "title": "All about Facts",
+ "url": null,
+ "volume": "14"}
+ ],
+ "abstract": "Everything you ever wanted to know about nothing",
+ "body": "Introduction \nEverything starts somewhere, as somebody [1] once said. \n\n In Depth \n Meat \nYou know, for kids. \n Potatos \nQED.",
+ "acknowledgement": null,
+ "annex": null
+}
+"""
+ // scalastyle:on
+ val JsonStringWithTitle = JsonString.replace("<<TITLE>>", "Dummy Example File")
+ val JsonStringWithoutTitle = JsonString.replace("title", "nottitle")
+ val MalformedJsonString = JsonString.replace("}", "")
+
+ // Pipeline tests
+ val output = "/tmp/testOutput"
+ val input = "/tmp/testInput"
+ val (testTable, testHost) = ("test-table", "dummy-host:2181")
+
+ val Sha1Strings : List[String] = List(
+ "sha1:K2DKSSVTXWPRMFDTWSTCQW3RVWRIOV3Q", // good
+ "sha1:C3YNNEGH5WAG5ZAAXWAEBNXJWT6CZ3WU", // good
+ "sha1:SDKUVHC3YNNEGH5WAG5ZAAXWAEBNX4WT", // good
+ "sha1:35985C3YNNEGH5WAG5ZAAXWAEBNXJW56", // bad status
+ "sha1:93187A85273589347598473894839443", // malformed
+ "sha1:024937534094897039547e9824382943") // bad status
+
+ val JsonStrings : List[String] = List(
+ JsonString.replace("<<TITLE>>", "Title 1: The Classic"),
+ JsonString.replace("<<TITLE>>", "Title 2: TNG"),
+ JsonString.replace("<<TITLE>>", "Title 3: The Sequel"),
+ // This will have bad status.
+ JsonString.replace("<<TITLE>>", "Title 1: The Classic"),
+ MalformedJsonString,
+ // This will have bad status.
+ JsonString.replace("<<TITLE>>", "Title 2: Not TNG")
+ )
+
+ // bnewbold: status codes aren't strings, they are uint64
+ val Ok : Long = 200
+ val Bad : Long = 400
+ val StatusCodes = List(Ok, Ok, Ok, Bad, Ok, Bad)
+
+ val SampleDataHead : List[Tuple] = (Sha1Strings, JsonStrings, StatusCodes)
+ .zipped
+ .toList
+ .map { case (sha, json, status) => List(Bytes.toBytes(sha), Bytes.toBytes(json), Bytes.toBytes(status)) }
+ .map { l => new Tuple(l.map(s => {new ImmutableBytesWritable(s)}):_*) }
+
+ // scalastyle:off null
+ // Add example of lines without GROBID data
+ val SampleData = SampleDataHead :+ new Tuple(
+ new ImmutableBytesWritable(Bytes.toBytes("sha1:35985C3YNNEGH5WAG5ZAA88888888888")), null, null)
+ // scalastyle:on null
+
+ JobTest("sandcrawler.GrobidScorableDumpJob")
+ .arg("test", "")
+ .arg("app.conf.path", "app.conf")
+ .arg("output", output)
+ .arg("hbase-table", testTable)
+ .arg("zookeeper-hosts", testHost)
+ .arg("debug", "true")
+ .source[Tuple](GrobidScorable.getHBaseSource(testTable, testHost), SampleData)
+ .sink[(String, String)](TypedTsv[(String, String)](output)) {
+ outputBuffer =>
+ "The pipeline" should "return correct-length list" in {
+ outputBuffer should have length 3
+ }
+ }
+ .run
+ .finish
+}
diff --git a/scalding/src/test/scala/sandcrawler/GrobidScorableTest.scala b/scalding/src/test/scala/sandcrawler/GrobidScorableTest.scala
new file mode 100644
index 0000000..b395a64
--- /dev/null
+++ b/scalding/src/test/scala/sandcrawler/GrobidScorableTest.scala
@@ -0,0 +1,122 @@
+package sandcrawler
+
+import cascading.tuple.Fields
+import cascading.tuple.Tuple
+import com.twitter.scalding.JobTest
+import com.twitter.scalding.TextLine
+import com.twitter.scalding.TupleConversions
+import com.twitter.scalding.TypedTsv
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable
+import org.apache.hadoop.hbase.util.Bytes
+import org.scalatest._
+import parallelai.spyglass.hbase.HBaseConstants.SourceMode
+
+class GrobidScorableTest extends FlatSpec with Matchers {
+ val GrobidString = """
+{
+ "title": "<<TITLE>>",
+ "authors": [
+ {"name": "Brewster Kahle"},
+ {"name": "J Doe"}
+ ],
+ "journal": {
+ "name": "Dummy Example File. Journal of Fake News. pp. 1-2. ISSN 1234-5678",
+ "eissn": null,
+ "issn": null,
+ "issue": null,
+ "publisher": null,
+ "volume": null
+ },
+ "date": "2000",
+ "doi": null,
+ "citations": [
+ { "authors": [{"name": "A Seaperson"}],
+ "date": "2001",
+ "id": "b0",
+ "index": 0,
+ "issue": null,
+ "journal": "Letters in the Alphabet",
+ "publisher": null,
+ "title": "Everything is Wonderful",
+ "url": null,
+ "volume": "20"},
+ { "authors": [],
+ "date": "2011-03-28",
+ "id": "b1",
+ "index": 1,
+ "issue": null,
+ "journal": "The Dictionary",
+ "publisher": null,
+ "title": "All about Facts",
+ "url": null,
+ "volume": "14"}
+ ],
+ "abstract": "Everything you ever wanted to know about nothing",
+ "body": "Introduction \nEverything starts somewhere, as somebody [1] once said. \n\n In Depth \n Meat \nYou know, for kids. \n Potatos \nQED.",
+ "acknowledgement": null,
+ "annex": null
+}
+"""
+ val GrobidStringWithGoodTitle = GrobidString.replace("<<TITLE>>", "Dummy Example File")
+ val GrobidStringWithMaximumTitle = GrobidString.replace("<<TITLE>>", "T" * Scorable.MaxTitleLength)
+ val GrobidStringWithExcessiveTitle = GrobidString.replace("<<TITLE>>", "T" * Scorable.MaxTitleLength + "0")
+ val GrobidStringWithNullTitle = GrobidString.replace("\"<<TITLE>>\"", "null")
+ val GrobidStringWithoutTitle = GrobidString.replace("title", "nottitle")
+ val MalformedGrobidString = GrobidString.replace("}", "")
+ val Key = "Dummy Key"
+
+ // Unit tests
+
+ "GrobidScorable.jsonToMapFeatures()" should "handle invalid JSON" in {
+ GrobidScorable.jsonToMapFeatures(Key, MalformedGrobidString) should be (None)
+ }
+
+ it should "handle null title" in {
+ GrobidScorable.jsonToMapFeatures(Key, GrobidStringWithNullTitle) should be (None)
+ }
+
+ it should "handle missing title" in {
+ GrobidScorable.jsonToMapFeatures(Key, GrobidStringWithoutTitle) should be (None)
+ }
+
+ it should "handle valid input" in {
+ GrobidScorable.jsonToMapFeatures(Key, GrobidStringWithGoodTitle) match {
+ case None => fail()
+ case Some(result) => {
+ result.slug shouldBe "dummyexamplefile"
+ Scorable.jsonToMap(result.json) match {
+ case None => fail()
+ case Some(map) => {
+ map should contain key "title"
+ map("title").asInstanceOf[String] shouldBe "Dummy Example File"
+ map("authors").asInstanceOf[List[String]] shouldBe List("Brewster Kahle", "J Doe")
+ }
+ }
+ }
+ }
+ }
+
+ "GrobidScorable.keepRecord()" should "return true for valid JSON with title" in {
+ GrobidScorable.keepRecord(GrobidStringWithGoodTitle) shouldBe true
+ }
+
+ it should "return true for valid JSON with a title of maximum permitted length" in {
+ GrobidScorable.keepRecord(GrobidStringWithMaximumTitle) shouldBe true
+ }
+
+ it should "return false for valid JSON with excessively long title" in {
+ GrobidScorable.keepRecord(GrobidStringWithExcessiveTitle) shouldBe false
+ }
+
+ it should "return false for valid JSON with null title" in {
+ GrobidScorable.keepRecord(GrobidStringWithNullTitle) shouldBe false
+ }
+
+ it should "return false for valid JSON with no title" in {
+ GrobidScorable.keepRecord(GrobidStringWithoutTitle) shouldBe false
+ }
+
+ it should "return false for invalid JSON" in {
+ GrobidScorable.keepRecord(GrobidStringWithoutTitle) shouldBe false
+ }
+}
diff --git a/scalding/src/test/scala/sandcrawler/HBaseBuilderTest.scala b/scalding/src/test/scala/sandcrawler/HBaseBuilderTest.scala
index 603a4c7..c61cb22 100644
--- a/scalding/src/test/scala/sandcrawler/HBaseBuilderTest.scala
+++ b/scalding/src/test/scala/sandcrawler/HBaseBuilderTest.scala
@@ -22,6 +22,7 @@ class HBaseBuilderTest extends FlatSpec with Matchers {
fields should have length 0
}
+ //scalastyle:off no.whitespace.before.left.bracket
it should "throw IllegalArgumentException on malformed input" in {
a [IllegalArgumentException] should be thrownBy {
HBaseBuilder.parseColSpecs(List("file_size"))
diff --git a/scalding/src/test/scala/sandcrawler/HBaseMimeCountTest.scala b/scalding/src/test/scala/sandcrawler/HBaseMimeCountTest.scala
index fde2290..d6d283f 100644
--- a/scalding/src/test/scala/sandcrawler/HBaseMimeCountTest.scala
+++ b/scalding/src/test/scala/sandcrawler/HBaseMimeCountTest.scala
@@ -1,15 +1,18 @@
package sandcrawler
-import cascading.tuple.{Tuple, Fields}
-import com.twitter.scalding.{JobTest, Tsv, TupleConversions}
+import cascading.tuple.Fields
+import cascading.tuple.Tuple
+import com.twitter.scalding.JobTest
+import com.twitter.scalding.Tsv
+import com.twitter.scalding.TupleConversions
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.util.Bytes
import org.junit.runner.RunWith
import org.scalatest.FunSpec
import org.scalatest.junit.JUnitRunner
import org.slf4j.LoggerFactory
-import parallelai.spyglass.hbase.HBaseSource
import parallelai.spyglass.hbase.HBaseConstants.SourceMode
+import parallelai.spyglass.hbase.HBaseSource
import scala._
@RunWith(classOf[JUnitRunner])
diff --git a/scalding/src/test/scala/sandcrawler/HBaseRowCountTest.scala b/scalding/src/test/scala/sandcrawler/HBaseRowCountTest.scala
index 3424a36..c4ca5aa 100644
--- a/scalding/src/test/scala/sandcrawler/HBaseRowCountTest.scala
+++ b/scalding/src/test/scala/sandcrawler/HBaseRowCountTest.scala
@@ -1,15 +1,18 @@
package sandcrawler
-import cascading.tuple.{Tuple, Fields}
-import com.twitter.scalding.{JobTest, Tsv, TupleConversions}
+import cascading.tuple.Fields
+import cascading.tuple.Tuple
+import com.twitter.scalding.JobTest
+import com.twitter.scalding.Tsv
+import com.twitter.scalding.TupleConversions
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.util.Bytes
import org.junit.runner.RunWith
import org.scalatest.FunSpec
import org.scalatest.junit.JUnitRunner
import org.slf4j.LoggerFactory
-import parallelai.spyglass.hbase.HBaseSource
import parallelai.spyglass.hbase.HBaseConstants.SourceMode
+import parallelai.spyglass.hbase.HBaseSource
import scala._
/**
@@ -47,12 +50,10 @@ class HBaseRowCountTest extends FunSpec with TupleConversions {
outputBuffer =>
it("should return the test data provided.") {
- println("outputBuffer.size => " + outputBuffer.size)
assert(outputBuffer.size === 1)
}
it("should return the correct count") {
- println("raw output => " + outputBuffer)
assert(outputBuffer(0).getObject(0) === 8)
}
}
diff --git a/scalding/src/test/scala/sandcrawler/HBaseStatusCodeCountTest.scala b/scalding/src/test/scala/sandcrawler/HBaseStatusCodeCountTest.scala
new file mode 100644
index 0000000..d2cf9de
--- /dev/null
+++ b/scalding/src/test/scala/sandcrawler/HBaseStatusCodeCountTest.scala
@@ -0,0 +1,71 @@
+package sandcrawler
+
+import cascading.tuple.Fields
+import cascading.tuple.Tuple
+import com.twitter.scalding.JobTest
+import com.twitter.scalding.Tsv
+import com.twitter.scalding.TupleConversions
+import com.twitter.scalding.TypedTsv
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable
+import org.apache.hadoop.hbase.util.Bytes
+import org.junit.runner.RunWith
+import org.scalatest.FunSpec
+import org.scalatest.junit.JUnitRunner
+import org.slf4j.LoggerFactory
+import parallelai.spyglass.hbase.HBaseConstants.SourceMode
+import parallelai.spyglass.hbase.HBaseSource
+import scala._
+
+@RunWith(classOf[JUnitRunner])
+class HBaseStatusCodeCountTest extends FunSpec with TupleConversions {
+
+ val output = "/tmp/testOutput"
+ val (testTable, testHost) = ("test-table", "dummy-host:2181")
+
+ val log = LoggerFactory.getLogger(this.getClass.getName)
+
+ val statusType1 : Long = 200
+ val statusType2 : Long = 404
+ val statusType1Bytes = Bytes.toBytes(statusType1)
+ val statusType2Bytes = Bytes.toBytes(statusType2)
+
+ // TODO(bnewbold): now to express a null (empty value) in this list?
+ val sampleData : List[List[Array[Byte]]] = List(
+ ("sha1:K2DKSSVTXWPRMFDTWSTCQW3RVWRIOV3Q", statusType1Bytes),
+ ("sha1:C3YNNEGH5WAG5ZAAXWAEBNXJWT6CZ3WU", statusType1Bytes),
+ ("sha1:SDKUVHC3YNNEGH5WAG5ZAAXWAEBNX4WT", statusType2Bytes),
+ ("sha1:35985C3YNNEGH5WAG5ZAAXWAEBNXJW56", statusType2Bytes),
+ ("sha1:885C3YNNEGH5WAG5ZAAXWA8BNXJWT6CZ", statusType2Bytes),
+ ("sha1:00904C3YNNEGH5WAG5ZA9XWAEBNXJWT6", statusType2Bytes),
+ ("sha1:249C3YNNEGH5WAG5ZAAXWAEBNXJWT6CZ", statusType1Bytes),
+ ("sha1:095893C3YNNEGH5WAG5ZAAXWAEBNXJWT", statusType2Bytes))
+ .map(pair => List(Bytes.toBytes(pair._1), pair._2))
+
+ val statusType1Count = sampleData.count(lst => lst(1) == statusType1Bytes)
+ val statusType2Count = sampleData.count(lst => lst(1) == statusType2Bytes)
+
+ JobTest("sandcrawler.HBaseStatusCodeCountJob")
+ .arg("test", "")
+ .arg("app.conf.path", "app.conf")
+ .arg("output", output)
+ .arg("hbase-table", testTable)
+ .arg("zookeeper-hosts", testHost)
+ .arg("debug", "true")
+ .source[Tuple](HBaseCountJob.getHBaseSource(testTable, testHost, "grobid0:status_code"),
+ sampleData.map(l => new Tuple(l.map(s => {new ImmutableBytesWritable(s)}):_*)))
+ .sink[Tuple](TypedTsv[(Long, Long)](output)) {
+ outputBuffer =>
+ it("should return a correct number of elements.") {
+ assert(outputBuffer.size === 2)
+ }
+
+ // Convert List[Tuple] to Map[Long, Long].
+ val counts = outputBuffer.map(t => (t.getLong(0), t.getLong(1))).toMap
+ it("should have the appropriate number of each status type") {
+ assert(counts(statusType1) == statusType1Count)
+ assert(counts(statusType2) == statusType2Count)
+ }
+ }
+ .run
+ .finish
+}
diff --git a/scalding/src/test/scala/sandcrawler/HBaseStatusCountTest.scala b/scalding/src/test/scala/sandcrawler/HBaseStatusCountTest.scala
index d7689cd..7e91af3 100644
--- a/scalding/src/test/scala/sandcrawler/HBaseStatusCountTest.scala
+++ b/scalding/src/test/scala/sandcrawler/HBaseStatusCountTest.scala
@@ -1,15 +1,19 @@
package sandcrawler
-import cascading.tuple.{Tuple, Fields}
-import com.twitter.scalding.{JobTest, Tsv, TypedTsv, TupleConversions}
+import cascading.tuple.Fields
+import cascading.tuple.Tuple
+import com.twitter.scalding.JobTest
+import com.twitter.scalding.Tsv
+import com.twitter.scalding.TupleConversions
+import com.twitter.scalding.TypedTsv
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.util.Bytes
import org.junit.runner.RunWith
import org.scalatest.FunSpec
import org.scalatest.junit.JUnitRunner
import org.slf4j.LoggerFactory
-import parallelai.spyglass.hbase.HBaseSource
import parallelai.spyglass.hbase.HBaseConstants.SourceMode
+import parallelai.spyglass.hbase.HBaseSource
import scala._
@RunWith(classOf[JUnitRunner])
@@ -20,21 +24,20 @@ class HBaseStatusCountTest extends FunSpec with TupleConversions {
val log = LoggerFactory.getLogger(this.getClass.getName)
- val statusType1 : Long = 200
- val statusType2 : Long = 404
- val statusType1Bytes = Bytes.toBytes(statusType1)
- val statusType2Bytes = Bytes.toBytes(statusType2)
-
- val sampleData = List(
- List(Bytes.toBytes("sha1:K2DKSSVTXWPRMFDTWSTCQW3RVWRIOV3Q"), statusType1Bytes),
- List(Bytes.toBytes("sha1:C3YNNEGH5WAG5ZAAXWAEBNXJWT6CZ3WU"), statusType1Bytes),
- List(Bytes.toBytes("sha1:SDKUVHC3YNNEGH5WAG5ZAAXWAEBNX4WT"), statusType2Bytes),
- List(Bytes.toBytes("sha1:35985C3YNNEGH5WAG5ZAAXWAEBNXJW56"), statusType2Bytes),
- List(Bytes.toBytes("sha1:885C3YNNEGH5WAG5ZAAXWA8BNXJWT6CZ"), statusType2Bytes),
- List(Bytes.toBytes("sha1:00904C3YNNEGH5WAG5ZA9XWAEBNXJWT6"), statusType2Bytes),
- List(Bytes.toBytes("sha1:249C3YNNEGH5WAG5ZAAXWAEBNXJWT6CZ"), statusType1Bytes),
- List(Bytes.toBytes("sha1:095893C3YNNEGH5WAG5ZAAXWAEBNXJWT"), statusType2Bytes)
- )
+ val statusType1Bytes = Bytes.toBytes("""{"status": "success"}""")
+ val statusType2Bytes = Bytes.toBytes("""{"status": "partial"}""")
+
+ // TODO(bnewbold): now to express a null (empty value) in this list?
+ val sampleData : List[List[Array[Byte]]] = List(
+ ("sha1:K2DKSSVTXWPRMFDTWSTCQW3RVWRIOV3Q", statusType1Bytes),
+ ("sha1:C3YNNEGH5WAG5ZAAXWAEBNXJWT6CZ3WU", statusType1Bytes),
+ ("sha1:SDKUVHC3YNNEGH5WAG5ZAAXWAEBNX4WT", statusType2Bytes),
+ ("sha1:35985C3YNNEGH5WAG5ZAAXWAEBNXJW56", statusType2Bytes),
+ ("sha1:885C3YNNEGH5WAG5ZAAXWA8BNXJWT6CZ", statusType2Bytes),
+ ("sha1:00904C3YNNEGH5WAG5ZA9XWAEBNXJWT6", statusType2Bytes),
+ ("sha1:249C3YNNEGH5WAG5ZAAXWAEBNXJWT6CZ", statusType1Bytes),
+ ("sha1:095893C3YNNEGH5WAG5ZAAXWAEBNXJWT", statusType2Bytes))
+ .map(pair => List(Bytes.toBytes(pair._1), pair._2))
val statusType1Count = sampleData.count(lst => lst(1) == statusType1Bytes)
val statusType2Count = sampleData.count(lst => lst(1) == statusType2Bytes)
@@ -46,20 +49,13 @@ class HBaseStatusCountTest extends FunSpec with TupleConversions {
.arg("hbase-table", testTable)
.arg("zookeeper-hosts", testHost)
.arg("debug", "true")
- .source[Tuple](HBaseCountJob.getHBaseSource(testTable, testHost, "grobid0:status_code"),
+ .source[Tuple](HBaseCountJob.getHBaseSource(testTable, testHost, "grobid0:status"),
sampleData.map(l => new Tuple(l.map(s => {new ImmutableBytesWritable(s)}):_*)))
- .sink[Tuple](TypedTsv[(Long, Long)](output)) {
+ .sink[Tuple](TypedTsv[(String, Long)](output)) {
outputBuffer =>
it("should return a 2-element list.") {
assert(outputBuffer.size === 2)
}
-
- // Convert List[Tuple] to Map[Long, Long].
- val counts = outputBuffer.map(t => (t.getLong(0), t.getLong(1))).toMap
- it("should have the appropriate number of each status type") {
- assert(counts(statusType1) == statusType1Count)
- assert(counts(statusType2) == statusType2Count)
- }
}
.run
.finish
diff --git a/scalding/src/test/scala/sandcrawler/ScorableFeaturesTest.scala b/scalding/src/test/scala/sandcrawler/ScorableFeaturesTest.scala
new file mode 100644
index 0000000..c847296
--- /dev/null
+++ b/scalding/src/test/scala/sandcrawler/ScorableFeaturesTest.scala
@@ -0,0 +1,64 @@
+package sandcrawler
+
+import java.io.InputStream
+
+import scala.io.Source
+
+import org.scalatest._
+
+// scalastyle:off null
+class ScorableFeaturesTest extends FlatSpec with Matchers {
+ "toMapFeatures()" should "work with gnarly inputs" in {
+ ScorableFeatures.create(title = null).toMapFeatures
+ ScorableFeatures.create(title = "something", doi = null, sha1 = null, year = 123).toMapFeatures
+ }
+
+ private def titleToSlug(s : String) : Option[String] = ScorableFeatures.create(title = s).toSlug
+
+ "mapToSlug()" should "extract the parts of titles before a colon" in {
+ titleToSlug("HELLO:there") shouldBe Some("hellothere")
+ }
+
+ it should "extract an entire colon-less string" in {
+ titleToSlug("hello THERE") shouldBe Some("hellothere")
+ }
+
+ it should "return Scorable.NoSlug if given empty string" in {
+ titleToSlug("") shouldBe (None)
+ }
+
+ it should "return Scorable.NoSlug if given null" in {
+ titleToSlug(null) shouldBe (None)
+ }
+
+ it should "strip punctuation" in {
+ titleToSlug("HELLO!:the:re") shouldBe Some("hellothere")
+ titleToSlug("a:b:cdefgh") shouldBe Some("abcdefgh")
+ titleToSlug(
+ "If you're happy and you know it, clap your hands!") shouldBe Some("ifyourehappyandyouknowitclapyourhands")
+ titleToSlug(":;\"\'") shouldBe (None)
+ }
+
+ it should "filter stub titles" in {
+ titleToSlug("abstract") shouldBe (None)
+ titleToSlug("title!") shouldBe (None)
+ titleToSlug("a real title which is not on denylist") shouldBe Some("arealtitlewhichisnotondenylist")
+ }
+
+ it should "strip special characters" in {
+ titleToSlug(":;!',|\"\'`.#?!-@*/\\=+~%$^{}()[]<>-_’·“”‘’“”«»「」¿–±§ʿ") shouldBe (None)
+ // TODO: titleToSlug("©™₨№…") shouldBe (None)
+ // TODO: titleToSlug("πµΣσ") shouldBe (None)
+ }
+
+ it should "remove whitespace" in {
+ titleToSlug("foo bar : baz ::") shouldBe Some("foobarbaz")
+ titleToSlug("\na\t:b:cdefghi") shouldBe Some("abcdefghi")
+ titleToSlug("\n \t \r ") shouldBe (None)
+ }
+
+ it should "skip very short slugs" in {
+ titleToSlug("short") shouldBe (None)
+ titleToSlug("a longer, more in depth title") shouldBe Some("alongermoreindepthtitle")
+ }
+}
diff --git a/scalding/src/test/scala/sandcrawler/ScorableTest.scala b/scalding/src/test/scala/sandcrawler/ScorableTest.scala
new file mode 100644
index 0000000..2094543
--- /dev/null
+++ b/scalding/src/test/scala/sandcrawler/ScorableTest.scala
@@ -0,0 +1,81 @@
+package sandcrawler
+
+import cascading.tuple.Fields
+import cascading.tuple.Tuple
+import com.twitter.scalding.JobTest
+import com.twitter.scalding.TextLine
+import com.twitter.scalding.TupleConversions
+import com.twitter.scalding.TypedTsv
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable
+import org.apache.hadoop.hbase.util.Bytes
+import org.scalatest._
+import parallelai.spyglass.hbase.HBaseConstants.SourceMode
+
+class ScorableTest extends FlatSpec with Matchers {
+ val JsonString = """
+{
+ "title": "<<TITLE>>",
+ "authors": [
+ {"name": "Brewster Kahle"},
+ {"name": "J Doe"}
+ ],
+ "journal": {
+ "name": "Dummy Example File. Journal of Fake News. pp. 1-2. ISSN 1234-5678",
+ "eissn": null,
+ "issn": null,
+ "issue": null,
+ "publisher": null,
+ "volume": null
+ },
+ "date": "2000",
+ "doi": null,
+ "citations": [
+ { "authors": [{"name": "A Seaperson"}],
+ "date": "2001",
+ "id": "b0",
+ "index": 0,
+ "issue": null,
+ "journal": "Letters in the Alphabet",
+ "publisher": null,
+ "title": "Everything is Wonderful",
+ "url": null,
+ "volume": "20"},
+ { "authors": [],
+ "date": "2011-03-28",
+ "id": "b1",
+ "index": 1,
+ "issue": null,
+ "journal": "The Dictionary",
+ "publisher": null,
+ "title": "All about Facts",
+ "url": null,
+ "volume": "14"}
+ ],
+ "abstract": "Everything you ever wanted to know about nothing",
+ "body": "Introduction \nEverything starts somewhere, as somebody [1] once said. \n\n In Depth \n Meat \nYou know, for kids. \n Potatos \nQED.",
+ "acknowledgement": null,
+ "annex": null
+}
+"""
+ "jsonToMap()" should "return a map, given a legal JSON string" in {
+ Scorable.jsonToMap(JsonString) should not be (None)
+ }
+
+ it should "return None, given illegal JSON" in {
+ Scorable.jsonToMap("illegal{,json{{") should be (None)
+ }
+
+ "computeOutput()" should "return Scorable.MaxScore if given identical ReduceFeatures" in {
+ val score = Scorable.computeSimilarity(
+ new ReduceFeatures(JsonString), new ReduceFeatures(JsonString))
+ score shouldBe Scorable.MaxScore
+ }
+
+ "computeOutput()" should "be case-insensitive" in {
+ val left = JsonString.replace("<<TITLE>>", "A TITLE UPPER CASE")
+ val right = JsonString.replace("<<TITLE>>", "a title upper case")
+ val score = Scorable.computeSimilarity(
+ new ReduceFeatures(left), new ReduceFeatures(right))
+ score shouldBe Scorable.MaxScore
+ }
+}
diff --git a/scalding/src/test/scala/sandcrawler/ScoreInsertableJobTest.scala b/scalding/src/test/scala/sandcrawler/ScoreInsertableJobTest.scala
new file mode 100644
index 0000000..5393f10
--- /dev/null
+++ b/scalding/src/test/scala/sandcrawler/ScoreInsertableJobTest.scala
@@ -0,0 +1,262 @@
+package sandcrawler
+
+import cascading.tuple.Fields
+import cascading.tuple.Tuple
+import com.twitter.scalding.JobTest
+import com.twitter.scalding.TextLine
+import com.twitter.scalding.TupleConversions
+import com.twitter.scalding.TypedTsv
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable
+import org.apache.hadoop.hbase.util.Bytes
+import org.scalatest._
+import parallelai.spyglass.hbase.HBaseConstants.SourceMode
+
+class ScoreInsertableJobTest extends FlatSpec with Matchers {
+ //scalastyle:off
+ val JsonString = """
+{
+ "title": "<<TITLE>>",
+ "authors": [
+ {"name": "Brewster Kahle"},
+ {"name": "J Doe"}
+ ],
+ "journal": {
+ "name": "Dummy Example File. Journal of Fake News. pp. 1-2. ISSN 1234-5678",
+ "eissn": null,
+ "issn": null,
+ "issue": null,
+ "publisher": null,
+ "volume": null
+ },
+ "date": "2000",
+ "doi": null,
+ "citations": [
+ { "authors": [{"name": "A Seaperson"}],
+ "date": "2001",
+ "id": "b0",
+ "index": 0,
+ "issue": null,
+ "journal": "Letters in the Alphabet",
+ "publisher": null,
+ "title": "Everything is Wonderful",
+ "url": null,
+ "volume": "20"},
+ { "authors": [],
+ "date": "2011-03-28",
+ "id": "b1",
+ "index": 1,
+ "issue": null,
+ "journal": "The Dictionary",
+ "publisher": null,
+ "title": "All about Facts",
+ "url": null,
+ "volume": "14"}
+ ],
+ "abstract": "Everything you ever wanted to know about nothing",
+ "body": "Introduction \nEverything starts somewhere, as somebody [1] once said. \n\n In Depth \n Meat \nYou know, for kids. \n Potatos \nQED.",
+ "acknowledgement": null,
+ "annex": null
+}
+"""
+ // scalastyle:on
+ val JsonStringWithTitle = JsonString.replace("<<TITLE>>", "Dummy Example File")
+ val JsonStringWithoutTitle = JsonString.replace("title", "nottitle")
+ val MalformedJsonString = JsonString.replace("}", "")
+
+ // scalastyle:off
+ val CrossrefString =
+"""
+{ "_id" : { "$oid" : "5a553d5988a035a45bf50ed3" },
+ "indexed" : { "date-parts" : [ [ 2017, 10, 23 ] ],
+ "date-time" : "2017-10-23T17:19:16Z",
+ "timestamp" : { "$numberLong" : "1508779156477" } },
+ "reference-count" : 0,
+ "publisher" : "Elsevier BV",
+ "issue" : "3",
+ "license" : [ { "URL" : "http://www.elsevier.com/tdm/userlicense/1.0/",
+ "start" : { "date-parts" : [ [ 1996, 1, 1 ] ],
+ "date-time" : "1996-01-01T00:00:00Z",
+ "timestamp" : { "$numberLong" : "820454400000" } },
+ "delay-in-days" : 0, "content-version" : "tdm" }],
+ "content-domain" : { "domain" : [], "crossmark-restriction" : false },
+ "published-print" : { "date-parts" : [ [ 1996 ] ] },
+ "DOI" : "<<DOI>>",
+ "type" : "journal-article",
+ "created" : { "date-parts" : [ [ 2002, 7, 25 ] ],
+ "date-time" : "2002-07-25T15:09:41Z",
+ "timestamp" : { "$numberLong" : "1027609781000" } },
+ "page" : "186-187",
+ "source" : "Crossref",
+ "is-referenced-by-count" : 0,
+ "title" : [ "<<TITLE>>" ],
+ "prefix" : "10.1016",
+ "volume" : "9",
+ "author" : [ { "given" : "W", "family" : "Gaier", "affiliation" : [] } ],
+ "member" : "78",
+ "container-title" : [ "Journal de Pédiatrie et de Puériculture" ],
+ "link" : [ { "URL" : "http://api.elsevier.com/content/article/PII:0987-7983(96)87729-2?httpAccept=text/xml",
+ "content-type" : "text/xml",
+ "content-version" : "vor",
+ "intended-application" : "text-mining" },
+ { "URL" :
+ "http://api.elsevier.com/content/article/PII:0987-7983(96)87729-2?httpAccept=text/plain",
+ "content-type" : "text/plain",
+ "content-version" : "vor",
+ "intended-application" : "text-mining" } ],
+ "deposited" : { "date-parts" : [ [ 2015, 9, 3 ] ],
+ "date-time" : "2015-09-03T10:03:43Z",
+ "timestamp" : { "$numberLong" : "1441274623000" } },
+ "score" : 1,
+ "issued" : { "date-parts" : [ [ 1996 ] ] },
+ "references-count" : 0,
+ "alternative-id" : [ "0987-7983(96)87729-2" ],
+ "URL" : "http://dx.doi.org/10.1016/0987-7983(96)87729-2",
+ "ISSN" : [ "0987-7983" ],
+ "issn-type" : [ { "value" : "0987-7983", "type" : "print" } ],
+ "subject" : [ "Pediatrics, Perinatology, and Child Health" ]
+}
+"""
+ // scalastyle:on
+ val TooLongOfTitle = "X" * Scorable.MaxTitleLength + "Y" // arbitrary long string
+ val TooShortOfTitle = "X" * (ScorableFeatures.MinSlugLength - 1)
+ val CrossrefStringWithTitle = CrossrefString.replace("<<TITLE>>", "SomeTitle")
+ val CrossrefStringWithoutTitle = CrossrefString.replace("title", "nottitle")
+ val MalformedCrossrefString = CrossrefString.replace("}", "")
+ val CrossrefStrings = List(
+ CrossrefString.replace("<<TITLE>>", "Title 2: TNG").replace("<<DOI>>", "DOI-0"),
+ CrossrefString.replace("<<TITLE>>", "Title 1: TNG 2A").replace("<<DOI>>", "DOI-0.5"),
+ CrossrefString.replace("<<TITLE>>", "Title 1: TNG 3").replace("<<DOI>>", "DOI-0.75"),
+ CrossrefString.replace("<<TITLE>>", "Title 2: Rebooted").replace("<<DOI>>", "DOI-1"),
+ CrossrefString.replace("<<TITLE>>", TooLongOfTitle).replace("<<DOI>>", "DOI-1"),
+ CrossrefString.replace("<<TITLE>>", TooShortOfTitle).replace("<<DOI>>", "DOI-1"))
+
+ // Pipeline tests
+ val output = "/tmp/testOutput"
+ val input = "/tmp/testInput"
+ val (testTable, testHost) = ("test-table", "dummy-host:2181")
+
+ val Sha1Strings : List[String] = List(
+ "sha1:K2DKSSVTXWPRMFDTWSTCQW3RVWRIOV3Q",
+ "sha1:C3YNNEGH5WAG5ZAAXWAEBNXJWT6CZ3WU",
+ "sha1:SDKUVHC3YNNEGH5WAG5ZAAXWAEBNX4WT",
+ "sha1:35985C3YNNEGH5WAG5ZAAXWAEBNXJW56",
+ "sha1:93187A85273589347598473894839443",
+ "sha1:024937534094897039547e9824382943",
+ "sha1:93229759932857982837892347893892",
+ "sha1:83229759932857982837892347893892")
+
+ val JsonStrings : List[String] = List(
+ JsonString.replace("<<TITLE>>", "Title 1: The Original"),
+ JsonString.replace("<<TITLE>>", "Title 2: TNG"),
+ JsonString.replace("<<TITLE>>", "Title 3: The Sequel"),
+ // This will have bad status.
+ JsonString.replace("<<TITLE>>", "Title 1: The Original"),
+ MalformedJsonString,
+ // This will have bad status.
+ JsonString.replace("<<TITLE>>", "Title 2: Not TNG"),
+ // These are in both sources but have bad titles
+ JsonString.replace("<<TITLE>>", TooLongOfTitle),
+ JsonString.replace("<<TITLE>>", TooShortOfTitle)
+ )
+
+ // bnewbold: status codes aren't strings, they are uint64
+ val Ok : Long = 200
+ val Bad : Long = 400
+ val StatusCodes = List(Ok, Ok, Ok, Bad, Ok, Bad, Ok, Ok)
+
+ val SampleDataHead : List[Tuple] = (Sha1Strings, JsonStrings, StatusCodes)
+ .zipped
+ .toList
+ .map { case (sha, json, status) => List(Bytes.toBytes(sha), Bytes.toBytes(json), Bytes.toBytes(status)) }
+ .map { l => new Tuple(l.map(s => {new ImmutableBytesWritable(s)}):_*) }
+
+ // scalastyle:off null
+ // Add example of lines without GROBID data
+ // scalastyle:off null
+ val SampleData = SampleDataHead :+ new Tuple(
+ new ImmutableBytesWritable(Bytes.toBytes("sha1:35985C3YNNEGH5WAG5ZAA88888888888")), null, null)
+ // scalastyle:on null
+
+ val CdxList: List[String] = List("{}", "{}", "{}", "{}", "{}", "{}", "{}", "{}" )
+ val MimeList: List[String] = List("application/pdf", "application/pdf", "application/pdf",
+ "application/pdf", "application/pdf", "application/pdf", "application/pdf",
+ "application/pdf")
+ val SizeList: List[Long] = List(1,2,3,4,5,6,7,8)
+
+ // Can zip 3 lists, but not 4... so we recursively zip
+ val SampleCdxData : List[Tuple] = ((Sha1Strings, CdxList).zipped.toList, (MimeList, SizeList).zipped.toList)
+ .zipped
+ .toList
+ .map { case ((sha: String, cdx: String), (mime: String, size: Long)) => List(Bytes.toBytes(sha), Bytes.toBytes(cdx), Bytes.toBytes(mime), Bytes.toBytes(size)) }
+ .map { l => new Tuple(l.map(s => {new ImmutableBytesWritable(s)}):_*) }
+
+ JobTest("sandcrawler.ScoreInsertableJob")
+ .arg("test", "")
+ .arg("app.conf.path", "app.conf")
+ .arg("output", output)
+ .arg("hbase-table", testTable)
+ .arg("zookeeper-hosts", testHost)
+ .arg("crossref-input", input)
+ .arg("debug", "true")
+ .source[Tuple](GrobidScorable.getHBaseSource(testTable, testHost), SampleData)
+ .source[Tuple](ScoreInsertableJob.getHBaseCdxSource(testTable, testHost), SampleCdxData)
+ .source(TextLine(input), List(
+ 0 -> CrossrefStrings(0),
+ 1 -> CrossrefStrings(1),
+ 2 -> CrossrefStrings(2),
+ 3 -> CrossrefStrings(3),
+ 4 -> CrossrefStrings(4),
+ 4 -> CrossrefStrings(5)))
+ .sink[(String, ReduceFeatures)](TypedTsv[(String, ReduceFeatures)](output + ".trapped")) { _ => () }
+ .sink[(String, String, Int, String, String, String, String, Long)](TypedTsv[(String, String, Int, String, String, String, String, Long)](output)) {
+ // Grobid titles and slugs (in parentheses):
+ // Title 1 (title1)
+ // Title 2: TNG (title2tng)
+ // Title 3: The Sequel (title3thesequel)
+ // <too long of a title>
+ // <too short of a title>
+ // crossref titles and slugs (in parentheses):
+ // Title 2: TNG (title2tng)
+ // Title 1: TNG 2A (title1tng2a)
+ // Title 1: TNG 3 (title1tng3)
+ // Title 2: Rebooted (title2rebooted)
+ // <too long of a title>
+ // <too short of a title>
+ // XXX: Join should have 3 "title1" slugs and 1 "title2tng" slug
+ outputBuffer =>
+ "The pipeline" should "return a 1-element list" in {
+ outputBuffer should have length 1
+ }
+
+ it should "has right # of entries with each slug" in {
+ val slugs = outputBuffer.map(_._2)
+ val countMap : Map[String, Int] = slugs.groupBy(identity).mapValues(_.size)
+ // XXX: countMap("title1") shouldBe 3
+ countMap("title2tng") shouldBe 1
+ }
+
+ def bundle(slug : String, grobidIndex : Int, crossrefIndex : Int) : (String, Int, String, String) = {
+ val mfg : Option[MapFeatures] = GrobidScorable.jsonToMapFeatures(
+ Sha1Strings(grobidIndex),
+ JsonStrings(grobidIndex))
+ val mfc : Option[MapFeatures] = CrossrefScorable.jsonToMapFeatures(CrossrefStrings(crossrefIndex))
+ if (mfg.isEmpty || mfc.isEmpty) {
+ fail()
+ } else {
+ val score = Scorable.computeSimilarity(
+ ReduceFeatures(mfg.get.json),
+ ReduceFeatures(mfc.get.json))
+ (slug, score, mfg.get.json, mfc.get.json)
+ }
+ }
+
+ it should "have right output values" in {
+ //outputBuffer.exists(_ == bundle("title1", 0, 0))
+ //outputBuffer.exists(_ == bundle("title1", 0, 2))
+ //outputBuffer.exists(_ == bundle("title1", 0, 1))
+ outputBuffer.exists(_ == bundle("title2tng", 1, 3))
+ }
+ }
+ .run
+ .finish
+}
diff --git a/scalding/src/test/scala/sandcrawler/ScoreJobTest.scala b/scalding/src/test/scala/sandcrawler/ScoreJobTest.scala
new file mode 100644
index 0000000..fbc0ee5
--- /dev/null
+++ b/scalding/src/test/scala/sandcrawler/ScoreJobTest.scala
@@ -0,0 +1,248 @@
+package sandcrawler
+
+import cascading.tuple.Fields
+import cascading.tuple.Tuple
+import com.twitter.scalding.JobTest
+import com.twitter.scalding.TextLine
+import com.twitter.scalding.TupleConversions
+import com.twitter.scalding.TypedTsv
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable
+import org.apache.hadoop.hbase.util.Bytes
+import org.scalatest._
+import parallelai.spyglass.hbase.HBaseConstants.SourceMode
+
+class ScoreJobTest extends FlatSpec with Matchers {
+ //scalastyle:off
+ val JsonString = """
+{
+ "title": "<<TITLE>>",
+ "authors": [
+ {"name": "Brewster Kahle"},
+ {"name": "J Doe"}
+ ],
+ "journal": {
+ "name": "Dummy Example File. Journal of Fake News. pp. 1-2. ISSN 1234-5678",
+ "eissn": null,
+ "issn": null,
+ "issue": null,
+ "publisher": null,
+ "volume": null
+ },
+ "date": "2000",
+ "doi": null,
+ "citations": [
+ { "authors": [{"name": "A Seaperson"}],
+ "date": "2001",
+ "id": "b0",
+ "index": 0,
+ "issue": null,
+ "journal": "Letters in the Alphabet",
+ "publisher": null,
+ "title": "Everything is Wonderful",
+ "url": null,
+ "volume": "20"},
+ { "authors": [],
+ "date": "2011-03-28",
+ "id": "b1",
+ "index": 1,
+ "issue": null,
+ "journal": "The Dictionary",
+ "publisher": null,
+ "title": "All about Facts",
+ "url": null,
+ "volume": "14"}
+ ],
+ "abstract": "Everything you ever wanted to know about nothing",
+ "body": "Introduction \nEverything starts somewhere, as somebody [1] once said. \n\n In Depth \n Meat \nYou know, for kids. \n Potatos \nQED.",
+ "acknowledgement": null,
+ "annex": null
+}
+"""
+ // scalastyle:on
+ val JsonStringWithTitle = JsonString.replace("<<TITLE>>", "Dummy Example File")
+ val JsonStringWithoutTitle = JsonString.replace("title", "nottitle")
+ val MalformedJsonString = JsonString.replace("}", "")
+
+ // scalastyle:off
+ val CrossrefString =
+"""
+{ "_id" : { "$oid" : "5a553d5988a035a45bf50ed3" },
+ "indexed" : { "date-parts" : [ [ 2017, 10, 23 ] ],
+ "date-time" : "2017-10-23T17:19:16Z",
+ "timestamp" : { "$numberLong" : "1508779156477" } },
+ "reference-count" : 0,
+ "publisher" : "Elsevier BV",
+ "issue" : "3",
+ "license" : [ { "URL" : "http://www.elsevier.com/tdm/userlicense/1.0/",
+ "start" : { "date-parts" : [ [ 1996, 1, 1 ] ],
+ "date-time" : "1996-01-01T00:00:00Z",
+ "timestamp" : { "$numberLong" : "820454400000" } },
+ "delay-in-days" : 0, "content-version" : "tdm" }],
+ "content-domain" : { "domain" : [], "crossmark-restriction" : false },
+ "published-print" : { "date-parts" : [ [ 1996 ] ] },
+ "DOI" : "<<DOI>>",
+ "type" : "journal-article",
+ "created" : { "date-parts" : [ [ 2002, 7, 25 ] ],
+ "date-time" : "2002-07-25T15:09:41Z",
+ "timestamp" : { "$numberLong" : "1027609781000" } },
+ "page" : "186-187",
+ "source" : "Crossref",
+ "is-referenced-by-count" : 0,
+ "title" : [ "<<TITLE>>" ],
+ "prefix" : "10.1016",
+ "volume" : "9",
+ "author" : [ { "given" : "W", "family" : "Gaier", "affiliation" : [] } ],
+ "member" : "78",
+ "container-title" : [ "Journal de Pédiatrie et de Puériculture" ],
+ "link" : [ { "URL" : "http://api.elsevier.com/content/article/PII:0987-7983(96)87729-2?httpAccept=text/xml",
+ "content-type" : "text/xml",
+ "content-version" : "vor",
+ "intended-application" : "text-mining" },
+ { "URL" :
+ "http://api.elsevier.com/content/article/PII:0987-7983(96)87729-2?httpAccept=text/plain",
+ "content-type" : "text/plain",
+ "content-version" : "vor",
+ "intended-application" : "text-mining" } ],
+ "deposited" : { "date-parts" : [ [ 2015, 9, 3 ] ],
+ "date-time" : "2015-09-03T10:03:43Z",
+ "timestamp" : { "$numberLong" : "1441274623000" } },
+ "score" : 1,
+ "issued" : { "date-parts" : [ [ 1996 ] ] },
+ "references-count" : 0,
+ "alternative-id" : [ "0987-7983(96)87729-2" ],
+ "URL" : "http://dx.doi.org/10.1016/0987-7983(96)87729-2",
+ "ISSN" : [ "0987-7983" ],
+ "issn-type" : [ { "value" : "0987-7983", "type" : "print" } ],
+ "subject" : [ "Pediatrics, Perinatology, and Child Health" ]
+}
+"""
+ // scalastyle:on
+ val TooLongOfTitle = "X" * Scorable.MaxTitleLength + "Y" // arbitrary long string
+ val TooShortOfTitle = "X" * (ScorableFeatures.MinSlugLength - 1)
+ val CrossrefStringWithTitle = CrossrefString.replace("<<TITLE>>", "SomeTitle")
+ val CrossrefStringWithoutTitle = CrossrefString.replace("title", "nottitle")
+ val MalformedCrossrefString = CrossrefString.replace("}", "")
+ val CrossrefStrings = List(
+ CrossrefString.replace("<<TITLE>>", "Title 2: TNG").replace("<<DOI>>", "DOI-0"),
+ CrossrefString.replace("<<TITLE>>", "Title 1: TNG 2A").replace("<<DOI>>", "DOI-0.5"),
+ CrossrefString.replace("<<TITLE>>", "Title 1: TNG 3").replace("<<DOI>>", "DOI-0.75"),
+ CrossrefString.replace("<<TITLE>>", "Title 2: Rebooted").replace("<<DOI>>", "DOI-1"),
+ CrossrefString.replace("<<TITLE>>", TooLongOfTitle).replace("<<DOI>>", "DOI-1"),
+ CrossrefString.replace("<<TITLE>>", TooShortOfTitle).replace("<<DOI>>", "DOI-1"))
+
+ // Pipeline tests
+ val output = "/tmp/testOutput"
+ val input = "/tmp/testInput"
+ val (testTable, testHost) = ("test-table", "dummy-host:2181")
+
+ val Sha1Strings : List[String] = List(
+ "sha1:K2DKSSVTXWPRMFDTWSTCQW3RVWRIOV3Q",
+ "sha1:C3YNNEGH5WAG5ZAAXWAEBNXJWT6CZ3WU",
+ "sha1:SDKUVHC3YNNEGH5WAG5ZAAXWAEBNX4WT",
+ "sha1:35985C3YNNEGH5WAG5ZAAXWAEBNXJW56",
+ "sha1:93187A85273589347598473894839443",
+ "sha1:024937534094897039547e9824382943",
+ "sha1:93229759932857982837892347893892",
+ "sha1:83229759932857982837892347893892")
+
+ val JsonStrings : List[String] = List(
+ JsonString.replace("<<TITLE>>", "Title 1: The Original"),
+ JsonString.replace("<<TITLE>>", "Title 2: TNG"),
+ JsonString.replace("<<TITLE>>", "Title 3: The Sequel"),
+ // This will have bad status.
+ JsonString.replace("<<TITLE>>", "Title 1: The Original"),
+ MalformedJsonString,
+ // This will have bad status.
+ JsonString.replace("<<TITLE>>", "Title 2: Not TNG"),
+ // These are in both sources but have bad titles
+ JsonString.replace("<<TITLE>>", TooLongOfTitle),
+ JsonString.replace("<<TITLE>>", TooShortOfTitle)
+ )
+
+ // bnewbold: status codes aren't strings, they are uint64
+ val Ok : Long = 200
+ val Bad : Long = 400
+ val StatusCodes = List(Ok, Ok, Ok, Bad, Ok, Bad, Ok, Ok)
+
+ val SampleDataHead : List[Tuple] = (Sha1Strings, JsonStrings, StatusCodes)
+ .zipped
+ .toList
+ .map { case (sha, json, status) => List(Bytes.toBytes(sha), Bytes.toBytes(json), Bytes.toBytes(status)) }
+ .map { l => new Tuple(l.map(s => {new ImmutableBytesWritable(s)}):_*) }
+
+ // scalastyle:off null
+ // Add example of lines without GROBID data
+ // scalastyle:off null
+ val SampleData = SampleDataHead :+ new Tuple(
+ new ImmutableBytesWritable(Bytes.toBytes("sha1:35985C3YNNEGH5WAG5ZAA88888888888")), null, null)
+ // scalastyle:on null
+
+ JobTest("sandcrawler.ScoreJob")
+ .arg("test", "")
+ .arg("app.conf.path", "app.conf")
+ .arg("output", output)
+ .arg("hbase-table", testTable)
+ .arg("zookeeper-hosts", testHost)
+ .arg("crossref-input", input)
+ .arg("debug", "true")
+ .source[Tuple](GrobidScorable.getHBaseSource(testTable, testHost), SampleData)
+ .source(TextLine(input), List(
+ 0 -> CrossrefStrings(0),
+ 1 -> CrossrefStrings(1),
+ 2 -> CrossrefStrings(2),
+ 3 -> CrossrefStrings(3),
+ 4 -> CrossrefStrings(4),
+ 4 -> CrossrefStrings(5)))
+ .sink[(String, ReduceFeatures)](TypedTsv[(String, ReduceFeatures)](output + ".trapped")) { _ => () }
+ .sink[(String, Int, String, String)](TypedTsv[(String, Int, String, String)](output)) {
+ // Grobid titles and slugs (in parentheses):
+ // Title 1 (title1)
+ // Title 2: TNG (title2tng)
+ // Title 3: The Sequel (title3thesequel)
+ // <too long of a title>
+ // <too short of a title>
+ // crossref titles and slugs (in parentheses):
+ // Title 2: TNG (title2tng)
+ // Title 1: TNG 2A (title1tng2a)
+ // Title 1: TNG 3 (title1tng3)
+ // Title 2: Rebooted (title2rebooted)
+ // <too long of a title>
+ // <too short of a title>
+ // XXX: Join should have 3 "title1" slugs and 1 "title2tng" slug
+ outputBuffer =>
+ "The pipeline" should "return a 1-element list" in {
+ outputBuffer should have length 1
+ }
+
+ it should "has right # of entries with each slug" in {
+ val slugs = outputBuffer.map(_._1)
+ val countMap : Map[String, Int] = slugs.groupBy(identity).mapValues(_.size)
+ // XXX: countMap("title1") shouldBe 3
+ countMap("title2tng") shouldBe 1
+ }
+
+ def bundle(slug : String, grobidIndex : Int, crossrefIndex : Int) : (String, Int, String, String) = {
+ val mfg : Option[MapFeatures] = GrobidScorable.jsonToMapFeatures(
+ Sha1Strings(grobidIndex),
+ JsonStrings(grobidIndex))
+ val mfc : Option[MapFeatures] = CrossrefScorable.jsonToMapFeatures(CrossrefStrings(crossrefIndex))
+ if (mfg.isEmpty || mfc.isEmpty) {
+ fail()
+ } else {
+ val score = Scorable.computeSimilarity(
+ ReduceFeatures(mfg.get.json),
+ ReduceFeatures(mfc.get.json))
+ (slug, score, mfg.get.json, mfc.get.json)
+ }
+ }
+
+ it should "have right output values" in {
+ //outputBuffer.exists(_ == bundle("title1", 0, 0))
+ //outputBuffer.exists(_ == bundle("title1", 0, 2))
+ //outputBuffer.exists(_ == bundle("title1", 0, 1))
+ outputBuffer.exists(_ == bundle("title2tng", 1, 3))
+ }
+ }
+ .run
+ .finish
+}
diff --git a/scalding/src/test/scala/sandcrawler/StringUtilitiesTest.scala b/scalding/src/test/scala/sandcrawler/StringUtilitiesTest.scala
new file mode 100644
index 0000000..410819b
--- /dev/null
+++ b/scalding/src/test/scala/sandcrawler/StringUtilitiesTest.scala
@@ -0,0 +1,85 @@
+package sandcrawler
+
+import org.scalatest._
+
+class StringUtilitiesTest extends FlatSpec with Matchers {
+ "removeAccents()" should "handle the empty string" in {
+ StringUtilities.removeAccents("") shouldBe ""
+ }
+
+ it should "not change a string with unaccented characters" in {
+ StringUtilities.removeAccents("abc123") shouldBe "abc123"
+ }
+
+ it should "remove accents from Ls" in {
+ StringUtilities.removeAccents("E\u0141\u0142en") shouldBe "ELlen"
+ }
+
+ it should "remove accents from Es without changing case" in {
+ val result = StringUtilities.removeAccents("\u00e9")
+ result should have length 1
+ result shouldBe "e"
+ }
+
+ it should "convert the ø in Soren" in {
+ StringUtilities.removeAccents("Søren") shouldBe "Soren"
+ StringUtilities.removeAccents("SØREN") shouldBe "SOREN"
+ }
+
+ "removePunctuation" should "work on the empty string" in {
+ StringUtilities.removePunctuation("") shouldBe ""
+ }
+
+ it should "work on non-empty text strings" in {
+ StringUtilities.removePunctuation("Hello, world!") shouldBe "Hello world"
+ StringUtilities.removePunctuation(":-)") shouldBe ""
+ StringUtilities.removePunctuation("<<---a---b--->") shouldBe "ab"
+ }
+
+ // Tests adapted from https://oldfashionedsoftware.com/2009/11/19/string-distance-and-refactoring-in-scala/
+ "stringDistance" should "work on empty strings" in {
+ StringUtilities.stringDistance("", "") shouldBe 0
+ StringUtilities.stringDistance("a", "") shouldBe 1
+ StringUtilities.stringDistance("", "a") shouldBe 1
+ StringUtilities.stringDistance("abc", "") shouldBe 3
+ StringUtilities.stringDistance("", "abc") shouldBe 3
+ }
+
+ it should "work on equal strings" in {
+ StringUtilities.stringDistance("", "") shouldBe 0
+ StringUtilities.stringDistance("a", "a") shouldBe 0
+ StringUtilities.stringDistance("abc", "abc") shouldBe 0
+ }
+
+ it should "work where only inserts are needed" in {
+ StringUtilities.stringDistance("", "a") shouldBe 1
+ StringUtilities.stringDistance("a", "ab") shouldBe 1
+ StringUtilities.stringDistance("b", "ab") shouldBe 1
+ StringUtilities.stringDistance("ac", "abc") shouldBe 1
+ StringUtilities.stringDistance("abcdefg", "xabxcdxxefxgx") shouldBe 6
+ }
+
+ it should "work where only deletes are needed" in {
+ StringUtilities.stringDistance( "a", "") shouldBe 1
+ StringUtilities.stringDistance( "ab", "a") shouldBe 1
+ StringUtilities.stringDistance( "ab", "b") shouldBe 1
+ StringUtilities.stringDistance("abc", "ac") shouldBe 1
+ StringUtilities.stringDistance("xabxcdxxefxgx", "abcdefg") shouldBe 6
+ }
+
+ it should "work where only substitutions are needed" in {
+ StringUtilities.stringDistance( "a", "b") shouldBe 1
+ StringUtilities.stringDistance( "ab", "ac") shouldBe 1
+ StringUtilities.stringDistance( "ac", "bc") shouldBe 1
+ StringUtilities.stringDistance("abc", "axc") shouldBe 1
+ StringUtilities.stringDistance("xabxcdxxefxgx", "1ab2cd34ef5g6") shouldBe 6
+ }
+
+ it should "work where many operations are needed" in {
+ StringUtilities.stringDistance("example", "samples") shouldBe 3
+ StringUtilities.stringDistance("sturgeon", "urgently") shouldBe 6
+ StringUtilities.stringDistance("levenshtein", "frankenstein") shouldBe 6
+ StringUtilities.stringDistance("distance", "difference") shouldBe 5
+ StringUtilities.stringDistance("java was neat", "scala is great") shouldBe 7
+ }
+}