diff options
21 files changed, 1016 insertions, 62 deletions
diff --git a/CONTRIBUTORS b/CONTRIBUTORS new file mode 100644 index 0000000..f6dea1c --- /dev/null +++ b/CONTRIBUTORS @@ -0,0 +1,4 @@ +Bryan Newbold + +Ellen Spertus transfers copyright of all of her contributions to the +repository in exchange for one Internet Archive Sticker, received.
\ No newline at end of file @@ -95,6 +95,27 @@ def run_rowcount(args): env=args.env) subprocess.call(cmd, shell=True) +def run_statuscodecount(args): + if args.rebuild: + rebuild_scalding() + print("Starting statuscodecount job...") + output = "{}/output-{}/{}-statuscodecount".format( + HDFS_DIR, + args.env, + datetime.strftime(datetime.now(), "%Y-%m-%d-%H%M.%S")) + cmd = """hadoop jar \ + scalding/target/scala-2.11/sandcrawler-assembly-0.2.0-SNAPSHOT.jar \ + com.twitter.scalding.Tool sandcrawler.HBaseStatusCodeCountJob \ + --hdfs \ + --app.conf.path scalding/ia_cluster.conf \ + --hbase-table wbgrp-journal-extract-0-{env} \ + --zookeeper-hosts {zookeeper_hosts} \ + --output {output}""".format( + output=output, + zookeeper_hosts=ZOOKEEPER_HOSTS, + env=args.env) + subprocess.call(cmd, shell=True) + def run_statuscount(args): if args.rebuild: rebuild_scalding() @@ -126,10 +147,15 @@ def run_matchcrossref(args): datetime.strftime(datetime.now(), "%Y-%m-%d-%H%M.%S")) # Notes: -D options must come after Tool but before class name # https://github.com/twitter/scalding/wiki/Frequently-asked-questions#how-do-i-pass-parameters-to-my-hadoop-job-number-of-reducers--memory-options--etc- + # Compression: changed due to errors in production + # https://stackoverflow.com/a/11336820/4682349 cmd = """hadoop jar \ scalding/target/scala-2.11/sandcrawler-assembly-0.2.0-SNAPSHOT.jar \ com.twitter.scalding.Tool \ -Dmapred.reduce.tasks={reducers} \ + -Dcascading.spill.list.threshold=500000 \ + -D mapred.output.compress=false \ + -Dmapred.compress.map.output=true\ sandcrawler.ScoreJob \ --hdfs \ --app.conf.path scalding/ia_cluster.conf \ @@ -144,6 +170,51 @@ def run_matchcrossref(args): crossref_input=args.crossref_input) subprocess.call(cmd, shell=True) +def run_grobidscorabledump(args): + if args.rebuild: + rebuild_scalding() + print("Starting grobid-scorable-dump job...") + output = "{}/output-{}/{}-grobidscorabledump".format( + HDFS_DIR, + args.env, + datetime.strftime(datetime.now(), "%Y-%m-%d-%H%M.%S")) + cmd = """hadoop jar \ + scalding/target/scala-2.11/sandcrawler-assembly-0.2.0-SNAPSHOT.jar \ + com.twitter.scalding.Tool sandcrawler.GrobidScorableDumpJob \ + --hdfs \ + --app.conf.path scalding/ia_cluster.conf \ + --hbase-table wbgrp-journal-extract-0-{env} \ + --zookeeper-hosts {zookeeper_hosts} \ + --output {output}""".format( + output=output, + zookeeper_hosts=ZOOKEEPER_HOSTS, + env=args.env) + subprocess.call(cmd, shell=True) + +def run_colcount(args): + if args.rebuild: + rebuild_scalding() + print("Starting colcount job...") + output = "{}/output-{}/{}-colcount-{}".format( + HDFS_DIR, + args.env, + datetime.strftime(datetime.now(), "%Y-%m-%d-%H%M.%S"), + args.column.replace(':', '_')) + cmd = """hadoop jar \ + scalding/target/scala-2.11/sandcrawler-assembly-0.2.0-SNAPSHOT.jar \ + com.twitter.scalding.Tool sandcrawler.HBaseColCountJob \ + --hdfs \ + --app.conf.path scalding/ia_cluster.conf \ + --hbase-table wbgrp-journal-extract-0-{env} \ + --zookeeper-hosts {zookeeper_hosts} \ + --column {column} \ + --output {output}""".format( + column=args.column, + output=output, + zookeeper_hosts=ZOOKEEPER_HOSTS, + env=args.env) + subprocess.call(cmd, shell=True) + def main(): parser = argparse.ArgumentParser() @@ -174,6 +245,9 @@ def main(): sub_statuscount = subparsers.add_parser('status-count') sub_statuscount.set_defaults(func=run_statuscount) + sub_statuscodecount = subparsers.add_parser('status-code-count') + sub_statuscodecount.set_defaults(func=run_statuscodecount) + sub_matchcrossref = subparsers.add_parser('match-crossref') sub_matchcrossref.set_defaults(func=run_matchcrossref) sub_matchcrossref.add_argument('crossref_input', @@ -182,6 +256,13 @@ def main(): help="number of reducers to run", type=int, default=30) + sub_grobidscorabledump = subparsers.add_parser('grobid-scorable-dump') + sub_grobidscorabledump.set_defaults(func=run_grobidscorabledump) + + sub_colcount = subparsers.add_parser('col-count') + sub_colcount.set_defaults(func=run_colcount) + sub_colcount.add_argument('column', + help="column name to use in count") args = parser.parse_args() if not args.__dict__.get("func"): diff --git a/scalding/build.sbt b/scalding/build.sbt index d477399..01f55ca 100644 --- a/scalding/build.sbt +++ b/scalding/build.sbt @@ -42,7 +42,7 @@ lazy val root = (project in file(".")). libraryDependencies += "org.apache.hadoop" % "hadoop-client" % hadoopVersion, libraryDependencies += "org.apache.hadoop" % "hadoop-mapreduce-client-jobclient" % hadoopVersion classifier "tests", libraryDependencies += "org.apache.hbase" % "hbase-common" % hbaseVersion, - libraryDependencies += "parallelai" % "parallelai.spyglass" % "2.11_0.17.2_cdh5.3.1", + libraryDependencies += "parallelai" % "parallelai.spyglass" % "2.11_0.17.2_cdh5.3.1-p1", // cargo-culted from twitter/scalding's build.sbt // hint via https://stackoverflow.com/questions/23280494/sbt-assembly-error-deduplicate-different-file-contents-found-in-the-following#23280952 diff --git a/scalding/src/main/resources/slug-blacklist.txt b/scalding/src/main/resources/slug-blacklist.txt new file mode 100644 index 0000000..6bc947b --- /dev/null +++ b/scalding/src/main/resources/slug-blacklist.txt @@ -0,0 +1,427 @@ +abbreviations +abstract +acknowledgements +article +authorreply +authorsreply +bookreview +bookreviews +casereport +commentary +commentaryon +commenton +commentto +contents +correspondence +dedication +editorialadvisoryboard +focus +hypothesis +inbrief +introduction +introductiontotheissue +lettertotheeditor +listofabbreviations +note +overview +preface +references +results +review +reviewarticle +summary +title +name +website +technicalreport +abstractsofaapaposterandpodiumpresentations +affiliation +authorindexforvolume81 +bookreviewssection +links +ll +description +indicegeneral +genealogy +summariesofkeyjournalarticles +summer +0 +jobdescription +socialengineering +approximation +thetimes +atribute +components +entscheidungsverzeichnis +indexofauthorsandtitles +inventions +newlyelectedmembersofthecollege +summaryofproceedings +appointmentsandstaffchanges +classes +finalexam +partone +referenciasbibliograficas +abstractsofthesesfromthescandinaviancountries +acknowledgementsvii +messagefromgeneralcochairs +mm +redaktorensforord +109 +place +computerscience +medicinalchemistry +homework +sun +dedicatoria +outline +affect +section +informationtoauthors +theeditorsdesk +entrevista +thefirstauthorreplies +councilminutes +no +furtherreadings +printing +glosario +lucina +mrsnews +85 +bookofabstracts +literaturrundschau +thismonthin +listedestableaux +sommario +specialsection +revieweracknowledgement2013 +abstractnotsubmittedforonlinepublication +essays +tableofcontentsandprologue +76 +callforarticles +ataglance +fichatecnica +literacy +theyearinreview +print +symptom +66 +additionalresources +unitednations +meetingabstracts +annualacknowledgementofmanuscriptreviewers +specifications +boardoftrustees +timemanagement +papers +importantnotice +annexa +linearregression +chaptertwo +finalreport +suggestedreadings +essay +copyrightform +resumo +bigdata +chapter10 +publication +changes +presentacio +ai +materialsafetydatasheet +home +dearreaders +acknowledgmentofreferees +maintenance +question +listofpublications +responsetothelettertotheeditor +communiquedepresse +58 +noii +litteraturverzeichniss +whatshappening +chairmansopeningremarks +editorscorrespondence +award +thebasics +56 +conservation +blood +chapter7 +chaos +citation +memoranda +figure3 +distribution +regression +synthese +context +editorialconsultants +theoreticalbackground +lecture +electrophoresis +editorspicks +introductorycomments +collaborateurs +recensions +personalandmiscellaneous +issuesandevents +theworldbank +51 +reviewsanddescriptionsoftablesandbooks +congratulations +definition +yourquestionsanswered +chapterone +missionstatement +62 +newsandreviews +decisionmaking +41 +projectmanagement +community +metaanalysis +magazin +other +linearalgebra +online +exercises +languageteaching +address +radiology +foreward +heartfailure +editoriale +memorandum +editorialinformation +revieweracknowledgement +safety +21 +features +aboutauthors +messagefromtheprogramcochairs +health +security +literaturecited +meetingsandconferences +messagefromthechairs +feature +meetingsofinterest +introductiongenerale +pressrelease +lungcancer +6 +7 +institutenews +researchresearchers +profile +11 +5 +rehabilitation +documents +currentresearch +4 +originalarticle +committee +endnotes +chapter1 +originalarticles +guidelinesforcontributors +chapteroneintroduction +aboutthecover +membershipapplication +awardsappointmentsannouncements +materialsandmethods +symposium +abstractsofcommunications +oralabstracts +insidethisissue +abreviations +ear +editorialstaff +proceedings +commentaries +perspective +agradecimientos +iii +glossaryofterms +appendix1 +editorialeditorial +interview +fortherecord +abstractwithdrawn +highlightsfromthisissue +materials +keywords +editorscomment +editorscorner +readersforum +messagefromtheprogramchairs +authorguidelines +resumes +reviewessay +editorinchief +8 +frequentlyaskedquestions +aimsandscope +abouttheeditors +indice +editorialstatement +sommaire +inthestudy +listofreferees +background +bookreviewsandnotices +articlesofsignificantinterestselectedfromthisissuebytheeditors +fromthepresident +panorama +welcome +history +continuingeducation +livresrecus +acknowledgementtoreferees +equipment +coverimage +1 +summaries +positionsavailable +y +congresscalendar +conferencereport +upcomingevents +posterpresentations +editorsintroduction +rezension +appendixb +2 +generalinformation +theauthors +notesandcomments +subscriptions +w +listoffigures +communication +calendarofmeetings +workscited +n +o +d +editorialcomments +listofreviewers +notitle +editorschoice +lettertotheeditors +comments +sumario +editorialintroduction +resenas +introduccion +classifieds +tocorrespondents +notesforcontributors +forthcomingarticles +bibliografia +ii +editorialsoftwaresurveysection +listofparticipants +advertisersindex +generaldiscussion +paperstoappearinforthcomingissues +indexofauthors +letterfromtheeditor +chapter1introduction +organizingcommittee +noticeboard +guideforauthors +chapteri +resources +resume +u +p +addendum +impressum +copyrightnotice +r +associationnews +classified +s +l +corrigenda +presentacion +instructionsforauthors +resumen +editorialnote +comptesrendus +response +curriculumvitae +obituaries +meetings +education +e +m +a +forthcomingevents +reviewers +abouttheauthors +fromtheeditors +i +instructionstoauthors +newproducts +callforpapers +blankpage +apresentacao +publishersnote +newbooks +corrections +tabledesmatieres +calendarofevents +editorsnote +shorternotices +listofcontributors +notesandnews +editorialcomment +newsandnotes +comment +inmemoriam +presentation +executivesummary +guesteditorial +avantpropos +discussion +glossary +letters +notes +reply +contributors +acknowledgments +fromtheeditor +buchbesprechungen +obituary +inthisissue +notesoncontributors +conclusion +einleitung +inhalt +bibliography +titeleiinhaltsverzeichnis +booksreceived +calendar +inhaltsverzeichnis +corrigendum +subjectindex +errata +frontmatter +correction +abstracts +letterstotheeditor +foreword +tableofcontents +authorindex +index +erratum +editorialboard +editorial diff --git a/scalding/src/main/scala/sandcrawler/CrossrefScorable.scala b/scalding/src/main/scala/sandcrawler/CrossrefScorable.scala index 5d1eaf5..ab33d03 100644 --- a/scalding/src/main/scala/sandcrawler/CrossrefScorable.scala +++ b/scalding/src/main/scala/sandcrawler/CrossrefScorable.scala @@ -19,29 +19,55 @@ class CrossrefScorable extends Scorable with HBasePipeConversions { def getFeaturesPipe(args : Args)(implicit mode : Mode, flowDef : FlowDef) : TypedPipe[MapFeatures] = { getSource(args).read .toTypedPipe[String](new Fields("line")) + .filter { CrossrefScorable.keepRecord(_) } .map { CrossrefScorable.jsonToMapFeatures(_) } } } object CrossrefScorable { + def keepRecord(json : String) : Boolean = { + Scorable.jsonToMap(json) match { + case None => false + case Some(map) => { + mapToTitle(map) match { + case None => false + case Some(title) => title.length <= Scorable.MaxTitleLength + } + } + } + } + + // Returns None if title is null, empty, or too long. + def mapToTitle(map : Map[String, Any]) : Option[String] = { + if (map contains "title") { + val titles = map("title").asInstanceOf[List[String]] + if (titles.isEmpty || titles == null) { + None + } else { + val title = titles(0) + if (title == null || title.isEmpty || title.length > Scorable.MaxTitleLength) None else Some(title) + } + } else { + None + } + } + def jsonToMapFeatures(json : String) : MapFeatures = { Scorable.jsonToMap(json) match { case None => MapFeatures(Scorable.NoSlug, json) - case Some(map) => { - if ((map contains "title") && (map contains "DOI")) { - val titles = map("title").asInstanceOf[List[String]] - val doi = Scorable.getString(map, "DOI") - if (titles.isEmpty || titles == null || doi.isEmpty || doi == null) { - new MapFeatures(Scorable.NoSlug, json) - } else { - // bnewbold: not checking that titles(0) is non-null/non-empty; case would be, in JSON, "title": [ null ] - val sf : ScorableFeatures = ScorableFeatures.create(title=titles(0), doi=doi) - new MapFeatures(sf.toSlug, sf.toString) + case Some(map) => + mapToTitle(map) match { + case None => MapFeatures(Scorable.NoSlug, json) + case Some(title) => { + val doi = Scorable.getString(map, "DOI") + if (doi.isEmpty || doi == null) { + MapFeatures(Scorable.NoSlug, json) + } else { + val sf : ScorableFeatures = ScorableFeatures.create(title=title, doi=doi) + MapFeatures(sf.toSlug, sf.toString) + } } - } else { - new MapFeatures(Scorable.NoSlug, json) } - } } } } diff --git a/scalding/src/main/scala/sandcrawler/GrobidScorable.scala b/scalding/src/main/scala/sandcrawler/GrobidScorable.scala index d7a1eea..76f4f22 100644 --- a/scalding/src/main/scala/sandcrawler/GrobidScorable.scala +++ b/scalding/src/main/scala/sandcrawler/GrobidScorable.scala @@ -24,20 +24,36 @@ class GrobidScorable extends Scorable with HBasePipeConversions { getSource(args) .read // Can't just "fromBytesWritable" because we have multiple types? - .toTypedPipe[(ImmutableBytesWritable,ImmutableBytesWritable,ImmutableBytesWritable)](new Fields("key", "tei_json", "status_code")) - .filter { case (_, tei_json, status_code) => tei_json != null && status_code != null } - .map { case (key, tei_json, status_code) => - (Bytes.toString(key.copyBytes()), Bytes.toString(tei_json.copyBytes()), Bytes.toLong(status_code.copyBytes())) + .toTypedPipe[(ImmutableBytesWritable,ImmutableBytesWritable,ImmutableBytesWritable)](new Fields("key", "metadata", "status_code")) + .filter { case (_, metadata, status_code) => metadata != null && status_code != null } + .map { case (key, metadata, status_code) => + (Bytes.toString(key.copyBytes()), Bytes.toString(metadata.copyBytes()), Bytes.toLong(status_code.copyBytes())) } // TODO: Should I combine next two stages for efficiency? .collect { case (key, json, StatusOK) => (key, json) } + .filter { case (key, json) => GrobidScorable.keepRecord(json) } .map { entry : (String, String) => GrobidScorable.jsonToMapFeatures(entry._1, entry._2) } } } object GrobidScorable { + def keepRecord(json : String) : Boolean = { + Scorable.jsonToMap(json) match { + case None => false + case Some(map) => { + if (map contains "title") { + val title = Scorable.getString(map, "title") + title != null && title.length <= Scorable.MaxTitleLength + } else { + false + } + } + } + } + + def getHBaseSource(table : String, host : String) : HBaseSource = { - HBaseBuilder.build(table, host, List("grobid0:tei_json", "grobid0:status_code"), SourceMode.SCAN_ALL) + HBaseBuilder.build(table, host, List("grobid0:metadata", "grobid0:status_code"), SourceMode.SCAN_ALL) } def jsonToMapFeatures(key : String, json : String) : MapFeatures = { diff --git a/scalding/src/main/scala/sandcrawler/GrobidScorableDumpJob.scala b/scalding/src/main/scala/sandcrawler/GrobidScorableDumpJob.scala new file mode 100644 index 0000000..468b68e --- /dev/null +++ b/scalding/src/main/scala/sandcrawler/GrobidScorableDumpJob.scala @@ -0,0 +1,62 @@ + +package sandcrawler + +import cascading.flow.FlowDef +import cascading.pipe.Pipe +import cascading.tuple.Fields +import com.twitter.scalding._ +import com.twitter.scalding._ +import com.twitter.scalding.typed.TDsl._ +import com.twitter.scalding.typed.TDsl._ +import org.apache.hadoop.hbase.io.ImmutableBytesWritable +import org.apache.hadoop.hbase.util.Bytes +import parallelai.spyglass.base.JobBase +import parallelai.spyglass.hbase.HBaseConstants.SourceMode +import parallelai.spyglass.hbase.HBasePipeConversions +import parallelai.spyglass.hbase.HBaseSource + +class GrobidScorableDumpJob(args: Args) extends JobBase(args) { + + val grobidHbaseRows = Stat("hbase-rows-scanned", "hbase-grobid-dump") + val filteredGrobidRows = Stat("grobid-rows-filtered", "hbase-grobid-dump") + val parsedGrobidRows = Stat("grobid-rows-parsed", "hbase-grobid-dump") + val validGrobidRows = Stat("grobid-rows-valid-slug", "hbase-grobid-dump") + + val pipe = GrobidScorable.getHBaseSource(args("hbase-table"), args("zookeeper-hosts")) + .read + // Can't just "fromBytesWritable" because we have multiple types? + .toTypedPipe[(ImmutableBytesWritable,ImmutableBytesWritable,ImmutableBytesWritable)](new Fields("key", "metadata", "status_code")) + .filter { case (_, metadata, status_code) => + grobidHbaseRows.inc + metadata != null && status_code != null + } + .map { case (key, metadata, status_code) => + (Bytes.toString(key.copyBytes()), Bytes.toString(metadata.copyBytes()), Bytes.toLong(status_code.copyBytes())) + } + // TODO: Should I combine next two stages for efficiency? + .collect { case (key, json, 200) => + filteredGrobidRows.inc + (key, json) + } + .map { entry : (String, String) => + parsedGrobidRows.inc + GrobidScorable.jsonToMapFeatures(entry._1, entry._2) + } + .filter { entry => Scorable.isValidSlug(entry.slug) } + .map { entry => + validGrobidRows.inc + entry + } + // XXX: this groupBy after the map? + .groupBy { case MapFeatures(slug, json) => slug } + .map { tuple => + val (slug : String, features : MapFeatures) = tuple + (slug, ReduceFeatures(features.json)) + } + + pipe + .map { case (slug, features) => + (slug, features.json) + } + .write(TypedTsv[(String, String)](args("output"))) +} diff --git a/scalding/src/main/scala/sandcrawler/HBaseColCountJob.scala b/scalding/src/main/scala/sandcrawler/HBaseColCountJob.scala new file mode 100644 index 0000000..20cc7a1 --- /dev/null +++ b/scalding/src/main/scala/sandcrawler/HBaseColCountJob.scala @@ -0,0 +1,37 @@ +package sandcrawler + +import java.util.Properties + +import cascading.property.AppProps +import cascading.tuple.Fields +import com.twitter.scalding._ +import parallelai.spyglass.base.JobBase +import parallelai.spyglass.hbase.HBaseConstants.SourceMode +import parallelai.spyglass.hbase.HBasePipeConversions +import parallelai.spyglass.hbase.HBaseSource + +class HBaseColCountJob(args: Args) extends JobBase(args) with HBasePipeConversions { + + val output = args("output") + + HBaseColCountJob.getHBaseSource( + args("hbase-table"), + args("zookeeper-hosts"), + args("column")) + .read + .debug + .groupAll { _.size('count) } + .write(Tsv(output)) +} + +object HBaseColCountJob { + + // eg, "wbgrp-journal-extract-0-qa",7 "mtrcs-zk1.us.archive.org:2181" + def getHBaseSource(hbaseTable: String, zookeeperHosts: String, col: String) : HBaseSource = { + HBaseBuilder.build( + hbaseTable, + zookeeperHosts, + List(col), + SourceMode.SCAN_ALL) + } +} diff --git a/scalding/src/main/scala/sandcrawler/HBaseStatusCodeCountJob.scala b/scalding/src/main/scala/sandcrawler/HBaseStatusCodeCountJob.scala new file mode 100644 index 0000000..4d9880f --- /dev/null +++ b/scalding/src/main/scala/sandcrawler/HBaseStatusCodeCountJob.scala @@ -0,0 +1,32 @@ +package sandcrawler + +import java.util.Properties + +import cascading.property.AppProps +import cascading.tuple.Fields +import com.twitter.scalding._ +import com.twitter.scalding.typed.TDsl._ +import org.apache.hadoop.hbase.io.ImmutableBytesWritable +import org.apache.hadoop.hbase.util.Bytes +import parallelai.spyglass.base.JobBase +import parallelai.spyglass.hbase.HBaseConstants.SourceMode +import parallelai.spyglass.hbase.HBasePipeConversions +import parallelai.spyglass.hbase.HBaseSource + +class HBaseStatusCodeCountJob(args: Args) extends JobBase(args) with HBasePipeConversions { + + val source = HBaseCountJob.getHBaseSource( + args("hbase-table"), + args("zookeeper-hosts"), + "grobid0:status_code") + + val statusPipe : TypedPipe[Long] = source + .read + .toTypedPipe[(ImmutableBytesWritable,ImmutableBytesWritable)]('key, 'status_code) + .map { case (key, raw_code) => Bytes.toLong(raw_code.copyBytes()) } + + statusPipe.groupBy { identity } + .size + .debug + .write(TypedTsv[(Long,Long)](args("output"))) +} diff --git a/scalding/src/main/scala/sandcrawler/HBaseStatusCountJob.scala b/scalding/src/main/scala/sandcrawler/HBaseStatusCountJob.scala index fd0b4e2..f79d672 100644 --- a/scalding/src/main/scala/sandcrawler/HBaseStatusCountJob.scala +++ b/scalding/src/main/scala/sandcrawler/HBaseStatusCountJob.scala @@ -18,15 +18,15 @@ class HBaseStatusCountJob(args: Args) extends JobBase(args) with HBasePipeConver val source = HBaseCountJob.getHBaseSource( args("hbase-table"), args("zookeeper-hosts"), - "grobid0:status_code") + "grobid0:status") - val statusPipe : TypedPipe[Long] = source + val statusPipe : TypedPipe[String] = source .read - .toTypedPipe[(ImmutableBytesWritable,ImmutableBytesWritable)]('key, 'status_code) - .map { case (key, raw_code) => Bytes.toLong(raw_code.copyBytes()) } + .toTypedPipe[(ImmutableBytesWritable,ImmutableBytesWritable)]('key, 'status) + .map { case (key, raw_status) => Bytes.toString(raw_status.copyBytes()) } statusPipe.groupBy { identity } .size .debug - .write(TypedTsv[(Long,Long)](args("output"))) + .write(TypedTsv[(String,Long)](args("output"))) } diff --git a/scalding/src/main/scala/sandcrawler/Scorable.scala b/scalding/src/main/scala/sandcrawler/Scorable.scala index 9b9c633..c704ed9 100644 --- a/scalding/src/main/scala/sandcrawler/Scorable.scala +++ b/scalding/src/main/scala/sandcrawler/Scorable.scala @@ -30,6 +30,7 @@ abstract class Scorable { } object Scorable { + val MaxTitleLength = 255 val NoSlug = "NO SLUG" // Used for slug if title is empty or unparseable def isValidSlug(slug : String) : Boolean = { diff --git a/scalding/src/main/scala/sandcrawler/ScorableFeatures.scala b/scalding/src/main/scala/sandcrawler/ScorableFeatures.scala index e71abfa..0b9868a 100644 --- a/scalding/src/main/scala/sandcrawler/ScorableFeatures.scala +++ b/scalding/src/main/scala/sandcrawler/ScorableFeatures.scala @@ -1,8 +1,16 @@ package sandcrawler +import java.io.InputStream + +import scala.io.Source import scala.util.parsing.json.JSONObject object ScorableFeatures { + // TODO: Add exception handling. + val fileStream : InputStream = getClass.getResourceAsStream("/slug-blacklist.txt") + val SlugBlacklist : Set[String] = Source.fromInputStream(fileStream).getLines.toSet + fileStream.close + // Static factory method def create(title : String, year : Int = 0, doi : String = "", sha1 : String = "") : ScorableFeatures = { new ScorableFeatures( @@ -16,14 +24,6 @@ object ScorableFeatures { // Contains features needed to make slug and to score (in combination // with a second ScorableFeatures). Create with above static factory method. class ScorableFeatures private(title : String, year: Int = 0, doi : String = "", sha1: String = "") { - val SlugBlacklist = Set( "abbreviations", "abstract", "acknowledgements", - "article", "authorreply", "authorsreply", "bookreview", "bookreviews", - "casereport", "commentary", "commentaryon", "commenton", "commentto", - "contents", "correspondence", "dedication", "editorialadvisoryboard", - "focus", "hypothesis", "inbrief", "introduction", "introductiontotheissue", - "lettertotheeditor", "listofabbreviations", "note", "overview", "preface", - "references", "results", "review", "reviewarticle", "summary", "title", - "name") def toMap() : Map[String, Any] = Map("title" -> title, "year" -> year, "doi" -> doi, "sha1" -> sha1) @@ -38,7 +38,7 @@ class ScorableFeatures private(title : String, year: Int = 0, doi : String = "", val unaccented = StringUtilities.removeAccents(title) // Remove punctuation val slug = StringUtilities.removePunctuation((unaccented.toLowerCase())).replaceAll("\\s", "") - if (slug.isEmpty || slug == null || (SlugBlacklist contains slug)) Scorable.NoSlug else slug + if (slug.isEmpty || slug == null || (ScorableFeatures.SlugBlacklist contains slug)) Scorable.NoSlug else slug } } diff --git a/scalding/src/main/scala/sandcrawler/ScoreJob.scala b/scalding/src/main/scala/sandcrawler/ScoreJob.scala index 75d45e9..28e9132 100644 --- a/scalding/src/main/scala/sandcrawler/ScoreJob.scala +++ b/scalding/src/main/scala/sandcrawler/ScoreJob.scala @@ -13,15 +13,18 @@ class ScoreJob(args: Args) extends JobBase(args) { val pipe1 : TypedPipe[(String, ReduceFeatures)] = sc1.getInputPipe(args) val pipe2 : TypedPipe[(String, ReduceFeatures)] = sc2.getInputPipe(args) - pipe1.join(pipe2).map { entry => - val (slug : String, (features1 : ReduceFeatures, features2 : ReduceFeatures)) = entry - new ReduceOutput( + pipe1 + .addTrap(TypedTsv(args("output") + ".trapped")) + .join(pipe2) + .map { entry => + val (slug : String, (features1 : ReduceFeatures, features2 : ReduceFeatures)) = entry + new ReduceOutput( slug, Scorable.computeSimilarity(features1, features2), features1.json, features2.json) - } - //TypedTsv doesn't work over case classes. + } + //TypedTsv doesn't work over case classes. .map { entry => (entry.slug, entry.score, entry.json1, entry.json2) } .write(TypedTsv[(String, Int, String, String)](args("output"))) } diff --git a/scalding/src/main/scala/sandcrawler/StringUtilities.scala b/scalding/src/main/scala/sandcrawler/StringUtilities.scala index 2745875..e03b60d 100644 --- a/scalding/src/main/scala/sandcrawler/StringUtilities.scala +++ b/scalding/src/main/scala/sandcrawler/StringUtilities.scala @@ -36,7 +36,7 @@ object StringUtilities { // Source: https://stackoverflow.com/a/30076541/631051 def removePunctuation(s: String) : String = { - s.replaceAll("""[\p{Punct}]""", "") + s.replaceAll("""[\p{Punct}’·“”‘’“”«»「」]""", "") } // Adapted from: https://stackoverflow.com/a/16018452/631051 diff --git a/scalding/src/test/scala/sandcrawler/CrossrefScorableTest.scala b/scalding/src/test/scala/sandcrawler/CrossrefScorableTest.scala index 1789d1a..3d18a21 100644 --- a/scalding/src/test/scala/sandcrawler/CrossrefScorableTest.scala +++ b/scalding/src/test/scala/sandcrawler/CrossrefScorableTest.scala @@ -66,7 +66,10 @@ class CrossrefScorableTest extends FlatSpec with Matchers { } """ // scalastyle:on - val CrossrefStringWithTitle = CrossrefString.replace("<<TITLE>>", "Some Title") + val CrossrefStringWithGoodTitle = CrossrefString.replace("<<TITLE>>", "Some Title") + val CrossrefStringWithMaximumTitle = CrossrefString.replace("<<TITLE>>", "T" * Scorable.MaxTitleLength) + val CrossrefStringWithExcessiveTitle = CrossrefString.replace("<<TITLE>>", "T" * Scorable.MaxTitleLength + "0") + val CrossrefStringWithNullTitle = CrossrefString.replace("\"<<TITLE>>\"", "null") val CrossrefStringWithEmptyTitle = CrossrefString.replace("<<TITLE>>", "") val CrossrefStringWithoutTitle = CrossrefString.replace("title", "nottitle") val MalformedCrossrefString = CrossrefString.replace("}", "") @@ -82,13 +85,18 @@ class CrossrefScorableTest extends FlatSpec with Matchers { result.slug shouldBe Scorable.NoSlug } + it should "handle null title" in { + val result = CrossrefScorable.jsonToMapFeatures(CrossrefStringWithNullTitle) + result.slug shouldBe Scorable.NoSlug + } + it should "handle empty title" in { val result = CrossrefScorable.jsonToMapFeatures(CrossrefStringWithEmptyTitle) result.slug shouldBe Scorable.NoSlug } it should "handle valid input" in { - val result = CrossrefScorable.jsonToMapFeatures(CrossrefStringWithTitle) + val result = CrossrefScorable.jsonToMapFeatures(CrossrefStringWithGoodTitle) result.slug shouldBe "sometitle" Scorable.jsonToMap(result.json) match { case None => fail() @@ -97,4 +105,28 @@ class CrossrefScorableTest extends FlatSpec with Matchers { } } } + + "CrossrefScorable.keepRecord()" should "return true for valid JSON with title" in { + CrossrefScorable.keepRecord(CrossrefStringWithGoodTitle) shouldBe true + } + + it should "return true for valid JSON with a title of maximum permitted length" in { + CrossrefScorable.keepRecord(CrossrefStringWithMaximumTitle) shouldBe true + } + + it should "return false for valid JSON with excessively long title" in { + CrossrefScorable.keepRecord(CrossrefStringWithExcessiveTitle) shouldBe false + } + + it should "return false for valid JSON with null title" in { + CrossrefScorable.keepRecord(CrossrefStringWithNullTitle) shouldBe false + } + + it should "return false for valid JSON with no title" in { + CrossrefScorable.keepRecord(CrossrefStringWithoutTitle) shouldBe false + } + + it should "return false for invalid JSON" in { + CrossrefScorable.keepRecord(CrossrefStringWithoutTitle) shouldBe false + } } diff --git a/scalding/src/test/scala/sandcrawler/GrobidScorableDumpJobTest.scala b/scalding/src/test/scala/sandcrawler/GrobidScorableDumpJobTest.scala new file mode 100644 index 0000000..12e13dc --- /dev/null +++ b/scalding/src/test/scala/sandcrawler/GrobidScorableDumpJobTest.scala @@ -0,0 +1,124 @@ + +package sandcrawler + +import cascading.tuple.Fields +import cascading.tuple.Tuple +import com.twitter.scalding.JobTest +import com.twitter.scalding.TextLine +import com.twitter.scalding.TupleConversions +import com.twitter.scalding.TypedTsv +import org.apache.hadoop.hbase.io.ImmutableBytesWritable +import org.apache.hadoop.hbase.util.Bytes +import org.scalatest._ +import parallelai.spyglass.hbase.HBaseConstants.SourceMode + +class GrobidScorableDumpJobTest extends FlatSpec with Matchers { + //scalastyle:off + val JsonString = """ +{ + "title": "<<TITLE>>", + "authors": [ + {"name": "Brewster Kahle"}, + {"name": "J Doe"} + ], + "journal": { + "name": "Dummy Example File. Journal of Fake News. pp. 1-2. ISSN 1234-5678", + "eissn": null, + "issn": null, + "issue": null, + "publisher": null, + "volume": null + }, + "date": "2000", + "doi": null, + "citations": [ + { "authors": [{"name": "A Seaperson"}], + "date": "2001", + "id": "b0", + "index": 0, + "issue": null, + "journal": "Letters in the Alphabet", + "publisher": null, + "title": "Everything is Wonderful", + "url": null, + "volume": "20"}, + { "authors": [], + "date": "2011-03-28", + "id": "b1", + "index": 1, + "issue": null, + "journal": "The Dictionary", + "publisher": null, + "title": "All about Facts", + "url": null, + "volume": "14"} + ], + "abstract": "Everything you ever wanted to know about nothing", + "body": "Introduction \nEverything starts somewhere, as somebody [1] once said. \n\n In Depth \n Meat \nYou know, for kids. \n Potatos \nQED.", + "acknowledgement": null, + "annex": null +} +""" + // scalastyle:on + val JsonStringWithTitle = JsonString.replace("<<TITLE>>", "Dummy Example File") + val JsonStringWithoutTitle = JsonString.replace("title", "nottitle") + val MalformedJsonString = JsonString.replace("}", "") + + // Pipeline tests + val output = "/tmp/testOutput" + val input = "/tmp/testInput" + val (testTable, testHost) = ("test-table", "dummy-host:2181") + + val Sha1Strings : List[String] = List( + "sha1:K2DKSSVTXWPRMFDTWSTCQW3RVWRIOV3Q", // good + "sha1:C3YNNEGH5WAG5ZAAXWAEBNXJWT6CZ3WU", // good + "sha1:SDKUVHC3YNNEGH5WAG5ZAAXWAEBNX4WT", // good + "sha1:35985C3YNNEGH5WAG5ZAAXWAEBNXJW56", // bad status + "sha1:93187A85273589347598473894839443", // malformed + "sha1:024937534094897039547e9824382943") // bad status + + val JsonStrings : List[String] = List( + JsonString.replace("<<TITLE>>", "Title 1"), + JsonString.replace("<<TITLE>>", "Title 2: TNG"), + JsonString.replace("<<TITLE>>", "Title 3: The Sequel"), + // This will have bad status. + JsonString.replace("<<TITLE>>", "Title 1"), + MalformedJsonString, + // This will have bad status. + JsonString.replace("<<TITLE>>", "Title 2") + ) + + // bnewbold: status codes aren't strings, they are uint64 + val Ok : Long = 200 + val Bad : Long = 400 + val StatusCodes = List(Ok, Ok, Ok, Bad, Ok, Bad) + + val SampleDataHead : List[Tuple] = (Sha1Strings, JsonStrings, StatusCodes) + .zipped + .toList + .map { case (sha, json, status) => List(Bytes.toBytes(sha), Bytes.toBytes(json), Bytes.toBytes(status)) } + .map { l => new Tuple(l.map(s => {new ImmutableBytesWritable(s)}):_*) } + + // scalastyle:off null + // Add example of lines without GROBID data + val SampleData = SampleDataHead :+ new Tuple( + new ImmutableBytesWritable(Bytes.toBytes("sha1:35985C3YNNEGH5WAG5ZAA88888888888")), null, null) + // scalastyle:on null + + JobTest("sandcrawler.GrobidScorableDumpJob") + .arg("test", "") + .arg("app.conf.path", "app.conf") + .arg("output", output) + .arg("hbase-table", testTable) + .arg("zookeeper-hosts", testHost) + .arg("debug", "true") + .source[Tuple](GrobidScorable.getHBaseSource(testTable, testHost), SampleData) + .sink[(String, String)](TypedTsv[(String, String)](output)) { + outputBuffer => + "The pipeline" should "return correct-length list" in { + outputBuffer should have length 3 + } + } + .run + .finish +} diff --git a/scalding/src/test/scala/sandcrawler/GrobidScorableTest.scala b/scalding/src/test/scala/sandcrawler/GrobidScorableTest.scala index 661824b..6c45cc5 100644 --- a/scalding/src/test/scala/sandcrawler/GrobidScorableTest.scala +++ b/scalding/src/test/scala/sandcrawler/GrobidScorableTest.scala @@ -57,7 +57,10 @@ class GrobidScorableTest extends FlatSpec with Matchers { "annex": null } """ - val GrobidStringWithTitle = GrobidString.replace("<<TITLE>>", "Dummy Example File") + val GrobidStringWithGoodTitle = GrobidString.replace("<<TITLE>>", "Dummy Example File") + val GrobidStringWithMaximumTitle = GrobidString.replace("<<TITLE>>", "T" * Scorable.MaxTitleLength) + val GrobidStringWithExcessiveTitle = GrobidString.replace("<<TITLE>>", "T" * Scorable.MaxTitleLength + "0") + val GrobidStringWithNullTitle = GrobidString.replace("\"<<TITLE>>\"", "null") val GrobidStringWithoutTitle = GrobidString.replace("title", "nottitle") val MalformedGrobidString = GrobidString.replace("}", "") val Key = "Dummy Key" @@ -69,13 +72,18 @@ class GrobidScorableTest extends FlatSpec with Matchers { result.slug shouldBe Scorable.NoSlug } + it should "handle null title" in { + val result = GrobidScorable.jsonToMapFeatures(Key, GrobidStringWithNullTitle) + result.slug shouldBe Scorable.NoSlug + } + it should "handle missing title" in { val result = GrobidScorable.jsonToMapFeatures(Key, GrobidStringWithoutTitle) result.slug shouldBe Scorable.NoSlug } it should "handle valid input" in { - val result = GrobidScorable.jsonToMapFeatures(Key, GrobidStringWithTitle) + val result = GrobidScorable.jsonToMapFeatures(Key, GrobidStringWithGoodTitle) result.slug shouldBe "dummyexamplefile" Scorable.jsonToMap(result.json) match { case None => fail() @@ -85,4 +93,28 @@ class GrobidScorableTest extends FlatSpec with Matchers { } } } + + "GrobidScorable.keepRecord()" should "return true for valid JSON with title" in { + GrobidScorable.keepRecord(GrobidStringWithGoodTitle) shouldBe true + } + + it should "return true for valid JSON with a title of maximum permitted length" in { + GrobidScorable.keepRecord(GrobidStringWithMaximumTitle) shouldBe true + } + + it should "return false for valid JSON with excessively long title" in { + GrobidScorable.keepRecord(GrobidStringWithExcessiveTitle) shouldBe false + } + + it should "return false for valid JSON with null title" in { + GrobidScorable.keepRecord(GrobidStringWithNullTitle) shouldBe false + } + + it should "return false for valid JSON with no title" in { + GrobidScorable.keepRecord(GrobidStringWithoutTitle) shouldBe false + } + + it should "return false for invalid JSON" in { + GrobidScorable.keepRecord(GrobidStringWithoutTitle) shouldBe false + } } diff --git a/scalding/src/test/scala/sandcrawler/HBaseStatusCodeCountTest.scala b/scalding/src/test/scala/sandcrawler/HBaseStatusCodeCountTest.scala new file mode 100644 index 0000000..d2cf9de --- /dev/null +++ b/scalding/src/test/scala/sandcrawler/HBaseStatusCodeCountTest.scala @@ -0,0 +1,71 @@ +package sandcrawler + +import cascading.tuple.Fields +import cascading.tuple.Tuple +import com.twitter.scalding.JobTest +import com.twitter.scalding.Tsv +import com.twitter.scalding.TupleConversions +import com.twitter.scalding.TypedTsv +import org.apache.hadoop.hbase.io.ImmutableBytesWritable +import org.apache.hadoop.hbase.util.Bytes +import org.junit.runner.RunWith +import org.scalatest.FunSpec +import org.scalatest.junit.JUnitRunner +import org.slf4j.LoggerFactory +import parallelai.spyglass.hbase.HBaseConstants.SourceMode +import parallelai.spyglass.hbase.HBaseSource +import scala._ + +@RunWith(classOf[JUnitRunner]) +class HBaseStatusCodeCountTest extends FunSpec with TupleConversions { + + val output = "/tmp/testOutput" + val (testTable, testHost) = ("test-table", "dummy-host:2181") + + val log = LoggerFactory.getLogger(this.getClass.getName) + + val statusType1 : Long = 200 + val statusType2 : Long = 404 + val statusType1Bytes = Bytes.toBytes(statusType1) + val statusType2Bytes = Bytes.toBytes(statusType2) + + // TODO(bnewbold): now to express a null (empty value) in this list? + val sampleData : List[List[Array[Byte]]] = List( + ("sha1:K2DKSSVTXWPRMFDTWSTCQW3RVWRIOV3Q", statusType1Bytes), + ("sha1:C3YNNEGH5WAG5ZAAXWAEBNXJWT6CZ3WU", statusType1Bytes), + ("sha1:SDKUVHC3YNNEGH5WAG5ZAAXWAEBNX4WT", statusType2Bytes), + ("sha1:35985C3YNNEGH5WAG5ZAAXWAEBNXJW56", statusType2Bytes), + ("sha1:885C3YNNEGH5WAG5ZAAXWA8BNXJWT6CZ", statusType2Bytes), + ("sha1:00904C3YNNEGH5WAG5ZA9XWAEBNXJWT6", statusType2Bytes), + ("sha1:249C3YNNEGH5WAG5ZAAXWAEBNXJWT6CZ", statusType1Bytes), + ("sha1:095893C3YNNEGH5WAG5ZAAXWAEBNXJWT", statusType2Bytes)) + .map(pair => List(Bytes.toBytes(pair._1), pair._2)) + + val statusType1Count = sampleData.count(lst => lst(1) == statusType1Bytes) + val statusType2Count = sampleData.count(lst => lst(1) == statusType2Bytes) + + JobTest("sandcrawler.HBaseStatusCodeCountJob") + .arg("test", "") + .arg("app.conf.path", "app.conf") + .arg("output", output) + .arg("hbase-table", testTable) + .arg("zookeeper-hosts", testHost) + .arg("debug", "true") + .source[Tuple](HBaseCountJob.getHBaseSource(testTable, testHost, "grobid0:status_code"), + sampleData.map(l => new Tuple(l.map(s => {new ImmutableBytesWritable(s)}):_*))) + .sink[Tuple](TypedTsv[(Long, Long)](output)) { + outputBuffer => + it("should return a correct number of elements.") { + assert(outputBuffer.size === 2) + } + + // Convert List[Tuple] to Map[Long, Long]. + val counts = outputBuffer.map(t => (t.getLong(0), t.getLong(1))).toMap + it("should have the appropriate number of each status type") { + assert(counts(statusType1) == statusType1Count) + assert(counts(statusType2) == statusType2Count) + } + } + .run + .finish +} diff --git a/scalding/src/test/scala/sandcrawler/HBaseStatusCountTest.scala b/scalding/src/test/scala/sandcrawler/HBaseStatusCountTest.scala index 3291670..7e91af3 100644 --- a/scalding/src/test/scala/sandcrawler/HBaseStatusCountTest.scala +++ b/scalding/src/test/scala/sandcrawler/HBaseStatusCountTest.scala @@ -24,10 +24,8 @@ class HBaseStatusCountTest extends FunSpec with TupleConversions { val log = LoggerFactory.getLogger(this.getClass.getName) - val statusType1 : Long = 200 - val statusType2 : Long = 404 - val statusType1Bytes = Bytes.toBytes(statusType1) - val statusType2Bytes = Bytes.toBytes(statusType2) + val statusType1Bytes = Bytes.toBytes("""{"status": "success"}""") + val statusType2Bytes = Bytes.toBytes("""{"status": "partial"}""") // TODO(bnewbold): now to express a null (empty value) in this list? val sampleData : List[List[Array[Byte]]] = List( @@ -51,20 +49,13 @@ class HBaseStatusCountTest extends FunSpec with TupleConversions { .arg("hbase-table", testTable) .arg("zookeeper-hosts", testHost) .arg("debug", "true") - .source[Tuple](HBaseCountJob.getHBaseSource(testTable, testHost, "grobid0:status_code"), + .source[Tuple](HBaseCountJob.getHBaseSource(testTable, testHost, "grobid0:status"), sampleData.map(l => new Tuple(l.map(s => {new ImmutableBytesWritable(s)}):_*))) - .sink[Tuple](TypedTsv[(Long, Long)](output)) { + .sink[Tuple](TypedTsv[(String, Long)](output)) { outputBuffer => it("should return a 2-element list.") { assert(outputBuffer.size === 2) } - - // Convert List[Tuple] to Map[Long, Long]. - val counts = outputBuffer.map(t => (t.getLong(0), t.getLong(1))).toMap - it("should have the appropriate number of each status type") { - assert(counts(statusType1) == statusType1Count) - assert(counts(statusType2) == statusType2Count) - } } .run .finish diff --git a/scalding/src/test/scala/sandcrawler/ScorableFeaturesTest.scala b/scalding/src/test/scala/sandcrawler/ScorableFeaturesTest.scala index 8a293fe..d742384 100644 --- a/scalding/src/test/scala/sandcrawler/ScorableFeaturesTest.scala +++ b/scalding/src/test/scala/sandcrawler/ScorableFeaturesTest.scala @@ -1,9 +1,14 @@ package sandcrawler +import java.io.InputStream + +import scala.io.Source + import org.scalatest._ // scalastyle:off null class ScorableFeaturesTest extends FlatSpec with Matchers { + private def titleToSlug(s : String) : String = { ScorableFeatures.create(title = s).toSlug } @@ -44,7 +49,7 @@ class ScorableFeaturesTest extends FlatSpec with Matchers { } it should "strip special characters" in { - titleToSlug(":;!',|\"\'`.#?!-@*/\\=+~%$^{}()[]<>-_") shouldBe Scorable.NoSlug + titleToSlug(":;!',|\"\'`.#?!-@*/\\=+~%$^{}()[]<>-_’·“”‘’“”«»「」") shouldBe Scorable.NoSlug // TODO: titleToSlug("©™₨№…") shouldBe Scorable.NoSlug // TODO: titleToSlug("πµΣσ") shouldBe Scorable.NoSlug } diff --git a/scalding/src/test/scala/sandcrawler/ScoreJobTest.scala b/scalding/src/test/scala/sandcrawler/ScoreJobTest.scala index 55ae614..85d141a 100644 --- a/scalding/src/test/scala/sandcrawler/ScoreJobTest.scala +++ b/scalding/src/test/scala/sandcrawler/ScoreJobTest.scala @@ -117,6 +117,7 @@ class ScoreJobTest extends FlatSpec with Matchers { } """ // scalastyle:on + val TooLongOfTitle = "X" * Scorable.MaxTitleLength + "Y" // arbitrary long string val CrossrefStringWithTitle = CrossrefString.replace("<<TITLE>>", "SomeTitle") val CrossrefStringWithoutTitle = CrossrefString.replace("title", "nottitle") val MalformedCrossrefString = CrossrefString.replace("}", "") @@ -124,7 +125,8 @@ class ScoreJobTest extends FlatSpec with Matchers { CrossrefString.replace("<<TITLE>>", "Title 2: TNG").replace("<<DOI>>", "DOI-0"), CrossrefString.replace("<<TITLE>>", "Title 1: TNG 2A").replace("<<DOI>>", "DOI-0.5"), CrossrefString.replace("<<TITLE>>", "Title 1: TNG 3").replace("<<DOI>>", "DOI-0.75"), - CrossrefString.replace("<<TITLE>>", "Title 2: Rebooted").replace("<<DOI>>", "DOI-1")) + CrossrefString.replace("<<TITLE>>", "Title 2: Rebooted").replace("<<DOI>>", "DOI-1"), + CrossrefString.replace("<<TITLE>>", TooLongOfTitle).replace("<<DOI>>", "DOI-1")) // Pipeline tests val output = "/tmp/testOutput" @@ -137,7 +139,8 @@ class ScoreJobTest extends FlatSpec with Matchers { "sha1:SDKUVHC3YNNEGH5WAG5ZAAXWAEBNX4WT", "sha1:35985C3YNNEGH5WAG5ZAAXWAEBNXJW56", "sha1:93187A85273589347598473894839443", - "sha1:024937534094897039547e9824382943") + "sha1:024937534094897039547e9824382943", + "sha1:93229759932857982837892347893892") val JsonStrings : List[String] = List( JsonString.replace("<<TITLE>>", "Title 1"), @@ -147,13 +150,15 @@ class ScoreJobTest extends FlatSpec with Matchers { JsonString.replace("<<TITLE>>", "Title 1"), MalformedJsonString, // This will have bad status. - JsonString.replace("<<TITLE>>", "Title 2") + JsonString.replace("<<TITLE>>", "Title 2"), + // This is in both sources but too long. + JsonString.replace("<<TITLE>>", TooLongOfTitle) ) // bnewbold: status codes aren't strings, they are uint64 val Ok : Long = 200 val Bad : Long = 400 - val StatusCodes = List(Ok, Ok, Ok, Bad, Ok, Bad) + val StatusCodes = List(Ok, Ok, Ok, Bad, Ok, Bad, Ok) val SampleDataHead : List[Tuple] = (Sha1Strings, JsonStrings, StatusCodes) .zipped @@ -163,6 +168,7 @@ class ScoreJobTest extends FlatSpec with Matchers { // scalastyle:off null // Add example of lines without GROBID data + // scalastyle:off null val SampleData = SampleDataHead :+ new Tuple( new ImmutableBytesWritable(Bytes.toBytes("sha1:35985C3YNNEGH5WAG5ZAA88888888888")), null, null) // scalastyle:on null @@ -180,17 +186,21 @@ class ScoreJobTest extends FlatSpec with Matchers { 0 -> CrossrefStrings(0), 1 -> CrossrefStrings(1), 2 -> CrossrefStrings(2), - 3 -> CrossrefStrings(3))) + 3 -> CrossrefStrings(3), + 4 -> CrossrefStrings(4))) + .sink[(String, ReduceFeatures)](TypedTsv[(String, ReduceFeatures)](output + ".trapped")) { _ => () } .sink[(String, Int, String, String)](TypedTsv[(String, Int, String, String)](output)) { // Grobid titles and slugs (in parentheses): // Title 1 (title1) // Title 2: TNG (title2tng) // Title 3: The Sequel (title3thesequel) + // <too long of a title> // crossref titles and slugs (in parentheses): // Title 2: TNG (title2tng) // Title 1: TNG 2A (title1tng2a) // Title 1: TNG 3 (title1tng3) // Title 2: Rebooted (title2rebooted) + // <too long of a title> // XXX: Join should have 3 "title1" slugs and 1 "title2tng" slug outputBuffer => "The pipeline" should "return a 1-element list" in { |