diff options
Diffstat (limited to 'scalding')
14 files changed, 443 insertions, 65 deletions
| diff --git a/scalding/build.sbt b/scalding/build.sbt index d477399..01f55ca 100644 --- a/scalding/build.sbt +++ b/scalding/build.sbt @@ -42,7 +42,7 @@ lazy val root = (project in file(".")).      libraryDependencies += "org.apache.hadoop" % "hadoop-client" % hadoopVersion,      libraryDependencies += "org.apache.hadoop" % "hadoop-mapreduce-client-jobclient" % hadoopVersion classifier "tests",      libraryDependencies += "org.apache.hbase" % "hbase-common" % hbaseVersion, -    libraryDependencies += "parallelai" % "parallelai.spyglass" % "2.11_0.17.2_cdh5.3.1", +    libraryDependencies += "parallelai" % "parallelai.spyglass" % "2.11_0.17.2_cdh5.3.1-p1",      // cargo-culted from twitter/scalding's build.sbt      // hint via https://stackoverflow.com/questions/23280494/sbt-assembly-error-deduplicate-different-file-contents-found-in-the-following#23280952 diff --git a/scalding/src/main/resources/slug-blacklist.txt b/scalding/src/main/resources/slug-blacklist.txt new file mode 100644 index 0000000..7dc701f --- /dev/null +++ b/scalding/src/main/resources/slug-blacklist.txt @@ -0,0 +1,34 @@ +abbreviations +abstract +acknowledgements +article +authorreply +authorsreply +bookreview +bookreviews +casereport +commentary +commentaryon +commenton +commentto +contents +correspondence +dedication +editorialadvisoryboard +focus +hypothesis +inbrief +introduction +introductiontotheissue +lettertotheeditor +listofabbreviations +note +overview +preface +references +results +review +reviewarticle +summary +title +name diff --git a/scalding/src/main/scala/sandcrawler/CrossrefScorable.scala b/scalding/src/main/scala/sandcrawler/CrossrefScorable.scala index ff8201a..5d1eaf5 100644 --- a/scalding/src/main/scala/sandcrawler/CrossrefScorable.scala +++ b/scalding/src/main/scala/sandcrawler/CrossrefScorable.scala @@ -35,7 +35,7 @@ object CrossrefScorable {              new MapFeatures(Scorable.NoSlug, json)            } else {              // bnewbold: not checking that titles(0) is non-null/non-empty; case would be, in JSON, "title": [ null ] -            val sf : ScorableFeatures = new ScorableFeatures(title=titles(0), doi=doi) +            val sf : ScorableFeatures = ScorableFeatures.create(title=titles(0), doi=doi)              new MapFeatures(sf.toSlug, sf.toString)            }          } else { diff --git a/scalding/src/main/scala/sandcrawler/GrobidScorable.scala b/scalding/src/main/scala/sandcrawler/GrobidScorable.scala index 9a09e05..e510f75 100644 --- a/scalding/src/main/scala/sandcrawler/GrobidScorable.scala +++ b/scalding/src/main/scala/sandcrawler/GrobidScorable.scala @@ -24,10 +24,10 @@ class GrobidScorable extends Scorable with HBasePipeConversions {      getSource(args)        .read        // Can't just "fromBytesWritable" because we have multiple types? -      .toTypedPipe[(ImmutableBytesWritable,ImmutableBytesWritable,ImmutableBytesWritable)](new Fields("key", "tei_json", "status_code")) -      .filter { case (_, tei_json, status_code) => tei_json != null && status_code != null } -      .map { case (key, tei_json, status_code) => -        (Bytes.toString(key.copyBytes()), Bytes.toString(tei_json.copyBytes()), Bytes.toLong(status_code.copyBytes())) +      .toTypedPipe[(ImmutableBytesWritable,ImmutableBytesWritable,ImmutableBytesWritable)](new Fields("key", "metadata", "status_code")) +      .filter { case (_, metadata, status_code) => metadata != null && status_code != null } +      .map { case (key, metadata, status_code) => +        (Bytes.toString(key.copyBytes()), Bytes.toString(metadata.copyBytes()), Bytes.toLong(status_code.copyBytes()))        }        // TODO: Should I combine next two stages for efficiency?        .collect { case (key, json, StatusOK) => (key, json) } @@ -37,7 +37,7 @@ class GrobidScorable extends Scorable with HBasePipeConversions {  object GrobidScorable {    def getHBaseSource(table : String, host : String) : HBaseSource = { -    HBaseBuilder.build(table, host, List("grobid0:tei_json", "grobid0:status_code"), SourceMode.SCAN_ALL) +    HBaseBuilder.build(table, host, List("grobid0:metadata", "grobid0:status_code"), SourceMode.SCAN_ALL)    }    def jsonToMapFeatures(key : String, json : String) : MapFeatures = { @@ -45,7 +45,7 @@ object GrobidScorable {        case None => MapFeatures(Scorable.NoSlug, json)        case Some(map) => {          if (map contains "title") { -          new ScorableFeatures(Scorable.getString(map, "title"), sha1=key).toMapFeatures +          ScorableFeatures.create(title=Scorable.getString(map, "title"), sha1=key).toMapFeatures          } else {            MapFeatures(Scorable.NoSlug, json)          } diff --git a/scalding/src/main/scala/sandcrawler/GrobidScorableDumpJob.scala b/scalding/src/main/scala/sandcrawler/GrobidScorableDumpJob.scala new file mode 100644 index 0000000..468b68e --- /dev/null +++ b/scalding/src/main/scala/sandcrawler/GrobidScorableDumpJob.scala @@ -0,0 +1,62 @@ + +package sandcrawler + +import cascading.flow.FlowDef +import cascading.pipe.Pipe +import cascading.tuple.Fields +import com.twitter.scalding._ +import com.twitter.scalding._ +import com.twitter.scalding.typed.TDsl._ +import com.twitter.scalding.typed.TDsl._ +import org.apache.hadoop.hbase.io.ImmutableBytesWritable +import org.apache.hadoop.hbase.util.Bytes +import parallelai.spyglass.base.JobBase +import parallelai.spyglass.hbase.HBaseConstants.SourceMode +import parallelai.spyglass.hbase.HBasePipeConversions +import parallelai.spyglass.hbase.HBaseSource + +class GrobidScorableDumpJob(args: Args) extends JobBase(args) { + +  val grobidHbaseRows = Stat("hbase-rows-scanned", "hbase-grobid-dump") +  val filteredGrobidRows = Stat("grobid-rows-filtered", "hbase-grobid-dump") +  val parsedGrobidRows = Stat("grobid-rows-parsed", "hbase-grobid-dump") +  val validGrobidRows = Stat("grobid-rows-valid-slug", "hbase-grobid-dump") + +  val pipe = GrobidScorable.getHBaseSource(args("hbase-table"), args("zookeeper-hosts")) +    .read +    // Can't just "fromBytesWritable" because we have multiple types? +    .toTypedPipe[(ImmutableBytesWritable,ImmutableBytesWritable,ImmutableBytesWritable)](new Fields("key", "metadata", "status_code")) +    .filter { case (_, metadata, status_code) => +      grobidHbaseRows.inc +      metadata != null && status_code != null +    } +    .map { case (key, metadata, status_code) => +      (Bytes.toString(key.copyBytes()), Bytes.toString(metadata.copyBytes()), Bytes.toLong(status_code.copyBytes())) +    } +    // TODO: Should I combine next two stages for efficiency? +    .collect { case (key, json, 200) => +      filteredGrobidRows.inc +      (key, json) +    } +    .map { entry : (String, String) => +      parsedGrobidRows.inc +      GrobidScorable.jsonToMapFeatures(entry._1, entry._2) +    } +    .filter { entry => Scorable.isValidSlug(entry.slug) } +    .map { entry => +      validGrobidRows.inc +      entry +    } +    // XXX: this groupBy after the map? +    .groupBy { case MapFeatures(slug, json) => slug } +    .map { tuple => +      val (slug : String, features : MapFeatures) = tuple +      (slug, ReduceFeatures(features.json)) +    } + +  pipe +    .map { case (slug, features) => +      (slug, features.json) +    } +    .write(TypedTsv[(String, String)](args("output"))) +} diff --git a/scalding/src/main/scala/sandcrawler/HBaseColCountJob.scala b/scalding/src/main/scala/sandcrawler/HBaseColCountJob.scala new file mode 100644 index 0000000..20cc7a1 --- /dev/null +++ b/scalding/src/main/scala/sandcrawler/HBaseColCountJob.scala @@ -0,0 +1,37 @@ +package sandcrawler + +import java.util.Properties + +import cascading.property.AppProps +import cascading.tuple.Fields +import com.twitter.scalding._ +import parallelai.spyglass.base.JobBase +import parallelai.spyglass.hbase.HBaseConstants.SourceMode +import parallelai.spyglass.hbase.HBasePipeConversions +import parallelai.spyglass.hbase.HBaseSource + +class HBaseColCountJob(args: Args) extends JobBase(args) with HBasePipeConversions { + +  val output = args("output") + +  HBaseColCountJob.getHBaseSource( +    args("hbase-table"), +    args("zookeeper-hosts"), +    args("column")) +    .read +    .debug +    .groupAll { _.size('count) } +    .write(Tsv(output)) +} + +object HBaseColCountJob { + +  // eg, "wbgrp-journal-extract-0-qa",7 "mtrcs-zk1.us.archive.org:2181" +  def getHBaseSource(hbaseTable: String, zookeeperHosts: String, col: String) : HBaseSource = { +    HBaseBuilder.build( +      hbaseTable, +      zookeeperHosts, +      List(col), +      SourceMode.SCAN_ALL) +  } +} diff --git a/scalding/src/main/scala/sandcrawler/HBaseStatusCodeCountJob.scala b/scalding/src/main/scala/sandcrawler/HBaseStatusCodeCountJob.scala new file mode 100644 index 0000000..4d9880f --- /dev/null +++ b/scalding/src/main/scala/sandcrawler/HBaseStatusCodeCountJob.scala @@ -0,0 +1,32 @@ +package sandcrawler + +import java.util.Properties + +import cascading.property.AppProps +import cascading.tuple.Fields +import com.twitter.scalding._ +import com.twitter.scalding.typed.TDsl._ +import org.apache.hadoop.hbase.io.ImmutableBytesWritable +import org.apache.hadoop.hbase.util.Bytes +import parallelai.spyglass.base.JobBase +import parallelai.spyglass.hbase.HBaseConstants.SourceMode +import parallelai.spyglass.hbase.HBasePipeConversions +import parallelai.spyglass.hbase.HBaseSource + +class HBaseStatusCodeCountJob(args: Args) extends JobBase(args) with HBasePipeConversions { + +  val source = HBaseCountJob.getHBaseSource( +    args("hbase-table"), +    args("zookeeper-hosts"), +    "grobid0:status_code") + +  val statusPipe : TypedPipe[Long] = source +    .read +    .toTypedPipe[(ImmutableBytesWritable,ImmutableBytesWritable)]('key, 'status_code) +    .map { case (key, raw_code) => Bytes.toLong(raw_code.copyBytes()) } + +  statusPipe.groupBy { identity } +    .size +    .debug +    .write(TypedTsv[(Long,Long)](args("output"))) +} diff --git a/scalding/src/main/scala/sandcrawler/HBaseStatusCountJob.scala b/scalding/src/main/scala/sandcrawler/HBaseStatusCountJob.scala index fd0b4e2..f79d672 100644 --- a/scalding/src/main/scala/sandcrawler/HBaseStatusCountJob.scala +++ b/scalding/src/main/scala/sandcrawler/HBaseStatusCountJob.scala @@ -18,15 +18,15 @@ class HBaseStatusCountJob(args: Args) extends JobBase(args) with HBasePipeConver    val source = HBaseCountJob.getHBaseSource(      args("hbase-table"),      args("zookeeper-hosts"), -    "grobid0:status_code") +    "grobid0:status") -  val statusPipe : TypedPipe[Long] = source +  val statusPipe : TypedPipe[String] = source      .read -    .toTypedPipe[(ImmutableBytesWritable,ImmutableBytesWritable)]('key, 'status_code) -    .map { case (key, raw_code) => Bytes.toLong(raw_code.copyBytes()) } +    .toTypedPipe[(ImmutableBytesWritable,ImmutableBytesWritable)]('key, 'status) +    .map { case (key, raw_status) => Bytes.toString(raw_status.copyBytes()) }    statusPipe.groupBy { identity }      .size      .debug -    .write(TypedTsv[(Long,Long)](args("output"))) +    .write(TypedTsv[(String,Long)](args("output")))  } diff --git a/scalding/src/main/scala/sandcrawler/ScorableFeatures.scala b/scalding/src/main/scala/sandcrawler/ScorableFeatures.scala index 8ed3369..0b9868a 100644 --- a/scalding/src/main/scala/sandcrawler/ScorableFeatures.scala +++ b/scalding/src/main/scala/sandcrawler/ScorableFeatures.scala @@ -1,31 +1,35 @@  package sandcrawler +import java.io.InputStream + +import scala.io.Source  import scala.util.parsing.json.JSONObject +object ScorableFeatures { +  // TODO: Add exception handling. +  val fileStream : InputStream = getClass.getResourceAsStream("/slug-blacklist.txt") +  val SlugBlacklist : Set[String] = Source.fromInputStream(fileStream).getLines.toSet +  fileStream.close + +  // Static factory method +  def create(title : String, year : Int = 0, doi : String = "", sha1 : String = "") : ScorableFeatures = { +    new ScorableFeatures( +      title=if (title == null) "" else title, +      year=year, +      doi=if (doi == null) "" else doi, +      sha1=if (sha1 == null) "" else sha1) +  } +}  // Contains features needed to make slug and to score (in combination -// with a second ScorableFeatures). -class ScorableFeatures(title : String, year: Int = 0, doi : String = "", sha1: String = "") { - -  val slugBlacklist = Set( "abbreviations", "abstract", "acknowledgements", -    "article", "authorreply", "authorsreply", "bookreview", "bookreviews", -    "casereport", "commentary", "commentaryon", "commenton", "commentto", -    "contents", "correspondence", "dedication", "editorialadvisoryboard", -    "focus", "hypothesis", "inbrief", "introduction", "introductiontotheissue", -    "lettertotheeditor", "listofabbreviations", "note", "overview", "preface", -    "references", "results", "review", "reviewarticle", "summary", "title", -    "name") - -  def toMap() : Map[String, Any] = { -    Map("title" -> (if (title == null) "" else title), -        "year" -> year, -        "doi" -> (if (doi == null) "" else doi), -        "sha1" -> (if (sha1 == null) "" else sha1)) -  } +// with a second ScorableFeatures). Create with above static factory method. +class ScorableFeatures private(title : String, year: Int = 0, doi : String = "", sha1: String = "") { -  override def toString() : String = { -    JSONObject(toMap()).toString -  } +  def toMap() : Map[String, Any] = +    Map("title" -> title, "year" -> year, "doi" -> doi, "sha1" -> sha1) + +  override def toString() : String = +    JSONObject(toMap).toString    def toSlug() : String = {      if (title == null) { @@ -34,11 +38,10 @@ class ScorableFeatures(title : String, year: Int = 0, doi : String = "", sha1: S        val unaccented = StringUtilities.removeAccents(title)        // Remove punctuation        val slug = StringUtilities.removePunctuation((unaccented.toLowerCase())).replaceAll("\\s", "") -      if (slug.isEmpty || slug == null || (slugBlacklist contains slug)) Scorable.NoSlug else slug +      if (slug.isEmpty || slug == null || (ScorableFeatures.SlugBlacklist contains slug)) Scorable.NoSlug else slug      }    } -  def toMapFeatures = { +  def toMapFeatures : MapFeatures =      MapFeatures(toSlug, toString) -  }  } diff --git a/scalding/src/test/scala/sandcrawler/GrobidScorableDumpJobTest.scala b/scalding/src/test/scala/sandcrawler/GrobidScorableDumpJobTest.scala new file mode 100644 index 0000000..12e13dc --- /dev/null +++ b/scalding/src/test/scala/sandcrawler/GrobidScorableDumpJobTest.scala @@ -0,0 +1,124 @@ + +package sandcrawler + +import cascading.tuple.Fields +import cascading.tuple.Tuple +import com.twitter.scalding.JobTest +import com.twitter.scalding.TextLine +import com.twitter.scalding.TupleConversions +import com.twitter.scalding.TypedTsv +import org.apache.hadoop.hbase.io.ImmutableBytesWritable +import org.apache.hadoop.hbase.util.Bytes +import org.scalatest._ +import parallelai.spyglass.hbase.HBaseConstants.SourceMode + +class GrobidScorableDumpJobTest extends FlatSpec with Matchers { +  //scalastyle:off +  val JsonString = """ +{ +  "title": "<<TITLE>>", +  "authors": [ +    {"name": "Brewster Kahle"}, +    {"name": "J Doe"} +  ], +  "journal": { +    "name": "Dummy Example File. Journal of Fake News. pp. 1-2. ISSN 1234-5678", +    "eissn": null, +    "issn": null, +    "issue": null, +    "publisher": null, +    "volume": null +  }, +  "date": "2000", +  "doi": null, +  "citations": [ +    { "authors": [{"name": "A Seaperson"}], +      "date": "2001", +      "id": "b0", +      "index": 0, +      "issue": null, +      "journal": "Letters in the Alphabet", +      "publisher": null, +      "title": "Everything is Wonderful", +      "url": null, +      "volume": "20"}, +    { "authors": [], +      "date": "2011-03-28", +      "id": "b1", +      "index": 1, +      "issue": null, +      "journal": "The Dictionary", +      "publisher": null, +      "title": "All about Facts", +      "url": null, +      "volume": "14"} +  ], +  "abstract": "Everything you ever wanted to know about nothing", +  "body": "Introduction \nEverything starts somewhere, as somebody [1]  once said. \n\n In Depth \n Meat \nYou know, for kids. \n Potatos \nQED.", +  "acknowledgement": null, +  "annex": null +} +""" +  // scalastyle:on +  val JsonStringWithTitle = JsonString.replace("<<TITLE>>", "Dummy Example File") +  val JsonStringWithoutTitle = JsonString.replace("title", "nottitle") +  val MalformedJsonString = JsonString.replace("}", "") + +  //  Pipeline tests +  val output = "/tmp/testOutput" +  val input = "/tmp/testInput" +  val (testTable, testHost) = ("test-table", "dummy-host:2181") + +  val Sha1Strings : List[String] = List( +    "sha1:K2DKSSVTXWPRMFDTWSTCQW3RVWRIOV3Q",  // good +    "sha1:C3YNNEGH5WAG5ZAAXWAEBNXJWT6CZ3WU",  // good +    "sha1:SDKUVHC3YNNEGH5WAG5ZAAXWAEBNX4WT",  // good +    "sha1:35985C3YNNEGH5WAG5ZAAXWAEBNXJW56",  // bad status +    "sha1:93187A85273589347598473894839443",  // malformed +    "sha1:024937534094897039547e9824382943")  // bad status + +  val JsonStrings : List[String] = List( +    JsonString.replace("<<TITLE>>", "Title 1"), +    JsonString.replace("<<TITLE>>", "Title 2: TNG"), +    JsonString.replace("<<TITLE>>", "Title 3: The Sequel"), +    // This will have bad status. +    JsonString.replace("<<TITLE>>", "Title 1"), +    MalformedJsonString, +    // This will have bad status. +    JsonString.replace("<<TITLE>>", "Title 2") +  ) + +  // bnewbold: status codes aren't strings, they are uint64 +  val Ok : Long = 200 +  val Bad : Long = 400 +  val StatusCodes = List(Ok, Ok, Ok, Bad, Ok, Bad) + +  val SampleDataHead : List[Tuple] = (Sha1Strings, JsonStrings, StatusCodes) +    .zipped +    .toList +    .map { case (sha, json, status) => List(Bytes.toBytes(sha), Bytes.toBytes(json), Bytes.toBytes(status)) } +    .map { l => new Tuple(l.map(s => {new ImmutableBytesWritable(s)}):_*) } + +  // scalastyle:off null +  // Add example of lines without GROBID data +  val SampleData = SampleDataHead :+ new Tuple( +    new ImmutableBytesWritable(Bytes.toBytes("sha1:35985C3YNNEGH5WAG5ZAA88888888888")), null, null) +  // scalastyle:on null + +  JobTest("sandcrawler.GrobidScorableDumpJob") +    .arg("test", "") +    .arg("app.conf.path", "app.conf") +    .arg("output", output) +    .arg("hbase-table", testTable) +    .arg("zookeeper-hosts", testHost) +    .arg("debug", "true") +    .source[Tuple](GrobidScorable.getHBaseSource(testTable, testHost), SampleData) +    .sink[(String, String)](TypedTsv[(String, String)](output)) { +      outputBuffer => +      "The pipeline" should "return correct-length list" in { +        outputBuffer should have length 3 +      } +    } +    .run +    .finish +} diff --git a/scalding/src/test/scala/sandcrawler/HBaseStatusCodeCountTest.scala b/scalding/src/test/scala/sandcrawler/HBaseStatusCodeCountTest.scala new file mode 100644 index 0000000..d2cf9de --- /dev/null +++ b/scalding/src/test/scala/sandcrawler/HBaseStatusCodeCountTest.scala @@ -0,0 +1,71 @@ +package sandcrawler + +import cascading.tuple.Fields +import cascading.tuple.Tuple +import com.twitter.scalding.JobTest +import com.twitter.scalding.Tsv +import com.twitter.scalding.TupleConversions +import com.twitter.scalding.TypedTsv +import org.apache.hadoop.hbase.io.ImmutableBytesWritable +import org.apache.hadoop.hbase.util.Bytes +import org.junit.runner.RunWith +import org.scalatest.FunSpec +import org.scalatest.junit.JUnitRunner +import org.slf4j.LoggerFactory +import parallelai.spyglass.hbase.HBaseConstants.SourceMode +import parallelai.spyglass.hbase.HBaseSource +import scala._ + +@RunWith(classOf[JUnitRunner]) +class HBaseStatusCodeCountTest extends FunSpec with TupleConversions { + +  val output = "/tmp/testOutput" +  val (testTable, testHost) = ("test-table", "dummy-host:2181") + +  val log = LoggerFactory.getLogger(this.getClass.getName) + +  val statusType1 : Long = 200 +  val statusType2 : Long = 404 +  val statusType1Bytes = Bytes.toBytes(statusType1) +  val statusType2Bytes = Bytes.toBytes(statusType2) + +  // TODO(bnewbold): now to express a null (empty value) in this list? +    val sampleData : List[List[Array[Byte]]] = List( +      ("sha1:K2DKSSVTXWPRMFDTWSTCQW3RVWRIOV3Q", statusType1Bytes), +      ("sha1:C3YNNEGH5WAG5ZAAXWAEBNXJWT6CZ3WU", statusType1Bytes), +      ("sha1:SDKUVHC3YNNEGH5WAG5ZAAXWAEBNX4WT", statusType2Bytes), +      ("sha1:35985C3YNNEGH5WAG5ZAAXWAEBNXJW56", statusType2Bytes), +      ("sha1:885C3YNNEGH5WAG5ZAAXWA8BNXJWT6CZ", statusType2Bytes), +      ("sha1:00904C3YNNEGH5WAG5ZA9XWAEBNXJWT6", statusType2Bytes), +      ("sha1:249C3YNNEGH5WAG5ZAAXWAEBNXJWT6CZ", statusType1Bytes), +      ("sha1:095893C3YNNEGH5WAG5ZAAXWAEBNXJWT", statusType2Bytes)) +        .map(pair => List(Bytes.toBytes(pair._1), pair._2)) + +  val statusType1Count = sampleData.count(lst => lst(1) == statusType1Bytes) +  val statusType2Count = sampleData.count(lst => lst(1) == statusType2Bytes) + +  JobTest("sandcrawler.HBaseStatusCodeCountJob") +    .arg("test", "") +    .arg("app.conf.path", "app.conf") +    .arg("output", output) +    .arg("hbase-table", testTable) +    .arg("zookeeper-hosts", testHost) +    .arg("debug", "true") +    .source[Tuple](HBaseCountJob.getHBaseSource(testTable, testHost, "grobid0:status_code"), +      sampleData.map(l => new Tuple(l.map(s => {new ImmutableBytesWritable(s)}):_*))) +    .sink[Tuple](TypedTsv[(Long, Long)](output)) { +      outputBuffer => +      it("should return a correct number of elements.") { +        assert(outputBuffer.size === 2) +      } + +      // Convert List[Tuple] to Map[Long, Long]. +      val counts = outputBuffer.map(t => (t.getLong(0), t.getLong(1))).toMap +      it("should have the appropriate number of each status type") { +        assert(counts(statusType1) == statusType1Count) +        assert(counts(statusType2) == statusType2Count) +      } +    } +    .run +    .finish +} diff --git a/scalding/src/test/scala/sandcrawler/HBaseStatusCountTest.scala b/scalding/src/test/scala/sandcrawler/HBaseStatusCountTest.scala index 0da0b9c..7e91af3 100644 --- a/scalding/src/test/scala/sandcrawler/HBaseStatusCountTest.scala +++ b/scalding/src/test/scala/sandcrawler/HBaseStatusCountTest.scala @@ -24,22 +24,20 @@ class HBaseStatusCountTest extends FunSpec with TupleConversions {    val log = LoggerFactory.getLogger(this.getClass.getName) -  val statusType1 : Long = 200 -  val statusType2 : Long = 404 -  val statusType1Bytes = Bytes.toBytes(statusType1) -  val statusType2Bytes = Bytes.toBytes(statusType2) - -  val sampleData : List[List[Array[Byte]]] = List( -    // TODO(bnewbold): now to express a null (empty value) in this list? -    List(Bytes.toBytes("sha1:K2DKSSVTXWPRMFDTWSTCQW3RVWRIOV3Q"), statusType1Bytes), -    List(Bytes.toBytes("sha1:C3YNNEGH5WAG5ZAAXWAEBNXJWT6CZ3WU"), statusType1Bytes), -    List(Bytes.toBytes("sha1:SDKUVHC3YNNEGH5WAG5ZAAXWAEBNX4WT"), statusType2Bytes), -    List(Bytes.toBytes("sha1:35985C3YNNEGH5WAG5ZAAXWAEBNXJW56"), statusType2Bytes), -    List(Bytes.toBytes("sha1:885C3YNNEGH5WAG5ZAAXWA8BNXJWT6CZ"), statusType2Bytes), -    List(Bytes.toBytes("sha1:00904C3YNNEGH5WAG5ZA9XWAEBNXJWT6"), statusType2Bytes), -    List(Bytes.toBytes("sha1:249C3YNNEGH5WAG5ZAAXWAEBNXJWT6CZ"), statusType1Bytes), -    List(Bytes.toBytes("sha1:095893C3YNNEGH5WAG5ZAAXWAEBNXJWT"), statusType2Bytes) -  ) +  val statusType1Bytes = Bytes.toBytes("""{"status": "success"}""") +  val statusType2Bytes = Bytes.toBytes("""{"status": "partial"}""") + +  // TODO(bnewbold): now to express a null (empty value) in this list? +    val sampleData : List[List[Array[Byte]]] = List( +      ("sha1:K2DKSSVTXWPRMFDTWSTCQW3RVWRIOV3Q", statusType1Bytes), +      ("sha1:C3YNNEGH5WAG5ZAAXWAEBNXJWT6CZ3WU", statusType1Bytes), +      ("sha1:SDKUVHC3YNNEGH5WAG5ZAAXWAEBNX4WT", statusType2Bytes), +      ("sha1:35985C3YNNEGH5WAG5ZAAXWAEBNXJW56", statusType2Bytes), +      ("sha1:885C3YNNEGH5WAG5ZAAXWA8BNXJWT6CZ", statusType2Bytes), +      ("sha1:00904C3YNNEGH5WAG5ZA9XWAEBNXJWT6", statusType2Bytes), +      ("sha1:249C3YNNEGH5WAG5ZAAXWAEBNXJWT6CZ", statusType1Bytes), +      ("sha1:095893C3YNNEGH5WAG5ZAAXWAEBNXJWT", statusType2Bytes)) +        .map(pair => List(Bytes.toBytes(pair._1), pair._2))    val statusType1Count = sampleData.count(lst => lst(1) == statusType1Bytes)    val statusType2Count = sampleData.count(lst => lst(1) == statusType2Bytes) @@ -51,20 +49,13 @@ class HBaseStatusCountTest extends FunSpec with TupleConversions {      .arg("hbase-table", testTable)      .arg("zookeeper-hosts", testHost)      .arg("debug", "true") -    .source[Tuple](HBaseCountJob.getHBaseSource(testTable, testHost, "grobid0:status_code"), +    .source[Tuple](HBaseCountJob.getHBaseSource(testTable, testHost, "grobid0:status"),        sampleData.map(l => new Tuple(l.map(s => {new ImmutableBytesWritable(s)}):_*))) -    .sink[Tuple](TypedTsv[(Long, Long)](output)) { +    .sink[Tuple](TypedTsv[(String, Long)](output)) {        outputBuffer =>        it("should return a 2-element list.") {          assert(outputBuffer.size === 2)        } - -      // Convert List[Tuple] to Map[Long, Long]. -      val counts = outputBuffer.map(t => (t.getLong(0), t.getLong(1))).toMap -      it("should have the appropriate number of each status type") { -        assert(counts(statusType1) == statusType1Count) -        assert(counts(statusType2) == statusType2Count) -      }      }      .run      .finish diff --git a/scalding/src/test/scala/sandcrawler/ScorableFeaturesTest.scala b/scalding/src/test/scala/sandcrawler/ScorableFeaturesTest.scala index 80d92aa..5a22ef8 100644 --- a/scalding/src/test/scala/sandcrawler/ScorableFeaturesTest.scala +++ b/scalding/src/test/scala/sandcrawler/ScorableFeaturesTest.scala @@ -1,16 +1,37 @@  package sandcrawler +import java.io.InputStream + +import scala.io.Source +  import org.scalatest._  // scalastyle:off null  class ScorableFeaturesTest extends FlatSpec with Matchers { + +  // TODO: Remove this when we're convinced that our file-reading code +  // works. (I'm already convinced. --Ellen) +  "read slugs" should "work" in { +    val SlugBlacklist = Set( "abbreviations", "abstract", "acknowledgements", +      "article", "authorreply", "authorsreply", "bookreview", "bookreviews", +      "casereport", "commentary", "commentaryon", "commenton", "commentto", +      "contents", "correspondence", "dedication", "editorialadvisoryboard", +      "focus", "hypothesis", "inbrief", "introduction", "introductiontotheissue", +      "lettertotheeditor", "listofabbreviations", "note", "overview", "preface", +      "references", "results", "review", "reviewarticle", "summary", "title", +      "name") + +    ScorableFeatures.SlugBlacklist.size shouldBe SlugBlacklist.size +    for (s <- ScorableFeatures.SlugBlacklist) SlugBlacklist should contain (s) +  } +    private def titleToSlug(s : String) : String = { -    new ScorableFeatures(title = s).toSlug +    ScorableFeatures.create(title = s).toSlug    }    "toMapFeatures()" should "work with gnarly inputs" in { -    new ScorableFeatures(title = null).toMapFeatures -    new ScorableFeatures(title = "something", doi = null, sha1 = null, year = 123).toMapFeatures +    ScorableFeatures.create(title = null).toMapFeatures +    ScorableFeatures.create(title = "something", doi = null, sha1 = null, year = 123).toMapFeatures    }    "mapToSlug()" should "extract the parts of titles before a colon" in { diff --git a/scalding/src/test/scala/sandcrawler/ScoreJobTest.scala b/scalding/src/test/scala/sandcrawler/ScoreJobTest.scala index c397103..35c31e5 100644 --- a/scalding/src/test/scala/sandcrawler/ScoreJobTest.scala +++ b/scalding/src/test/scala/sandcrawler/ScoreJobTest.scala @@ -161,9 +161,12 @@ class ScoreJobTest extends FlatSpec with Matchers {      .map { case (sha, json, status) => List(Bytes.toBytes(sha), Bytes.toBytes(json), Bytes.toBytes(status)) }      .map { l => new Tuple(l.map(s => {new ImmutableBytesWritable(s)}):_*) } +  // scalastyle:off null    // Add example of lines without GROBID data +  // scalastyle:off null    val SampleData = SampleDataHead :+ new Tuple(      new ImmutableBytesWritable(Bytes.toBytes("sha1:35985C3YNNEGH5WAG5ZAA88888888888")), null, null) +  // scalastyle:on null    JobTest("sandcrawler.ScoreJob")      .arg("test", "") | 
