diff options
Diffstat (limited to 'scalding/src/main')
24 files changed, 1897 insertions, 6 deletions
diff --git a/scalding/src/main/resources/slug-denylist.txt b/scalding/src/main/resources/slug-denylist.txt new file mode 100644 index 0000000..926dbd5 --- /dev/null +++ b/scalding/src/main/resources/slug-denylist.txt @@ -0,0 +1,554 @@ +abbreviations +abbreviationsandacronyms +aboutauthors +abouttheauthor +abouttheauthors +aboutthecover +abouttheeditors +abreviations +abstract +abstractnotsubmittedforonlinepublication +abstracts +abstractsofaapaposterandpodiumpresentations +abstractsofcommunications +abstractsofthesesfromthescandinaviancountries +abstractwithdrawn +acknowledgement +acknowledgements +acknowledgementsvii +acknowledgementtoreferees +acknowledgementtoreviewers +acknowledgment +acknowledgmentofreferees +acknowledgments +addendum +additionalresources +address +advertisersindex +affect +affiliation +afterword +agenda +agradecimentos +agradecimientos +aimsandscope +analysis +annexa +announcement +announcements +annualacknowledgementofmanuscriptreviewers +anotefromtheeditor +appendices +appendix +appendix1 +appendixa +appendixb +appointmentsandstaffchanges +approximation +apresentacao +article +articles +articlesofsignificantinterestselectedfromthisissuebytheeditors +associationnews +ataglance +atribute +attention +authorguidelines +authorindex +authorindexforvolume81 +authorreply +authors +authorsreply +authorsresponse +avantpropos +award +awardsappointmentsannouncements +backcover +background +backmatter +berichtigung +besprechungen +bibliografia +bibliographie +bibliography +bigdata +blankpage +blood +boardoftrustees +booknotes +booknotices +bookofabstracts +bookreview +bookreviews +bookreviewsandnotices +bookreviewssection +booksreceived +buchbesprechung +buchbesprechungen +bulletin +calendar +calendarofevents +calendarofmeetings +callforarticles +callforpapers +casereport +casereports +casestudy +chairmansopeningremarks +changes +chaos +chapter1 +chapter10 +chapter1introduction +chapter2 +chapter7 +chapteri +chapterone +chapteroneintroduction +chaptertwo +chapterx +citation +classes +classified +classifieds +closingremarks +collaborateurs +comment +commentaries +commentary +commentaryon +commenton +comments +commentto +committee +communication +communications +communicationstotheeditor +communiquedepresse +community +components +comptesrendus +computerscience +concludingremarks +conclusion +conclusions +conferencereport +congratulations +congresscalendar +conservation +content +contents +context +continuingeducation +continuingmedicaleducation +contributors +copyright +copyrightform +copyrightnotice +correction +corrections +correspondence +corrigenda +corrigendum +councilminutes +cover +coverimage +currentresearch +curriculumvitae +danksagung +dearreaders +decisionmaking +dedication +dedicatoria +definition +description +discussion +diskussion +distribution +documents +ear +economics +editorial +editorialadvisoryboard +editorialannouncement +editorialboard +editorialcomment +editorialcomments +editorialconsultants +editoriale +editorialeditorial +editorialforeword +editorialinformation +editorialintroduction +editorialintroductions +editorialnote +editorialnotes +editorialpreface +editorials +editorialsoftwaresurveysection +editorialstaff +editorialstatement +editorinchief +editors +editorschoice +editorscomment +editorscomments +editorscorner +editorscorrespondence +editorsforeword +editorsintroduction +editorsletter +editorsnote +editorsnotes +editorspage +editorspicks +editorspreface +education +einfuhrung +einleitung +electrophoresis +employment +endnotes +entrevista +entscheidungsverzeichnis +epilogue +equipment +errata +erratum +essay +essays +executivesummary +exercises +expediente +extendedabstracts +feature +features +fichatecnica +figure3 +finalexam +finalreport +focus +foreward +foreword +forthcomingarticles +forthcomingevents +fortherecord +forum +frequentlyaskedquestions +fromtheeditor +fromtheeditorinchief +fromtheeditors +fromtheeditorsdesk +fromthepresident +frontmatter +furtherreadings +genealogy +generaldiscussion +generalinformation +generalintroduction +germany +gettingstarted +glosario +glossary +glossaryofterms +guesteditorial +guesteditorsforeword +guesteditorsintroduction +guideforauthors +guidelinesforcontributors +health +heartfailure +highlights +highlightsfromthisissue +highlightsofthisissue +history +home +homework +hypothesis +iii +imageofthemonth +importantnotice +impressum +inbrief +index +indexofauthors +indexofauthorsandtitles +indice +indicegeneral +informationforauthors +informationtoauthors +inhalt +inhaltsverzeichnis +inleiding +inmemoriam +inreply +inresponse +insidethisissue +institutenews +instructionsforauthors +instructionstoauthors +interview +inthestudy +inthisissue +introducao +introduccion +introduction +introductionandoverview +introductiongenerale +introductiontotheissue +introductiontothespecialissue +introductorycomments +introductoryremarks +introduzione +inventions +invitedcommentary +issuesandevents +jobdescription +journalclub +journalscan +keywords +kurzkommentiert +languageteaching +lecture +letter +letterfromtheeditor +letterfromtheeditorinchief +letterfromtheeditors +letterfromthepresident +letters +letterstotheeditor +letterstotheeditors +lettertotheeditor +lettertotheeditors +liminaire +linearalgebra +linearregression +links +listedestableaux +listofabbreviations +listofcontributors +listoffigures +listofparticipants +listofpublications +listofreferees +listofreviewers +listoftables +literacy +literatur +literature +literaturecited +literaturereview +literaturrundschau +literaturverzeichnis +litteraturverzeichniss +livresrecus +lucina +lungcancer +magazin +maintenance +materials +materialsafetydatasheet +materialsandmethods +medicinalchemistry +meetingabstracts +meetingreport +meetings +meetingsandconferences +meetingsofinterest +membershipapplication +memoranda +memorandum +messagefromgeneralcochairs +messagefromthechairs +messagefromtheeditor +messagefromtheeditorinchief +messagefromthepresident +messagefromtheprogramchairs +messagefromtheprogramcochairs +metaanalysis +miscellanea +miscellaneous +miscellany +missionstatement +motivation +mrsnews +name +newbooks +newlyelectedmembersofthecollege +newproducts +news +newsandnotes +newsandreviews +newsandviews +newsbriefs +newsinbrief +newsnotes +newsviews +noii +note +notefromtheeditor +notes +notesandcomments +notesandnews +notesdelecture +notesforcontributors +notesoncontributors +notice +noticeboard +notitle +notitleavailable +obituaries +obituary +online +openaccess +openingaddress +openingremarks +oralabstracts +oralpresentations +organizingcommittee +originalarticle +originalarticles +other +outline +overview +panorama +papers +paperstoappearinforthcomingissues +partone +personalandmiscellaneous +perspective +perspectives +philosophy +pictureofthemonth +place +pointofview +positionsavailable +poster +posterpresentations +postscript +preface +prefaceandacknowledgements +prefacetothesecondedition +preliminarymaterial +presentacio +presentacion +presentation +presidentialaddress +presidentsmessage +presidentsreport +pressrelease +print +printing +proceedings +proceedingsofthenationalacademyofsciences +profile +programcommittee +projectmanagement +prologue +publication +publichealth +publishersnote +question +questionsandanswers +radiology +readersforum +recensiones +recensions +recentpublications +redaktorensforord +referate +references +referenciasbibliograficas +regression +rehabilitation +rejoinder +remerciements +reply +replybyauthors +researchresearchers +resenas +resources +response +responsetothelettertotheeditor +results +resume +resumen +resumes +resumo +retraction +review +reviewarticle +revieweracknowledgement +revieweracknowledgement2013 +reviewers +reviewessay +reviews +reviewsanddescriptionsoftablesandbooks +reviewsofbooks +rezension +rezensionen +safety +section +security +selectedbibliography +shortcommunication +shorternotices +shortnotices +socialengineering +sociology +sommaire +sommario +specialreport +specialsection +specifications +spistresci +subjectindex +subscriptions +suggestedreadings +sumario +summaries +summariesofkeyjournalarticles +summary +summaryofproceedings +summer +sun +supplementarymaterial +symposium +symptom +synthese +tabledesmatieres +tableofcontents +tableofcontentsandprologue +technicalreport +theauthors +theauthorsreply +thebasics +theeditorsdesk +thefirstauthorreplies +thelancet +theoreticalbackground +thetimes +theworldbank +theyearinreview +thismonthin +thismonthinthejournal +timemanagement +titeleiinhaltsverzeichnis +title +titlepage +titlepagei +tocorrespondents +totheeditor +unitedkingdom +unitednations +unitedstates +upcomingevents +vorwort +website +welcome +whatshappening +whatsnew +workscited +yourquestionsanswered +zudiesemheft +zusammenfassung diff --git a/scalding/src/main/scala/sandcrawler/BibjsonScorable.scala b/scalding/src/main/scala/sandcrawler/BibjsonScorable.scala new file mode 100644 index 0000000..abf9220 --- /dev/null +++ b/scalding/src/main/scala/sandcrawler/BibjsonScorable.scala @@ -0,0 +1,50 @@ +package sandcrawler + +import scala.math +import scala.util.parsing.json.JSON +import scala.util.parsing.json.JSONObject + +import cascading.flow.FlowDef +import cascading.tuple.Fields +import com.twitter.scalding._ +import com.twitter.scalding.typed.TDsl._ + +class BibjsonScorable extends Scorable { + + def getSource(args : Args) : Source = { + TextLine(args("bibjson-input")) + } + + def getFeaturesPipe(args : Args)(implicit mode : Mode, flowDef : FlowDef) : TypedPipe[Option[MapFeatures]] = { + getSource(args).read + .toTypedPipe[String](new Fields("line")) + .map { BibjsonScorable.bibjsonToMapFeatures(_) } + } +} + +object BibjsonScorable { + def bibjsonToMapFeatures(json : String) : Option[MapFeatures] = { + Scorable.jsonToMap(json) match { + case None => None + case Some(map) => { + if (map contains "title") { + val title = Scorable.getString(map, "title") + val doi = Scorable.getString(map, "doi") + val sha1 = Scorable.getString(map, "sha") + // TODO: year, authors (if available) + if (title == null || title.isEmpty) { + None + } else { + val sf : ScorableFeatures = ScorableFeatures.create(title=title, doi=doi, sha1=sha1) + sf.toSlug match { + case None => None + case Some(slug) => Some(MapFeatures(slug, sf.toString)) + } + } + } else { + None + } + } + } + } +} diff --git a/scalding/src/main/scala/sandcrawler/CrossrefScorable.scala b/scalding/src/main/scala/sandcrawler/CrossrefScorable.scala new file mode 100644 index 0000000..bb6413f --- /dev/null +++ b/scalding/src/main/scala/sandcrawler/CrossrefScorable.scala @@ -0,0 +1,153 @@ +package sandcrawler + +import scala.math +import scala.util.parsing.json.JSON +import scala.util.parsing.json.JSONArray +import scala.util.parsing.json.JSONObject + +import cascading.flow.FlowDef +import cascading.tuple.Fields +import com.twitter.scalding._ +import com.twitter.scalding.typed.TDsl._ +import parallelai.spyglass.hbase.HBasePipeConversions + +class CrossrefScorable extends Scorable with HBasePipeConversions { + // TODO: Generalize args so there can be multiple Crossref pipes in one job. + def getSource(args : Args) : Source = { + TextLine(args("crossref-input")) + } + + def getFeaturesPipe(args : Args)(implicit mode : Mode, flowDef : FlowDef) : TypedPipe[Option[MapFeatures]] = { + getSource(args).read + .toTypedPipe[String](new Fields("line")) + .filter { CrossrefScorable.keepRecord(_) } + .map { CrossrefScorable.jsonToMapFeatures(_) } + } +} + +object CrossrefScorable { + + val ContentTypeWhitelist: Set[String] = Set( + "book", + "book-chapter", + "dataset", + "dissertation", + "journal-article", + "letter", + "monograph", + "posted-content", + "pre-print", + "proceedings-article", + "report", + "working-paper") + + def keepRecord(json : String) : Boolean = { + Scorable.jsonToMap(json) match { + case None => false + case Some(map) => { + mapToTitle(map) match { + case None => false + case Some(title) => title.length <= Scorable.MaxTitleLength + } + } + } + } + + // Returns None if title is null, empty, or too long. + def mapToTitle(map : Map[String, Any]) : Option[String] = { + def getTitle : Option[String] = { + if (map contains "title") { + val titles = map("title").asInstanceOf[List[String]] + if (titles.isEmpty || titles == null) None else Some(titles(0)) + } else { + None + } + } + + def getSubtitle : Option[String] = { + if (map contains "subtitle") { + val subtitles = map("subtitle").asInstanceOf[List[String]] + if (subtitles.isEmpty || subtitles == null) { + None + } else { + val sub = subtitles(0) + if (sub == null || sub.isEmpty) { + None + } else { + Some(sub) + } + } + } else { + None + } + } + + getTitle match { + case None => None + case Some(baseTitle) => { + if (baseTitle == null) { + None + } else { + getSubtitle match { + case None => Some(baseTitle) + case Some(baseSubtitle) => Some(baseTitle.concat(":".concat(baseSubtitle))) + } + } + } + } + } + + def mapToAuthorList(map : Map[String, Any]) : List[String] = { + if (map contains "author") { + val objArray = map("author").asInstanceOf[List[Any]].map(e => e.asInstanceOf[Map[String,Any]]) + // TODO(bnewbold): combine given and family names? + objArray + .filter(e => e contains "family") + .map(e => e.get("family").get.asInstanceOf[String]) + } else { + List() + } + } + + def mapToYear(map : Map[String, Any]) : Option[Int] = { + map.get("created") match { + case None => None + case Some(created) => { + Some(created.asInstanceOf[Map[String,Any]] + .get("date-parts") + .get + .asInstanceOf[List[Any]](0) + .asInstanceOf[List[Any]](0) + .asInstanceOf[Double] + .toInt) + } + } + } + + def jsonToMapFeatures(json : String) : Option[MapFeatures] = { + def makeMapFeatures(title : String, doi : String, authors : List[String], year : Int, contentType : String) : Option[MapFeatures] = { + if (doi.isEmpty || doi == null || authors.length == 0 || !(ContentTypeWhitelist contains contentType)) { + None + } else { + val sf : ScorableFeatures = ScorableFeatures.create(title=title, authors=authors, doi=doi.toLowerCase(), year=year) + sf.toSlug match { + case None => None + case Some(slug) => Some(MapFeatures(slug, sf.toString)) + } + } + } + Scorable.jsonToMap(json) match { + case None => None + case Some(map) => + mapToTitle(map) match { + case None => None + case Some(title) => makeMapFeatures( + title=title, + doi=Scorable.getString(map, "DOI"), + authors=mapToAuthorList(map), + year=mapToYear(map).getOrElse(0), + contentType=map.get("type").map(e => e.asInstanceOf[String]).getOrElse("MISSING-CONTENT-TYPE")) + } + } + } +} diff --git a/scalding/src/main/scala/sandcrawler/DumpFileMetaJob.scala b/scalding/src/main/scala/sandcrawler/DumpFileMetaJob.scala new file mode 100644 index 0000000..b3734f0 --- /dev/null +++ b/scalding/src/main/scala/sandcrawler/DumpFileMetaJob.scala @@ -0,0 +1,36 @@ +package sandcrawler + +import java.util.Properties + +import cascading.property.AppProps +import cascading.tuple.Fields +import com.twitter.scalding._ +import com.twitter.scalding.typed.TDsl._ +import org.apache.hadoop.hbase.io.ImmutableBytesWritable +import org.apache.hadoop.hbase.util.Bytes +import parallelai.spyglass.base.JobBase +import parallelai.spyglass.hbase.HBaseConstants.SourceMode +import parallelai.spyglass.hbase.HBasePipeConversions +import parallelai.spyglass.hbase.HBaseSource + +// Dumps all the info needed to insert a file entity in Fatcat. Useful for +// joining. +class DumpFileMetaJob(args: Args) extends JobBase(args) with HBasePipeConversions { + + val metaPipe : TypedPipe[(String, String, String, Long)] = HBaseBuilder.build(args("hbase-table"), + args("zookeeper-hosts"), + List("file:cdx", "file:mime", "file:size"), + SourceMode.SCAN_ALL) + .read + .toTypedPipe[(ImmutableBytesWritable,ImmutableBytesWritable,ImmutableBytesWritable,ImmutableBytesWritable)](new Fields("key", "cdx", "mime", "size")) + .filter { case (_, cdx, mime, size) => cdx != null && mime != null && size != null } + .map { case (key, cdx, mime, size) => + (Bytes.toString(key.copyBytes()), + Bytes.toString(cdx.copyBytes()), + Bytes.toString(mime.copyBytes()), + Bytes.toLong(size.copyBytes())) + }; + + metaPipe.write(TypedTsv[(String,String,String,Long)](args("output"))) + +} diff --git a/scalding/src/main/scala/sandcrawler/DumpGrobidMetaInsertableJob.scala b/scalding/src/main/scala/sandcrawler/DumpGrobidMetaInsertableJob.scala new file mode 100644 index 0000000..ee2b7c2 --- /dev/null +++ b/scalding/src/main/scala/sandcrawler/DumpGrobidMetaInsertableJob.scala @@ -0,0 +1,38 @@ +package sandcrawler + +import java.util.Properties + +import cascading.property.AppProps +import cascading.tuple.Fields +import com.twitter.scalding._ +import com.twitter.scalding.typed.TDsl._ +import org.apache.hadoop.hbase.io.ImmutableBytesWritable +import org.apache.hadoop.hbase.util.Bytes +import parallelai.spyglass.base.JobBase +import parallelai.spyglass.hbase.HBaseConstants.SourceMode +import parallelai.spyglass.hbase.HBasePipeConversions +import parallelai.spyglass.hbase.HBaseSource + +// Dumps the SHA1 key and grobid0:metadata columns, plus file metadata needed +// to insert into fatcat. Used, eg, as part of long-tail mellon pipeline. +class DumpGrobidMetaInsertableJob(args: Args) extends JobBase(args) with HBasePipeConversions { + + val metaPipe : TypedPipe[(String, String, String, Long, String)] = HBaseBuilder.build(args("hbase-table"), + args("zookeeper-hosts"), + List("file:cdx", "file:mime", "file:size", "grobid0:metadata"), + SourceMode.SCAN_ALL) + .read + .toTypedPipe[(ImmutableBytesWritable,ImmutableBytesWritable,ImmutableBytesWritable,ImmutableBytesWritable,ImmutableBytesWritable)](new Fields("key", "cdx", "mime", "size", "metadata")) + .filter { case (_, cdx, mime, size, metadata) => cdx != null && mime != null && size != null && metadata != null } + .map { case (key, cdx, mime, size, metadata) => + (Bytes.toString(key.copyBytes()), + Bytes.toString(cdx.copyBytes()), + Bytes.toString(mime.copyBytes()), + Bytes.toLong(size.copyBytes()), + Bytes.toString(metadata.copyBytes()) + ) + }; + + metaPipe.write(TypedTsv[(String,String,String,Long,String)](args("output"))) + +} diff --git a/scalding/src/main/scala/sandcrawler/DumpGrobidStatusCodeJob.scala b/scalding/src/main/scala/sandcrawler/DumpGrobidStatusCodeJob.scala new file mode 100644 index 0000000..42b3464 --- /dev/null +++ b/scalding/src/main/scala/sandcrawler/DumpGrobidStatusCodeJob.scala @@ -0,0 +1,34 @@ +package sandcrawler + +import java.util.Properties + +import cascading.property.AppProps +import cascading.tuple.Fields +import com.twitter.scalding._ +import com.twitter.scalding.typed.TDsl._ +import org.apache.hadoop.hbase.io.ImmutableBytesWritable +import org.apache.hadoop.hbase.util.Bytes +import parallelai.spyglass.base.JobBase +import parallelai.spyglass.hbase.HBaseConstants.SourceMode +import parallelai.spyglass.hbase.HBasePipeConversions +import parallelai.spyglass.hbase.HBaseSource + +// Dumps status code for each GROBID-processed file. Good for crawl/corpus +// analytics, if we consider GROBID status a rough "is this a paper" metric. +class DumpGrobidStatusCodeJob(args: Args) extends JobBase(args) with HBasePipeConversions { + + val metaPipe : TypedPipe[(String, Long)] = HBaseBuilder.build(args("hbase-table"), + args("zookeeper-hosts"), + List("grobid0:status_code"), + SourceMode.SCAN_ALL) + .read + .toTypedPipe[(ImmutableBytesWritable,ImmutableBytesWritable)](new Fields("key", "status_code")) + .filter { case (_, status_code) => status_code != null } + .map { case (key, status_code) => + (Bytes.toString(key.copyBytes()), + Bytes.toLong(status_code.copyBytes())) + }; + + metaPipe.write(TypedTsv[(String,Long)](args("output"))) + +} diff --git a/scalding/src/main/scala/sandcrawler/DumpGrobidXmlJob.scala b/scalding/src/main/scala/sandcrawler/DumpGrobidXmlJob.scala new file mode 100644 index 0000000..953610d --- /dev/null +++ b/scalding/src/main/scala/sandcrawler/DumpGrobidXmlJob.scala @@ -0,0 +1,41 @@ +package sandcrawler + +import java.util.Properties + +import cascading.property.AppProps +import cascading.tuple.Fields +import com.twitter.scalding._ +import com.twitter.scalding.typed.TDsl._ +import org.apache.hadoop.hbase.io.ImmutableBytesWritable +import org.apache.hadoop.hbase.util.Bytes +import parallelai.spyglass.base.JobBase +import parallelai.spyglass.hbase.HBaseConstants.SourceMode +import parallelai.spyglass.hbase.HBasePipeConversions +import parallelai.spyglass.hbase.HBaseSource +import scala.util.parsing.json.JSONObject + +// Dumps the SHA1 key and grobid0:tei_xml columns, as TSV/JSON (two TSV +// columns: one is key, second is JSON). Used for partner delivery/sharing +class DumpGrobidXmlJob(args: Args) extends JobBase(args) with HBasePipeConversions { + + val metaPipe : TypedPipe[(String, String)] = HBaseBuilder.build(args("hbase-table"), + args("zookeeper-hosts"), + List("file:cdx", "grobid0:tei_xml"), + SourceMode.SCAN_ALL) + .read + .toTypedPipe[(ImmutableBytesWritable,ImmutableBytesWritable,ImmutableBytesWritable)](new Fields("key", "cdx", "tei_xml")) + .filter { case (_, cdx, tei_xml) => cdx != null && tei_xml != null } + .map { case (key, cdx, tei_xml) => + (Bytes.toString(key.copyBytes()), + JSONObject( + Map( + "pdf_hash" -> Bytes.toString(key.copyBytes()), + "cdx_metadata" -> Bytes.toString(cdx.copyBytes()), + "tei_xml" -> Bytes.toString(tei_xml.copyBytes()) + )).toString + ) + }; + + metaPipe.write(TypedTsv[(String,String)](args("output"))) + +} diff --git a/scalding/src/main/scala/sandcrawler/DumpUnGrobidedJob.scala b/scalding/src/main/scala/sandcrawler/DumpUnGrobidedJob.scala new file mode 100644 index 0000000..7fd3ce0 --- /dev/null +++ b/scalding/src/main/scala/sandcrawler/DumpUnGrobidedJob.scala @@ -0,0 +1,67 @@ +package sandcrawler + +import java.util.Properties + +import cascading.property.AppProps +import cascading.tuple.Fields +import com.twitter.scalding._ +import com.twitter.scalding.typed.TDsl._ +import parallelai.spyglass.base.JobBase +import parallelai.spyglass.hbase.HBaseConstants.SourceMode +import parallelai.spyglass.hbase.HBasePipeConversions +import parallelai.spyglass.hbase.HBaseSource + +// Filters for HBase rows which have not had GROBID run on them, but do have +// full CDX metadata, and dumps to a TSV for later extraction by the +// "extraction-ungrobided" job. +// +// Does the same horrible join thing that DumpUnGrobidedJob does. +class DumpUnGrobidedJob(args: Args) extends JobBase(args) with HBasePipeConversions { + + val output = args("output") + + val allKeys : TypedPipe[(String,String,String,String)] = DumpUnGrobidedJob.getHBaseKeySource( + args("hbase-table"), + args("zookeeper-hosts")) + .read + .fromBytesWritable('key, 'c, 'mime, 'cdx) + .toTypedPipe[(String,String,String,String)]('key, 'c, 'mime, 'cdx) + + val existingKeys : TypedPipe[(String,Boolean)] = DumpUnGrobidedJob.getHBaseColSource( + args("hbase-table"), + args("zookeeper-hosts")) + .read + .fromBytesWritable('key) + .toTypedPipe[String]('key) + .map{ key => (key, true) } + + val missingKeys : TypedPipe[(String,String,String,String)] = allKeys + .groupBy(_._1) + .leftJoin(existingKeys.groupBy(_._1)) + .toTypedPipe + .collect { case (key, ((_, c, mime, cdx), None)) => (key, c, mime, cdx) } + + missingKeys + .write(TypedTsv[(String,String,String,String)](output)) + +} + +object DumpUnGrobidedJob { + + // eg, "wbgrp-journal-extract-0-qa",7 "mtrcs-zk1.us.archive.org:2181" + def getHBaseColSource(hbaseTable: String, zookeeperHosts: String) : HBaseSource = { + HBaseBuilder.build( + hbaseTable, + zookeeperHosts, + List("grobid0:status_code"), + SourceMode.SCAN_ALL) + } + + def getHBaseKeySource(hbaseTable: String, zookeeperHosts: String) : HBaseSource = { + HBaseBuilder.build( + hbaseTable, + zookeeperHosts, + List("f:c", "file:mime", "file:cdx"), + SourceMode.SCAN_ALL) + } +} diff --git a/scalding/src/main/scala/sandcrawler/FatcatScorable.scala b/scalding/src/main/scala/sandcrawler/FatcatScorable.scala new file mode 100644 index 0000000..2090e84 --- /dev/null +++ b/scalding/src/main/scala/sandcrawler/FatcatScorable.scala @@ -0,0 +1,146 @@ +package sandcrawler + +import scala.math +import scala.util.parsing.json.JSON +import scala.util.parsing.json.JSONArray +import scala.util.parsing.json.JSONObject + +import cascading.flow.FlowDef +import cascading.tuple.Fields +import com.twitter.scalding._ +import com.twitter.scalding.typed.TDsl._ +import parallelai.spyglass.hbase.HBasePipeConversions + + +class FatcatScorableRight extends Scorable { + + def getSource(args : Args) : Source = { + TextLine(args("fatcat-release-input-right")) + } + + def getFeaturesPipe(args : Args)(implicit mode : Mode, flowDef : FlowDef) : TypedPipe[Option[MapFeatures]] = { + getSource(args).read + .toTypedPipe[String](new Fields("line")) + .filter { FatcatScorable.keepRecord(_) } + .map { FatcatScorable.jsonToMapFeatures(_) } + } +} + +class FatcatScorable extends Scorable with HBasePipeConversions { + + def getSource(args : Args) : Source = { + TextLine(args("fatcat-release-input")) + } + + def getFeaturesPipe(args : Args)(implicit mode : Mode, flowDef : FlowDef) : TypedPipe[Option[MapFeatures]] = { + getSource(args).read + .toTypedPipe[String](new Fields("line")) + .filter { FatcatScorable.keepRecord(_) } + .map { FatcatScorable.jsonToMapFeatures(_) } + } +} + +object FatcatScorable { + + // Note; removed ReleaseType filtering + + def keepRecord(json : String) : Boolean = { + Scorable.jsonToMap(json) match { + case None => false + case Some(map) => { + mapToTitle(map) match { + case None => false + case Some(title) => title.length <= Scorable.MaxTitleLength + } + } + } + } + + // Returns None if title is null, empty, or too long. + def mapToTitle(map : Map[String, Any]) : Option[String] = { + def getTitle : Option[String] = { + if (map contains "title") { + val title = map("title").asInstanceOf[String] + if (title == null || title.isEmpty) None else Some(title) + } else { + None + } + } + + def getSubtitle : Option[String] = { + if (map contains "subtitle") { + val subtitle = map("subtitle").asInstanceOf[String] + if (subtitle == null || subtitle.isEmpty) { + None + } else { + Some(subtitle) + } + } else { + None + } + } + + getTitle match { + case None => None + case Some(baseTitle) => { + if (baseTitle == null) { + None + } else { + getSubtitle match { + case None => Some(baseTitle) + case Some(baseSubtitle) => Some(baseTitle.concat(":".concat(baseSubtitle))) + } + } + } + } + } + + def mapToAuthorList(map : Map[String, Any]) : List[String] = { + if (map contains "contribs") { + val objArray = map("contribs").asInstanceOf[List[Any]].map(e => e.asInstanceOf[Map[String,Any]]) + // TODO(bnewbold): better name stuff... contrib.surname, creator.surname, + // or raw_name split to last + objArray + .filter(e => e contains "raw_name") + .map(e => e.get("raw_name").get.asInstanceOf[String]) + } else { + List() + } + } + + def mapToYear(map : Map[String, Any]) : Option[Int] = { + map.get("release_year") match { + case None => None + case Some(year) => { + Some(year.asInstanceOf[Double].toInt) + } + } + } + + def jsonToMapFeatures(json : String) : Option[MapFeatures] = { + def makeMapFeatures(title : String, doi : String, fatcat_release: String, fatcat_work : String, authors : List[String], year : Int, contentType : String) : Option[MapFeatures] = { + // NOTE: not doing any filtering here! + val sf : ScorableFeatures = ScorableFeatures.create(title=title, authors=authors, doi=doi, fatcat_release=fatcat_release, fatcat_work=fatcat_work, year=year) + sf.toSlug match { + case None => None + case Some(slug) => Some(MapFeatures(slug, sf.toString)) + } + } + Scorable.jsonToMap(json) match { + case None => None + case Some(map) => + mapToTitle(map) match { + case None => None + case Some(title) => makeMapFeatures( + title=title, + // TODO: doi=Scorable.getString(map, "doi"), + doi=null, + fatcat_release=Scorable.getString(map, "ident"), + fatcat_work=Scorable.getString(map, "work_id"), + authors=mapToAuthorList(map), + year=mapToYear(map).getOrElse(0), + contentType=map.get("type").map(e => e.asInstanceOf[String]).getOrElse("MISSING-CONTENT-TYPE")) + } + } + } +} diff --git a/scalding/src/main/scala/sandcrawler/GrobidScorable.scala b/scalding/src/main/scala/sandcrawler/GrobidScorable.scala new file mode 100644 index 0000000..f4ed129 --- /dev/null +++ b/scalding/src/main/scala/sandcrawler/GrobidScorable.scala @@ -0,0 +1,83 @@ +package sandcrawler + +import scala.util.parsing.json.JSONObject + +import cascading.flow.FlowDef +import cascading.tuple.Fields +import com.twitter.scalding._ +import com.twitter.scalding.typed.TDsl._ +import org.apache.hadoop.hbase.io.ImmutableBytesWritable +import org.apache.hadoop.hbase.util.Bytes +import parallelai.spyglass.hbase.HBaseConstants.SourceMode +import parallelai.spyglass.hbase.HBasePipeConversions +import parallelai.spyglass.hbase.HBaseSource + +class GrobidScorable extends Scorable with HBasePipeConversions { + val StatusOK = 200 + + def getSource(args : Args) : Source = { + // TODO: Generalize args so there can be multiple grobid pipes in one job + GrobidScorable.getHBaseSource(args("hbase-table"), args("zookeeper-hosts")) + } + + def getFeaturesPipe(args : Args)(implicit mode : Mode, flowDef : FlowDef) : TypedPipe[Option[MapFeatures]] = { + getSource(args) + .read + // Can't just "fromBytesWritable" because we have multiple types + .toTypedPipe[(ImmutableBytesWritable,ImmutableBytesWritable,ImmutableBytesWritable)](new Fields("key", "metadata", "status_code")) + .filter { case (_, metadata, status_code) => metadata != null && status_code != null } + .map { case (key, metadata, status_code) => + (Bytes.toString(key.copyBytes()), Bytes.toString(metadata.copyBytes()), Bytes.toLong(status_code.copyBytes())) + } + .collect { case (key, json, StatusOK) => (key, json) } + .filter { case (key, json) => GrobidScorable.keepRecord(json) } + .map { entry : (String, String) => GrobidScorable.jsonToMapFeatures(entry._1, entry._2) } + } +} + +object GrobidScorable { + def keepRecord(json : String) : Boolean = { + Scorable.jsonToMap(json) match { + case None => false + case Some(map) => { + if (map contains "title") { + val title = Scorable.getString(map, "title") + title != null && title.length <= Scorable.MaxTitleLength + } else { + false + } + } + } + } + + def mapToAuthorList(map : Map[String, Any]) : List[String] = { + if (map contains "authors") { + val objArray = map("authors").asInstanceOf[List[Any]].map(e => e.asInstanceOf[Map[String,Any]]) + objArray + .filter(e => e contains "name") + .map(e => e.get("name").get.asInstanceOf[String]) + } else { + List() + } + } + + def getHBaseSource(table : String, host : String) : HBaseSource = { + HBaseBuilder.build(table, host, List("grobid0:metadata", "grobid0:status_code"), SourceMode.SCAN_ALL) + } + + def jsonToMapFeatures(key : String, json : String) : Option[MapFeatures] = { + Scorable.jsonToMap(json) match { + case None => None + case Some(map) => { + if (map contains "title") { + val authors: List[String] = mapToAuthorList(map) + val title = Scorable.getString(map, "title") + ScorableFeatures.create(title=title, authors=authors, sha1=key).toMapFeatures + } else { + None + } + } + } + } +} + diff --git a/scalding/src/main/scala/sandcrawler/GrobidScorableDumpJob.scala b/scalding/src/main/scala/sandcrawler/GrobidScorableDumpJob.scala new file mode 100644 index 0000000..3146a6c --- /dev/null +++ b/scalding/src/main/scala/sandcrawler/GrobidScorableDumpJob.scala @@ -0,0 +1,59 @@ + +package sandcrawler + +import cascading.flow.FlowDef +import cascading.pipe.Pipe +import cascading.tuple.Fields +import com.twitter.scalding._ +import com.twitter.scalding.typed.TDsl._ +import org.apache.hadoop.hbase.io.ImmutableBytesWritable +import org.apache.hadoop.hbase.util.Bytes +import parallelai.spyglass.base.JobBase +import parallelai.spyglass.hbase.HBaseConstants.SourceMode +import parallelai.spyglass.hbase.HBasePipeConversions +import parallelai.spyglass.hbase.HBaseSource + +class GrobidScorableDumpJob(args: Args) extends JobBase(args) { + + val grobidHbaseRows = Stat("hbase-rows-scanned", "hbase-grobid-dump") + val filteredGrobidRows = Stat("grobid-rows-filtered", "hbase-grobid-dump") + val parsedGrobidRows = Stat("grobid-rows-parsed", "hbase-grobid-dump") + val validGrobidRows = Stat("grobid-rows-valid-slug", "hbase-grobid-dump") + + val pipe = GrobidScorable.getHBaseSource(args("hbase-table"), args("zookeeper-hosts")) + .read + // Can't just "fromBytesWritable" because we have multiple types? + .toTypedPipe[(ImmutableBytesWritable,ImmutableBytesWritable,ImmutableBytesWritable)](new Fields("key", "metadata", "status_code")) + .filter { case (_, metadata, status_code) => + grobidHbaseRows.inc + metadata != null && status_code != null + } + .map { case (key, metadata, status_code) => + (Bytes.toString(key.copyBytes()), Bytes.toString(metadata.copyBytes()), Bytes.toLong(status_code.copyBytes())) + } + // TODO: Should I combine next two stages for efficiency? + .collect { case (key, json, 200) => + filteredGrobidRows.inc + (key, json) + } + .map { entry : (String, String) => + parsedGrobidRows.inc + GrobidScorable.jsonToMapFeatures(entry._1, entry._2) + } + .filterNot { entry => entry.isEmpty } + .map { entry => { + validGrobidRows.inc + entry.get + }} + .groupBy { case MapFeatures(slug, json) => slug } + .map { tuple => + val (slug : String, features : MapFeatures) = tuple + (slug, ReduceFeatures(features.json)) + } + + pipe + .map { case (slug, features) => + (slug, features.json) + } + .write(TypedTsv[(String, String)](args("output"))) +} diff --git a/scalding/src/main/scala/sandcrawler/GroupFatcatWorksJob.scala b/scalding/src/main/scala/sandcrawler/GroupFatcatWorksJob.scala new file mode 100644 index 0000000..46d2038 --- /dev/null +++ b/scalding/src/main/scala/sandcrawler/GroupFatcatWorksJob.scala @@ -0,0 +1,43 @@ +package sandcrawler + +import cascading.pipe.Pipe +import com.twitter.scalding.Args +import com.twitter.scalding.Stat +import com.twitter.scalding.TypedPipe +import com.twitter.scalding.TypedTsv +import parallelai.spyglass.base.JobBase + +class GroupFatcatWorksJob(args: Args) extends JobBase(args) { + + val fatcatRowCount = Stat("fatcat-rows-filtered", "sandcrawler") + val joinedRowCount = Stat("joined-rows", "sandcrawler") + + val fatcatScorable : Scorable = new FatcatScorable() + val fatcatPipe : TypedPipe[(String, ReduceFeatures)] = fatcatScorable + .getInputPipe(args) + .map { r => + fatcatRowCount.inc + r + } + + val joinedPipe = fatcatPipe + .addTrap(TypedTsv(args("output") + ".trapped")) + .join(fatcatPipe) + + // TypedTsv doesn't work over case classes. + joinedPipe + // filter out trivial self-matches (releases are identical) + .filter { case (slug, (fatcatFeaturesLeft, fatcatFeaturesRight)) => + Scorable.selfMatchable(fatcatFeaturesLeft, fatcatFeaturesRight) + } + .map { case (slug, (fatcatFeaturesLeft, fatcatFeaturesRight)) => + joinedRowCount.inc + new ReduceOutput( + slug, + Scorable.computeSimilarity(fatcatFeaturesLeft, fatcatFeaturesRight), + fatcatFeaturesLeft.json, + fatcatFeaturesRight.json) + } + .map { entry => (entry.slug, entry.score, entry.json1, entry.json2) } + .write(TypedTsv[(String, Int, String, String)](args("output"))) +} diff --git a/scalding/src/main/scala/sandcrawler/GroupFatcatWorksSubsetJob.scala b/scalding/src/main/scala/sandcrawler/GroupFatcatWorksSubsetJob.scala new file mode 100644 index 0000000..ea5e26b --- /dev/null +++ b/scalding/src/main/scala/sandcrawler/GroupFatcatWorksSubsetJob.scala @@ -0,0 +1,52 @@ +package sandcrawler + +import cascading.pipe.Pipe +import com.twitter.scalding.Args +import com.twitter.scalding.Stat +import com.twitter.scalding.TypedPipe +import com.twitter.scalding.TypedTsv +import parallelai.spyglass.base.JobBase + +class GroupFatcatWorksSubsetJob(args: Args) extends JobBase(args) { + + val fatcatLhsRowCount = Stat("fatcat-rows-filtered-left", "sandcrawler") + val fatcatRhsRowCount = Stat("fatcat-rows-filtered-right", "sandcrawler") + val joinedRowCount = Stat("joined-rows", "sandcrawler") + + val fatcatScorableLhs : Scorable = new FatcatScorable() + val fatcatPipeLhs : TypedPipe[(String, ReduceFeatures)] = fatcatScorableLhs + .getInputPipe(args) + .map { r => + fatcatLhsRowCount.inc + r + } + + val fatcatScorableRhs : Scorable = new FatcatScorableRight() + val fatcatPipeRhs : TypedPipe[(String, ReduceFeatures)] = fatcatScorableRhs + .getInputPipe(args) + .map { r => + fatcatRhsRowCount.inc + r + } + + val joinedPipe = fatcatPipeLhs + .addTrap(TypedTsv(args("output") + ".trapped")) + .join(fatcatPipeRhs) + + // TypedTsv doesn't work over case classes. + joinedPipe + // filter out trivial self-matches (releases are identical) + .filter { case (slug, (fatcatFeaturesLeft, fatcatFeaturesRight)) => + Scorable.selfMatchable(fatcatFeaturesLeft, fatcatFeaturesRight) + } + .map { case (slug, (fatcatFeaturesLeft, fatcatFeaturesRight)) => + joinedRowCount.inc + new ReduceOutput( + slug, + Scorable.computeSimilarity(fatcatFeaturesLeft, fatcatFeaturesRight), + fatcatFeaturesLeft.json, + fatcatFeaturesRight.json) + } + .map { entry => (entry.slug, entry.score, entry.json1, entry.json2) } + .write(TypedTsv[(String, Int, String, String)](args("output"))) +} diff --git a/scalding/src/main/scala/sandcrawler/HBaseColCountJob.scala b/scalding/src/main/scala/sandcrawler/HBaseColCountJob.scala new file mode 100644 index 0000000..20cc7a1 --- /dev/null +++ b/scalding/src/main/scala/sandcrawler/HBaseColCountJob.scala @@ -0,0 +1,37 @@ +package sandcrawler + +import java.util.Properties + +import cascading.property.AppProps +import cascading.tuple.Fields +import com.twitter.scalding._ +import parallelai.spyglass.base.JobBase +import parallelai.spyglass.hbase.HBaseConstants.SourceMode +import parallelai.spyglass.hbase.HBasePipeConversions +import parallelai.spyglass.hbase.HBaseSource + +class HBaseColCountJob(args: Args) extends JobBase(args) with HBasePipeConversions { + + val output = args("output") + + HBaseColCountJob.getHBaseSource( + args("hbase-table"), + args("zookeeper-hosts"), + args("column")) + .read + .debug + .groupAll { _.size('count) } + .write(Tsv(output)) +} + +object HBaseColCountJob { + + // eg, "wbgrp-journal-extract-0-qa",7 "mtrcs-zk1.us.archive.org:2181" + def getHBaseSource(hbaseTable: String, zookeeperHosts: String, col: String) : HBaseSource = { + HBaseBuilder.build( + hbaseTable, + zookeeperHosts, + List(col), + SourceMode.SCAN_ALL) + } +} diff --git a/scalding/src/main/scala/sandcrawler/HBaseRowCountJob.scala b/scalding/src/main/scala/sandcrawler/HBaseRowCountJob.scala index 4c3de33..5c7954a 100644 --- a/scalding/src/main/scala/sandcrawler/HBaseRowCountJob.scala +++ b/scalding/src/main/scala/sandcrawler/HBaseRowCountJob.scala @@ -30,7 +30,7 @@ object HBaseRowCountJob { HBaseBuilder.build( hbaseTable, zookeeperHosts, - List("file:size"), + List("f:c"), SourceMode.SCAN_ALL) } } diff --git a/scalding/src/main/scala/sandcrawler/HBaseStatusCodeCountJob.scala b/scalding/src/main/scala/sandcrawler/HBaseStatusCodeCountJob.scala new file mode 100644 index 0000000..4d9880f --- /dev/null +++ b/scalding/src/main/scala/sandcrawler/HBaseStatusCodeCountJob.scala @@ -0,0 +1,32 @@ +package sandcrawler + +import java.util.Properties + +import cascading.property.AppProps +import cascading.tuple.Fields +import com.twitter.scalding._ +import com.twitter.scalding.typed.TDsl._ +import org.apache.hadoop.hbase.io.ImmutableBytesWritable +import org.apache.hadoop.hbase.util.Bytes +import parallelai.spyglass.base.JobBase +import parallelai.spyglass.hbase.HBaseConstants.SourceMode +import parallelai.spyglass.hbase.HBasePipeConversions +import parallelai.spyglass.hbase.HBaseSource + +class HBaseStatusCodeCountJob(args: Args) extends JobBase(args) with HBasePipeConversions { + + val source = HBaseCountJob.getHBaseSource( + args("hbase-table"), + args("zookeeper-hosts"), + "grobid0:status_code") + + val statusPipe : TypedPipe[Long] = source + .read + .toTypedPipe[(ImmutableBytesWritable,ImmutableBytesWritable)]('key, 'status_code) + .map { case (key, raw_code) => Bytes.toLong(raw_code.copyBytes()) } + + statusPipe.groupBy { identity } + .size + .debug + .write(TypedTsv[(Long,Long)](args("output"))) +} diff --git a/scalding/src/main/scala/sandcrawler/HBaseStatusCountJob.scala b/scalding/src/main/scala/sandcrawler/HBaseStatusCountJob.scala index fd0b4e2..f79d672 100644 --- a/scalding/src/main/scala/sandcrawler/HBaseStatusCountJob.scala +++ b/scalding/src/main/scala/sandcrawler/HBaseStatusCountJob.scala @@ -18,15 +18,15 @@ class HBaseStatusCountJob(args: Args) extends JobBase(args) with HBasePipeConver val source = HBaseCountJob.getHBaseSource( args("hbase-table"), args("zookeeper-hosts"), - "grobid0:status_code") + "grobid0:status") - val statusPipe : TypedPipe[Long] = source + val statusPipe : TypedPipe[String] = source .read - .toTypedPipe[(ImmutableBytesWritable,ImmutableBytesWritable)]('key, 'status_code) - .map { case (key, raw_code) => Bytes.toLong(raw_code.copyBytes()) } + .toTypedPipe[(ImmutableBytesWritable,ImmutableBytesWritable)]('key, 'status) + .map { case (key, raw_status) => Bytes.toString(raw_status.copyBytes()) } statusPipe.groupBy { identity } .size .debug - .write(TypedTsv[(Long,Long)](args("output"))) + .write(TypedTsv[(String,Long)](args("output"))) } diff --git a/scalding/src/main/scala/sandcrawler/MatchBenchmarkJob.scala b/scalding/src/main/scala/sandcrawler/MatchBenchmarkJob.scala new file mode 100644 index 0000000..292de75 --- /dev/null +++ b/scalding/src/main/scala/sandcrawler/MatchBenchmarkJob.scala @@ -0,0 +1,30 @@ +package sandcrawler + +import cascading.pipe.Pipe +import com.twitter.scalding.Args +import com.twitter.scalding.TypedPipe +import com.twitter.scalding.TypedTsv +import parallelai.spyglass.base.JobBase + +class MatchBenchmarkJob(args: Args) extends JobBase(args) { + // TODO: Instantiate any subclass of Scorable specified in args. + val sc1 : Scorable = new BibjsonScorable() + val sc2 : Scorable = new BibjsonScorable() + val leftArgs = args + ("bibjson-input" -> List(args("left-bibjson"))) + val rightArgs = args + ("bibjson-input" -> List(args("right-bibjson"))) + val pipe1 : TypedPipe[(String, ReduceFeatures)] = sc1.getInputPipe(leftArgs) + val pipe2 : TypedPipe[(String, ReduceFeatures)] = sc2.getInputPipe(rightArgs) + + pipe1.join(pipe2) + .map { entry => + val (slug : String, (features1 : ReduceFeatures, features2 : ReduceFeatures)) = entry + new ReduceOutput( + slug, + Scorable.computeSimilarity(features1, features2), + features1.json, + features2.json) + } + //TypedTsv doesn't work over case classes. + .map { entry => (entry.slug, entry.score, entry.json1, entry.json2) } + .write(TypedTsv[(String, Int, String, String)](args("output"))) +} diff --git a/scalding/src/main/scala/sandcrawler/MissingColumnDumpJob.scala b/scalding/src/main/scala/sandcrawler/MissingColumnDumpJob.scala new file mode 100644 index 0000000..cc3bf23 --- /dev/null +++ b/scalding/src/main/scala/sandcrawler/MissingColumnDumpJob.scala @@ -0,0 +1,67 @@ +package sandcrawler + +import java.util.Properties + +import cascading.property.AppProps +import cascading.tuple.Fields +import com.twitter.scalding._ +import com.twitter.scalding.typed.TDsl._ +import parallelai.spyglass.base.JobBase +import parallelai.spyglass.hbase.HBaseConstants.SourceMode +import parallelai.spyglass.hbase.HBasePipeConversions +import parallelai.spyglass.hbase.HBaseSource + +// This nasty, no-good, horrible Job outputs a list of keys ("sha1:A234...") +// for which the given "column" does not have a value set. +// It does this using a self-join because SpyGlass's HBase SCAN support seems +// to be extremely limited. +class MissingColumnDumpJob(args: Args) extends JobBase(args) with HBasePipeConversions { + + val output = args("output") + + val allKeys : TypedPipe[String] = MissingColumnDumpJob.getHBaseKeySource( + args("hbase-table"), + args("zookeeper-hosts")) + .read + .fromBytesWritable('key) + .toTypedPipe[String]('key) + + val existingKeys : TypedPipe[(String,Boolean)] = MissingColumnDumpJob.getHBaseColSource( + args("hbase-table"), + args("zookeeper-hosts"), + args("column")) + .read + .fromBytesWritable('key) + .toTypedPipe[String]('key) + .map{ key => (key, true) } + + val missingKeys : TypedPipe[String] = allKeys + .groupBy( identity ) + .leftJoin(existingKeys.groupBy(_._1)) + .toTypedPipe + .collect { case (key, (_, None)) => key } + + missingKeys + .write(TypedTsv[String](output)) + +} + +object MissingColumnDumpJob { + + // eg, "wbgrp-journal-extract-0-qa",7 "mtrcs-zk1.us.archive.org:2181" + def getHBaseColSource(hbaseTable: String, zookeeperHosts: String, col: String) : HBaseSource = { + HBaseBuilder.build( + hbaseTable, + zookeeperHosts, + List(col), + SourceMode.SCAN_ALL) + } + + def getHBaseKeySource(hbaseTable: String, zookeeperHosts: String) : HBaseSource = { + HBaseBuilder.build( + hbaseTable, + zookeeperHosts, + List("f:c"), + SourceMode.SCAN_ALL) + } +} diff --git a/scalding/src/main/scala/sandcrawler/Scorable.scala b/scalding/src/main/scala/sandcrawler/Scorable.scala new file mode 100644 index 0000000..d9c38e8 --- /dev/null +++ b/scalding/src/main/scala/sandcrawler/Scorable.scala @@ -0,0 +1,96 @@ +package sandcrawler + +import scala.math +import scala.util.parsing.json.JSON +import scala.util.parsing.json.JSONObject + +import cascading.flow.FlowDef +import com.twitter.scalding._ +import com.twitter.scalding.typed.TDsl._ + +case class MapFeatures(slug : String, json : String) +case class ReduceFeatures(json : String) +case class ReduceOutput(val slug : String, score : Int, json1 : String, json2 : String) + +abstract class Scorable { + def getInputPipe(args : Args)(implicit mode : Mode, flowDef : FlowDef) : TypedPipe[(String, ReduceFeatures)] = { + val validFeatures : TypedPipe[MapFeatures] = getFeaturesPipe(args) + .filterNot { entry => entry.isEmpty } + .map { entry => entry.get } + + validFeatures + .groupBy { case MapFeatures(slug, json) => slug } + .map { tuple => + val (slug : String, features : MapFeatures) = tuple + (slug, ReduceFeatures(features.json)) + } + } + + // abstract methods + def getSource(args : Args) : Source + def getFeaturesPipe(args : Args)(implicit mode : Mode, flowDef : FlowDef) : TypedPipe[Option[MapFeatures]] +} + +object Scorable { + val MaxTitleLength = 1023 + + def jsonToMap(json : String) : Option[Map[String, Any]] = { + // https://stackoverflow.com/a/32717262/631051 + val jsonObject = JSON.parseFull(json) + if (jsonObject == None) { + None + } else { + Some(jsonObject.get.asInstanceOf[Map[String, Any]]) + } + } + + def getStringOption(optionalMap : Option[Map[String, Any]], key : String) : Option[String] = { + optionalMap match { + case None => None + case Some(map) => if (map contains key) Some(map(key).asInstanceOf[String]) else None + } + } + + // Caller is responsible for ensuring that key is a String in map. + // TODO: Add and handle ClassCastException + def getString(map : Map[String, Any], key : String) : String = { + assert(map contains key) + map(key).asInstanceOf[String] + } + + val MaxScore = 1000 + + def selfMatchable(features1 : ReduceFeatures, features2 : ReduceFeatures) : Boolean = { + val json1 = jsonToMap(features1.json) + val json2 = jsonToMap(features2.json) + + ( + getStringOption(json1, "fatcat_release") != None && + getStringOption(json2, "fatcat_release") != None && + getStringOption(json1, "fatcat_release") != getStringOption(json2, "fatcat_release") && + (getStringOption(json1, "fatcat_work") match { + case None => false + case Some(work1) => getStringOption(json2, "fatcat_work") match { + case None => false + // this last check ensures we don't double-match + case Some(work2) => work1 > work2 + } + }) + ) + } + + def computeSimilarity(features1 : ReduceFeatures, features2 : ReduceFeatures) : Int = { + val json1 = jsonToMap(features1.json) + val json2 = jsonToMap(features2.json) + getStringOption(json1, "title") match { + case None => 0 + case Some(title1) => { + getStringOption(json2, "title") match { + case None => 0 + case Some(title2) => + (StringUtilities.similarity(title1.toLowerCase, title2.toLowerCase) * MaxScore).toInt + } + } + } + } +} diff --git a/scalding/src/main/scala/sandcrawler/ScorableFeatures.scala b/scalding/src/main/scala/sandcrawler/ScorableFeatures.scala new file mode 100644 index 0000000..93cd78d --- /dev/null +++ b/scalding/src/main/scala/sandcrawler/ScorableFeatures.scala @@ -0,0 +1,63 @@ +package sandcrawler + +import java.io.InputStream + +import scala.io.Source +import scala.util.parsing.json.JSONArray +import scala.util.parsing.json.JSONObject + +object ScorableFeatures { + // TODO: Add exception handling. + val fileStream : InputStream = getClass.getResourceAsStream("/slug-denylist.txt") + val SlugDenylist : Set[String] = Source.fromInputStream(fileStream).getLines.toSet + fileStream.close + val MinSlugLength = 8 + + // Static factory method + def create(title : String, authors : List[Any] = List(), year : Int = 0, doi : String = "", fatcat_release : String = "", fatcat_work : String = "", sha1 : String = "") : ScorableFeatures = { + new ScorableFeatures( + title=if (title == null) "" else title, + authors=if (authors == null) List() else authors.map(a => if (a == null) "" else a), + year=year, + doi=if (doi == null) "" else doi, + fatcat_release=if (fatcat_release == null) "" else fatcat_release, + fatcat_work=if (fatcat_work == null) "" else fatcat_work, + sha1=if (sha1 == null) "" else sha1) + } +} + +// Contains features needed to make slug and to score (in combination +// with a second ScorableFeatures). Create with above static factory method. +class ScorableFeatures private(title : String, authors : List[Any] = List(), year: Int = 0, doi : String = "", fatcat_release : String = "", fatcat_work : String = "", sha1: String = "") { + + def toMap() : Map[String, Any] = + Map("title" -> title, "authors" -> JSONArray(authors), "year" -> year, "doi" -> doi, "fatcat_release" -> fatcat_release, "fatcat_work" -> fatcat_work, "sha1" -> sha1) + + override def toString() : String = { + JSONObject(toMap).toString + } + + def toSlug() : Option[String] = { + if (title == null) { + None + } else { + val unaccented = StringUtilities.removeAccents(title) + // Remove punctuation + val slug = StringUtilities.removePunctuation((unaccented.toLowerCase())).replaceAll("\\s", "") + if (slug.isEmpty + || slug == null + || (ScorableFeatures.SlugDenylist contains slug) + || (slug.length < ScorableFeatures.MinSlugLength)) { + None + } else { + Some(slug) + } + } + } + + def toMapFeatures : Option[MapFeatures] = + toSlug match { + case None => None + case Some(slug) => Some(MapFeatures(slug, toString)) + } +} diff --git a/scalding/src/main/scala/sandcrawler/ScoreInsertable.scala b/scalding/src/main/scala/sandcrawler/ScoreInsertable.scala new file mode 100644 index 0000000..58007fa --- /dev/null +++ b/scalding/src/main/scala/sandcrawler/ScoreInsertable.scala @@ -0,0 +1,86 @@ +package sandcrawler + +import cascading.tuple.Fields +import cascading.pipe.Pipe +import com.twitter.scalding._ +import com.twitter.scalding.typed.TDsl._ +import org.apache.hadoop.hbase.io.ImmutableBytesWritable +import org.apache.hadoop.hbase.util.Bytes +import parallelai.spyglass.base.JobBase +import parallelai.spyglass.hbase.HBaseConstants.SourceMode +import parallelai.spyglass.hbase.HBasePipeConversions +import parallelai.spyglass.hbase.HBaseSource + +class ScoreInsertableJob(args: Args) extends JobBase(args) { + + val grobidRowCount = Stat("grobid-rows-filtered", "sandcrawler") + val crossrefRowCount = Stat("crossref-rows-filtered", "sandcrawler") + val cdxRowCount = Stat("cdx-rows", "sandcrawler") + val scoredRowCount = Stat("scored-rows", "sandcrawler") + val joinedRowCount = Stat("joined-rows", "sandcrawler") + + val grobidScorable : Scorable = new GrobidScorable() + val crossrefScorable : Scorable = new CrossrefScorable() + + val grobidPipe : TypedPipe[(String, ReduceFeatures)] = grobidScorable + .getInputPipe(args) + .map { r => + grobidRowCount.inc + r + } + val crossrefPipe : TypedPipe[(String, ReduceFeatures)] = crossrefScorable + .getInputPipe(args) + .map { r => + crossrefRowCount.inc + r + } + val cdxPipe : TypedPipe[(String, String, String, Long)] = ScoreInsertableJob.getHBaseCdxSource(args("hbase-table"), args("zookeeper-hosts")) + .read + .toTypedPipe[(ImmutableBytesWritable,ImmutableBytesWritable,ImmutableBytesWritable,ImmutableBytesWritable)](new Fields("key", "cdx", "mime", "size")) + .filter { case (_, cdx, mime, size) => cdx != null && mime != null && size != null } + .map { case (key, cdx, mime, size) => + (Bytes.toString(key.copyBytes()), + Bytes.toString(cdx.copyBytes()), + Bytes.toString(mime.copyBytes()), + Bytes.toLong(size.copyBytes())) + } + .map { r => + cdxRowCount.inc + r + } + + val scoredPipe = grobidPipe + .addTrap(TypedTsv(args("output") + ".trapped")) + .join(crossrefPipe) + .map { case (slug, (grobidFeatures, crossrefFeatures)) => + scoredRowCount.inc + //val (slug : String, (grobidFeatures: ReduceFeatures, crossrefFeatures: ReduceFeatures)) = entry + // Not ever Empty, I promise + val key = Scorable.getStringOption(Scorable.jsonToMap(grobidFeatures.json), "sha1").orNull + (key, new ReduceOutput( + slug, + Scorable.computeSimilarity(grobidFeatures, crossrefFeatures), + grobidFeatures.json, + crossrefFeatures.json)) + } + .map { case (key, entry) => (key, entry.slug, entry.score, entry.json1, entry.json2) } + .groupBy { case (key, _, _, _, _) => key } + + // TypedTsv doesn't work over case classes. + val joinedPipe = scoredPipe + .join(cdxPipe.groupBy { case (key, _, _, _) => key }) + .map { case (key, ((_, slug, score, left, right), (_, cdx, mime, size))) => (key, slug, score, left, right, cdx, mime, size) } + .write(TypedTsv[(String, String, Int, String, String, String, String, Long)](args("output"))) +} + +object ScoreInsertableJob { + + // eg, "wbgrp-journal-extract-0-qa",7 "mtrcs-zk1.us.archive.org:2181" + def getHBaseCdxSource(hbaseTable: String, zookeeperHosts: String) : HBaseSource = { + HBaseBuilder.build( + hbaseTable, + zookeeperHosts, + List("file:cdx", "file:mime", "file:size"), + SourceMode.SCAN_ALL) + } +} diff --git a/scalding/src/main/scala/sandcrawler/ScoreJob.scala b/scalding/src/main/scala/sandcrawler/ScoreJob.scala new file mode 100644 index 0000000..ccb9b76 --- /dev/null +++ b/scalding/src/main/scala/sandcrawler/ScoreJob.scala @@ -0,0 +1,48 @@ +package sandcrawler + +import cascading.pipe.Pipe +import com.twitter.scalding.Args +import com.twitter.scalding.Stat +import com.twitter.scalding.TypedPipe +import com.twitter.scalding.TypedTsv +import parallelai.spyglass.base.JobBase + +class ScoreJob(args: Args) extends JobBase(args) { + + val grobidRowCount = Stat("grobid-rows-filtered", "sandcrawler") + val crossrefRowCount = Stat("crossref-rows-filtered", "sandcrawler") + val joinedRowCount = Stat("joined-rows", "sandcrawler") + + val grobidScorable : Scorable = new GrobidScorable() + val crossrefScorable : Scorable = new CrossrefScorable() + val grobidPipe : TypedPipe[(String, ReduceFeatures)] = grobidScorable + .getInputPipe(args) + .map { r => + grobidRowCount.inc + r + } + val crossrefPipe : TypedPipe[(String, ReduceFeatures)] = crossrefScorable + .getInputPipe(args) + .map { r => + crossrefRowCount.inc + r + } + + val joinedPipe = grobidPipe + .addTrap(TypedTsv(args("output") + ".trapped")) + .join(crossrefPipe) + + // TypedTsv doesn't work over case classes. + joinedPipe + .map { case (slug, (grobidFeatures, crossrefFeatures)) => + joinedRowCount.inc + //val (slug : String, (grobidFeatures: ReduceFeatures, crossrefFeatures: ReduceFeatures)) = entry + new ReduceOutput( + slug, + Scorable.computeSimilarity(grobidFeatures, crossrefFeatures), + grobidFeatures.json, + crossrefFeatures.json) + } + .map { entry => (entry.slug, entry.score, entry.json1, entry.json2) } + .write(TypedTsv[(String, Int, String, String)](args("output"))) +} diff --git a/scalding/src/main/scala/sandcrawler/StringUtilities.scala b/scalding/src/main/scala/sandcrawler/StringUtilities.scala new file mode 100644 index 0000000..9150ced --- /dev/null +++ b/scalding/src/main/scala/sandcrawler/StringUtilities.scala @@ -0,0 +1,76 @@ +package sandcrawler + +import java.text.Normalizer +import java.util.regex.Pattern + +object StringUtilities { + // bnewbold: I propose that we: + // 1. keep only \p{Ideographic}, \p{Alphabetic}, and \p{Digit} + // 2. strip accents + // 3. "lower-case" (unicode-aware) + // 4. do any final custom/manual mappings + // + // We should check (test) that null bytes are handled, in addition to other + // more obvious characters + + // Adapted from https://git-wip-us.apache.org/repos/asf?p=commons-lang.git;a=blob;f=src/main/java/org/apache/commons/lang3/StringUtils.java;h=1d7b9b99335865a88c509339f700ce71ce2c71f2;hb=HEAD#l934 + def removeAccents(s : String) : String = { + val replacements = Map( + '\u0141' -> 'L', + '\u0142' -> 'l', // Letter ell + '\u00d8' -> 'O', + '\u00f8' -> 'o' + ) + val sb = new StringBuilder(Normalizer.normalize(s, Normalizer.Form.NFD)) + for (i <- 0 to sb.length - 1) { + for (key <- replacements.keys) { + if (sb(i) == key) { + sb.deleteCharAt(i); + sb.insert(i, replacements(key)) + } + } + } + val pattern = Pattern.compile("\\p{InCombiningDiacriticalMarks}+") + pattern.matcher(sb).replaceAll("") + } + + // Source: https://stackoverflow.com/a/30076541/631051 + def removePunctuation(s: String) : String = { + s.replaceAll("""[\p{Punct}’·“”‘’“”«»「」¿–±§ʿ]""", "") + } + + // Adapted from: https://stackoverflow.com/a/16018452/631051 + def similarity(s1a : String, s2a : String) : Double = { + val (s1, s2) = (removeAccents(removePunctuation(s1a)), + removeAccents(removePunctuation(s2a))) + val longer : String = if (s1.length > s2.length) s1 else s2 + val shorter : String = if (s1.length > s2.length) s2 else s1 + if (longer.length == 0) { + // Both strings are empty. + 1 + } else { + (longer.length - stringDistance(longer, shorter)) / longer.length.toDouble + } + } + + // Source: https://oldfashionedsoftware.com/2009/11/19/string-distance-and-refactoring-in-scala/ + def stringDistance(s1: String, s2: String): Int = { + val memo = scala.collection.mutable.Map[(List[Char],List[Char]),Int]() + def min(a:Int, b:Int, c:Int) = Math.min( Math.min( a, b ), c) + def sd(s1: List[Char], s2: List[Char]): Int = { + if (!memo.contains((s1, s2))) { + memo((s1,s2)) = (s1, s2) match { + case (_, Nil) => s1.length + case (Nil, _) => s2.length + case (c1::t1, c2::t2) => + min( sd(t1,s2) + 1, sd(s1,t2) + 1, + sd(t1,t2) + (if (c1==c2) 0 else 1) ) + } + } + memo((s1,s2)) + } + + sd( s1.toList, s2.toList ) + } +} + |