diff options
Diffstat (limited to 'scalding')
43 files changed, 3437 insertions, 56 deletions
diff --git a/scalding/README.md b/scalding/README.md index 45b62d0..b09e0e8 100644 --- a/scalding/README.md +++ b/scalding/README.md @@ -1,12 +1,13 @@ This directory contains Hadoop map/reduce jobs written in Scala (compiled to -the JVM) using the Scalding framework. +the JVM) using the Scalding framework. Scalding builds on the Java Cascading +library, which itself builds on the Java Hadoop libraries. See the other markdown files in this directory for more background and tips. ## Dependencies -Locally, you need to have the JVM (eg, OpenJDK 1.8), `sbt` build tool, and -might need (exactly) Scala version 2.11.8. +To develop locally, you need to have the JVM (eg, OpenJDK 1.8), `sbt` build +tool, and might need (exactly) Scala version 2.11.8. On a debian/ubuntu machine: @@ -15,24 +16,32 @@ On a debian/ubuntu machine: sudo apt-get update sudo apt install scala sbt +It's also helpful to have a local copy of the `hadoop` binary for running +benchmarks. The `fetch_hadoop.sh` script in the top level directory will fetch +an appropriate version. + ## Building and Running -Run tests: +You can run `sbt` commands individually: + # run all test sbt test -Build a jar and upload to a cluster machine (from which to run in production): - + # build a jar (also runs tests) sbt assembly - scp target/scala-2.11/sandcrawler-assembly-0.2.0-SNAPSHOT.jar devbox: -Run on cluster: +Or you can start a session and run commands within that, which is *much* +faster: + + sbt -mem 2048 + + sbt> test + sbt> assembly + sbt> testOnly sandcrawler.SomeTestClassName - devbox$ touch thing.conf - devbox$ hadoop jar sandcrawler-assembly-0.2.0-SNAPSHOT.jar \ - com.twitter.scalding.Tool sandcrawler.HBaseRowCountJob --hdfs \ - --app.conf.path thing.conf \ - --output hdfs:///user/bnewbold/spyglass_out_test +On the cluster, you usually use the `please` script to kick off jobs. Be sure +to build the jars first, or pass `--rebuild` to do it automatically. You need +`hadoop` on your path for this. ## Troubleshooting @@ -42,3 +51,4 @@ If your `sbt` task fails with this error: try restarting `sbt` with more memory (e.g., `sbt -mem 2048`). +See `scalding-debugging.md` or maybe `../notes/` for more. diff --git a/scalding/build.sbt b/scalding/build.sbt index 980418c..01f55ca 100644 --- a/scalding/build.sbt +++ b/scalding/build.sbt @@ -20,6 +20,13 @@ lazy val root = (project in file(".")). scalaSourceFiles.filterNot(_.getAbsolutePath.contains(dirNameToExclude)) }, + (scalastyleSources in Test) := { + // all .scala files in "src/test/scala" + val scalaSourceFiles = ((scalaSource in Test).value ** "*.scala").get + val dirNameToExclude = "/example/" + scalaSourceFiles.filterNot(_.getAbsolutePath.contains(dirNameToExclude)) + }, + name := "sandcrawler", resolvers += "conjars.org" at "http://conjars.org/repo", @@ -35,7 +42,7 @@ lazy val root = (project in file(".")). libraryDependencies += "org.apache.hadoop" % "hadoop-client" % hadoopVersion, libraryDependencies += "org.apache.hadoop" % "hadoop-mapreduce-client-jobclient" % hadoopVersion classifier "tests", libraryDependencies += "org.apache.hbase" % "hbase-common" % hbaseVersion, - libraryDependencies += "parallelai" % "parallelai.spyglass" % "2.11_0.17.2_cdh5.3.1", + libraryDependencies += "parallelai" % "parallelai.spyglass" % "2.11_0.17.2_cdh5.3.1-p1", // cargo-culted from twitter/scalding's build.sbt // hint via https://stackoverflow.com/questions/23280494/sbt-assembly-error-deduplicate-different-file-contents-found-in-the-following#23280952 @@ -55,4 +62,5 @@ lazy val root = (project in file(".")). case x => (assemblyMergeStrategy in assembly).value(x) }, + testOptions in Test += Tests.Argument("-oF") ) diff --git a/scalding/scalastyle-config.xml b/scalding/scalastyle-config.xml index 86d8fca..47d0feb 100644 --- a/scalding/scalastyle-config.xml +++ b/scalding/scalastyle-config.xml @@ -35,7 +35,7 @@ <check level="warning" class="org.scalastyle.scalariform.SpacesAfterPlusChecker" enabled="true"></check> <check level="warning" class="org.scalastyle.file.WhitespaceEndOfLineChecker" enabled="true"></check> <check level="warning" class="org.scalastyle.scalariform.SpacesBeforePlusChecker" enabled="true"></check> - <check level="warning" class="org.scalastyle.file.FileLineLengthChecker" enabled="true"> + <check level="warning" class="org.scalastyle.file.FileLineLengthChecker" enabled="false"> <parameters> <parameter name="maxLineLength"><![CDATA[160]]></parameter> <parameter name="tabSize"><![CDATA[4]]></parameter> diff --git a/scalding/scalding-debugging.md b/scalding/scalding-debugging.md index bd9dd36..5a54742 100644 --- a/scalding/scalding-debugging.md +++ b/scalding/scalding-debugging.md @@ -83,3 +83,13 @@ Values of type `List[Fields]` are not printed in the expected way: scala> allFields.length res0: Int = 2 + +## SpyGlass Column Selection + +Two equivalent ways to specify `columns`/`column_families`: + + List("f", "file"), + List(new Fields("c"), new Fields("size", "mimetype")), + + List("f", "file", "file") + List(new Fields("c"), new Fields("size"), new Fields("mimetype")), diff --git a/scalding/src/main/resources/slug-denylist.txt b/scalding/src/main/resources/slug-denylist.txt new file mode 100644 index 0000000..926dbd5 --- /dev/null +++ b/scalding/src/main/resources/slug-denylist.txt @@ -0,0 +1,554 @@ +abbreviations +abbreviationsandacronyms +aboutauthors +abouttheauthor +abouttheauthors +aboutthecover +abouttheeditors +abreviations +abstract +abstractnotsubmittedforonlinepublication +abstracts +abstractsofaapaposterandpodiumpresentations +abstractsofcommunications +abstractsofthesesfromthescandinaviancountries +abstractwithdrawn +acknowledgement +acknowledgements +acknowledgementsvii +acknowledgementtoreferees +acknowledgementtoreviewers +acknowledgment +acknowledgmentofreferees +acknowledgments +addendum +additionalresources +address +advertisersindex +affect +affiliation +afterword +agenda +agradecimentos +agradecimientos +aimsandscope +analysis +annexa +announcement +announcements +annualacknowledgementofmanuscriptreviewers +anotefromtheeditor +appendices +appendix +appendix1 +appendixa +appendixb +appointmentsandstaffchanges +approximation +apresentacao +article +articles +articlesofsignificantinterestselectedfromthisissuebytheeditors +associationnews +ataglance +atribute +attention +authorguidelines +authorindex +authorindexforvolume81 +authorreply +authors +authorsreply +authorsresponse +avantpropos +award +awardsappointmentsannouncements +backcover +background +backmatter +berichtigung +besprechungen +bibliografia +bibliographie +bibliography +bigdata +blankpage +blood +boardoftrustees +booknotes +booknotices +bookofabstracts +bookreview +bookreviews +bookreviewsandnotices +bookreviewssection +booksreceived +buchbesprechung +buchbesprechungen +bulletin +calendar +calendarofevents +calendarofmeetings +callforarticles +callforpapers +casereport +casereports +casestudy +chairmansopeningremarks +changes +chaos +chapter1 +chapter10 +chapter1introduction +chapter2 +chapter7 +chapteri +chapterone +chapteroneintroduction +chaptertwo +chapterx +citation +classes +classified +classifieds +closingremarks +collaborateurs +comment +commentaries +commentary +commentaryon +commenton +comments +commentto +committee +communication +communications +communicationstotheeditor +communiquedepresse +community +components +comptesrendus +computerscience +concludingremarks +conclusion +conclusions +conferencereport +congratulations +congresscalendar +conservation +content +contents +context +continuingeducation +continuingmedicaleducation +contributors +copyright +copyrightform +copyrightnotice +correction +corrections +correspondence +corrigenda +corrigendum +councilminutes +cover +coverimage +currentresearch +curriculumvitae +danksagung +dearreaders +decisionmaking +dedication +dedicatoria +definition +description +discussion +diskussion +distribution +documents +ear +economics +editorial +editorialadvisoryboard +editorialannouncement +editorialboard +editorialcomment +editorialcomments +editorialconsultants +editoriale +editorialeditorial +editorialforeword +editorialinformation +editorialintroduction +editorialintroductions +editorialnote +editorialnotes +editorialpreface +editorials +editorialsoftwaresurveysection +editorialstaff +editorialstatement +editorinchief +editors +editorschoice +editorscomment +editorscomments +editorscorner +editorscorrespondence +editorsforeword +editorsintroduction +editorsletter +editorsnote +editorsnotes +editorspage +editorspicks +editorspreface +education +einfuhrung +einleitung +electrophoresis +employment +endnotes +entrevista +entscheidungsverzeichnis +epilogue +equipment +errata +erratum +essay +essays +executivesummary +exercises +expediente +extendedabstracts +feature +features +fichatecnica +figure3 +finalexam +finalreport +focus +foreward +foreword +forthcomingarticles +forthcomingevents +fortherecord +forum +frequentlyaskedquestions +fromtheeditor +fromtheeditorinchief +fromtheeditors +fromtheeditorsdesk +fromthepresident +frontmatter +furtherreadings +genealogy +generaldiscussion +generalinformation +generalintroduction +germany +gettingstarted +glosario +glossary +glossaryofterms +guesteditorial +guesteditorsforeword +guesteditorsintroduction +guideforauthors +guidelinesforcontributors +health +heartfailure +highlights +highlightsfromthisissue +highlightsofthisissue +history +home +homework +hypothesis +iii +imageofthemonth +importantnotice +impressum +inbrief +index +indexofauthors +indexofauthorsandtitles +indice +indicegeneral +informationforauthors +informationtoauthors +inhalt +inhaltsverzeichnis +inleiding +inmemoriam +inreply +inresponse +insidethisissue +institutenews +instructionsforauthors +instructionstoauthors +interview +inthestudy +inthisissue +introducao +introduccion +introduction +introductionandoverview +introductiongenerale +introductiontotheissue +introductiontothespecialissue +introductorycomments +introductoryremarks +introduzione +inventions +invitedcommentary +issuesandevents +jobdescription +journalclub +journalscan +keywords +kurzkommentiert +languageteaching +lecture +letter +letterfromtheeditor +letterfromtheeditorinchief +letterfromtheeditors +letterfromthepresident +letters +letterstotheeditor +letterstotheeditors +lettertotheeditor +lettertotheeditors +liminaire +linearalgebra +linearregression +links +listedestableaux +listofabbreviations +listofcontributors +listoffigures +listofparticipants +listofpublications +listofreferees +listofreviewers +listoftables +literacy +literatur +literature +literaturecited +literaturereview +literaturrundschau +literaturverzeichnis +litteraturverzeichniss +livresrecus +lucina +lungcancer +magazin +maintenance +materials +materialsafetydatasheet +materialsandmethods +medicinalchemistry +meetingabstracts +meetingreport +meetings +meetingsandconferences +meetingsofinterest +membershipapplication +memoranda +memorandum +messagefromgeneralcochairs +messagefromthechairs +messagefromtheeditor +messagefromtheeditorinchief +messagefromthepresident +messagefromtheprogramchairs +messagefromtheprogramcochairs +metaanalysis +miscellanea +miscellaneous +miscellany +missionstatement +motivation +mrsnews +name +newbooks +newlyelectedmembersofthecollege +newproducts +news +newsandnotes +newsandreviews +newsandviews +newsbriefs +newsinbrief +newsnotes +newsviews +noii +note +notefromtheeditor +notes +notesandcomments +notesandnews +notesdelecture +notesforcontributors +notesoncontributors +notice +noticeboard +notitle +notitleavailable +obituaries +obituary +online +openaccess +openingaddress +openingremarks +oralabstracts +oralpresentations +organizingcommittee +originalarticle +originalarticles +other +outline +overview +panorama +papers +paperstoappearinforthcomingissues +partone +personalandmiscellaneous +perspective +perspectives +philosophy +pictureofthemonth +place +pointofview +positionsavailable +poster +posterpresentations +postscript +preface +prefaceandacknowledgements +prefacetothesecondedition +preliminarymaterial +presentacio +presentacion +presentation +presidentialaddress +presidentsmessage +presidentsreport +pressrelease +print +printing +proceedings +proceedingsofthenationalacademyofsciences +profile +programcommittee +projectmanagement +prologue +publication +publichealth +publishersnote +question +questionsandanswers +radiology +readersforum +recensiones +recensions +recentpublications +redaktorensforord +referate +references +referenciasbibliograficas +regression +rehabilitation +rejoinder +remerciements +reply +replybyauthors +researchresearchers +resenas +resources +response +responsetothelettertotheeditor +results +resume +resumen +resumes +resumo +retraction +review +reviewarticle +revieweracknowledgement +revieweracknowledgement2013 +reviewers +reviewessay +reviews +reviewsanddescriptionsoftablesandbooks +reviewsofbooks +rezension +rezensionen +safety +section +security +selectedbibliography +shortcommunication +shorternotices +shortnotices +socialengineering +sociology +sommaire +sommario +specialreport +specialsection +specifications +spistresci +subjectindex +subscriptions +suggestedreadings +sumario +summaries +summariesofkeyjournalarticles +summary +summaryofproceedings +summer +sun +supplementarymaterial +symposium +symptom +synthese +tabledesmatieres +tableofcontents +tableofcontentsandprologue +technicalreport +theauthors +theauthorsreply +thebasics +theeditorsdesk +thefirstauthorreplies +thelancet +theoreticalbackground +thetimes +theworldbank +theyearinreview +thismonthin +thismonthinthejournal +timemanagement +titeleiinhaltsverzeichnis +title +titlepage +titlepagei +tocorrespondents +totheeditor +unitedkingdom +unitednations +unitedstates +upcomingevents +vorwort +website +welcome +whatshappening +whatsnew +workscited +yourquestionsanswered +zudiesemheft +zusammenfassung diff --git a/scalding/src/main/scala/sandcrawler/BibjsonScorable.scala b/scalding/src/main/scala/sandcrawler/BibjsonScorable.scala new file mode 100644 index 0000000..abf9220 --- /dev/null +++ b/scalding/src/main/scala/sandcrawler/BibjsonScorable.scala @@ -0,0 +1,50 @@ +package sandcrawler + +import scala.math +import scala.util.parsing.json.JSON +import scala.util.parsing.json.JSONObject + +import cascading.flow.FlowDef +import cascading.tuple.Fields +import com.twitter.scalding._ +import com.twitter.scalding.typed.TDsl._ + +class BibjsonScorable extends Scorable { + + def getSource(args : Args) : Source = { + TextLine(args("bibjson-input")) + } + + def getFeaturesPipe(args : Args)(implicit mode : Mode, flowDef : FlowDef) : TypedPipe[Option[MapFeatures]] = { + getSource(args).read + .toTypedPipe[String](new Fields("line")) + .map { BibjsonScorable.bibjsonToMapFeatures(_) } + } +} + +object BibjsonScorable { + def bibjsonToMapFeatures(json : String) : Option[MapFeatures] = { + Scorable.jsonToMap(json) match { + case None => None + case Some(map) => { + if (map contains "title") { + val title = Scorable.getString(map, "title") + val doi = Scorable.getString(map, "doi") + val sha1 = Scorable.getString(map, "sha") + // TODO: year, authors (if available) + if (title == null || title.isEmpty) { + None + } else { + val sf : ScorableFeatures = ScorableFeatures.create(title=title, doi=doi, sha1=sha1) + sf.toSlug match { + case None => None + case Some(slug) => Some(MapFeatures(slug, sf.toString)) + } + } + } else { + None + } + } + } + } +} diff --git a/scalding/src/main/scala/sandcrawler/CrossrefScorable.scala b/scalding/src/main/scala/sandcrawler/CrossrefScorable.scala new file mode 100644 index 0000000..bb6413f --- /dev/null +++ b/scalding/src/main/scala/sandcrawler/CrossrefScorable.scala @@ -0,0 +1,153 @@ +package sandcrawler + +import scala.math +import scala.util.parsing.json.JSON +import scala.util.parsing.json.JSONArray +import scala.util.parsing.json.JSONObject + +import cascading.flow.FlowDef +import cascading.tuple.Fields +import com.twitter.scalding._ +import com.twitter.scalding.typed.TDsl._ +import parallelai.spyglass.hbase.HBasePipeConversions + +class CrossrefScorable extends Scorable with HBasePipeConversions { + // TODO: Generalize args so there can be multiple Crossref pipes in one job. + def getSource(args : Args) : Source = { + TextLine(args("crossref-input")) + } + + def getFeaturesPipe(args : Args)(implicit mode : Mode, flowDef : FlowDef) : TypedPipe[Option[MapFeatures]] = { + getSource(args).read + .toTypedPipe[String](new Fields("line")) + .filter { CrossrefScorable.keepRecord(_) } + .map { CrossrefScorable.jsonToMapFeatures(_) } + } +} + +object CrossrefScorable { + + val ContentTypeWhitelist: Set[String] = Set( + "book", + "book-chapter", + "dataset", + "dissertation", + "journal-article", + "letter", + "monograph", + "posted-content", + "pre-print", + "proceedings-article", + "report", + "working-paper") + + def keepRecord(json : String) : Boolean = { + Scorable.jsonToMap(json) match { + case None => false + case Some(map) => { + mapToTitle(map) match { + case None => false + case Some(title) => title.length <= Scorable.MaxTitleLength + } + } + } + } + + // Returns None if title is null, empty, or too long. + def mapToTitle(map : Map[String, Any]) : Option[String] = { + def getTitle : Option[String] = { + if (map contains "title") { + val titles = map("title").asInstanceOf[List[String]] + if (titles.isEmpty || titles == null) None else Some(titles(0)) + } else { + None + } + } + + def getSubtitle : Option[String] = { + if (map contains "subtitle") { + val subtitles = map("subtitle").asInstanceOf[List[String]] + if (subtitles.isEmpty || subtitles == null) { + None + } else { + val sub = subtitles(0) + if (sub == null || sub.isEmpty) { + None + } else { + Some(sub) + } + } + } else { + None + } + } + + getTitle match { + case None => None + case Some(baseTitle) => { + if (baseTitle == null) { + None + } else { + getSubtitle match { + case None => Some(baseTitle) + case Some(baseSubtitle) => Some(baseTitle.concat(":".concat(baseSubtitle))) + } + } + } + } + } + + def mapToAuthorList(map : Map[String, Any]) : List[String] = { + if (map contains "author") { + val objArray = map("author").asInstanceOf[List[Any]].map(e => e.asInstanceOf[Map[String,Any]]) + // TODO(bnewbold): combine given and family names? + objArray + .filter(e => e contains "family") + .map(e => e.get("family").get.asInstanceOf[String]) + } else { + List() + } + } + + def mapToYear(map : Map[String, Any]) : Option[Int] = { + map.get("created") match { + case None => None + case Some(created) => { + Some(created.asInstanceOf[Map[String,Any]] + .get("date-parts") + .get + .asInstanceOf[List[Any]](0) + .asInstanceOf[List[Any]](0) + .asInstanceOf[Double] + .toInt) + } + } + } + + def jsonToMapFeatures(json : String) : Option[MapFeatures] = { + def makeMapFeatures(title : String, doi : String, authors : List[String], year : Int, contentType : String) : Option[MapFeatures] = { + if (doi.isEmpty || doi == null || authors.length == 0 || !(ContentTypeWhitelist contains contentType)) { + None + } else { + val sf : ScorableFeatures = ScorableFeatures.create(title=title, authors=authors, doi=doi.toLowerCase(), year=year) + sf.toSlug match { + case None => None + case Some(slug) => Some(MapFeatures(slug, sf.toString)) + } + } + } + Scorable.jsonToMap(json) match { + case None => None + case Some(map) => + mapToTitle(map) match { + case None => None + case Some(title) => makeMapFeatures( + title=title, + doi=Scorable.getString(map, "DOI"), + authors=mapToAuthorList(map), + year=mapToYear(map).getOrElse(0), + contentType=map.get("type").map(e => e.asInstanceOf[String]).getOrElse("MISSING-CONTENT-TYPE")) + } + } + } +} diff --git a/scalding/src/main/scala/sandcrawler/DumpFileMetaJob.scala b/scalding/src/main/scala/sandcrawler/DumpFileMetaJob.scala new file mode 100644 index 0000000..b3734f0 --- /dev/null +++ b/scalding/src/main/scala/sandcrawler/DumpFileMetaJob.scala @@ -0,0 +1,36 @@ +package sandcrawler + +import java.util.Properties + +import cascading.property.AppProps +import cascading.tuple.Fields +import com.twitter.scalding._ +import com.twitter.scalding.typed.TDsl._ +import org.apache.hadoop.hbase.io.ImmutableBytesWritable +import org.apache.hadoop.hbase.util.Bytes +import parallelai.spyglass.base.JobBase +import parallelai.spyglass.hbase.HBaseConstants.SourceMode +import parallelai.spyglass.hbase.HBasePipeConversions +import parallelai.spyglass.hbase.HBaseSource + +// Dumps all the info needed to insert a file entity in Fatcat. Useful for +// joining. +class DumpFileMetaJob(args: Args) extends JobBase(args) with HBasePipeConversions { + + val metaPipe : TypedPipe[(String, String, String, Long)] = HBaseBuilder.build(args("hbase-table"), + args("zookeeper-hosts"), + List("file:cdx", "file:mime", "file:size"), + SourceMode.SCAN_ALL) + .read + .toTypedPipe[(ImmutableBytesWritable,ImmutableBytesWritable,ImmutableBytesWritable,ImmutableBytesWritable)](new Fields("key", "cdx", "mime", "size")) + .filter { case (_, cdx, mime, size) => cdx != null && mime != null && size != null } + .map { case (key, cdx, mime, size) => + (Bytes.toString(key.copyBytes()), + Bytes.toString(cdx.copyBytes()), + Bytes.toString(mime.copyBytes()), + Bytes.toLong(size.copyBytes())) + }; + + metaPipe.write(TypedTsv[(String,String,String,Long)](args("output"))) + +} diff --git a/scalding/src/main/scala/sandcrawler/DumpGrobidMetaInsertableJob.scala b/scalding/src/main/scala/sandcrawler/DumpGrobidMetaInsertableJob.scala new file mode 100644 index 0000000..ee2b7c2 --- /dev/null +++ b/scalding/src/main/scala/sandcrawler/DumpGrobidMetaInsertableJob.scala @@ -0,0 +1,38 @@ +package sandcrawler + +import java.util.Properties + +import cascading.property.AppProps +import cascading.tuple.Fields +import com.twitter.scalding._ +import com.twitter.scalding.typed.TDsl._ +import org.apache.hadoop.hbase.io.ImmutableBytesWritable +import org.apache.hadoop.hbase.util.Bytes +import parallelai.spyglass.base.JobBase +import parallelai.spyglass.hbase.HBaseConstants.SourceMode +import parallelai.spyglass.hbase.HBasePipeConversions +import parallelai.spyglass.hbase.HBaseSource + +// Dumps the SHA1 key and grobid0:metadata columns, plus file metadata needed +// to insert into fatcat. Used, eg, as part of long-tail mellon pipeline. +class DumpGrobidMetaInsertableJob(args: Args) extends JobBase(args) with HBasePipeConversions { + + val metaPipe : TypedPipe[(String, String, String, Long, String)] = HBaseBuilder.build(args("hbase-table"), + args("zookeeper-hosts"), + List("file:cdx", "file:mime", "file:size", "grobid0:metadata"), + SourceMode.SCAN_ALL) + .read + .toTypedPipe[(ImmutableBytesWritable,ImmutableBytesWritable,ImmutableBytesWritable,ImmutableBytesWritable,ImmutableBytesWritable)](new Fields("key", "cdx", "mime", "size", "metadata")) + .filter { case (_, cdx, mime, size, metadata) => cdx != null && mime != null && size != null && metadata != null } + .map { case (key, cdx, mime, size, metadata) => + (Bytes.toString(key.copyBytes()), + Bytes.toString(cdx.copyBytes()), + Bytes.toString(mime.copyBytes()), + Bytes.toLong(size.copyBytes()), + Bytes.toString(metadata.copyBytes()) + ) + }; + + metaPipe.write(TypedTsv[(String,String,String,Long,String)](args("output"))) + +} diff --git a/scalding/src/main/scala/sandcrawler/DumpGrobidStatusCodeJob.scala b/scalding/src/main/scala/sandcrawler/DumpGrobidStatusCodeJob.scala new file mode 100644 index 0000000..42b3464 --- /dev/null +++ b/scalding/src/main/scala/sandcrawler/DumpGrobidStatusCodeJob.scala @@ -0,0 +1,34 @@ +package sandcrawler + +import java.util.Properties + +import cascading.property.AppProps +import cascading.tuple.Fields +import com.twitter.scalding._ +import com.twitter.scalding.typed.TDsl._ +import org.apache.hadoop.hbase.io.ImmutableBytesWritable +import org.apache.hadoop.hbase.util.Bytes +import parallelai.spyglass.base.JobBase +import parallelai.spyglass.hbase.HBaseConstants.SourceMode +import parallelai.spyglass.hbase.HBasePipeConversions +import parallelai.spyglass.hbase.HBaseSource + +// Dumps status code for each GROBID-processed file. Good for crawl/corpus +// analytics, if we consider GROBID status a rough "is this a paper" metric. +class DumpGrobidStatusCodeJob(args: Args) extends JobBase(args) with HBasePipeConversions { + + val metaPipe : TypedPipe[(String, Long)] = HBaseBuilder.build(args("hbase-table"), + args("zookeeper-hosts"), + List("grobid0:status_code"), + SourceMode.SCAN_ALL) + .read + .toTypedPipe[(ImmutableBytesWritable,ImmutableBytesWritable)](new Fields("key", "status_code")) + .filter { case (_, status_code) => status_code != null } + .map { case (key, status_code) => + (Bytes.toString(key.copyBytes()), + Bytes.toLong(status_code.copyBytes())) + }; + + metaPipe.write(TypedTsv[(String,Long)](args("output"))) + +} diff --git a/scalding/src/main/scala/sandcrawler/DumpGrobidXmlJob.scala b/scalding/src/main/scala/sandcrawler/DumpGrobidXmlJob.scala new file mode 100644 index 0000000..953610d --- /dev/null +++ b/scalding/src/main/scala/sandcrawler/DumpGrobidXmlJob.scala @@ -0,0 +1,41 @@ +package sandcrawler + +import java.util.Properties + +import cascading.property.AppProps +import cascading.tuple.Fields +import com.twitter.scalding._ +import com.twitter.scalding.typed.TDsl._ +import org.apache.hadoop.hbase.io.ImmutableBytesWritable +import org.apache.hadoop.hbase.util.Bytes +import parallelai.spyglass.base.JobBase +import parallelai.spyglass.hbase.HBaseConstants.SourceMode +import parallelai.spyglass.hbase.HBasePipeConversions +import parallelai.spyglass.hbase.HBaseSource +import scala.util.parsing.json.JSONObject + +// Dumps the SHA1 key and grobid0:tei_xml columns, as TSV/JSON (two TSV +// columns: one is key, second is JSON). Used for partner delivery/sharing +class DumpGrobidXmlJob(args: Args) extends JobBase(args) with HBasePipeConversions { + + val metaPipe : TypedPipe[(String, String)] = HBaseBuilder.build(args("hbase-table"), + args("zookeeper-hosts"), + List("file:cdx", "grobid0:tei_xml"), + SourceMode.SCAN_ALL) + .read + .toTypedPipe[(ImmutableBytesWritable,ImmutableBytesWritable,ImmutableBytesWritable)](new Fields("key", "cdx", "tei_xml")) + .filter { case (_, cdx, tei_xml) => cdx != null && tei_xml != null } + .map { case (key, cdx, tei_xml) => + (Bytes.toString(key.copyBytes()), + JSONObject( + Map( + "pdf_hash" -> Bytes.toString(key.copyBytes()), + "cdx_metadata" -> Bytes.toString(cdx.copyBytes()), + "tei_xml" -> Bytes.toString(tei_xml.copyBytes()) + )).toString + ) + }; + + metaPipe.write(TypedTsv[(String,String)](args("output"))) + +} diff --git a/scalding/src/main/scala/sandcrawler/DumpUnGrobidedJob.scala b/scalding/src/main/scala/sandcrawler/DumpUnGrobidedJob.scala new file mode 100644 index 0000000..7fd3ce0 --- /dev/null +++ b/scalding/src/main/scala/sandcrawler/DumpUnGrobidedJob.scala @@ -0,0 +1,67 @@ +package sandcrawler + +import java.util.Properties + +import cascading.property.AppProps +import cascading.tuple.Fields +import com.twitter.scalding._ +import com.twitter.scalding.typed.TDsl._ +import parallelai.spyglass.base.JobBase +import parallelai.spyglass.hbase.HBaseConstants.SourceMode +import parallelai.spyglass.hbase.HBasePipeConversions +import parallelai.spyglass.hbase.HBaseSource + +// Filters for HBase rows which have not had GROBID run on them, but do have +// full CDX metadata, and dumps to a TSV for later extraction by the +// "extraction-ungrobided" job. +// +// Does the same horrible join thing that DumpUnGrobidedJob does. +class DumpUnGrobidedJob(args: Args) extends JobBase(args) with HBasePipeConversions { + + val output = args("output") + + val allKeys : TypedPipe[(String,String,String,String)] = DumpUnGrobidedJob.getHBaseKeySource( + args("hbase-table"), + args("zookeeper-hosts")) + .read + .fromBytesWritable('key, 'c, 'mime, 'cdx) + .toTypedPipe[(String,String,String,String)]('key, 'c, 'mime, 'cdx) + + val existingKeys : TypedPipe[(String,Boolean)] = DumpUnGrobidedJob.getHBaseColSource( + args("hbase-table"), + args("zookeeper-hosts")) + .read + .fromBytesWritable('key) + .toTypedPipe[String]('key) + .map{ key => (key, true) } + + val missingKeys : TypedPipe[(String,String,String,String)] = allKeys + .groupBy(_._1) + .leftJoin(existingKeys.groupBy(_._1)) + .toTypedPipe + .collect { case (key, ((_, c, mime, cdx), None)) => (key, c, mime, cdx) } + + missingKeys + .write(TypedTsv[(String,String,String,String)](output)) + +} + +object DumpUnGrobidedJob { + + // eg, "wbgrp-journal-extract-0-qa",7 "mtrcs-zk1.us.archive.org:2181" + def getHBaseColSource(hbaseTable: String, zookeeperHosts: String) : HBaseSource = { + HBaseBuilder.build( + hbaseTable, + zookeeperHosts, + List("grobid0:status_code"), + SourceMode.SCAN_ALL) + } + + def getHBaseKeySource(hbaseTable: String, zookeeperHosts: String) : HBaseSource = { + HBaseBuilder.build( + hbaseTable, + zookeeperHosts, + List("f:c", "file:mime", "file:cdx"), + SourceMode.SCAN_ALL) + } +} diff --git a/scalding/src/main/scala/sandcrawler/FatcatScorable.scala b/scalding/src/main/scala/sandcrawler/FatcatScorable.scala new file mode 100644 index 0000000..2090e84 --- /dev/null +++ b/scalding/src/main/scala/sandcrawler/FatcatScorable.scala @@ -0,0 +1,146 @@ +package sandcrawler + +import scala.math +import scala.util.parsing.json.JSON +import scala.util.parsing.json.JSONArray +import scala.util.parsing.json.JSONObject + +import cascading.flow.FlowDef +import cascading.tuple.Fields +import com.twitter.scalding._ +import com.twitter.scalding.typed.TDsl._ +import parallelai.spyglass.hbase.HBasePipeConversions + + +class FatcatScorableRight extends Scorable { + + def getSource(args : Args) : Source = { + TextLine(args("fatcat-release-input-right")) + } + + def getFeaturesPipe(args : Args)(implicit mode : Mode, flowDef : FlowDef) : TypedPipe[Option[MapFeatures]] = { + getSource(args).read + .toTypedPipe[String](new Fields("line")) + .filter { FatcatScorable.keepRecord(_) } + .map { FatcatScorable.jsonToMapFeatures(_) } + } +} + +class FatcatScorable extends Scorable with HBasePipeConversions { + + def getSource(args : Args) : Source = { + TextLine(args("fatcat-release-input")) + } + + def getFeaturesPipe(args : Args)(implicit mode : Mode, flowDef : FlowDef) : TypedPipe[Option[MapFeatures]] = { + getSource(args).read + .toTypedPipe[String](new Fields("line")) + .filter { FatcatScorable.keepRecord(_) } + .map { FatcatScorable.jsonToMapFeatures(_) } + } +} + +object FatcatScorable { + + // Note; removed ReleaseType filtering + + def keepRecord(json : String) : Boolean = { + Scorable.jsonToMap(json) match { + case None => false + case Some(map) => { + mapToTitle(map) match { + case None => false + case Some(title) => title.length <= Scorable.MaxTitleLength + } + } + } + } + + // Returns None if title is null, empty, or too long. + def mapToTitle(map : Map[String, Any]) : Option[String] = { + def getTitle : Option[String] = { + if (map contains "title") { + val title = map("title").asInstanceOf[String] + if (title == null || title.isEmpty) None else Some(title) + } else { + None + } + } + + def getSubtitle : Option[String] = { + if (map contains "subtitle") { + val subtitle = map("subtitle").asInstanceOf[String] + if (subtitle == null || subtitle.isEmpty) { + None + } else { + Some(subtitle) + } + } else { + None + } + } + + getTitle match { + case None => None + case Some(baseTitle) => { + if (baseTitle == null) { + None + } else { + getSubtitle match { + case None => Some(baseTitle) + case Some(baseSubtitle) => Some(baseTitle.concat(":".concat(baseSubtitle))) + } + } + } + } + } + + def mapToAuthorList(map : Map[String, Any]) : List[String] = { + if (map contains "contribs") { + val objArray = map("contribs").asInstanceOf[List[Any]].map(e => e.asInstanceOf[Map[String,Any]]) + // TODO(bnewbold): better name stuff... contrib.surname, creator.surname, + // or raw_name split to last + objArray + .filter(e => e contains "raw_name") + .map(e => e.get("raw_name").get.asInstanceOf[String]) + } else { + List() + } + } + + def mapToYear(map : Map[String, Any]) : Option[Int] = { + map.get("release_year") match { + case None => None + case Some(year) => { + Some(year.asInstanceOf[Double].toInt) + } + } + } + + def jsonToMapFeatures(json : String) : Option[MapFeatures] = { + def makeMapFeatures(title : String, doi : String, fatcat_release: String, fatcat_work : String, authors : List[String], year : Int, contentType : String) : Option[MapFeatures] = { + // NOTE: not doing any filtering here! + val sf : ScorableFeatures = ScorableFeatures.create(title=title, authors=authors, doi=doi, fatcat_release=fatcat_release, fatcat_work=fatcat_work, year=year) + sf.toSlug match { + case None => None + case Some(slug) => Some(MapFeatures(slug, sf.toString)) + } + } + Scorable.jsonToMap(json) match { + case None => None + case Some(map) => + mapToTitle(map) match { + case None => None + case Some(title) => makeMapFeatures( + title=title, + // TODO: doi=Scorable.getString(map, "doi"), + doi=null, + fatcat_release=Scorable.getString(map, "ident"), + fatcat_work=Scorable.getString(map, "work_id"), + authors=mapToAuthorList(map), + year=mapToYear(map).getOrElse(0), + contentType=map.get("type").map(e => e.asInstanceOf[String]).getOrElse("MISSING-CONTENT-TYPE")) + } + } + } +} diff --git a/scalding/src/main/scala/sandcrawler/GrobidScorable.scala b/scalding/src/main/scala/sandcrawler/GrobidScorable.scala new file mode 100644 index 0000000..f4ed129 --- /dev/null +++ b/scalding/src/main/scala/sandcrawler/GrobidScorable.scala @@ -0,0 +1,83 @@ +package sandcrawler + +import scala.util.parsing.json.JSONObject + +import cascading.flow.FlowDef +import cascading.tuple.Fields +import com.twitter.scalding._ +import com.twitter.scalding.typed.TDsl._ +import org.apache.hadoop.hbase.io.ImmutableBytesWritable +import org.apache.hadoop.hbase.util.Bytes +import parallelai.spyglass.hbase.HBaseConstants.SourceMode +import parallelai.spyglass.hbase.HBasePipeConversions +import parallelai.spyglass.hbase.HBaseSource + +class GrobidScorable extends Scorable with HBasePipeConversions { + val StatusOK = 200 + + def getSource(args : Args) : Source = { + // TODO: Generalize args so there can be multiple grobid pipes in one job + GrobidScorable.getHBaseSource(args("hbase-table"), args("zookeeper-hosts")) + } + + def getFeaturesPipe(args : Args)(implicit mode : Mode, flowDef : FlowDef) : TypedPipe[Option[MapFeatures]] = { + getSource(args) + .read + // Can't just "fromBytesWritable" because we have multiple types + .toTypedPipe[(ImmutableBytesWritable,ImmutableBytesWritable,ImmutableBytesWritable)](new Fields("key", "metadata", "status_code")) + .filter { case (_, metadata, status_code) => metadata != null && status_code != null } + .map { case (key, metadata, status_code) => + (Bytes.toString(key.copyBytes()), Bytes.toString(metadata.copyBytes()), Bytes.toLong(status_code.copyBytes())) + } + .collect { case (key, json, StatusOK) => (key, json) } + .filter { case (key, json) => GrobidScorable.keepRecord(json) } + .map { entry : (String, String) => GrobidScorable.jsonToMapFeatures(entry._1, entry._2) } + } +} + +object GrobidScorable { + def keepRecord(json : String) : Boolean = { + Scorable.jsonToMap(json) match { + case None => false + case Some(map) => { + if (map contains "title") { + val title = Scorable.getString(map, "title") + title != null && title.length <= Scorable.MaxTitleLength + } else { + false + } + } + } + } + + def mapToAuthorList(map : Map[String, Any]) : List[String] = { + if (map contains "authors") { + val objArray = map("authors").asInstanceOf[List[Any]].map(e => e.asInstanceOf[Map[String,Any]]) + objArray + .filter(e => e contains "name") + .map(e => e.get("name").get.asInstanceOf[String]) + } else { + List() + } + } + + def getHBaseSource(table : String, host : String) : HBaseSource = { + HBaseBuilder.build(table, host, List("grobid0:metadata", "grobid0:status_code"), SourceMode.SCAN_ALL) + } + + def jsonToMapFeatures(key : String, json : String) : Option[MapFeatures] = { + Scorable.jsonToMap(json) match { + case None => None + case Some(map) => { + if (map contains "title") { + val authors: List[String] = mapToAuthorList(map) + val title = Scorable.getString(map, "title") + ScorableFeatures.create(title=title, authors=authors, sha1=key).toMapFeatures + } else { + None + } + } + } + } +} + diff --git a/scalding/src/main/scala/sandcrawler/GrobidScorableDumpJob.scala b/scalding/src/main/scala/sandcrawler/GrobidScorableDumpJob.scala new file mode 100644 index 0000000..3146a6c --- /dev/null +++ b/scalding/src/main/scala/sandcrawler/GrobidScorableDumpJob.scala @@ -0,0 +1,59 @@ + +package sandcrawler + +import cascading.flow.FlowDef +import cascading.pipe.Pipe +import cascading.tuple.Fields +import com.twitter.scalding._ +import com.twitter.scalding.typed.TDsl._ +import org.apache.hadoop.hbase.io.ImmutableBytesWritable +import org.apache.hadoop.hbase.util.Bytes +import parallelai.spyglass.base.JobBase +import parallelai.spyglass.hbase.HBaseConstants.SourceMode +import parallelai.spyglass.hbase.HBasePipeConversions +import parallelai.spyglass.hbase.HBaseSource + +class GrobidScorableDumpJob(args: Args) extends JobBase(args) { + + val grobidHbaseRows = Stat("hbase-rows-scanned", "hbase-grobid-dump") + val filteredGrobidRows = Stat("grobid-rows-filtered", "hbase-grobid-dump") + val parsedGrobidRows = Stat("grobid-rows-parsed", "hbase-grobid-dump") + val validGrobidRows = Stat("grobid-rows-valid-slug", "hbase-grobid-dump") + + val pipe = GrobidScorable.getHBaseSource(args("hbase-table"), args("zookeeper-hosts")) + .read + // Can't just "fromBytesWritable" because we have multiple types? + .toTypedPipe[(ImmutableBytesWritable,ImmutableBytesWritable,ImmutableBytesWritable)](new Fields("key", "metadata", "status_code")) + .filter { case (_, metadata, status_code) => + grobidHbaseRows.inc + metadata != null && status_code != null + } + .map { case (key, metadata, status_code) => + (Bytes.toString(key.copyBytes()), Bytes.toString(metadata.copyBytes()), Bytes.toLong(status_code.copyBytes())) + } + // TODO: Should I combine next two stages for efficiency? + .collect { case (key, json, 200) => + filteredGrobidRows.inc + (key, json) + } + .map { entry : (String, String) => + parsedGrobidRows.inc + GrobidScorable.jsonToMapFeatures(entry._1, entry._2) + } + .filterNot { entry => entry.isEmpty } + .map { entry => { + validGrobidRows.inc + entry.get + }} + .groupBy { case MapFeatures(slug, json) => slug } + .map { tuple => + val (slug : String, features : MapFeatures) = tuple + (slug, ReduceFeatures(features.json)) + } + + pipe + .map { case (slug, features) => + (slug, features.json) + } + .write(TypedTsv[(String, String)](args("output"))) +} diff --git a/scalding/src/main/scala/sandcrawler/GroupFatcatWorksJob.scala b/scalding/src/main/scala/sandcrawler/GroupFatcatWorksJob.scala new file mode 100644 index 0000000..46d2038 --- /dev/null +++ b/scalding/src/main/scala/sandcrawler/GroupFatcatWorksJob.scala @@ -0,0 +1,43 @@ +package sandcrawler + +import cascading.pipe.Pipe +import com.twitter.scalding.Args +import com.twitter.scalding.Stat +import com.twitter.scalding.TypedPipe +import com.twitter.scalding.TypedTsv +import parallelai.spyglass.base.JobBase + +class GroupFatcatWorksJob(args: Args) extends JobBase(args) { + + val fatcatRowCount = Stat("fatcat-rows-filtered", "sandcrawler") + val joinedRowCount = Stat("joined-rows", "sandcrawler") + + val fatcatScorable : Scorable = new FatcatScorable() + val fatcatPipe : TypedPipe[(String, ReduceFeatures)] = fatcatScorable + .getInputPipe(args) + .map { r => + fatcatRowCount.inc + r + } + + val joinedPipe = fatcatPipe + .addTrap(TypedTsv(args("output") + ".trapped")) + .join(fatcatPipe) + + // TypedTsv doesn't work over case classes. + joinedPipe + // filter out trivial self-matches (releases are identical) + .filter { case (slug, (fatcatFeaturesLeft, fatcatFeaturesRight)) => + Scorable.selfMatchable(fatcatFeaturesLeft, fatcatFeaturesRight) + } + .map { case (slug, (fatcatFeaturesLeft, fatcatFeaturesRight)) => + joinedRowCount.inc + new ReduceOutput( + slug, + Scorable.computeSimilarity(fatcatFeaturesLeft, fatcatFeaturesRight), + fatcatFeaturesLeft.json, + fatcatFeaturesRight.json) + } + .map { entry => (entry.slug, entry.score, entry.json1, entry.json2) } + .write(TypedTsv[(String, Int, String, String)](args("output"))) +} diff --git a/scalding/src/main/scala/sandcrawler/GroupFatcatWorksSubsetJob.scala b/scalding/src/main/scala/sandcrawler/GroupFatcatWorksSubsetJob.scala new file mode 100644 index 0000000..ea5e26b --- /dev/null +++ b/scalding/src/main/scala/sandcrawler/GroupFatcatWorksSubsetJob.scala @@ -0,0 +1,52 @@ +package sandcrawler + +import cascading.pipe.Pipe +import com.twitter.scalding.Args +import com.twitter.scalding.Stat +import com.twitter.scalding.TypedPipe +import com.twitter.scalding.TypedTsv +import parallelai.spyglass.base.JobBase + +class GroupFatcatWorksSubsetJob(args: Args) extends JobBase(args) { + + val fatcatLhsRowCount = Stat("fatcat-rows-filtered-left", "sandcrawler") + val fatcatRhsRowCount = Stat("fatcat-rows-filtered-right", "sandcrawler") + val joinedRowCount = Stat("joined-rows", "sandcrawler") + + val fatcatScorableLhs : Scorable = new FatcatScorable() + val fatcatPipeLhs : TypedPipe[(String, ReduceFeatures)] = fatcatScorableLhs + .getInputPipe(args) + .map { r => + fatcatLhsRowCount.inc + r + } + + val fatcatScorableRhs : Scorable = new FatcatScorableRight() + val fatcatPipeRhs : TypedPipe[(String, ReduceFeatures)] = fatcatScorableRhs + .getInputPipe(args) + .map { r => + fatcatRhsRowCount.inc + r + } + + val joinedPipe = fatcatPipeLhs + .addTrap(TypedTsv(args("output") + ".trapped")) + .join(fatcatPipeRhs) + + // TypedTsv doesn't work over case classes. + joinedPipe + // filter out trivial self-matches (releases are identical) + .filter { case (slug, (fatcatFeaturesLeft, fatcatFeaturesRight)) => + Scorable.selfMatchable(fatcatFeaturesLeft, fatcatFeaturesRight) + } + .map { case (slug, (fatcatFeaturesLeft, fatcatFeaturesRight)) => + joinedRowCount.inc + new ReduceOutput( + slug, + Scorable.computeSimilarity(fatcatFeaturesLeft, fatcatFeaturesRight), + fatcatFeaturesLeft.json, + fatcatFeaturesRight.json) + } + .map { entry => (entry.slug, entry.score, entry.json1, entry.json2) } + .write(TypedTsv[(String, Int, String, String)](args("output"))) +} diff --git a/scalding/src/main/scala/sandcrawler/HBaseColCountJob.scala b/scalding/src/main/scala/sandcrawler/HBaseColCountJob.scala new file mode 100644 index 0000000..20cc7a1 --- /dev/null +++ b/scalding/src/main/scala/sandcrawler/HBaseColCountJob.scala @@ -0,0 +1,37 @@ +package sandcrawler + +import java.util.Properties + +import cascading.property.AppProps +import cascading.tuple.Fields +import com.twitter.scalding._ +import parallelai.spyglass.base.JobBase +import parallelai.spyglass.hbase.HBaseConstants.SourceMode +import parallelai.spyglass.hbase.HBasePipeConversions +import parallelai.spyglass.hbase.HBaseSource + +class HBaseColCountJob(args: Args) extends JobBase(args) with HBasePipeConversions { + + val output = args("output") + + HBaseColCountJob.getHBaseSource( + args("hbase-table"), + args("zookeeper-hosts"), + args("column")) + .read + .debug + .groupAll { _.size('count) } + .write(Tsv(output)) +} + +object HBaseColCountJob { + + // eg, "wbgrp-journal-extract-0-qa",7 "mtrcs-zk1.us.archive.org:2181" + def getHBaseSource(hbaseTable: String, zookeeperHosts: String, col: String) : HBaseSource = { + HBaseBuilder.build( + hbaseTable, + zookeeperHosts, + List(col), + SourceMode.SCAN_ALL) + } +} diff --git a/scalding/src/main/scala/sandcrawler/HBaseRowCountJob.scala b/scalding/src/main/scala/sandcrawler/HBaseRowCountJob.scala index 4c3de33..5c7954a 100644 --- a/scalding/src/main/scala/sandcrawler/HBaseRowCountJob.scala +++ b/scalding/src/main/scala/sandcrawler/HBaseRowCountJob.scala @@ -30,7 +30,7 @@ object HBaseRowCountJob { HBaseBuilder.build( hbaseTable, zookeeperHosts, - List("file:size"), + List("f:c"), SourceMode.SCAN_ALL) } } diff --git a/scalding/src/main/scala/sandcrawler/HBaseStatusCodeCountJob.scala b/scalding/src/main/scala/sandcrawler/HBaseStatusCodeCountJob.scala new file mode 100644 index 0000000..4d9880f --- /dev/null +++ b/scalding/src/main/scala/sandcrawler/HBaseStatusCodeCountJob.scala @@ -0,0 +1,32 @@ +package sandcrawler + +import java.util.Properties + +import cascading.property.AppProps +import cascading.tuple.Fields +import com.twitter.scalding._ +import com.twitter.scalding.typed.TDsl._ +import org.apache.hadoop.hbase.io.ImmutableBytesWritable +import org.apache.hadoop.hbase.util.Bytes +import parallelai.spyglass.base.JobBase +import parallelai.spyglass.hbase.HBaseConstants.SourceMode +import parallelai.spyglass.hbase.HBasePipeConversions +import parallelai.spyglass.hbase.HBaseSource + +class HBaseStatusCodeCountJob(args: Args) extends JobBase(args) with HBasePipeConversions { + + val source = HBaseCountJob.getHBaseSource( + args("hbase-table"), + args("zookeeper-hosts"), + "grobid0:status_code") + + val statusPipe : TypedPipe[Long] = source + .read + .toTypedPipe[(ImmutableBytesWritable,ImmutableBytesWritable)]('key, 'status_code) + .map { case (key, raw_code) => Bytes.toLong(raw_code.copyBytes()) } + + statusPipe.groupBy { identity } + .size + .debug + .write(TypedTsv[(Long,Long)](args("output"))) +} diff --git a/scalding/src/main/scala/sandcrawler/HBaseStatusCountJob.scala b/scalding/src/main/scala/sandcrawler/HBaseStatusCountJob.scala index fd0b4e2..f79d672 100644 --- a/scalding/src/main/scala/sandcrawler/HBaseStatusCountJob.scala +++ b/scalding/src/main/scala/sandcrawler/HBaseStatusCountJob.scala @@ -18,15 +18,15 @@ class HBaseStatusCountJob(args: Args) extends JobBase(args) with HBasePipeConver val source = HBaseCountJob.getHBaseSource( args("hbase-table"), args("zookeeper-hosts"), - "grobid0:status_code") + "grobid0:status") - val statusPipe : TypedPipe[Long] = source + val statusPipe : TypedPipe[String] = source .read - .toTypedPipe[(ImmutableBytesWritable,ImmutableBytesWritable)]('key, 'status_code) - .map { case (key, raw_code) => Bytes.toLong(raw_code.copyBytes()) } + .toTypedPipe[(ImmutableBytesWritable,ImmutableBytesWritable)]('key, 'status) + .map { case (key, raw_status) => Bytes.toString(raw_status.copyBytes()) } statusPipe.groupBy { identity } .size .debug - .write(TypedTsv[(Long,Long)](args("output"))) + .write(TypedTsv[(String,Long)](args("output"))) } diff --git a/scalding/src/main/scala/sandcrawler/MatchBenchmarkJob.scala b/scalding/src/main/scala/sandcrawler/MatchBenchmarkJob.scala new file mode 100644 index 0000000..292de75 --- /dev/null +++ b/scalding/src/main/scala/sandcrawler/MatchBenchmarkJob.scala @@ -0,0 +1,30 @@ +package sandcrawler + +import cascading.pipe.Pipe +import com.twitter.scalding.Args +import com.twitter.scalding.TypedPipe +import com.twitter.scalding.TypedTsv +import parallelai.spyglass.base.JobBase + +class MatchBenchmarkJob(args: Args) extends JobBase(args) { + // TODO: Instantiate any subclass of Scorable specified in args. + val sc1 : Scorable = new BibjsonScorable() + val sc2 : Scorable = new BibjsonScorable() + val leftArgs = args + ("bibjson-input" -> List(args("left-bibjson"))) + val rightArgs = args + ("bibjson-input" -> List(args("right-bibjson"))) + val pipe1 : TypedPipe[(String, ReduceFeatures)] = sc1.getInputPipe(leftArgs) + val pipe2 : TypedPipe[(String, ReduceFeatures)] = sc2.getInputPipe(rightArgs) + + pipe1.join(pipe2) + .map { entry => + val (slug : String, (features1 : ReduceFeatures, features2 : ReduceFeatures)) = entry + new ReduceOutput( + slug, + Scorable.computeSimilarity(features1, features2), + features1.json, + features2.json) + } + //TypedTsv doesn't work over case classes. + .map { entry => (entry.slug, entry.score, entry.json1, entry.json2) } + .write(TypedTsv[(String, Int, String, String)](args("output"))) +} diff --git a/scalding/src/main/scala/sandcrawler/MissingColumnDumpJob.scala b/scalding/src/main/scala/sandcrawler/MissingColumnDumpJob.scala new file mode 100644 index 0000000..cc3bf23 --- /dev/null +++ b/scalding/src/main/scala/sandcrawler/MissingColumnDumpJob.scala @@ -0,0 +1,67 @@ +package sandcrawler + +import java.util.Properties + +import cascading.property.AppProps +import cascading.tuple.Fields +import com.twitter.scalding._ +import com.twitter.scalding.typed.TDsl._ +import parallelai.spyglass.base.JobBase +import parallelai.spyglass.hbase.HBaseConstants.SourceMode +import parallelai.spyglass.hbase.HBasePipeConversions +import parallelai.spyglass.hbase.HBaseSource + +// This nasty, no-good, horrible Job outputs a list of keys ("sha1:A234...") +// for which the given "column" does not have a value set. +// It does this using a self-join because SpyGlass's HBase SCAN support seems +// to be extremely limited. +class MissingColumnDumpJob(args: Args) extends JobBase(args) with HBasePipeConversions { + + val output = args("output") + + val allKeys : TypedPipe[String] = MissingColumnDumpJob.getHBaseKeySource( + args("hbase-table"), + args("zookeeper-hosts")) + .read + .fromBytesWritable('key) + .toTypedPipe[String]('key) + + val existingKeys : TypedPipe[(String,Boolean)] = MissingColumnDumpJob.getHBaseColSource( + args("hbase-table"), + args("zookeeper-hosts"), + args("column")) + .read + .fromBytesWritable('key) + .toTypedPipe[String]('key) + .map{ key => (key, true) } + + val missingKeys : TypedPipe[String] = allKeys + .groupBy( identity ) + .leftJoin(existingKeys.groupBy(_._1)) + .toTypedPipe + .collect { case (key, (_, None)) => key } + + missingKeys + .write(TypedTsv[String](output)) + +} + +object MissingColumnDumpJob { + + // eg, "wbgrp-journal-extract-0-qa",7 "mtrcs-zk1.us.archive.org:2181" + def getHBaseColSource(hbaseTable: String, zookeeperHosts: String, col: String) : HBaseSource = { + HBaseBuilder.build( + hbaseTable, + zookeeperHosts, + List(col), + SourceMode.SCAN_ALL) + } + + def getHBaseKeySource(hbaseTable: String, zookeeperHosts: String) : HBaseSource = { + HBaseBuilder.build( + hbaseTable, + zookeeperHosts, + List("f:c"), + SourceMode.SCAN_ALL) + } +} diff --git a/scalding/src/main/scala/sandcrawler/Scorable.scala b/scalding/src/main/scala/sandcrawler/Scorable.scala new file mode 100644 index 0000000..d9c38e8 --- /dev/null +++ b/scalding/src/main/scala/sandcrawler/Scorable.scala @@ -0,0 +1,96 @@ +package sandcrawler + +import scala.math +import scala.util.parsing.json.JSON +import scala.util.parsing.json.JSONObject + +import cascading.flow.FlowDef +import com.twitter.scalding._ +import com.twitter.scalding.typed.TDsl._ + +case class MapFeatures(slug : String, json : String) +case class ReduceFeatures(json : String) +case class ReduceOutput(val slug : String, score : Int, json1 : String, json2 : String) + +abstract class Scorable { + def getInputPipe(args : Args)(implicit mode : Mode, flowDef : FlowDef) : TypedPipe[(String, ReduceFeatures)] = { + val validFeatures : TypedPipe[MapFeatures] = getFeaturesPipe(args) + .filterNot { entry => entry.isEmpty } + .map { entry => entry.get } + + validFeatures + .groupBy { case MapFeatures(slug, json) => slug } + .map { tuple => + val (slug : String, features : MapFeatures) = tuple + (slug, ReduceFeatures(features.json)) + } + } + + // abstract methods + def getSource(args : Args) : Source + def getFeaturesPipe(args : Args)(implicit mode : Mode, flowDef : FlowDef) : TypedPipe[Option[MapFeatures]] +} + +object Scorable { + val MaxTitleLength = 1023 + + def jsonToMap(json : String) : Option[Map[String, Any]] = { + // https://stackoverflow.com/a/32717262/631051 + val jsonObject = JSON.parseFull(json) + if (jsonObject == None) { + None + } else { + Some(jsonObject.get.asInstanceOf[Map[String, Any]]) + } + } + + def getStringOption(optionalMap : Option[Map[String, Any]], key : String) : Option[String] = { + optionalMap match { + case None => None + case Some(map) => if (map contains key) Some(map(key).asInstanceOf[String]) else None + } + } + + // Caller is responsible for ensuring that key is a String in map. + // TODO: Add and handle ClassCastException + def getString(map : Map[String, Any], key : String) : String = { + assert(map contains key) + map(key).asInstanceOf[String] + } + + val MaxScore = 1000 + + def selfMatchable(features1 : ReduceFeatures, features2 : ReduceFeatures) : Boolean = { + val json1 = jsonToMap(features1.json) + val json2 = jsonToMap(features2.json) + + ( + getStringOption(json1, "fatcat_release") != None && + getStringOption(json2, "fatcat_release") != None && + getStringOption(json1, "fatcat_release") != getStringOption(json2, "fatcat_release") && + (getStringOption(json1, "fatcat_work") match { + case None => false + case Some(work1) => getStringOption(json2, "fatcat_work") match { + case None => false + // this last check ensures we don't double-match + case Some(work2) => work1 > work2 + } + }) + ) + } + + def computeSimilarity(features1 : ReduceFeatures, features2 : ReduceFeatures) : Int = { + val json1 = jsonToMap(features1.json) + val json2 = jsonToMap(features2.json) + getStringOption(json1, "title") match { + case None => 0 + case Some(title1) => { + getStringOption(json2, "title") match { + case None => 0 + case Some(title2) => + (StringUtilities.similarity(title1.toLowerCase, title2.toLowerCase) * MaxScore).toInt + } + } + } + } +} diff --git a/scalding/src/main/scala/sandcrawler/ScorableFeatures.scala b/scalding/src/main/scala/sandcrawler/ScorableFeatures.scala new file mode 100644 index 0000000..93cd78d --- /dev/null +++ b/scalding/src/main/scala/sandcrawler/ScorableFeatures.scala @@ -0,0 +1,63 @@ +package sandcrawler + +import java.io.InputStream + +import scala.io.Source +import scala.util.parsing.json.JSONArray +import scala.util.parsing.json.JSONObject + +object ScorableFeatures { + // TODO: Add exception handling. + val fileStream : InputStream = getClass.getResourceAsStream("/slug-denylist.txt") + val SlugDenylist : Set[String] = Source.fromInputStream(fileStream).getLines.toSet + fileStream.close + val MinSlugLength = 8 + + // Static factory method + def create(title : String, authors : List[Any] = List(), year : Int = 0, doi : String = "", fatcat_release : String = "", fatcat_work : String = "", sha1 : String = "") : ScorableFeatures = { + new ScorableFeatures( + title=if (title == null) "" else title, + authors=if (authors == null) List() else authors.map(a => if (a == null) "" else a), + year=year, + doi=if (doi == null) "" else doi, + fatcat_release=if (fatcat_release == null) "" else fatcat_release, + fatcat_work=if (fatcat_work == null) "" else fatcat_work, + sha1=if (sha1 == null) "" else sha1) + } +} + +// Contains features needed to make slug and to score (in combination +// with a second ScorableFeatures). Create with above static factory method. +class ScorableFeatures private(title : String, authors : List[Any] = List(), year: Int = 0, doi : String = "", fatcat_release : String = "", fatcat_work : String = "", sha1: String = "") { + + def toMap() : Map[String, Any] = + Map("title" -> title, "authors" -> JSONArray(authors), "year" -> year, "doi" -> doi, "fatcat_release" -> fatcat_release, "fatcat_work" -> fatcat_work, "sha1" -> sha1) + + override def toString() : String = { + JSONObject(toMap).toString + } + + def toSlug() : Option[String] = { + if (title == null) { + None + } else { + val unaccented = StringUtilities.removeAccents(title) + // Remove punctuation + val slug = StringUtilities.removePunctuation((unaccented.toLowerCase())).replaceAll("\\s", "") + if (slug.isEmpty + || slug == null + || (ScorableFeatures.SlugDenylist contains slug) + || (slug.length < ScorableFeatures.MinSlugLength)) { + None + } else { + Some(slug) + } + } + } + + def toMapFeatures : Option[MapFeatures] = + toSlug match { + case None => None + case Some(slug) => Some(MapFeatures(slug, toString)) + } +} diff --git a/scalding/src/main/scala/sandcrawler/ScoreInsertable.scala b/scalding/src/main/scala/sandcrawler/ScoreInsertable.scala new file mode 100644 index 0000000..58007fa --- /dev/null +++ b/scalding/src/main/scala/sandcrawler/ScoreInsertable.scala @@ -0,0 +1,86 @@ +package sandcrawler + +import cascading.tuple.Fields +import cascading.pipe.Pipe +import com.twitter.scalding._ +import com.twitter.scalding.typed.TDsl._ +import org.apache.hadoop.hbase.io.ImmutableBytesWritable +import org.apache.hadoop.hbase.util.Bytes +import parallelai.spyglass.base.JobBase +import parallelai.spyglass.hbase.HBaseConstants.SourceMode +import parallelai.spyglass.hbase.HBasePipeConversions +import parallelai.spyglass.hbase.HBaseSource + +class ScoreInsertableJob(args: Args) extends JobBase(args) { + + val grobidRowCount = Stat("grobid-rows-filtered", "sandcrawler") + val crossrefRowCount = Stat("crossref-rows-filtered", "sandcrawler") + val cdxRowCount = Stat("cdx-rows", "sandcrawler") + val scoredRowCount = Stat("scored-rows", "sandcrawler") + val joinedRowCount = Stat("joined-rows", "sandcrawler") + + val grobidScorable : Scorable = new GrobidScorable() + val crossrefScorable : Scorable = new CrossrefScorable() + + val grobidPipe : TypedPipe[(String, ReduceFeatures)] = grobidScorable + .getInputPipe(args) + .map { r => + grobidRowCount.inc + r + } + val crossrefPipe : TypedPipe[(String, ReduceFeatures)] = crossrefScorable + .getInputPipe(args) + .map { r => + crossrefRowCount.inc + r + } + val cdxPipe : TypedPipe[(String, String, String, Long)] = ScoreInsertableJob.getHBaseCdxSource(args("hbase-table"), args("zookeeper-hosts")) + .read + .toTypedPipe[(ImmutableBytesWritable,ImmutableBytesWritable,ImmutableBytesWritable,ImmutableBytesWritable)](new Fields("key", "cdx", "mime", "size")) + .filter { case (_, cdx, mime, size) => cdx != null && mime != null && size != null } + .map { case (key, cdx, mime, size) => + (Bytes.toString(key.copyBytes()), + Bytes.toString(cdx.copyBytes()), + Bytes.toString(mime.copyBytes()), + Bytes.toLong(size.copyBytes())) + } + .map { r => + cdxRowCount.inc + r + } + + val scoredPipe = grobidPipe + .addTrap(TypedTsv(args("output") + ".trapped")) + .join(crossrefPipe) + .map { case (slug, (grobidFeatures, crossrefFeatures)) => + scoredRowCount.inc + //val (slug : String, (grobidFeatures: ReduceFeatures, crossrefFeatures: ReduceFeatures)) = entry + // Not ever Empty, I promise + val key = Scorable.getStringOption(Scorable.jsonToMap(grobidFeatures.json), "sha1").orNull + (key, new ReduceOutput( + slug, + Scorable.computeSimilarity(grobidFeatures, crossrefFeatures), + grobidFeatures.json, + crossrefFeatures.json)) + } + .map { case (key, entry) => (key, entry.slug, entry.score, entry.json1, entry.json2) } + .groupBy { case (key, _, _, _, _) => key } + + // TypedTsv doesn't work over case classes. + val joinedPipe = scoredPipe + .join(cdxPipe.groupBy { case (key, _, _, _) => key }) + .map { case (key, ((_, slug, score, left, right), (_, cdx, mime, size))) => (key, slug, score, left, right, cdx, mime, size) } + .write(TypedTsv[(String, String, Int, String, String, String, String, Long)](args("output"))) +} + +object ScoreInsertableJob { + + // eg, "wbgrp-journal-extract-0-qa",7 "mtrcs-zk1.us.archive.org:2181" + def getHBaseCdxSource(hbaseTable: String, zookeeperHosts: String) : HBaseSource = { + HBaseBuilder.build( + hbaseTable, + zookeeperHosts, + List("file:cdx", "file:mime", "file:size"), + SourceMode.SCAN_ALL) + } +} diff --git a/scalding/src/main/scala/sandcrawler/ScoreJob.scala b/scalding/src/main/scala/sandcrawler/ScoreJob.scala new file mode 100644 index 0000000..ccb9b76 --- /dev/null +++ b/scalding/src/main/scala/sandcrawler/ScoreJob.scala @@ -0,0 +1,48 @@ +package sandcrawler + +import cascading.pipe.Pipe +import com.twitter.scalding.Args +import com.twitter.scalding.Stat +import com.twitter.scalding.TypedPipe +import com.twitter.scalding.TypedTsv +import parallelai.spyglass.base.JobBase + +class ScoreJob(args: Args) extends JobBase(args) { + + val grobidRowCount = Stat("grobid-rows-filtered", "sandcrawler") + val crossrefRowCount = Stat("crossref-rows-filtered", "sandcrawler") + val joinedRowCount = Stat("joined-rows", "sandcrawler") + + val grobidScorable : Scorable = new GrobidScorable() + val crossrefScorable : Scorable = new CrossrefScorable() + val grobidPipe : TypedPipe[(String, ReduceFeatures)] = grobidScorable + .getInputPipe(args) + .map { r => + grobidRowCount.inc + r + } + val crossrefPipe : TypedPipe[(String, ReduceFeatures)] = crossrefScorable + .getInputPipe(args) + .map { r => + crossrefRowCount.inc + r + } + + val joinedPipe = grobidPipe + .addTrap(TypedTsv(args("output") + ".trapped")) + .join(crossrefPipe) + + // TypedTsv doesn't work over case classes. + joinedPipe + .map { case (slug, (grobidFeatures, crossrefFeatures)) => + joinedRowCount.inc + //val (slug : String, (grobidFeatures: ReduceFeatures, crossrefFeatures: ReduceFeatures)) = entry + new ReduceOutput( + slug, + Scorable.computeSimilarity(grobidFeatures, crossrefFeatures), + grobidFeatures.json, + crossrefFeatures.json) + } + .map { entry => (entry.slug, entry.score, entry.json1, entry.json2) } + .write(TypedTsv[(String, Int, String, String)](args("output"))) +} diff --git a/scalding/src/main/scala/sandcrawler/StringUtilities.scala b/scalding/src/main/scala/sandcrawler/StringUtilities.scala new file mode 100644 index 0000000..9150ced --- /dev/null +++ b/scalding/src/main/scala/sandcrawler/StringUtilities.scala @@ -0,0 +1,76 @@ +package sandcrawler + +import java.text.Normalizer +import java.util.regex.Pattern + +object StringUtilities { + // bnewbold: I propose that we: + // 1. keep only \p{Ideographic}, \p{Alphabetic}, and \p{Digit} + // 2. strip accents + // 3. "lower-case" (unicode-aware) + // 4. do any final custom/manual mappings + // + // We should check (test) that null bytes are handled, in addition to other + // more obvious characters + + // Adapted from https://git-wip-us.apache.org/repos/asf?p=commons-lang.git;a=blob;f=src/main/java/org/apache/commons/lang3/StringUtils.java;h=1d7b9b99335865a88c509339f700ce71ce2c71f2;hb=HEAD#l934 + def removeAccents(s : String) : String = { + val replacements = Map( + '\u0141' -> 'L', + '\u0142' -> 'l', // Letter ell + '\u00d8' -> 'O', + '\u00f8' -> 'o' + ) + val sb = new StringBuilder(Normalizer.normalize(s, Normalizer.Form.NFD)) + for (i <- 0 to sb.length - 1) { + for (key <- replacements.keys) { + if (sb(i) == key) { + sb.deleteCharAt(i); + sb.insert(i, replacements(key)) + } + } + } + val pattern = Pattern.compile("\\p{InCombiningDiacriticalMarks}+") + pattern.matcher(sb).replaceAll("") + } + + // Source: https://stackoverflow.com/a/30076541/631051 + def removePunctuation(s: String) : String = { + s.replaceAll("""[\p{Punct}’·“”‘’“”«»「」¿–±§ʿ]""", "") + } + + // Adapted from: https://stackoverflow.com/a/16018452/631051 + def similarity(s1a : String, s2a : String) : Double = { + val (s1, s2) = (removeAccents(removePunctuation(s1a)), + removeAccents(removePunctuation(s2a))) + val longer : String = if (s1.length > s2.length) s1 else s2 + val shorter : String = if (s1.length > s2.length) s2 else s1 + if (longer.length == 0) { + // Both strings are empty. + 1 + } else { + (longer.length - stringDistance(longer, shorter)) / longer.length.toDouble + } + } + + // Source: https://oldfashionedsoftware.com/2009/11/19/string-distance-and-refactoring-in-scala/ + def stringDistance(s1: String, s2: String): Int = { + val memo = scala.collection.mutable.Map[(List[Char],List[Char]),Int]() + def min(a:Int, b:Int, c:Int) = Math.min( Math.min( a, b ), c) + def sd(s1: List[Char], s2: List[Char]): Int = { + if (!memo.contains((s1, s2))) { + memo((s1,s2)) = (s1, s2) match { + case (_, Nil) => s1.length + case (Nil, _) => s2.length + case (c1::t1, c2::t2) => + min( sd(t1,s2) + 1, sd(s1,t2) + 1, + sd(t1,t2) + (if (c1==c2) 0 else 1) ) + } + } + memo((s1,s2)) + } + + sd( s1.toList, s2.toList ) + } +} + diff --git a/scalding/src/test/scala/sandcrawler/CrossrefScorableTest.scala b/scalding/src/test/scala/sandcrawler/CrossrefScorableTest.scala new file mode 100644 index 0000000..8302b8f --- /dev/null +++ b/scalding/src/test/scala/sandcrawler/CrossrefScorableTest.scala @@ -0,0 +1,172 @@ +package sandcrawler + +import cascading.tuple.Fields +import cascading.tuple.Tuple +import com.twitter.scalding.JobTest +import com.twitter.scalding.TextLine +import com.twitter.scalding.TupleConversions +import com.twitter.scalding.TypedTsv +import org.apache.hadoop.hbase.io.ImmutableBytesWritable +import org.apache.hadoop.hbase.util.Bytes +import org.scalatest._ +import parallelai.spyglass.hbase.HBaseConstants.SourceMode + +class CrossrefScorableTest extends FlatSpec with Matchers { + // scalastyle:off + val CrossrefString = +""" +{ "_id" : { "$oid" : "5a553d5988a035a45bf50ed3" }, + "indexed" : { "date-parts" : [ [ 2017, 10, 23 ] ], + "date-time" : "2017-10-23T17:19:16Z", + "timestamp" : { "$numberLong" : "1508779156477" } }, + "reference-count" : 0, + "publisher" : "Elsevier BV", + "issue" : "3", + "license" : [ { "URL" : "http://www.elsevier.com/tdm/userlicense/1.0/", + "start" : { "date-parts" : [ [ 1996, 1, 1 ] ], + "date-time" : "1996-01-01T00:00:00Z", + "timestamp" : { "$numberLong" : "820454400000" } }, + "delay-in-days" : 0, "content-version" : "tdm" }], + "content-domain" : { "domain" : [], "crossmark-restriction" : false }, + "published-print" : { "date-parts" : [ [ 1996 ] ] }, + "DOI" : "<<DOI>>", + "type" : "journal-article", + "created" : { "date-parts" : [ [ 2002, 7, 25 ] ], + "date-time" : "2002-07-25T15:09:41Z", + "timestamp" : { "$numberLong" : "1027609781000" } }, + "page" : "186-187", + "source" : "Crossref", + "is-referenced-by-count" : 0, + "title" : [ "<<TITLE>>" ], + "prefix" : "10.1016", + "volume" : "9", + "author" : [ { "given" : "W", "family" : "Gaier", "affiliation" : [] } ], + "member" : "78", + "container-title" : [ "Journal de Pédiatrie et de Puériculture" ], + "link" : [ { "URL" : "http://api.elsevier.com/content/article/PII:0987-7983(96)87729-2?httpAccept=text/xml", + "content-type" : "text/xml", + "content-version" : "vor", + "intended-application" : "text-mining" }, + { "URL" : + "http://api.elsevier.com/content/article/PII:0987-7983(96)87729-2?httpAccept=text/plain", + "content-type" : "text/plain", + "content-version" : "vor", + "intended-application" : "text-mining" } ], + "deposited" : { "date-parts" : [ [ 2015, 9, 3 ] ], + "date-time" : "2015-09-03T10:03:43Z", + "timestamp" : { "$numberLong" : "1441274623000" } }, + "score" : 1, + "issued" : { "date-parts" : [ [ 1996 ] ] }, + "references-count" : 0, + "alternative-id" : [ "0987-7983(96)87729-2" ], + "URL" : "http://dx.doi.org/10.1016/0987-7983(96)87729-2", + "ISSN" : [ "0987-7983" ], + "issn-type" : [ { "value" : "0987-7983", "type" : "print" } ], + "subject" : [ "Pediatrics, Perinatology, and Child Health" ] +} +""".replace("<<DOI>>", "10.123/aBc") + // scalastyle:on + val CrossrefStringWithGoodTitle = CrossrefString.replace("<<TITLE>>", "Some Title") + val CrossrefStringWithMaximumTitle = CrossrefString.replace("<<TITLE>>", "T" * Scorable.MaxTitleLength) + val CrossrefStringWithExcessiveTitle = CrossrefString.replace("<<TITLE>>", "T" * Scorable.MaxTitleLength + "0") + val CrossrefStringWithNullTitle = CrossrefString.replace("\"<<TITLE>>\"", "null") + val CrossrefStringWithEmptyTitle = CrossrefString.replace("<<TITLE>>", "") + val CrossrefStringWithoutTitle = CrossrefString.replace("title", "nottitle") + val MalformedCrossrefString = CrossrefString.replace("}", "") + val CrossrefStringWithNoAuthors = CrossrefString.replace("<<TITLE>>", "Some Valid Title").replace("author", "no-author") + val CrossrefStringWrongType = CrossrefString.replace("<<TITLE>>", "Some Valid Title").replace("journal-article", "other") + val CrossrefStringNoType = CrossrefString.replace("<<TITLE>>", "Some Valid Title").replace("type", "not-type") + + // Unit tests + "CrossrefScorable.jsonToMapFeatures()" should "handle invalid JSON" in { + CrossrefScorable.jsonToMapFeatures(MalformedCrossrefString) should be (None) + } + + it should "handle missing title" in { + CrossrefScorable.jsonToMapFeatures(CrossrefStringWithoutTitle) should be (None) + } + + it should "handle null title" in { + CrossrefScorable.jsonToMapFeatures(CrossrefStringWithNullTitle) should be (None) + } + + it should "handle empty title" in { + CrossrefScorable.jsonToMapFeatures(CrossrefStringWithEmptyTitle) should be (None) + } + + it should "handle subtitle" in { + CrossrefScorable.jsonToMapFeatures( + """{"title": ["short but not too short"], "subtitle": ["just right!"], "DOI": "10.123/asdf", "type":"journal-article","author":[{ "given" : "W", "family" : "Gaier"}]}""") match { + case None => fail() + case Some(result) => result.slug shouldBe "shortbutnottooshortjustright" + } + } + + it should "handle empty subtitle" in { + CrossrefScorable.jsonToMapFeatures( + """{"title": ["short but not too short"], "subtitle": [""], "DOI": "10.123/asdf", "type":"journal-article", "author":[{ "given" : "W", "family" : "Gaier"}]}""") match { + case None => fail() + case Some(result) => result.slug shouldBe "shortbutnottooshort" + } + } + + it should "handle null subtitle" in { + CrossrefScorable.jsonToMapFeatures( + """{"title": ["short but not too short"], "subtitle": [null], "DOI": "10.123/asdf", "type":"journal-article", "author":[{ "given" : "W", "family" : "Gaier"}]}""") match { + case None => fail() + case Some(result) => result.slug shouldBe "shortbutnottooshort" + } + } + + it should "handle missing authors" in { + CrossrefScorable.jsonToMapFeatures(CrossrefStringWithNoAuthors) should be (None) + } + + it should "handle valid input" in { + CrossrefScorable.jsonToMapFeatures(CrossrefStringWithGoodTitle) match { + case None => fail() + case Some(result) => { + result.slug shouldBe "sometitle" + Scorable.jsonToMap(result.json) match { + case None => fail() + case Some(map) => { + map("title").asInstanceOf[String] shouldBe "Some Title" + map("doi").asInstanceOf[String] shouldBe "10.123/abc" + // TODO: full name? not just a string? + map("authors").asInstanceOf[List[String]] shouldBe List("Gaier") + map("year").asInstanceOf[Double].toInt shouldBe 2002 + } + } + } + } + } + + "CrossrefScorable.keepRecord()" should "return true for valid JSON with title" in { + CrossrefScorable.keepRecord(CrossrefStringWithGoodTitle) shouldBe true + } + + it should "return true for valid JSON with a title of maximum permitted length" in { + CrossrefScorable.keepRecord(CrossrefStringWithMaximumTitle) shouldBe true + } + + it should "return false for valid JSON with excessively long title" in { + CrossrefScorable.keepRecord(CrossrefStringWithExcessiveTitle) shouldBe false + } + + it should "return false for valid JSON with null title" in { + CrossrefScorable.keepRecord(CrossrefStringWithNullTitle) shouldBe false + } + + it should "return false for valid JSON with no title" in { + CrossrefScorable.keepRecord(CrossrefStringWithoutTitle) shouldBe false + } + + it should "return false for invalid JSON" in { + CrossrefScorable.keepRecord(CrossrefStringWithoutTitle) shouldBe false + } + + it should "handle content types" in { + CrossrefScorable.jsonToMapFeatures(CrossrefStringWrongType) should be (None) + CrossrefScorable.jsonToMapFeatures(CrossrefStringNoType) should be (None) + } +} diff --git a/scalding/src/test/scala/sandcrawler/DumpUnGrobidedJobTest.scala b/scalding/src/test/scala/sandcrawler/DumpUnGrobidedJobTest.scala new file mode 100644 index 0000000..8dda5c8 --- /dev/null +++ b/scalding/src/test/scala/sandcrawler/DumpUnGrobidedJobTest.scala @@ -0,0 +1,72 @@ +package sandcrawler + +import cascading.tuple.Fields +import cascading.tuple.Tuple +import com.twitter.scalding.JobTest +import com.twitter.scalding.Tsv +import com.twitter.scalding.TupleConversions +import com.twitter.scalding.TypedTsv +import org.apache.hadoop.hbase.io.ImmutableBytesWritable +import org.apache.hadoop.hbase.util.Bytes +import org.junit.runner.RunWith +import org.scalatest.FunSpec +import org.scalatest.junit.JUnitRunner +import org.slf4j.LoggerFactory +import parallelai.spyglass.hbase.HBaseConstants.SourceMode +import parallelai.spyglass.hbase.HBaseSource +import scala._ + +@RunWith(classOf[JUnitRunner]) +class DumpUnGrobidedJobTest extends FunSpec with TupleConversions { + + val output = "/tmp/testOutput" + val (testTable, testHost) = ("test-table", "dummy-host:2181") + + val log = LoggerFactory.getLogger(this.getClass.getName) + + val statusCode: Long = 200 + val statusBytes = Bytes.toBytes(statusCode) + + val sampleDataGrobid : List[List[Array[Byte]]] = List( + ("sha1:SDKUVHC3YNNEGH5WAG5ZAAXWAEBNX4WT", statusBytes), + ("sha1:35985C3YNNEGH5WAG5ZAAXWAEBNXJW56", statusBytes), + ("sha1:885C3YNNEGH5WAG5ZAAXWA8BNXJWT6CZ", statusBytes), + ("sha1:00904C3YNNEGH5WAG5ZA9XWAEBNXJWT6", statusBytes), + ("sha1:249C3YNNEGH5WAG5ZAAXWAEBNXJWT6CZ", statusBytes), + ("sha1:095893C3YNNEGH5WAG5ZAAXWAEBNXJWT", statusBytes)) + .map(pair => List(Bytes.toBytes(pair._1), pair._2)) + + val sampleDataFile : List[List[Array[Byte]]] = List( + ("sha1:K2DKSSVTXWPRMFDTWSTCQW3RVWRIOV3Q", """{c-json-data}""", "application/pdf", """{cdx-json-data}"""), + ("sha1:C3YNNEGH5WAG5ZAAXWAEBNXJWT6CZ3WU", """{c-json-data}""", "application/pdf", """{cdx-json-data}"""), + ("sha1:SDKUVHC3YNNEGH5WAG5ZAAXWAEBNX4WT", """{c-json-data}""", "application/pdf", """{cdx-json-data}"""), + ("sha1:35985C3YNNEGH5WAG5ZAAXWAEBNXJW56", """{c-json-data}""", "application/pdf", """{cdx-json-data}"""), + ("sha1:885C3YNNEGH5WAG5ZAAXWA8BNXJWT6CZ", """{c-json-data}""", "application/pdf", """{cdx-json-data}"""), + ("sha1:00904C3YNNEGH5WAG5ZA9XWAEBNXJWT6", """{c-json-data}""", "application/pdf", """{cdx-json-data}"""), + ("sha1:249C3YNNEGH5WAG5ZAAXWAEBNXJWT6CZ", """{c-json-data}""", "application/pdf", """{cdx-json-data}"""), + ("sha1:095893C3YNNEGH5WAG5ZAAXWAEBNXJWT", """{c-json-data}""", "application/pdf", """{cdx-json-data}""")) + .map(pair => List(Bytes.toBytes(pair._1), + Bytes.toBytes(pair._2), + Bytes.toBytes(pair._3), + Bytes.toBytes(pair._4))) + + JobTest("sandcrawler.DumpUnGrobidedJob") + .arg("test", "") + .arg("app.conf.path", "app.conf") + .arg("output", output) + .arg("hbase-table", testTable) + .arg("zookeeper-hosts", testHost) + .arg("debug", "true") + .source[Tuple](DumpUnGrobidedJob.getHBaseColSource(testTable, testHost), + sampleDataGrobid.map(l => new Tuple(l.map(s => {new ImmutableBytesWritable(s)}):_*))) + .source[Tuple](DumpUnGrobidedJob.getHBaseKeySource(testTable, testHost), + sampleDataFile.map(l => new Tuple(l.map(s => {new ImmutableBytesWritable(s)}):_*))) + .sink[Tuple](TypedTsv[(String,String,String,String)](output)) { + outputBuffer => + it("should return correct-length list.") { + assert(outputBuffer.size === 2) + } + } + .run + .finish +} diff --git a/scalding/src/test/scala/sandcrawler/FatcatScorableTest.scala b/scalding/src/test/scala/sandcrawler/FatcatScorableTest.scala new file mode 100644 index 0000000..823e14a --- /dev/null +++ b/scalding/src/test/scala/sandcrawler/FatcatScorableTest.scala @@ -0,0 +1,160 @@ +package sandcrawler + +import cascading.tuple.Fields +import cascading.tuple.Tuple +import com.twitter.scalding.JobTest +import com.twitter.scalding.TextLine +import com.twitter.scalding.TupleConversions +import com.twitter.scalding.TypedTsv +import org.apache.hadoop.hbase.io.ImmutableBytesWritable +import org.apache.hadoop.hbase.util.Bytes +import org.scalatest._ +import parallelai.spyglass.hbase.HBaseConstants.SourceMode + +class FatcatScorableTest extends FlatSpec with Matchers { + // scalastyle:off + val FatcatString = +""" +{ + "abstracts": [], + "refs": [], + "contribs": [ + { + "index": 0, + "raw_name": "W Gaier", + "surname": "Gaier", + "role": "author", + "extra": { + "seq": "first" + } + } + ], + "publisher": "Elsevier BV", + "pages": "186-187", + "ext_ids": { + "doi": "<<DOI>>" + }, + "release_year": 1996, + "release_stage": "published", + "release_type": "article-journal", + "container_id": "3nccslsn5jez3ixrp5skjyjxu4", + "title": "<<TITLE>>", + "state": "active", + "ident": "pnri57u66ffytigdmyybbmouni", + "work_id": "tdmqnfzm2nggrhfwzasyegvpyu", + "revision": "e50bd04e-d0d4-4ee7-b7a4-6b4f079de154", + "extra": { + "crossref": { + "alternative-id": [ + "0987-7983(96)87729-2" + ], + "type": "journal-article" + } + } +} +""".replace("<<DOI>>", "10.123/aBc") + // scalastyle:on + val FatcatStringWithGoodTitle = FatcatString.replace("<<TITLE>>", "Some Title") + val FatcatStringWithMaximumTitle = FatcatString.replace("<<TITLE>>", "T" * Scorable.MaxTitleLength) + val FatcatStringWithExcessiveTitle = FatcatString.replace("<<TITLE>>", "T" * Scorable.MaxTitleLength + "0") + val FatcatStringWithNullTitle = FatcatString.replace("\"<<TITLE>>\"", "null") + val FatcatStringWithEmptyTitle = FatcatString.replace("<<TITLE>>", "") + val FatcatStringWithoutTitle = FatcatString.replace("title", "nottitle") + val MalformedFatcatString = FatcatString.replace("}", "") + val FatcatStringWithNoAuthors = FatcatString.replace("<<TITLE>>", "Some Valid Title").replace("contribs", "no-contribs") + //val FatcatStringWrongType = FatcatString.replace("<<TITLE>>", "Some Valid Title").replace("journal-article", "other") + //val FatcatStringNoType = FatcatString.replace("<<TITLE>>", "Some Valid Title").replace("type", "not-type") + + // Unit tests + "FatcatScorable.jsonToMapFeatures()" should "handle invalid JSON" in { + FatcatScorable.jsonToMapFeatures(MalformedFatcatString) should be (None) + } + + it should "handle missing title" in { + FatcatScorable.jsonToMapFeatures(FatcatStringWithoutTitle) should be (None) + } + + it should "handle null title" in { + FatcatScorable.jsonToMapFeatures(FatcatStringWithNullTitle) should be (None) + } + + it should "handle empty title" in { + FatcatScorable.jsonToMapFeatures(FatcatStringWithEmptyTitle) should be (None) + } + + it should "handle subtitle" in { + FatcatScorable.jsonToMapFeatures( + """{"title": "short but not too short", "subtitle": "just right!", "ident": "pnri57u66ffytigdmyybbmouni", "work_id": "tdmqnfzm2nggrhfwzasyegvpyu", "DOI": "10.123/asdf", "type":"journal-article","contribs":[{ "raw_name" : "W Gaier", "surname" : "Gaier"}]}""") match { + case None => fail() + case Some(result) => result.slug shouldBe "shortbutnottooshortjustright" + } + } + + it should "handle empty subtitle" in { + FatcatScorable.jsonToMapFeatures( + """{"title": "short but not too short", "subtitle": "", "ident": "pnri57u66ffytigdmyybbmouni", "work_id": "tdmqnfzm2nggrhfwzasyegvpyu", "DOI": "10.123/asdf", "type":"journal-article", "contribs":[{ "raw_name" : "W Gaier", "surname" : "Gaier"}]}""") match { + case None => fail() + case Some(result) => result.slug shouldBe "shortbutnottooshort" + } + } + + it should "handle null subtitle" in { + FatcatScorable.jsonToMapFeatures( + """{"title": "short but not too short", "subtitle": null, "ident": "pnri57u66ffytigdmyybbmouni", "work_id": "tdmqnfzm2nggrhfwzasyegvpyu", "DOI": "10.123/asdf", "type":"journal-article", "contribs":[{ "raw_name" : "W Gaier", "surname" : "Gaier"}]}""") match { + case None => fail() + case Some(result) => result.slug shouldBe "shortbutnottooshort" + } + } + + it should "handle missing authors" in { + // TODO: not actually removing these + //FatcatScorable.jsonToMapFeatures(FatcatStringWithNoAuthors) should be (None) + FatcatScorable.jsonToMapFeatures(FatcatStringWithNoAuthors) + } + + it should "handle valid input" in { + FatcatScorable.jsonToMapFeatures(FatcatStringWithGoodTitle) match { + case None => fail() + case Some(result) => { + result.slug shouldBe "sometitle" + Scorable.jsonToMap(result.json) match { + case None => fail() + case Some(map) => { + map("title").asInstanceOf[String] shouldBe "Some Title" + //map("doi").asInstanceOf[String] shouldBe "10.123/abc" + map("fatcat_release").asInstanceOf[String] shouldBe "pnri57u66ffytigdmyybbmouni" + map("fatcat_work").asInstanceOf[String] shouldBe "tdmqnfzm2nggrhfwzasyegvpyu" + // TODO: full name? not just a string? + map("authors").asInstanceOf[List[String]] shouldBe List("W Gaier") + map("year").asInstanceOf[Double].toInt shouldBe 1996 + } + } + } + } + } + + "FatcatScorable.keepRecord()" should "return true for valid JSON with title" in { + FatcatScorable.keepRecord(FatcatStringWithGoodTitle) shouldBe true + } + + it should "return true for valid JSON with a title of maximum permitted length" in { + FatcatScorable.keepRecord(FatcatStringWithMaximumTitle) shouldBe true + } + + it should "return false for valid JSON with excessively long title" in { + FatcatScorable.keepRecord(FatcatStringWithExcessiveTitle) shouldBe false + } + + it should "return false for valid JSON with null title" in { + FatcatScorable.keepRecord(FatcatStringWithNullTitle) shouldBe false + } + + it should "return false for valid JSON with no title" in { + FatcatScorable.keepRecord(FatcatStringWithoutTitle) shouldBe false + } + + it should "return false for invalid JSON" in { + FatcatScorable.keepRecord(FatcatStringWithoutTitle) shouldBe false + } + +} diff --git a/scalding/src/test/scala/sandcrawler/GrobidScorableDumpJobTest.scala b/scalding/src/test/scala/sandcrawler/GrobidScorableDumpJobTest.scala new file mode 100644 index 0000000..bf9343b --- /dev/null +++ b/scalding/src/test/scala/sandcrawler/GrobidScorableDumpJobTest.scala @@ -0,0 +1,124 @@ + +package sandcrawler + +import cascading.tuple.Fields +import cascading.tuple.Tuple +import com.twitter.scalding.JobTest +import com.twitter.scalding.TextLine +import com.twitter.scalding.TupleConversions +import com.twitter.scalding.TypedTsv +import org.apache.hadoop.hbase.io.ImmutableBytesWritable +import org.apache.hadoop.hbase.util.Bytes +import org.scalatest._ +import parallelai.spyglass.hbase.HBaseConstants.SourceMode + +class GrobidScorableDumpJobTest extends FlatSpec with Matchers { + //scalastyle:off + val JsonString = """ +{ + "title": "<<TITLE>>", + "authors": [ + {"name": "Brewster Kahle"}, + {"name": "J Doe"} + ], + "journal": { + "name": "Dummy Example File. Journal of Fake News. pp. 1-2. ISSN 1234-5678", + "eissn": null, + "issn": null, + "issue": null, + "publisher": null, + "volume": null + }, + "date": "2000", + "doi": null, + "citations": [ + { "authors": [{"name": "A Seaperson"}], + "date": "2001", + "id": "b0", + "index": 0, + "issue": null, + "journal": "Letters in the Alphabet", + "publisher": null, + "title": "Everything is Wonderful", + "url": null, + "volume": "20"}, + { "authors": [], + "date": "2011-03-28", + "id": "b1", + "index": 1, + "issue": null, + "journal": "The Dictionary", + "publisher": null, + "title": "All about Facts", + "url": null, + "volume": "14"} + ], + "abstract": "Everything you ever wanted to know about nothing", + "body": "Introduction \nEverything starts somewhere, as somebody [1] once said. \n\n In Depth \n Meat \nYou know, for kids. \n Potatos \nQED.", + "acknowledgement": null, + "annex": null +} +""" + // scalastyle:on + val JsonStringWithTitle = JsonString.replace("<<TITLE>>", "Dummy Example File") + val JsonStringWithoutTitle = JsonString.replace("title", "nottitle") + val MalformedJsonString = JsonString.replace("}", "") + + // Pipeline tests + val output = "/tmp/testOutput" + val input = "/tmp/testInput" + val (testTable, testHost) = ("test-table", "dummy-host:2181") + + val Sha1Strings : List[String] = List( + "sha1:K2DKSSVTXWPRMFDTWSTCQW3RVWRIOV3Q", // good + "sha1:C3YNNEGH5WAG5ZAAXWAEBNXJWT6CZ3WU", // good + "sha1:SDKUVHC3YNNEGH5WAG5ZAAXWAEBNX4WT", // good + "sha1:35985C3YNNEGH5WAG5ZAAXWAEBNXJW56", // bad status + "sha1:93187A85273589347598473894839443", // malformed + "sha1:024937534094897039547e9824382943") // bad status + + val JsonStrings : List[String] = List( + JsonString.replace("<<TITLE>>", "Title 1: The Classic"), + JsonString.replace("<<TITLE>>", "Title 2: TNG"), + JsonString.replace("<<TITLE>>", "Title 3: The Sequel"), + // This will have bad status. + JsonString.replace("<<TITLE>>", "Title 1: The Classic"), + MalformedJsonString, + // This will have bad status. + JsonString.replace("<<TITLE>>", "Title 2: Not TNG") + ) + + // bnewbold: status codes aren't strings, they are uint64 + val Ok : Long = 200 + val Bad : Long = 400 + val StatusCodes = List(Ok, Ok, Ok, Bad, Ok, Bad) + + val SampleDataHead : List[Tuple] = (Sha1Strings, JsonStrings, StatusCodes) + .zipped + .toList + .map { case (sha, json, status) => List(Bytes.toBytes(sha), Bytes.toBytes(json), Bytes.toBytes(status)) } + .map { l => new Tuple(l.map(s => {new ImmutableBytesWritable(s)}):_*) } + + // scalastyle:off null + // Add example of lines without GROBID data + val SampleData = SampleDataHead :+ new Tuple( + new ImmutableBytesWritable(Bytes.toBytes("sha1:35985C3YNNEGH5WAG5ZAA88888888888")), null, null) + // scalastyle:on null + + JobTest("sandcrawler.GrobidScorableDumpJob") + .arg("test", "") + .arg("app.conf.path", "app.conf") + .arg("output", output) + .arg("hbase-table", testTable) + .arg("zookeeper-hosts", testHost) + .arg("debug", "true") + .source[Tuple](GrobidScorable.getHBaseSource(testTable, testHost), SampleData) + .sink[(String, String)](TypedTsv[(String, String)](output)) { + outputBuffer => + "The pipeline" should "return correct-length list" in { + outputBuffer should have length 3 + } + } + .run + .finish +} diff --git a/scalding/src/test/scala/sandcrawler/GrobidScorableTest.scala b/scalding/src/test/scala/sandcrawler/GrobidScorableTest.scala new file mode 100644 index 0000000..b395a64 --- /dev/null +++ b/scalding/src/test/scala/sandcrawler/GrobidScorableTest.scala @@ -0,0 +1,122 @@ +package sandcrawler + +import cascading.tuple.Fields +import cascading.tuple.Tuple +import com.twitter.scalding.JobTest +import com.twitter.scalding.TextLine +import com.twitter.scalding.TupleConversions +import com.twitter.scalding.TypedTsv +import org.apache.hadoop.hbase.io.ImmutableBytesWritable +import org.apache.hadoop.hbase.util.Bytes +import org.scalatest._ +import parallelai.spyglass.hbase.HBaseConstants.SourceMode + +class GrobidScorableTest extends FlatSpec with Matchers { + val GrobidString = """ +{ + "title": "<<TITLE>>", + "authors": [ + {"name": "Brewster Kahle"}, + {"name": "J Doe"} + ], + "journal": { + "name": "Dummy Example File. Journal of Fake News. pp. 1-2. ISSN 1234-5678", + "eissn": null, + "issn": null, + "issue": null, + "publisher": null, + "volume": null + }, + "date": "2000", + "doi": null, + "citations": [ + { "authors": [{"name": "A Seaperson"}], + "date": "2001", + "id": "b0", + "index": 0, + "issue": null, + "journal": "Letters in the Alphabet", + "publisher": null, + "title": "Everything is Wonderful", + "url": null, + "volume": "20"}, + { "authors": [], + "date": "2011-03-28", + "id": "b1", + "index": 1, + "issue": null, + "journal": "The Dictionary", + "publisher": null, + "title": "All about Facts", + "url": null, + "volume": "14"} + ], + "abstract": "Everything you ever wanted to know about nothing", + "body": "Introduction \nEverything starts somewhere, as somebody [1] once said. \n\n In Depth \n Meat \nYou know, for kids. \n Potatos \nQED.", + "acknowledgement": null, + "annex": null +} +""" + val GrobidStringWithGoodTitle = GrobidString.replace("<<TITLE>>", "Dummy Example File") + val GrobidStringWithMaximumTitle = GrobidString.replace("<<TITLE>>", "T" * Scorable.MaxTitleLength) + val GrobidStringWithExcessiveTitle = GrobidString.replace("<<TITLE>>", "T" * Scorable.MaxTitleLength + "0") + val GrobidStringWithNullTitle = GrobidString.replace("\"<<TITLE>>\"", "null") + val GrobidStringWithoutTitle = GrobidString.replace("title", "nottitle") + val MalformedGrobidString = GrobidString.replace("}", "") + val Key = "Dummy Key" + + // Unit tests + + "GrobidScorable.jsonToMapFeatures()" should "handle invalid JSON" in { + GrobidScorable.jsonToMapFeatures(Key, MalformedGrobidString) should be (None) + } + + it should "handle null title" in { + GrobidScorable.jsonToMapFeatures(Key, GrobidStringWithNullTitle) should be (None) + } + + it should "handle missing title" in { + GrobidScorable.jsonToMapFeatures(Key, GrobidStringWithoutTitle) should be (None) + } + + it should "handle valid input" in { + GrobidScorable.jsonToMapFeatures(Key, GrobidStringWithGoodTitle) match { + case None => fail() + case Some(result) => { + result.slug shouldBe "dummyexamplefile" + Scorable.jsonToMap(result.json) match { + case None => fail() + case Some(map) => { + map should contain key "title" + map("title").asInstanceOf[String] shouldBe "Dummy Example File" + map("authors").asInstanceOf[List[String]] shouldBe List("Brewster Kahle", "J Doe") + } + } + } + } + } + + "GrobidScorable.keepRecord()" should "return true for valid JSON with title" in { + GrobidScorable.keepRecord(GrobidStringWithGoodTitle) shouldBe true + } + + it should "return true for valid JSON with a title of maximum permitted length" in { + GrobidScorable.keepRecord(GrobidStringWithMaximumTitle) shouldBe true + } + + it should "return false for valid JSON with excessively long title" in { + GrobidScorable.keepRecord(GrobidStringWithExcessiveTitle) shouldBe false + } + + it should "return false for valid JSON with null title" in { + GrobidScorable.keepRecord(GrobidStringWithNullTitle) shouldBe false + } + + it should "return false for valid JSON with no title" in { + GrobidScorable.keepRecord(GrobidStringWithoutTitle) shouldBe false + } + + it should "return false for invalid JSON" in { + GrobidScorable.keepRecord(GrobidStringWithoutTitle) shouldBe false + } +} diff --git a/scalding/src/test/scala/sandcrawler/HBaseBuilderTest.scala b/scalding/src/test/scala/sandcrawler/HBaseBuilderTest.scala index 603a4c7..c61cb22 100644 --- a/scalding/src/test/scala/sandcrawler/HBaseBuilderTest.scala +++ b/scalding/src/test/scala/sandcrawler/HBaseBuilderTest.scala @@ -22,6 +22,7 @@ class HBaseBuilderTest extends FlatSpec with Matchers { fields should have length 0 } + //scalastyle:off no.whitespace.before.left.bracket it should "throw IllegalArgumentException on malformed input" in { a [IllegalArgumentException] should be thrownBy { HBaseBuilder.parseColSpecs(List("file_size")) diff --git a/scalding/src/test/scala/sandcrawler/HBaseMimeCountTest.scala b/scalding/src/test/scala/sandcrawler/HBaseMimeCountTest.scala index fde2290..d6d283f 100644 --- a/scalding/src/test/scala/sandcrawler/HBaseMimeCountTest.scala +++ b/scalding/src/test/scala/sandcrawler/HBaseMimeCountTest.scala @@ -1,15 +1,18 @@ package sandcrawler -import cascading.tuple.{Tuple, Fields} -import com.twitter.scalding.{JobTest, Tsv, TupleConversions} +import cascading.tuple.Fields +import cascading.tuple.Tuple +import com.twitter.scalding.JobTest +import com.twitter.scalding.Tsv +import com.twitter.scalding.TupleConversions import org.apache.hadoop.hbase.io.ImmutableBytesWritable import org.apache.hadoop.hbase.util.Bytes import org.junit.runner.RunWith import org.scalatest.FunSpec import org.scalatest.junit.JUnitRunner import org.slf4j.LoggerFactory -import parallelai.spyglass.hbase.HBaseSource import parallelai.spyglass.hbase.HBaseConstants.SourceMode +import parallelai.spyglass.hbase.HBaseSource import scala._ @RunWith(classOf[JUnitRunner]) diff --git a/scalding/src/test/scala/sandcrawler/HBaseRowCountTest.scala b/scalding/src/test/scala/sandcrawler/HBaseRowCountTest.scala index 3424a36..c4ca5aa 100644 --- a/scalding/src/test/scala/sandcrawler/HBaseRowCountTest.scala +++ b/scalding/src/test/scala/sandcrawler/HBaseRowCountTest.scala @@ -1,15 +1,18 @@ package sandcrawler -import cascading.tuple.{Tuple, Fields} -import com.twitter.scalding.{JobTest, Tsv, TupleConversions} +import cascading.tuple.Fields +import cascading.tuple.Tuple +import com.twitter.scalding.JobTest +import com.twitter.scalding.Tsv +import com.twitter.scalding.TupleConversions import org.apache.hadoop.hbase.io.ImmutableBytesWritable import org.apache.hadoop.hbase.util.Bytes import org.junit.runner.RunWith import org.scalatest.FunSpec import org.scalatest.junit.JUnitRunner import org.slf4j.LoggerFactory -import parallelai.spyglass.hbase.HBaseSource import parallelai.spyglass.hbase.HBaseConstants.SourceMode +import parallelai.spyglass.hbase.HBaseSource import scala._ /** @@ -47,12 +50,10 @@ class HBaseRowCountTest extends FunSpec with TupleConversions { outputBuffer => it("should return the test data provided.") { - println("outputBuffer.size => " + outputBuffer.size) assert(outputBuffer.size === 1) } it("should return the correct count") { - println("raw output => " + outputBuffer) assert(outputBuffer(0).getObject(0) === 8) } } diff --git a/scalding/src/test/scala/sandcrawler/HBaseStatusCodeCountTest.scala b/scalding/src/test/scala/sandcrawler/HBaseStatusCodeCountTest.scala new file mode 100644 index 0000000..d2cf9de --- /dev/null +++ b/scalding/src/test/scala/sandcrawler/HBaseStatusCodeCountTest.scala @@ -0,0 +1,71 @@ +package sandcrawler + +import cascading.tuple.Fields +import cascading.tuple.Tuple +import com.twitter.scalding.JobTest +import com.twitter.scalding.Tsv +import com.twitter.scalding.TupleConversions +import com.twitter.scalding.TypedTsv +import org.apache.hadoop.hbase.io.ImmutableBytesWritable +import org.apache.hadoop.hbase.util.Bytes +import org.junit.runner.RunWith +import org.scalatest.FunSpec +import org.scalatest.junit.JUnitRunner +import org.slf4j.LoggerFactory +import parallelai.spyglass.hbase.HBaseConstants.SourceMode +import parallelai.spyglass.hbase.HBaseSource +import scala._ + +@RunWith(classOf[JUnitRunner]) +class HBaseStatusCodeCountTest extends FunSpec with TupleConversions { + + val output = "/tmp/testOutput" + val (testTable, testHost) = ("test-table", "dummy-host:2181") + + val log = LoggerFactory.getLogger(this.getClass.getName) + + val statusType1 : Long = 200 + val statusType2 : Long = 404 + val statusType1Bytes = Bytes.toBytes(statusType1) + val statusType2Bytes = Bytes.toBytes(statusType2) + + // TODO(bnewbold): now to express a null (empty value) in this list? + val sampleData : List[List[Array[Byte]]] = List( + ("sha1:K2DKSSVTXWPRMFDTWSTCQW3RVWRIOV3Q", statusType1Bytes), + ("sha1:C3YNNEGH5WAG5ZAAXWAEBNXJWT6CZ3WU", statusType1Bytes), + ("sha1:SDKUVHC3YNNEGH5WAG5ZAAXWAEBNX4WT", statusType2Bytes), + ("sha1:35985C3YNNEGH5WAG5ZAAXWAEBNXJW56", statusType2Bytes), + ("sha1:885C3YNNEGH5WAG5ZAAXWA8BNXJWT6CZ", statusType2Bytes), + ("sha1:00904C3YNNEGH5WAG5ZA9XWAEBNXJWT6", statusType2Bytes), + ("sha1:249C3YNNEGH5WAG5ZAAXWAEBNXJWT6CZ", statusType1Bytes), + ("sha1:095893C3YNNEGH5WAG5ZAAXWAEBNXJWT", statusType2Bytes)) + .map(pair => List(Bytes.toBytes(pair._1), pair._2)) + + val statusType1Count = sampleData.count(lst => lst(1) == statusType1Bytes) + val statusType2Count = sampleData.count(lst => lst(1) == statusType2Bytes) + + JobTest("sandcrawler.HBaseStatusCodeCountJob") + .arg("test", "") + .arg("app.conf.path", "app.conf") + .arg("output", output) + .arg("hbase-table", testTable) + .arg("zookeeper-hosts", testHost) + .arg("debug", "true") + .source[Tuple](HBaseCountJob.getHBaseSource(testTable, testHost, "grobid0:status_code"), + sampleData.map(l => new Tuple(l.map(s => {new ImmutableBytesWritable(s)}):_*))) + .sink[Tuple](TypedTsv[(Long, Long)](output)) { + outputBuffer => + it("should return a correct number of elements.") { + assert(outputBuffer.size === 2) + } + + // Convert List[Tuple] to Map[Long, Long]. + val counts = outputBuffer.map(t => (t.getLong(0), t.getLong(1))).toMap + it("should have the appropriate number of each status type") { + assert(counts(statusType1) == statusType1Count) + assert(counts(statusType2) == statusType2Count) + } + } + .run + .finish +} diff --git a/scalding/src/test/scala/sandcrawler/HBaseStatusCountTest.scala b/scalding/src/test/scala/sandcrawler/HBaseStatusCountTest.scala index d7689cd..7e91af3 100644 --- a/scalding/src/test/scala/sandcrawler/HBaseStatusCountTest.scala +++ b/scalding/src/test/scala/sandcrawler/HBaseStatusCountTest.scala @@ -1,15 +1,19 @@ package sandcrawler -import cascading.tuple.{Tuple, Fields} -import com.twitter.scalding.{JobTest, Tsv, TypedTsv, TupleConversions} +import cascading.tuple.Fields +import cascading.tuple.Tuple +import com.twitter.scalding.JobTest +import com.twitter.scalding.Tsv +import com.twitter.scalding.TupleConversions +import com.twitter.scalding.TypedTsv import org.apache.hadoop.hbase.io.ImmutableBytesWritable import org.apache.hadoop.hbase.util.Bytes import org.junit.runner.RunWith import org.scalatest.FunSpec import org.scalatest.junit.JUnitRunner import org.slf4j.LoggerFactory -import parallelai.spyglass.hbase.HBaseSource import parallelai.spyglass.hbase.HBaseConstants.SourceMode +import parallelai.spyglass.hbase.HBaseSource import scala._ @RunWith(classOf[JUnitRunner]) @@ -20,21 +24,20 @@ class HBaseStatusCountTest extends FunSpec with TupleConversions { val log = LoggerFactory.getLogger(this.getClass.getName) - val statusType1 : Long = 200 - val statusType2 : Long = 404 - val statusType1Bytes = Bytes.toBytes(statusType1) - val statusType2Bytes = Bytes.toBytes(statusType2) - - val sampleData = List( - List(Bytes.toBytes("sha1:K2DKSSVTXWPRMFDTWSTCQW3RVWRIOV3Q"), statusType1Bytes), - List(Bytes.toBytes("sha1:C3YNNEGH5WAG5ZAAXWAEBNXJWT6CZ3WU"), statusType1Bytes), - List(Bytes.toBytes("sha1:SDKUVHC3YNNEGH5WAG5ZAAXWAEBNX4WT"), statusType2Bytes), - List(Bytes.toBytes("sha1:35985C3YNNEGH5WAG5ZAAXWAEBNXJW56"), statusType2Bytes), - List(Bytes.toBytes("sha1:885C3YNNEGH5WAG5ZAAXWA8BNXJWT6CZ"), statusType2Bytes), - List(Bytes.toBytes("sha1:00904C3YNNEGH5WAG5ZA9XWAEBNXJWT6"), statusType2Bytes), - List(Bytes.toBytes("sha1:249C3YNNEGH5WAG5ZAAXWAEBNXJWT6CZ"), statusType1Bytes), - List(Bytes.toBytes("sha1:095893C3YNNEGH5WAG5ZAAXWAEBNXJWT"), statusType2Bytes) - ) + val statusType1Bytes = Bytes.toBytes("""{"status": "success"}""") + val statusType2Bytes = Bytes.toBytes("""{"status": "partial"}""") + + // TODO(bnewbold): now to express a null (empty value) in this list? + val sampleData : List[List[Array[Byte]]] = List( + ("sha1:K2DKSSVTXWPRMFDTWSTCQW3RVWRIOV3Q", statusType1Bytes), + ("sha1:C3YNNEGH5WAG5ZAAXWAEBNXJWT6CZ3WU", statusType1Bytes), + ("sha1:SDKUVHC3YNNEGH5WAG5ZAAXWAEBNX4WT", statusType2Bytes), + ("sha1:35985C3YNNEGH5WAG5ZAAXWAEBNXJW56", statusType2Bytes), + ("sha1:885C3YNNEGH5WAG5ZAAXWA8BNXJWT6CZ", statusType2Bytes), + ("sha1:00904C3YNNEGH5WAG5ZA9XWAEBNXJWT6", statusType2Bytes), + ("sha1:249C3YNNEGH5WAG5ZAAXWAEBNXJWT6CZ", statusType1Bytes), + ("sha1:095893C3YNNEGH5WAG5ZAAXWAEBNXJWT", statusType2Bytes)) + .map(pair => List(Bytes.toBytes(pair._1), pair._2)) val statusType1Count = sampleData.count(lst => lst(1) == statusType1Bytes) val statusType2Count = sampleData.count(lst => lst(1) == statusType2Bytes) @@ -46,20 +49,13 @@ class HBaseStatusCountTest extends FunSpec with TupleConversions { .arg("hbase-table", testTable) .arg("zookeeper-hosts", testHost) .arg("debug", "true") - .source[Tuple](HBaseCountJob.getHBaseSource(testTable, testHost, "grobid0:status_code"), + .source[Tuple](HBaseCountJob.getHBaseSource(testTable, testHost, "grobid0:status"), sampleData.map(l => new Tuple(l.map(s => {new ImmutableBytesWritable(s)}):_*))) - .sink[Tuple](TypedTsv[(Long, Long)](output)) { + .sink[Tuple](TypedTsv[(String, Long)](output)) { outputBuffer => it("should return a 2-element list.") { assert(outputBuffer.size === 2) } - - // Convert List[Tuple] to Map[Long, Long]. - val counts = outputBuffer.map(t => (t.getLong(0), t.getLong(1))).toMap - it("should have the appropriate number of each status type") { - assert(counts(statusType1) == statusType1Count) - assert(counts(statusType2) == statusType2Count) - } } .run .finish diff --git a/scalding/src/test/scala/sandcrawler/ScorableFeaturesTest.scala b/scalding/src/test/scala/sandcrawler/ScorableFeaturesTest.scala new file mode 100644 index 0000000..c847296 --- /dev/null +++ b/scalding/src/test/scala/sandcrawler/ScorableFeaturesTest.scala @@ -0,0 +1,64 @@ +package sandcrawler + +import java.io.InputStream + +import scala.io.Source + +import org.scalatest._ + +// scalastyle:off null +class ScorableFeaturesTest extends FlatSpec with Matchers { + "toMapFeatures()" should "work with gnarly inputs" in { + ScorableFeatures.create(title = null).toMapFeatures + ScorableFeatures.create(title = "something", doi = null, sha1 = null, year = 123).toMapFeatures + } + + private def titleToSlug(s : String) : Option[String] = ScorableFeatures.create(title = s).toSlug + + "mapToSlug()" should "extract the parts of titles before a colon" in { + titleToSlug("HELLO:there") shouldBe Some("hellothere") + } + + it should "extract an entire colon-less string" in { + titleToSlug("hello THERE") shouldBe Some("hellothere") + } + + it should "return Scorable.NoSlug if given empty string" in { + titleToSlug("") shouldBe (None) + } + + it should "return Scorable.NoSlug if given null" in { + titleToSlug(null) shouldBe (None) + } + + it should "strip punctuation" in { + titleToSlug("HELLO!:the:re") shouldBe Some("hellothere") + titleToSlug("a:b:cdefgh") shouldBe Some("abcdefgh") + titleToSlug( + "If you're happy and you know it, clap your hands!") shouldBe Some("ifyourehappyandyouknowitclapyourhands") + titleToSlug(":;\"\'") shouldBe (None) + } + + it should "filter stub titles" in { + titleToSlug("abstract") shouldBe (None) + titleToSlug("title!") shouldBe (None) + titleToSlug("a real title which is not on denylist") shouldBe Some("arealtitlewhichisnotondenylist") + } + + it should "strip special characters" in { + titleToSlug(":;!',|\"\'`.#?!-@*/\\=+~%$^{}()[]<>-_’·“”‘’“”«»「」¿–±§ʿ") shouldBe (None) + // TODO: titleToSlug("©™₨№…") shouldBe (None) + // TODO: titleToSlug("πµΣσ") shouldBe (None) + } + + it should "remove whitespace" in { + titleToSlug("foo bar : baz ::") shouldBe Some("foobarbaz") + titleToSlug("\na\t:b:cdefghi") shouldBe Some("abcdefghi") + titleToSlug("\n \t \r ") shouldBe (None) + } + + it should "skip very short slugs" in { + titleToSlug("short") shouldBe (None) + titleToSlug("a longer, more in depth title") shouldBe Some("alongermoreindepthtitle") + } +} diff --git a/scalding/src/test/scala/sandcrawler/ScorableTest.scala b/scalding/src/test/scala/sandcrawler/ScorableTest.scala new file mode 100644 index 0000000..2094543 --- /dev/null +++ b/scalding/src/test/scala/sandcrawler/ScorableTest.scala @@ -0,0 +1,81 @@ +package sandcrawler + +import cascading.tuple.Fields +import cascading.tuple.Tuple +import com.twitter.scalding.JobTest +import com.twitter.scalding.TextLine +import com.twitter.scalding.TupleConversions +import com.twitter.scalding.TypedTsv +import org.apache.hadoop.hbase.io.ImmutableBytesWritable +import org.apache.hadoop.hbase.util.Bytes +import org.scalatest._ +import parallelai.spyglass.hbase.HBaseConstants.SourceMode + +class ScorableTest extends FlatSpec with Matchers { + val JsonString = """ +{ + "title": "<<TITLE>>", + "authors": [ + {"name": "Brewster Kahle"}, + {"name": "J Doe"} + ], + "journal": { + "name": "Dummy Example File. Journal of Fake News. pp. 1-2. ISSN 1234-5678", + "eissn": null, + "issn": null, + "issue": null, + "publisher": null, + "volume": null + }, + "date": "2000", + "doi": null, + "citations": [ + { "authors": [{"name": "A Seaperson"}], + "date": "2001", + "id": "b0", + "index": 0, + "issue": null, + "journal": "Letters in the Alphabet", + "publisher": null, + "title": "Everything is Wonderful", + "url": null, + "volume": "20"}, + { "authors": [], + "date": "2011-03-28", + "id": "b1", + "index": 1, + "issue": null, + "journal": "The Dictionary", + "publisher": null, + "title": "All about Facts", + "url": null, + "volume": "14"} + ], + "abstract": "Everything you ever wanted to know about nothing", + "body": "Introduction \nEverything starts somewhere, as somebody [1] once said. \n\n In Depth \n Meat \nYou know, for kids. \n Potatos \nQED.", + "acknowledgement": null, + "annex": null +} +""" + "jsonToMap()" should "return a map, given a legal JSON string" in { + Scorable.jsonToMap(JsonString) should not be (None) + } + + it should "return None, given illegal JSON" in { + Scorable.jsonToMap("illegal{,json{{") should be (None) + } + + "computeOutput()" should "return Scorable.MaxScore if given identical ReduceFeatures" in { + val score = Scorable.computeSimilarity( + new ReduceFeatures(JsonString), new ReduceFeatures(JsonString)) + score shouldBe Scorable.MaxScore + } + + "computeOutput()" should "be case-insensitive" in { + val left = JsonString.replace("<<TITLE>>", "A TITLE UPPER CASE") + val right = JsonString.replace("<<TITLE>>", "a title upper case") + val score = Scorable.computeSimilarity( + new ReduceFeatures(left), new ReduceFeatures(right)) + score shouldBe Scorable.MaxScore + } +} diff --git a/scalding/src/test/scala/sandcrawler/ScoreInsertableJobTest.scala b/scalding/src/test/scala/sandcrawler/ScoreInsertableJobTest.scala new file mode 100644 index 0000000..5393f10 --- /dev/null +++ b/scalding/src/test/scala/sandcrawler/ScoreInsertableJobTest.scala @@ -0,0 +1,262 @@ +package sandcrawler + +import cascading.tuple.Fields +import cascading.tuple.Tuple +import com.twitter.scalding.JobTest +import com.twitter.scalding.TextLine +import com.twitter.scalding.TupleConversions +import com.twitter.scalding.TypedTsv +import org.apache.hadoop.hbase.io.ImmutableBytesWritable +import org.apache.hadoop.hbase.util.Bytes +import org.scalatest._ +import parallelai.spyglass.hbase.HBaseConstants.SourceMode + +class ScoreInsertableJobTest extends FlatSpec with Matchers { + //scalastyle:off + val JsonString = """ +{ + "title": "<<TITLE>>", + "authors": [ + {"name": "Brewster Kahle"}, + {"name": "J Doe"} + ], + "journal": { + "name": "Dummy Example File. Journal of Fake News. pp. 1-2. ISSN 1234-5678", + "eissn": null, + "issn": null, + "issue": null, + "publisher": null, + "volume": null + }, + "date": "2000", + "doi": null, + "citations": [ + { "authors": [{"name": "A Seaperson"}], + "date": "2001", + "id": "b0", + "index": 0, + "issue": null, + "journal": "Letters in the Alphabet", + "publisher": null, + "title": "Everything is Wonderful", + "url": null, + "volume": "20"}, + { "authors": [], + "date": "2011-03-28", + "id": "b1", + "index": 1, + "issue": null, + "journal": "The Dictionary", + "publisher": null, + "title": "All about Facts", + "url": null, + "volume": "14"} + ], + "abstract": "Everything you ever wanted to know about nothing", + "body": "Introduction \nEverything starts somewhere, as somebody [1] once said. \n\n In Depth \n Meat \nYou know, for kids. \n Potatos \nQED.", + "acknowledgement": null, + "annex": null +} +""" + // scalastyle:on + val JsonStringWithTitle = JsonString.replace("<<TITLE>>", "Dummy Example File") + val JsonStringWithoutTitle = JsonString.replace("title", "nottitle") + val MalformedJsonString = JsonString.replace("}", "") + + // scalastyle:off + val CrossrefString = +""" +{ "_id" : { "$oid" : "5a553d5988a035a45bf50ed3" }, + "indexed" : { "date-parts" : [ [ 2017, 10, 23 ] ], + "date-time" : "2017-10-23T17:19:16Z", + "timestamp" : { "$numberLong" : "1508779156477" } }, + "reference-count" : 0, + "publisher" : "Elsevier BV", + "issue" : "3", + "license" : [ { "URL" : "http://www.elsevier.com/tdm/userlicense/1.0/", + "start" : { "date-parts" : [ [ 1996, 1, 1 ] ], + "date-time" : "1996-01-01T00:00:00Z", + "timestamp" : { "$numberLong" : "820454400000" } }, + "delay-in-days" : 0, "content-version" : "tdm" }], + "content-domain" : { "domain" : [], "crossmark-restriction" : false }, + "published-print" : { "date-parts" : [ [ 1996 ] ] }, + "DOI" : "<<DOI>>", + "type" : "journal-article", + "created" : { "date-parts" : [ [ 2002, 7, 25 ] ], + "date-time" : "2002-07-25T15:09:41Z", + "timestamp" : { "$numberLong" : "1027609781000" } }, + "page" : "186-187", + "source" : "Crossref", + "is-referenced-by-count" : 0, + "title" : [ "<<TITLE>>" ], + "prefix" : "10.1016", + "volume" : "9", + "author" : [ { "given" : "W", "family" : "Gaier", "affiliation" : [] } ], + "member" : "78", + "container-title" : [ "Journal de Pédiatrie et de Puériculture" ], + "link" : [ { "URL" : "http://api.elsevier.com/content/article/PII:0987-7983(96)87729-2?httpAccept=text/xml", + "content-type" : "text/xml", + "content-version" : "vor", + "intended-application" : "text-mining" }, + { "URL" : + "http://api.elsevier.com/content/article/PII:0987-7983(96)87729-2?httpAccept=text/plain", + "content-type" : "text/plain", + "content-version" : "vor", + "intended-application" : "text-mining" } ], + "deposited" : { "date-parts" : [ [ 2015, 9, 3 ] ], + "date-time" : "2015-09-03T10:03:43Z", + "timestamp" : { "$numberLong" : "1441274623000" } }, + "score" : 1, + "issued" : { "date-parts" : [ [ 1996 ] ] }, + "references-count" : 0, + "alternative-id" : [ "0987-7983(96)87729-2" ], + "URL" : "http://dx.doi.org/10.1016/0987-7983(96)87729-2", + "ISSN" : [ "0987-7983" ], + "issn-type" : [ { "value" : "0987-7983", "type" : "print" } ], + "subject" : [ "Pediatrics, Perinatology, and Child Health" ] +} +""" + // scalastyle:on + val TooLongOfTitle = "X" * Scorable.MaxTitleLength + "Y" // arbitrary long string + val TooShortOfTitle = "X" * (ScorableFeatures.MinSlugLength - 1) + val CrossrefStringWithTitle = CrossrefString.replace("<<TITLE>>", "SomeTitle") + val CrossrefStringWithoutTitle = CrossrefString.replace("title", "nottitle") + val MalformedCrossrefString = CrossrefString.replace("}", "") + val CrossrefStrings = List( + CrossrefString.replace("<<TITLE>>", "Title 2: TNG").replace("<<DOI>>", "DOI-0"), + CrossrefString.replace("<<TITLE>>", "Title 1: TNG 2A").replace("<<DOI>>", "DOI-0.5"), + CrossrefString.replace("<<TITLE>>", "Title 1: TNG 3").replace("<<DOI>>", "DOI-0.75"), + CrossrefString.replace("<<TITLE>>", "Title 2: Rebooted").replace("<<DOI>>", "DOI-1"), + CrossrefString.replace("<<TITLE>>", TooLongOfTitle).replace("<<DOI>>", "DOI-1"), + CrossrefString.replace("<<TITLE>>", TooShortOfTitle).replace("<<DOI>>", "DOI-1")) + + // Pipeline tests + val output = "/tmp/testOutput" + val input = "/tmp/testInput" + val (testTable, testHost) = ("test-table", "dummy-host:2181") + + val Sha1Strings : List[String] = List( + "sha1:K2DKSSVTXWPRMFDTWSTCQW3RVWRIOV3Q", + "sha1:C3YNNEGH5WAG5ZAAXWAEBNXJWT6CZ3WU", + "sha1:SDKUVHC3YNNEGH5WAG5ZAAXWAEBNX4WT", + "sha1:35985C3YNNEGH5WAG5ZAAXWAEBNXJW56", + "sha1:93187A85273589347598473894839443", + "sha1:024937534094897039547e9824382943", + "sha1:93229759932857982837892347893892", + "sha1:83229759932857982837892347893892") + + val JsonStrings : List[String] = List( + JsonString.replace("<<TITLE>>", "Title 1: The Original"), + JsonString.replace("<<TITLE>>", "Title 2: TNG"), + JsonString.replace("<<TITLE>>", "Title 3: The Sequel"), + // This will have bad status. + JsonString.replace("<<TITLE>>", "Title 1: The Original"), + MalformedJsonString, + // This will have bad status. + JsonString.replace("<<TITLE>>", "Title 2: Not TNG"), + // These are in both sources but have bad titles + JsonString.replace("<<TITLE>>", TooLongOfTitle), + JsonString.replace("<<TITLE>>", TooShortOfTitle) + ) + + // bnewbold: status codes aren't strings, they are uint64 + val Ok : Long = 200 + val Bad : Long = 400 + val StatusCodes = List(Ok, Ok, Ok, Bad, Ok, Bad, Ok, Ok) + + val SampleDataHead : List[Tuple] = (Sha1Strings, JsonStrings, StatusCodes) + .zipped + .toList + .map { case (sha, json, status) => List(Bytes.toBytes(sha), Bytes.toBytes(json), Bytes.toBytes(status)) } + .map { l => new Tuple(l.map(s => {new ImmutableBytesWritable(s)}):_*) } + + // scalastyle:off null + // Add example of lines without GROBID data + // scalastyle:off null + val SampleData = SampleDataHead :+ new Tuple( + new ImmutableBytesWritable(Bytes.toBytes("sha1:35985C3YNNEGH5WAG5ZAA88888888888")), null, null) + // scalastyle:on null + + val CdxList: List[String] = List("{}", "{}", "{}", "{}", "{}", "{}", "{}", "{}" ) + val MimeList: List[String] = List("application/pdf", "application/pdf", "application/pdf", + "application/pdf", "application/pdf", "application/pdf", "application/pdf", + "application/pdf") + val SizeList: List[Long] = List(1,2,3,4,5,6,7,8) + + // Can zip 3 lists, but not 4... so we recursively zip + val SampleCdxData : List[Tuple] = ((Sha1Strings, CdxList).zipped.toList, (MimeList, SizeList).zipped.toList) + .zipped + .toList + .map { case ((sha: String, cdx: String), (mime: String, size: Long)) => List(Bytes.toBytes(sha), Bytes.toBytes(cdx), Bytes.toBytes(mime), Bytes.toBytes(size)) } + .map { l => new Tuple(l.map(s => {new ImmutableBytesWritable(s)}):_*) } + + JobTest("sandcrawler.ScoreInsertableJob") + .arg("test", "") + .arg("app.conf.path", "app.conf") + .arg("output", output) + .arg("hbase-table", testTable) + .arg("zookeeper-hosts", testHost) + .arg("crossref-input", input) + .arg("debug", "true") + .source[Tuple](GrobidScorable.getHBaseSource(testTable, testHost), SampleData) + .source[Tuple](ScoreInsertableJob.getHBaseCdxSource(testTable, testHost), SampleCdxData) + .source(TextLine(input), List( + 0 -> CrossrefStrings(0), + 1 -> CrossrefStrings(1), + 2 -> CrossrefStrings(2), + 3 -> CrossrefStrings(3), + 4 -> CrossrefStrings(4), + 4 -> CrossrefStrings(5))) + .sink[(String, ReduceFeatures)](TypedTsv[(String, ReduceFeatures)](output + ".trapped")) { _ => () } + .sink[(String, String, Int, String, String, String, String, Long)](TypedTsv[(String, String, Int, String, String, String, String, Long)](output)) { + // Grobid titles and slugs (in parentheses): + // Title 1 (title1) + // Title 2: TNG (title2tng) + // Title 3: The Sequel (title3thesequel) + // <too long of a title> + // <too short of a title> + // crossref titles and slugs (in parentheses): + // Title 2: TNG (title2tng) + // Title 1: TNG 2A (title1tng2a) + // Title 1: TNG 3 (title1tng3) + // Title 2: Rebooted (title2rebooted) + // <too long of a title> + // <too short of a title> + // XXX: Join should have 3 "title1" slugs and 1 "title2tng" slug + outputBuffer => + "The pipeline" should "return a 1-element list" in { + outputBuffer should have length 1 + } + + it should "has right # of entries with each slug" in { + val slugs = outputBuffer.map(_._2) + val countMap : Map[String, Int] = slugs.groupBy(identity).mapValues(_.size) + // XXX: countMap("title1") shouldBe 3 + countMap("title2tng") shouldBe 1 + } + + def bundle(slug : String, grobidIndex : Int, crossrefIndex : Int) : (String, Int, String, String) = { + val mfg : Option[MapFeatures] = GrobidScorable.jsonToMapFeatures( + Sha1Strings(grobidIndex), + JsonStrings(grobidIndex)) + val mfc : Option[MapFeatures] = CrossrefScorable.jsonToMapFeatures(CrossrefStrings(crossrefIndex)) + if (mfg.isEmpty || mfc.isEmpty) { + fail() + } else { + val score = Scorable.computeSimilarity( + ReduceFeatures(mfg.get.json), + ReduceFeatures(mfc.get.json)) + (slug, score, mfg.get.json, mfc.get.json) + } + } + + it should "have right output values" in { + //outputBuffer.exists(_ == bundle("title1", 0, 0)) + //outputBuffer.exists(_ == bundle("title1", 0, 2)) + //outputBuffer.exists(_ == bundle("title1", 0, 1)) + outputBuffer.exists(_ == bundle("title2tng", 1, 3)) + } + } + .run + .finish +} diff --git a/scalding/src/test/scala/sandcrawler/ScoreJobTest.scala b/scalding/src/test/scala/sandcrawler/ScoreJobTest.scala new file mode 100644 index 0000000..fbc0ee5 --- /dev/null +++ b/scalding/src/test/scala/sandcrawler/ScoreJobTest.scala @@ -0,0 +1,248 @@ +package sandcrawler + +import cascading.tuple.Fields +import cascading.tuple.Tuple +import com.twitter.scalding.JobTest +import com.twitter.scalding.TextLine +import com.twitter.scalding.TupleConversions +import com.twitter.scalding.TypedTsv +import org.apache.hadoop.hbase.io.ImmutableBytesWritable +import org.apache.hadoop.hbase.util.Bytes +import org.scalatest._ +import parallelai.spyglass.hbase.HBaseConstants.SourceMode + +class ScoreJobTest extends FlatSpec with Matchers { + //scalastyle:off + val JsonString = """ +{ + "title": "<<TITLE>>", + "authors": [ + {"name": "Brewster Kahle"}, + {"name": "J Doe"} + ], + "journal": { + "name": "Dummy Example File. Journal of Fake News. pp. 1-2. ISSN 1234-5678", + "eissn": null, + "issn": null, + "issue": null, + "publisher": null, + "volume": null + }, + "date": "2000", + "doi": null, + "citations": [ + { "authors": [{"name": "A Seaperson"}], + "date": "2001", + "id": "b0", + "index": 0, + "issue": null, + "journal": "Letters in the Alphabet", + "publisher": null, + "title": "Everything is Wonderful", + "url": null, + "volume": "20"}, + { "authors": [], + "date": "2011-03-28", + "id": "b1", + "index": 1, + "issue": null, + "journal": "The Dictionary", + "publisher": null, + "title": "All about Facts", + "url": null, + "volume": "14"} + ], + "abstract": "Everything you ever wanted to know about nothing", + "body": "Introduction \nEverything starts somewhere, as somebody [1] once said. \n\n In Depth \n Meat \nYou know, for kids. \n Potatos \nQED.", + "acknowledgement": null, + "annex": null +} +""" + // scalastyle:on + val JsonStringWithTitle = JsonString.replace("<<TITLE>>", "Dummy Example File") + val JsonStringWithoutTitle = JsonString.replace("title", "nottitle") + val MalformedJsonString = JsonString.replace("}", "") + + // scalastyle:off + val CrossrefString = +""" +{ "_id" : { "$oid" : "5a553d5988a035a45bf50ed3" }, + "indexed" : { "date-parts" : [ [ 2017, 10, 23 ] ], + "date-time" : "2017-10-23T17:19:16Z", + "timestamp" : { "$numberLong" : "1508779156477" } }, + "reference-count" : 0, + "publisher" : "Elsevier BV", + "issue" : "3", + "license" : [ { "URL" : "http://www.elsevier.com/tdm/userlicense/1.0/", + "start" : { "date-parts" : [ [ 1996, 1, 1 ] ], + "date-time" : "1996-01-01T00:00:00Z", + "timestamp" : { "$numberLong" : "820454400000" } }, + "delay-in-days" : 0, "content-version" : "tdm" }], + "content-domain" : { "domain" : [], "crossmark-restriction" : false }, + "published-print" : { "date-parts" : [ [ 1996 ] ] }, + "DOI" : "<<DOI>>", + "type" : "journal-article", + "created" : { "date-parts" : [ [ 2002, 7, 25 ] ], + "date-time" : "2002-07-25T15:09:41Z", + "timestamp" : { "$numberLong" : "1027609781000" } }, + "page" : "186-187", + "source" : "Crossref", + "is-referenced-by-count" : 0, + "title" : [ "<<TITLE>>" ], + "prefix" : "10.1016", + "volume" : "9", + "author" : [ { "given" : "W", "family" : "Gaier", "affiliation" : [] } ], + "member" : "78", + "container-title" : [ "Journal de Pédiatrie et de Puériculture" ], + "link" : [ { "URL" : "http://api.elsevier.com/content/article/PII:0987-7983(96)87729-2?httpAccept=text/xml", + "content-type" : "text/xml", + "content-version" : "vor", + "intended-application" : "text-mining" }, + { "URL" : + "http://api.elsevier.com/content/article/PII:0987-7983(96)87729-2?httpAccept=text/plain", + "content-type" : "text/plain", + "content-version" : "vor", + "intended-application" : "text-mining" } ], + "deposited" : { "date-parts" : [ [ 2015, 9, 3 ] ], + "date-time" : "2015-09-03T10:03:43Z", + "timestamp" : { "$numberLong" : "1441274623000" } }, + "score" : 1, + "issued" : { "date-parts" : [ [ 1996 ] ] }, + "references-count" : 0, + "alternative-id" : [ "0987-7983(96)87729-2" ], + "URL" : "http://dx.doi.org/10.1016/0987-7983(96)87729-2", + "ISSN" : [ "0987-7983" ], + "issn-type" : [ { "value" : "0987-7983", "type" : "print" } ], + "subject" : [ "Pediatrics, Perinatology, and Child Health" ] +} +""" + // scalastyle:on + val TooLongOfTitle = "X" * Scorable.MaxTitleLength + "Y" // arbitrary long string + val TooShortOfTitle = "X" * (ScorableFeatures.MinSlugLength - 1) + val CrossrefStringWithTitle = CrossrefString.replace("<<TITLE>>", "SomeTitle") + val CrossrefStringWithoutTitle = CrossrefString.replace("title", "nottitle") + val MalformedCrossrefString = CrossrefString.replace("}", "") + val CrossrefStrings = List( + CrossrefString.replace("<<TITLE>>", "Title 2: TNG").replace("<<DOI>>", "DOI-0"), + CrossrefString.replace("<<TITLE>>", "Title 1: TNG 2A").replace("<<DOI>>", "DOI-0.5"), + CrossrefString.replace("<<TITLE>>", "Title 1: TNG 3").replace("<<DOI>>", "DOI-0.75"), + CrossrefString.replace("<<TITLE>>", "Title 2: Rebooted").replace("<<DOI>>", "DOI-1"), + CrossrefString.replace("<<TITLE>>", TooLongOfTitle).replace("<<DOI>>", "DOI-1"), + CrossrefString.replace("<<TITLE>>", TooShortOfTitle).replace("<<DOI>>", "DOI-1")) + + // Pipeline tests + val output = "/tmp/testOutput" + val input = "/tmp/testInput" + val (testTable, testHost) = ("test-table", "dummy-host:2181") + + val Sha1Strings : List[String] = List( + "sha1:K2DKSSVTXWPRMFDTWSTCQW3RVWRIOV3Q", + "sha1:C3YNNEGH5WAG5ZAAXWAEBNXJWT6CZ3WU", + "sha1:SDKUVHC3YNNEGH5WAG5ZAAXWAEBNX4WT", + "sha1:35985C3YNNEGH5WAG5ZAAXWAEBNXJW56", + "sha1:93187A85273589347598473894839443", + "sha1:024937534094897039547e9824382943", + "sha1:93229759932857982837892347893892", + "sha1:83229759932857982837892347893892") + + val JsonStrings : List[String] = List( + JsonString.replace("<<TITLE>>", "Title 1: The Original"), + JsonString.replace("<<TITLE>>", "Title 2: TNG"), + JsonString.replace("<<TITLE>>", "Title 3: The Sequel"), + // This will have bad status. + JsonString.replace("<<TITLE>>", "Title 1: The Original"), + MalformedJsonString, + // This will have bad status. + JsonString.replace("<<TITLE>>", "Title 2: Not TNG"), + // These are in both sources but have bad titles + JsonString.replace("<<TITLE>>", TooLongOfTitle), + JsonString.replace("<<TITLE>>", TooShortOfTitle) + ) + + // bnewbold: status codes aren't strings, they are uint64 + val Ok : Long = 200 + val Bad : Long = 400 + val StatusCodes = List(Ok, Ok, Ok, Bad, Ok, Bad, Ok, Ok) + + val SampleDataHead : List[Tuple] = (Sha1Strings, JsonStrings, StatusCodes) + .zipped + .toList + .map { case (sha, json, status) => List(Bytes.toBytes(sha), Bytes.toBytes(json), Bytes.toBytes(status)) } + .map { l => new Tuple(l.map(s => {new ImmutableBytesWritable(s)}):_*) } + + // scalastyle:off null + // Add example of lines without GROBID data + // scalastyle:off null + val SampleData = SampleDataHead :+ new Tuple( + new ImmutableBytesWritable(Bytes.toBytes("sha1:35985C3YNNEGH5WAG5ZAA88888888888")), null, null) + // scalastyle:on null + + JobTest("sandcrawler.ScoreJob") + .arg("test", "") + .arg("app.conf.path", "app.conf") + .arg("output", output) + .arg("hbase-table", testTable) + .arg("zookeeper-hosts", testHost) + .arg("crossref-input", input) + .arg("debug", "true") + .source[Tuple](GrobidScorable.getHBaseSource(testTable, testHost), SampleData) + .source(TextLine(input), List( + 0 -> CrossrefStrings(0), + 1 -> CrossrefStrings(1), + 2 -> CrossrefStrings(2), + 3 -> CrossrefStrings(3), + 4 -> CrossrefStrings(4), + 4 -> CrossrefStrings(5))) + .sink[(String, ReduceFeatures)](TypedTsv[(String, ReduceFeatures)](output + ".trapped")) { _ => () } + .sink[(String, Int, String, String)](TypedTsv[(String, Int, String, String)](output)) { + // Grobid titles and slugs (in parentheses): + // Title 1 (title1) + // Title 2: TNG (title2tng) + // Title 3: The Sequel (title3thesequel) + // <too long of a title> + // <too short of a title> + // crossref titles and slugs (in parentheses): + // Title 2: TNG (title2tng) + // Title 1: TNG 2A (title1tng2a) + // Title 1: TNG 3 (title1tng3) + // Title 2: Rebooted (title2rebooted) + // <too long of a title> + // <too short of a title> + // XXX: Join should have 3 "title1" slugs and 1 "title2tng" slug + outputBuffer => + "The pipeline" should "return a 1-element list" in { + outputBuffer should have length 1 + } + + it should "has right # of entries with each slug" in { + val slugs = outputBuffer.map(_._1) + val countMap : Map[String, Int] = slugs.groupBy(identity).mapValues(_.size) + // XXX: countMap("title1") shouldBe 3 + countMap("title2tng") shouldBe 1 + } + + def bundle(slug : String, grobidIndex : Int, crossrefIndex : Int) : (String, Int, String, String) = { + val mfg : Option[MapFeatures] = GrobidScorable.jsonToMapFeatures( + Sha1Strings(grobidIndex), + JsonStrings(grobidIndex)) + val mfc : Option[MapFeatures] = CrossrefScorable.jsonToMapFeatures(CrossrefStrings(crossrefIndex)) + if (mfg.isEmpty || mfc.isEmpty) { + fail() + } else { + val score = Scorable.computeSimilarity( + ReduceFeatures(mfg.get.json), + ReduceFeatures(mfc.get.json)) + (slug, score, mfg.get.json, mfc.get.json) + } + } + + it should "have right output values" in { + //outputBuffer.exists(_ == bundle("title1", 0, 0)) + //outputBuffer.exists(_ == bundle("title1", 0, 2)) + //outputBuffer.exists(_ == bundle("title1", 0, 1)) + outputBuffer.exists(_ == bundle("title2tng", 1, 3)) + } + } + .run + .finish +} diff --git a/scalding/src/test/scala/sandcrawler/StringUtilitiesTest.scala b/scalding/src/test/scala/sandcrawler/StringUtilitiesTest.scala new file mode 100644 index 0000000..410819b --- /dev/null +++ b/scalding/src/test/scala/sandcrawler/StringUtilitiesTest.scala @@ -0,0 +1,85 @@ +package sandcrawler + +import org.scalatest._ + +class StringUtilitiesTest extends FlatSpec with Matchers { + "removeAccents()" should "handle the empty string" in { + StringUtilities.removeAccents("") shouldBe "" + } + + it should "not change a string with unaccented characters" in { + StringUtilities.removeAccents("abc123") shouldBe "abc123" + } + + it should "remove accents from Ls" in { + StringUtilities.removeAccents("E\u0141\u0142en") shouldBe "ELlen" + } + + it should "remove accents from Es without changing case" in { + val result = StringUtilities.removeAccents("\u00e9") + result should have length 1 + result shouldBe "e" + } + + it should "convert the ø in Soren" in { + StringUtilities.removeAccents("Søren") shouldBe "Soren" + StringUtilities.removeAccents("SØREN") shouldBe "SOREN" + } + + "removePunctuation" should "work on the empty string" in { + StringUtilities.removePunctuation("") shouldBe "" + } + + it should "work on non-empty text strings" in { + StringUtilities.removePunctuation("Hello, world!") shouldBe "Hello world" + StringUtilities.removePunctuation(":-)") shouldBe "" + StringUtilities.removePunctuation("<<---a---b--->") shouldBe "ab" + } + + // Tests adapted from https://oldfashionedsoftware.com/2009/11/19/string-distance-and-refactoring-in-scala/ + "stringDistance" should "work on empty strings" in { + StringUtilities.stringDistance("", "") shouldBe 0 + StringUtilities.stringDistance("a", "") shouldBe 1 + StringUtilities.stringDistance("", "a") shouldBe 1 + StringUtilities.stringDistance("abc", "") shouldBe 3 + StringUtilities.stringDistance("", "abc") shouldBe 3 + } + + it should "work on equal strings" in { + StringUtilities.stringDistance("", "") shouldBe 0 + StringUtilities.stringDistance("a", "a") shouldBe 0 + StringUtilities.stringDistance("abc", "abc") shouldBe 0 + } + + it should "work where only inserts are needed" in { + StringUtilities.stringDistance("", "a") shouldBe 1 + StringUtilities.stringDistance("a", "ab") shouldBe 1 + StringUtilities.stringDistance("b", "ab") shouldBe 1 + StringUtilities.stringDistance("ac", "abc") shouldBe 1 + StringUtilities.stringDistance("abcdefg", "xabxcdxxefxgx") shouldBe 6 + } + + it should "work where only deletes are needed" in { + StringUtilities.stringDistance( "a", "") shouldBe 1 + StringUtilities.stringDistance( "ab", "a") shouldBe 1 + StringUtilities.stringDistance( "ab", "b") shouldBe 1 + StringUtilities.stringDistance("abc", "ac") shouldBe 1 + StringUtilities.stringDistance("xabxcdxxefxgx", "abcdefg") shouldBe 6 + } + + it should "work where only substitutions are needed" in { + StringUtilities.stringDistance( "a", "b") shouldBe 1 + StringUtilities.stringDistance( "ab", "ac") shouldBe 1 + StringUtilities.stringDistance( "ac", "bc") shouldBe 1 + StringUtilities.stringDistance("abc", "axc") shouldBe 1 + StringUtilities.stringDistance("xabxcdxxefxgx", "1ab2cd34ef5g6") shouldBe 6 + } + + it should "work where many operations are needed" in { + StringUtilities.stringDistance("example", "samples") shouldBe 3 + StringUtilities.stringDistance("sturgeon", "urgently") shouldBe 6 + StringUtilities.stringDistance("levenshtein", "frankenstein") shouldBe 6 + StringUtilities.stringDistance("distance", "difference") shouldBe 5 + StringUtilities.stringDistance("java was neat", "scala is great") shouldBe 7 + } +} |