diff options
Diffstat (limited to 'scalding/src/main')
10 files changed, 606 insertions, 42 deletions
| diff --git a/scalding/src/main/resources/slug-blacklist.txt b/scalding/src/main/resources/slug-blacklist.txt new file mode 100644 index 0000000..6bc947b --- /dev/null +++ b/scalding/src/main/resources/slug-blacklist.txt @@ -0,0 +1,427 @@ +abbreviations +abstract +acknowledgements +article +authorreply +authorsreply +bookreview +bookreviews +casereport +commentary +commentaryon +commenton +commentto +contents +correspondence +dedication +editorialadvisoryboard +focus +hypothesis +inbrief +introduction +introductiontotheissue +lettertotheeditor +listofabbreviations +note +overview +preface +references +results +review +reviewarticle +summary +title +name +website +technicalreport +abstractsofaapaposterandpodiumpresentations +affiliation +authorindexforvolume81 +bookreviewssection +links +ll +description +indicegeneral +genealogy +summariesofkeyjournalarticles +summer +0 +jobdescription +socialengineering +approximation +thetimes +atribute +components +entscheidungsverzeichnis +indexofauthorsandtitles +inventions +newlyelectedmembersofthecollege +summaryofproceedings +appointmentsandstaffchanges +classes +finalexam +partone +referenciasbibliograficas +abstractsofthesesfromthescandinaviancountries +acknowledgementsvii +messagefromgeneralcochairs +mm +redaktorensforord +109 +place +computerscience +medicinalchemistry +homework +sun +dedicatoria +outline +affect +section +informationtoauthors +theeditorsdesk +entrevista +thefirstauthorreplies +councilminutes +no +furtherreadings +printing +glosario +lucina +mrsnews +85 +bookofabstracts +literaturrundschau +thismonthin +listedestableaux +sommario +specialsection +revieweracknowledgement2013 +abstractnotsubmittedforonlinepublication +essays +tableofcontentsandprologue +76 +callforarticles +ataglance +fichatecnica +literacy +theyearinreview +print +symptom +66 +additionalresources +unitednations +meetingabstracts +annualacknowledgementofmanuscriptreviewers +specifications +boardoftrustees +timemanagement +papers +importantnotice +annexa +linearregression +chaptertwo +finalreport +suggestedreadings +essay +copyrightform +resumo +bigdata +chapter10 +publication +changes +presentacio +ai +materialsafetydatasheet +home +dearreaders +acknowledgmentofreferees +maintenance +question +listofpublications +responsetothelettertotheeditor +communiquedepresse +58 +noii +litteraturverzeichniss +whatshappening +chairmansopeningremarks +editorscorrespondence +award +thebasics +56 +conservation +blood +chapter7 +chaos +citation +memoranda +figure3 +distribution +regression +synthese +context +editorialconsultants +theoreticalbackground +lecture +electrophoresis +editorspicks +introductorycomments +collaborateurs +recensions +personalandmiscellaneous +issuesandevents +theworldbank +51 +reviewsanddescriptionsoftablesandbooks +congratulations +definition +yourquestionsanswered +chapterone +missionstatement +62 +newsandreviews +decisionmaking +41 +projectmanagement +community +metaanalysis +magazin +other +linearalgebra +online +exercises +languageteaching +address +radiology +foreward +heartfailure +editoriale +memorandum +editorialinformation +revieweracknowledgement +safety +21 +features +aboutauthors +messagefromtheprogramcochairs +health +security +literaturecited +meetingsandconferences +messagefromthechairs +feature +meetingsofinterest +introductiongenerale +pressrelease +lungcancer +6 +7 +institutenews +researchresearchers +profile +11 +5 +rehabilitation +documents +currentresearch +4 +originalarticle +committee +endnotes +chapter1 +originalarticles +guidelinesforcontributors +chapteroneintroduction +aboutthecover +membershipapplication +awardsappointmentsannouncements +materialsandmethods +symposium +abstractsofcommunications +oralabstracts +insidethisissue +abreviations +ear +editorialstaff +proceedings +commentaries +perspective +agradecimientos +iii +glossaryofterms +appendix1 +editorialeditorial +interview +fortherecord +abstractwithdrawn +highlightsfromthisissue +materials +keywords +editorscomment +editorscorner +readersforum +messagefromtheprogramchairs +authorguidelines +resumes +reviewessay +editorinchief +8 +frequentlyaskedquestions +aimsandscope +abouttheeditors +indice +editorialstatement +sommaire +inthestudy +listofreferees +background +bookreviewsandnotices +articlesofsignificantinterestselectedfromthisissuebytheeditors +fromthepresident +panorama +welcome +history +continuingeducation +livresrecus +acknowledgementtoreferees +equipment +coverimage +1 +summaries +positionsavailable +y +congresscalendar +conferencereport +upcomingevents +posterpresentations +editorsintroduction +rezension +appendixb +2 +generalinformation +theauthors +notesandcomments +subscriptions +w +listoffigures +communication +calendarofmeetings +workscited +n +o +d +editorialcomments +listofreviewers +notitle +editorschoice +lettertotheeditors +comments +sumario +editorialintroduction +resenas +introduccion +classifieds +tocorrespondents +notesforcontributors +forthcomingarticles +bibliografia +ii +editorialsoftwaresurveysection +listofparticipants +advertisersindex +generaldiscussion +paperstoappearinforthcomingissues +indexofauthors +letterfromtheeditor +chapter1introduction +organizingcommittee +noticeboard +guideforauthors +chapteri +resources +resume +u +p +addendum +impressum +copyrightnotice +r +associationnews +classified +s +l +corrigenda +presentacion +instructionsforauthors +resumen +editorialnote +comptesrendus +response +curriculumvitae +obituaries +meetings +education +e +m +a +forthcomingevents +reviewers +abouttheauthors +fromtheeditors +i +instructionstoauthors +newproducts +callforpapers +blankpage +apresentacao +publishersnote +newbooks +corrections +tabledesmatieres +calendarofevents +editorsnote +shorternotices +listofcontributors +notesandnews +editorialcomment +newsandnotes +comment +inmemoriam +presentation +executivesummary +guesteditorial +avantpropos +discussion +glossary +letters +notes +reply +contributors +acknowledgments +fromtheeditor +buchbesprechungen +obituary +inthisissue +notesoncontributors +conclusion +einleitung +inhalt +bibliography +titeleiinhaltsverzeichnis +booksreceived +calendar +inhaltsverzeichnis +corrigendum +subjectindex +errata +frontmatter +correction +abstracts +letterstotheeditor +foreword +tableofcontents +authorindex +index +erratum +editorialboard +editorial diff --git a/scalding/src/main/scala/sandcrawler/CrossrefScorable.scala b/scalding/src/main/scala/sandcrawler/CrossrefScorable.scala index ff8201a..5d1eaf5 100644 --- a/scalding/src/main/scala/sandcrawler/CrossrefScorable.scala +++ b/scalding/src/main/scala/sandcrawler/CrossrefScorable.scala @@ -35,7 +35,7 @@ object CrossrefScorable {              new MapFeatures(Scorable.NoSlug, json)            } else {              // bnewbold: not checking that titles(0) is non-null/non-empty; case would be, in JSON, "title": [ null ] -            val sf : ScorableFeatures = new ScorableFeatures(title=titles(0), doi=doi) +            val sf : ScorableFeatures = ScorableFeatures.create(title=titles(0), doi=doi)              new MapFeatures(sf.toSlug, sf.toString)            }          } else { diff --git a/scalding/src/main/scala/sandcrawler/GrobidScorable.scala b/scalding/src/main/scala/sandcrawler/GrobidScorable.scala index 9a09e05..e510f75 100644 --- a/scalding/src/main/scala/sandcrawler/GrobidScorable.scala +++ b/scalding/src/main/scala/sandcrawler/GrobidScorable.scala @@ -24,10 +24,10 @@ class GrobidScorable extends Scorable with HBasePipeConversions {      getSource(args)        .read        // Can't just "fromBytesWritable" because we have multiple types? -      .toTypedPipe[(ImmutableBytesWritable,ImmutableBytesWritable,ImmutableBytesWritable)](new Fields("key", "tei_json", "status_code")) -      .filter { case (_, tei_json, status_code) => tei_json != null && status_code != null } -      .map { case (key, tei_json, status_code) => -        (Bytes.toString(key.copyBytes()), Bytes.toString(tei_json.copyBytes()), Bytes.toLong(status_code.copyBytes())) +      .toTypedPipe[(ImmutableBytesWritable,ImmutableBytesWritable,ImmutableBytesWritable)](new Fields("key", "metadata", "status_code")) +      .filter { case (_, metadata, status_code) => metadata != null && status_code != null } +      .map { case (key, metadata, status_code) => +        (Bytes.toString(key.copyBytes()), Bytes.toString(metadata.copyBytes()), Bytes.toLong(status_code.copyBytes()))        }        // TODO: Should I combine next two stages for efficiency?        .collect { case (key, json, StatusOK) => (key, json) } @@ -37,7 +37,7 @@ class GrobidScorable extends Scorable with HBasePipeConversions {  object GrobidScorable {    def getHBaseSource(table : String, host : String) : HBaseSource = { -    HBaseBuilder.build(table, host, List("grobid0:tei_json", "grobid0:status_code"), SourceMode.SCAN_ALL) +    HBaseBuilder.build(table, host, List("grobid0:metadata", "grobid0:status_code"), SourceMode.SCAN_ALL)    }    def jsonToMapFeatures(key : String, json : String) : MapFeatures = { @@ -45,7 +45,7 @@ object GrobidScorable {        case None => MapFeatures(Scorable.NoSlug, json)        case Some(map) => {          if (map contains "title") { -          new ScorableFeatures(Scorable.getString(map, "title"), sha1=key).toMapFeatures +          ScorableFeatures.create(title=Scorable.getString(map, "title"), sha1=key).toMapFeatures          } else {            MapFeatures(Scorable.NoSlug, json)          } diff --git a/scalding/src/main/scala/sandcrawler/GrobidScorableDumpJob.scala b/scalding/src/main/scala/sandcrawler/GrobidScorableDumpJob.scala new file mode 100644 index 0000000..468b68e --- /dev/null +++ b/scalding/src/main/scala/sandcrawler/GrobidScorableDumpJob.scala @@ -0,0 +1,62 @@ + +package sandcrawler + +import cascading.flow.FlowDef +import cascading.pipe.Pipe +import cascading.tuple.Fields +import com.twitter.scalding._ +import com.twitter.scalding._ +import com.twitter.scalding.typed.TDsl._ +import com.twitter.scalding.typed.TDsl._ +import org.apache.hadoop.hbase.io.ImmutableBytesWritable +import org.apache.hadoop.hbase.util.Bytes +import parallelai.spyglass.base.JobBase +import parallelai.spyglass.hbase.HBaseConstants.SourceMode +import parallelai.spyglass.hbase.HBasePipeConversions +import parallelai.spyglass.hbase.HBaseSource + +class GrobidScorableDumpJob(args: Args) extends JobBase(args) { + +  val grobidHbaseRows = Stat("hbase-rows-scanned", "hbase-grobid-dump") +  val filteredGrobidRows = Stat("grobid-rows-filtered", "hbase-grobid-dump") +  val parsedGrobidRows = Stat("grobid-rows-parsed", "hbase-grobid-dump") +  val validGrobidRows = Stat("grobid-rows-valid-slug", "hbase-grobid-dump") + +  val pipe = GrobidScorable.getHBaseSource(args("hbase-table"), args("zookeeper-hosts")) +    .read +    // Can't just "fromBytesWritable" because we have multiple types? +    .toTypedPipe[(ImmutableBytesWritable,ImmutableBytesWritable,ImmutableBytesWritable)](new Fields("key", "metadata", "status_code")) +    .filter { case (_, metadata, status_code) => +      grobidHbaseRows.inc +      metadata != null && status_code != null +    } +    .map { case (key, metadata, status_code) => +      (Bytes.toString(key.copyBytes()), Bytes.toString(metadata.copyBytes()), Bytes.toLong(status_code.copyBytes())) +    } +    // TODO: Should I combine next two stages for efficiency? +    .collect { case (key, json, 200) => +      filteredGrobidRows.inc +      (key, json) +    } +    .map { entry : (String, String) => +      parsedGrobidRows.inc +      GrobidScorable.jsonToMapFeatures(entry._1, entry._2) +    } +    .filter { entry => Scorable.isValidSlug(entry.slug) } +    .map { entry => +      validGrobidRows.inc +      entry +    } +    // XXX: this groupBy after the map? +    .groupBy { case MapFeatures(slug, json) => slug } +    .map { tuple => +      val (slug : String, features : MapFeatures) = tuple +      (slug, ReduceFeatures(features.json)) +    } + +  pipe +    .map { case (slug, features) => +      (slug, features.json) +    } +    .write(TypedTsv[(String, String)](args("output"))) +} diff --git a/scalding/src/main/scala/sandcrawler/HBaseColCountJob.scala b/scalding/src/main/scala/sandcrawler/HBaseColCountJob.scala new file mode 100644 index 0000000..20cc7a1 --- /dev/null +++ b/scalding/src/main/scala/sandcrawler/HBaseColCountJob.scala @@ -0,0 +1,37 @@ +package sandcrawler + +import java.util.Properties + +import cascading.property.AppProps +import cascading.tuple.Fields +import com.twitter.scalding._ +import parallelai.spyglass.base.JobBase +import parallelai.spyglass.hbase.HBaseConstants.SourceMode +import parallelai.spyglass.hbase.HBasePipeConversions +import parallelai.spyglass.hbase.HBaseSource + +class HBaseColCountJob(args: Args) extends JobBase(args) with HBasePipeConversions { + +  val output = args("output") + +  HBaseColCountJob.getHBaseSource( +    args("hbase-table"), +    args("zookeeper-hosts"), +    args("column")) +    .read +    .debug +    .groupAll { _.size('count) } +    .write(Tsv(output)) +} + +object HBaseColCountJob { + +  // eg, "wbgrp-journal-extract-0-qa",7 "mtrcs-zk1.us.archive.org:2181" +  def getHBaseSource(hbaseTable: String, zookeeperHosts: String, col: String) : HBaseSource = { +    HBaseBuilder.build( +      hbaseTable, +      zookeeperHosts, +      List(col), +      SourceMode.SCAN_ALL) +  } +} diff --git a/scalding/src/main/scala/sandcrawler/HBaseStatusCodeCountJob.scala b/scalding/src/main/scala/sandcrawler/HBaseStatusCodeCountJob.scala new file mode 100644 index 0000000..4d9880f --- /dev/null +++ b/scalding/src/main/scala/sandcrawler/HBaseStatusCodeCountJob.scala @@ -0,0 +1,32 @@ +package sandcrawler + +import java.util.Properties + +import cascading.property.AppProps +import cascading.tuple.Fields +import com.twitter.scalding._ +import com.twitter.scalding.typed.TDsl._ +import org.apache.hadoop.hbase.io.ImmutableBytesWritable +import org.apache.hadoop.hbase.util.Bytes +import parallelai.spyglass.base.JobBase +import parallelai.spyglass.hbase.HBaseConstants.SourceMode +import parallelai.spyglass.hbase.HBasePipeConversions +import parallelai.spyglass.hbase.HBaseSource + +class HBaseStatusCodeCountJob(args: Args) extends JobBase(args) with HBasePipeConversions { + +  val source = HBaseCountJob.getHBaseSource( +    args("hbase-table"), +    args("zookeeper-hosts"), +    "grobid0:status_code") + +  val statusPipe : TypedPipe[Long] = source +    .read +    .toTypedPipe[(ImmutableBytesWritable,ImmutableBytesWritable)]('key, 'status_code) +    .map { case (key, raw_code) => Bytes.toLong(raw_code.copyBytes()) } + +  statusPipe.groupBy { identity } +    .size +    .debug +    .write(TypedTsv[(Long,Long)](args("output"))) +} diff --git a/scalding/src/main/scala/sandcrawler/HBaseStatusCountJob.scala b/scalding/src/main/scala/sandcrawler/HBaseStatusCountJob.scala index fd0b4e2..f79d672 100644 --- a/scalding/src/main/scala/sandcrawler/HBaseStatusCountJob.scala +++ b/scalding/src/main/scala/sandcrawler/HBaseStatusCountJob.scala @@ -18,15 +18,15 @@ class HBaseStatusCountJob(args: Args) extends JobBase(args) with HBasePipeConver    val source = HBaseCountJob.getHBaseSource(      args("hbase-table"),      args("zookeeper-hosts"), -    "grobid0:status_code") +    "grobid0:status") -  val statusPipe : TypedPipe[Long] = source +  val statusPipe : TypedPipe[String] = source      .read -    .toTypedPipe[(ImmutableBytesWritable,ImmutableBytesWritable)]('key, 'status_code) -    .map { case (key, raw_code) => Bytes.toLong(raw_code.copyBytes()) } +    .toTypedPipe[(ImmutableBytesWritable,ImmutableBytesWritable)]('key, 'status) +    .map { case (key, raw_status) => Bytes.toString(raw_status.copyBytes()) }    statusPipe.groupBy { identity }      .size      .debug -    .write(TypedTsv[(Long,Long)](args("output"))) +    .write(TypedTsv[(String,Long)](args("output")))  } diff --git a/scalding/src/main/scala/sandcrawler/ScorableFeatures.scala b/scalding/src/main/scala/sandcrawler/ScorableFeatures.scala index 8ed3369..0b9868a 100644 --- a/scalding/src/main/scala/sandcrawler/ScorableFeatures.scala +++ b/scalding/src/main/scala/sandcrawler/ScorableFeatures.scala @@ -1,31 +1,35 @@  package sandcrawler +import java.io.InputStream + +import scala.io.Source  import scala.util.parsing.json.JSONObject +object ScorableFeatures { +  // TODO: Add exception handling. +  val fileStream : InputStream = getClass.getResourceAsStream("/slug-blacklist.txt") +  val SlugBlacklist : Set[String] = Source.fromInputStream(fileStream).getLines.toSet +  fileStream.close + +  // Static factory method +  def create(title : String, year : Int = 0, doi : String = "", sha1 : String = "") : ScorableFeatures = { +    new ScorableFeatures( +      title=if (title == null) "" else title, +      year=year, +      doi=if (doi == null) "" else doi, +      sha1=if (sha1 == null) "" else sha1) +  } +}  // Contains features needed to make slug and to score (in combination -// with a second ScorableFeatures). -class ScorableFeatures(title : String, year: Int = 0, doi : String = "", sha1: String = "") { - -  val slugBlacklist = Set( "abbreviations", "abstract", "acknowledgements", -    "article", "authorreply", "authorsreply", "bookreview", "bookreviews", -    "casereport", "commentary", "commentaryon", "commenton", "commentto", -    "contents", "correspondence", "dedication", "editorialadvisoryboard", -    "focus", "hypothesis", "inbrief", "introduction", "introductiontotheissue", -    "lettertotheeditor", "listofabbreviations", "note", "overview", "preface", -    "references", "results", "review", "reviewarticle", "summary", "title", -    "name") - -  def toMap() : Map[String, Any] = { -    Map("title" -> (if (title == null) "" else title), -        "year" -> year, -        "doi" -> (if (doi == null) "" else doi), -        "sha1" -> (if (sha1 == null) "" else sha1)) -  } +// with a second ScorableFeatures). Create with above static factory method. +class ScorableFeatures private(title : String, year: Int = 0, doi : String = "", sha1: String = "") { -  override def toString() : String = { -    JSONObject(toMap()).toString -  } +  def toMap() : Map[String, Any] = +    Map("title" -> title, "year" -> year, "doi" -> doi, "sha1" -> sha1) + +  override def toString() : String = +    JSONObject(toMap).toString    def toSlug() : String = {      if (title == null) { @@ -34,11 +38,10 @@ class ScorableFeatures(title : String, year: Int = 0, doi : String = "", sha1: S        val unaccented = StringUtilities.removeAccents(title)        // Remove punctuation        val slug = StringUtilities.removePunctuation((unaccented.toLowerCase())).replaceAll("\\s", "") -      if (slug.isEmpty || slug == null || (slugBlacklist contains slug)) Scorable.NoSlug else slug +      if (slug.isEmpty || slug == null || (ScorableFeatures.SlugBlacklist contains slug)) Scorable.NoSlug else slug      }    } -  def toMapFeatures = { +  def toMapFeatures : MapFeatures =      MapFeatures(toSlug, toString) -  }  } diff --git a/scalding/src/main/scala/sandcrawler/ScoreJob.scala b/scalding/src/main/scala/sandcrawler/ScoreJob.scala index 75d45e9..28e9132 100644 --- a/scalding/src/main/scala/sandcrawler/ScoreJob.scala +++ b/scalding/src/main/scala/sandcrawler/ScoreJob.scala @@ -13,15 +13,18 @@ class ScoreJob(args: Args) extends JobBase(args) {    val pipe1 : TypedPipe[(String, ReduceFeatures)] = sc1.getInputPipe(args)    val pipe2 : TypedPipe[(String, ReduceFeatures)] = sc2.getInputPipe(args) -  pipe1.join(pipe2).map { entry => -    val (slug : String, (features1 : ReduceFeatures, features2 : ReduceFeatures)) = entry -    new ReduceOutput( +  pipe1 +    .addTrap(TypedTsv(args("output") + ".trapped")) +    .join(pipe2) +    .map { entry => +      val (slug : String, (features1 : ReduceFeatures, features2 : ReduceFeatures)) = entry +      new ReduceOutput(        slug,        Scorable.computeSimilarity(features1, features2),        features1.json,        features2.json) -  } -  //TypedTsv doesn't work over case classes. +    } +    //TypedTsv doesn't work over case classes.      .map { entry => (entry.slug, entry.score, entry.json1, entry.json2) }      .write(TypedTsv[(String, Int, String, String)](args("output")))  } diff --git a/scalding/src/main/scala/sandcrawler/StringUtilities.scala b/scalding/src/main/scala/sandcrawler/StringUtilities.scala index 2745875..e03b60d 100644 --- a/scalding/src/main/scala/sandcrawler/StringUtilities.scala +++ b/scalding/src/main/scala/sandcrawler/StringUtilities.scala @@ -36,7 +36,7 @@ object StringUtilities {    // Source: https://stackoverflow.com/a/30076541/631051    def removePunctuation(s: String) : String = { -    s.replaceAll("""[\p{Punct}]""", "") +    s.replaceAll("""[\p{Punct}’·“”‘’“”«»「」]""", "")    }    // Adapted from: https://stackoverflow.com/a/16018452/631051 | 
