diff options
| author | Ellen Spertus <ellen.spertus@gmail.com> | 2018-07-24 16:15:42 -0700 | 
|---|---|---|
| committer | Ellen Spertus <ellen.spertus@gmail.com> | 2018-07-24 16:15:42 -0700 | 
| commit | a950d5d5c61fb77b2ba83703ef853ef951ac94af (patch) | |
| tree | 38b34c904ad9d21c0d51ab6401d485e71ce4fcf3 /scalding | |
| parent | 07edf1ccad9c3268324926471dd0c8a7433f0c08 (diff) | |
| download | sandcrawler-a950d5d5c61fb77b2ba83703ef853ef951ac94af.tar.gz sandcrawler-a950d5d5c61fb77b2ba83703ef853ef951ac94af.zip | |
WIP. I'm having problems converting between ImmutableBytesWritable and String.
Diffstat (limited to 'scalding')
| -rw-r--r-- | scalding/src/main/scala/sandcrawler/HBaseCrossrefScoreJob.scala | 58 | ||||
| -rw-r--r-- | scalding/src/test/scala/sandcrawler/HBaseCrossrefScoreTest.scala | 49 | 
2 files changed, 84 insertions, 23 deletions
| diff --git a/scalding/src/main/scala/sandcrawler/HBaseCrossrefScoreJob.scala b/scalding/src/main/scala/sandcrawler/HBaseCrossrefScoreJob.scala index 12660e8..1360af0 100644 --- a/scalding/src/main/scala/sandcrawler/HBaseCrossrefScoreJob.scala +++ b/scalding/src/main/scala/sandcrawler/HBaseCrossrefScoreJob.scala @@ -8,75 +8,97 @@ import cascading.tuple.Fields  import com.twitter.scalding._  import com.twitter.scalding.typed.TDsl._  import org.apache.hadoop.hbase.io.ImmutableBytesWritable +import org.apache.hadoop.hbase.util.Bytes  import parallelai.spyglass.base.JobBase  import parallelai.spyglass.hbase.HBaseConstants.SourceMode  import parallelai.spyglass.hbase.HBasePipeConversions +import parallelai.spyglass.hbase.HBaseSource  class HBaseCrossrefScoreJob(args: Args) extends JobBase(args) with HBasePipeConversions {    // key is SHA1 -  val grobidSource = HBaseBuilder.build( -    args("grobid-table"), -    args("zookeeper-hosts"), -    List("grobid0:tei_json"), -    sourceMode = SourceMode.SCAN_ALL) - +  val grobidSource = HBaseCrossrefScore.getHBaseSource( +    args("hbase-table"), +    args("zookeeper-hosts"))    val grobidPipe = grobidSource      .read      .map('tei_json -> 'slug) { -      json : String => HBaseCrossrefScore.grobidToSlug(json)} +      json : ImmutableBytesWritable => { +        HBaseCrossrefScore.grobidToSlug(json.toString) match { +          case Some(slug) => slug +          case None => "nothing" +        } +      } +    } +    .debug +    .map('key -> 'sha1) { sha1 : String => sha1 } -  val crossrefSource = TextLine(args("input")) +  val crossrefSource = TextLine(args("crossref-input"))    val crossrefPipe = crossrefSource      .read      .map('line -> 'slug) {        json : String => HBaseCrossrefScore.crossrefToSlug(json)} - -/* -  statusPipe.groupBy { identity } -    .size      .debug -    .write(TypedTsv[(Long,Long)](args("output"))) -   */ + +  val innerJoinPipe = grobidPipe.joinWithSmaller('slug -> 'slug, crossrefPipe) +  innerJoinPipe +    .mapTo(('tei_json, 'line, 'sha1) -> ('sha1, 'doi, 'score)) { +      x : (String, String, String) => HBaseCrossrefScore.performJoin(x._1, x._2, x._3)} +    .write(TypedTsv[(String, String, String)](args("output")))  }  object HBaseCrossrefScore { +  def getHBaseSource(hbaseTable: String, zookeeperHosts: String) : HBaseSource = HBaseBuilder.build( +    hbaseTable,      // HBase Table Name +    zookeeperHosts,  // HBase Zookeeper server (to get runtime config info; can be array?) +    List("grobid0:tei_json"), +    SourceMode.SCAN_ALL) + +  def performJoin(grobidJson : String, crossRefJson : String, sha1 : String) : (String, String, String) = { +    (sha1, "1.2.3.4", "100") +  } +    def jsonToMap(json : String) : Map[String, Any] = {      // https://stackoverflow.com/a/32717262/631051      val jsonObject = JSON.parseFull(json)      if (jsonObject == None) {        // Empty map for malformed JSON -      Map[String, Any]() +      Map[String, Any]("foo" -> json)      } else {        jsonObject.get.asInstanceOf[Map[String, Any]]      }    } -    def grobidToSlug(json : String) : Option[String] = { +    throw new Exception(json)      val map = jsonToMap(json)      if (map contains "title") {        titleToSlug(map("title").asInstanceOf[String])      } else { -      None +      Some("grobidToSlug None: " + map("foo"))      }    }    def crossrefToSlug(json : String) : Option[String] = {      val map = jsonToMap(json)      if (map contains "title") { +      // TODO: Don't ignore titles after the first.        titleToSlug(map("title").asInstanceOf[List[String]](0))      } else { -      None +      Some("crossRefToSlug None")      }    }    def titleToSlug(title : String) : Option[String] = { +    Some(title) +    /*      val slug = title.split(":")(0).toLowerCase() +    println("title: " + title + ", slug: " + slug)      if (slug.isEmpty) {        None      } else {        Some(slug)      } +     */    }  } diff --git a/scalding/src/test/scala/sandcrawler/HBaseCrossrefScoreTest.scala b/scalding/src/test/scala/sandcrawler/HBaseCrossrefScoreTest.scala index a59b278..f52c5b4 100644 --- a/scalding/src/test/scala/sandcrawler/HBaseCrossrefScoreTest.scala +++ b/scalding/src/test/scala/sandcrawler/HBaseCrossrefScoreTest.scala @@ -1,13 +1,17 @@  package sandcrawler  import cascading.tuple.Fields +import cascading.tuple.Tuple +import com.twitter.scalding.{JobTest, TextLine, TypedTsv, TupleConversions} +import org.apache.hadoop.hbase.io.ImmutableBytesWritable +import org.apache.hadoop.hbase.util.Bytes  import org.scalatest._  import parallelai.spyglass.hbase.HBaseConstants.SourceMode -class HBaseCrossrefScoreTest extends FlatSpec with Matchers { +class HBaseCrossrefScoreTest extends FunSpec with TupleConversions {    val GrobidString = """  { -  "title": "Dummy Example File", +  "title": "<<TITLE>>",    "authors": [      {"name": "Brewster Kahle"},      {"name": "J Doe"} @@ -50,6 +54,7 @@ class HBaseCrossrefScoreTest extends FlatSpec with Matchers {    "annex": null  }  """ +  val GrobidStringWithTitle = GrobidString.replace("<<TITLE>>", "Dummy Example File")    val GrobidStringWithoutTitle = GrobidString.replace("title", "nottitle")    val MalformedGrobidString = GrobidString.replace("}", "") @@ -69,7 +74,7 @@ class HBaseCrossrefScoreTest extends FlatSpec with Matchers {                                  "delay-in-days" : 0, "content-version" : "tdm" }],    "content-domain" : { "domain" : [], "crossmark-restriction" : false },     "published-print" : { "date-parts" : [ [ 1996 ] ] },  -  "DOI" : "10.1016/0987-7983(96)87729-2",  +  "DOI" : "<<DOI>>",    "type" : "journal-article",     "created" : { "date-parts" : [ [ 2002, 7, 25 ] ],       "date-time" : "2002-07-25T15:09:41Z",  @@ -77,7 +82,7 @@ class HBaseCrossrefScoreTest extends FlatSpec with Matchers {    "page" : "186-187",     "source" : "Crossref",     "is-referenced-by-count" : 0,  -  "title" : [ "les ferments lactiques: classification, propriétés, utilisations agroalimentaires" ],  +  "title" : [ "<<TITLE>>" ],    "prefix" : "10.1016",     "volume" : "9",     "author" : [ { "given" : "W", "family" : "Gaier", "affiliation" : [] } ],  @@ -105,9 +110,10 @@ class HBaseCrossrefScoreTest extends FlatSpec with Matchers {    "subject" : [ "Pediatrics, Perinatology, and Child Health" ]  }  """ +  val CrossrefStringWithTitle = CrossrefString.replace("<<TITLE>>", "SomeTitle")    val CrossrefStringWithoutTitle = CrossrefString.replace("title", "nottitle")    val MalformedCrossrefString = CrossrefString.replace("}", "") - +/*    "titleToSlug()" should "extract the parts of titles before a colon" in {      val slug = HBaseCrossrefScore.titleToSlug("HELLO:there")      slug should contain ("hello") @@ -147,4 +153,37 @@ class HBaseCrossrefScoreTest extends FlatSpec with Matchers {      val slug = HBaseCrossrefScore.grobidToSlug(MalformedCrossrefString)       slug shouldBe None    } + */ +   +  val output = "/tmp/testOutput" +  val input = "/tmp/testInput" +  val (testTable, testHost) = ("test-table", "dummy-host:2181") + +  val grobidSampleData = List( +    List(Bytes.toBytes("sha1:K2DKSSVTXWPRMFDTWSTCQW3RVWRIOV3Q"), Bytes.toBytes(GrobidString.replace("<<TITLE>>", "Title1"))), +    List(Bytes.toBytes("sha1:C3YNNEGH5WAG5ZAAXWAEBNXJWT6CZ3WU"), Bytes.toBytes(GrobidString.replace("<<TITLE>>", "Title2: TNG"))), +    List(Bytes.toBytes("sha1:SDKUVHC3YNNEGH5WAG5ZAAXWAEBNX4WT"), Bytes.toBytes(GrobidString.replace("<<TITLE>>", "Title3: The Sequel"))), +    List(Bytes.toBytes("sha1:35985C3YNNEGH5WAG5ZAAXWAEBNXJW56"), Bytes.toBytes(GrobidString.replace("<<TITLE>>", "Title4")))) + +  JobTest("sandcrawler.HBaseCrossrefScoreJob") +    .arg("test", "") +    .arg("app.conf.path", "app.conf") +    .arg("output", output) +    .arg("hbase-table", testTable) +    .arg("zookeeper-hosts", testHost) +    .arg("crossref-input", input) +    .arg("debug", "true") +    .source[Tuple](HBaseCrossrefScore.getHBaseSource(testTable, testHost), +      grobidSampleData.map(l => new Tuple(l.map(s => {new ImmutableBytesWritable(s)}):_*))) +    .source(TextLine(input), List(( +      "0" -> CrossrefString.replace("<<TITLE>>", "Title 1: TNG").replace("<<DOI>>", "DOI-0"), +      "1" -> CrossrefString.replace("<<TITLE>>", "Title 2: Rebooted").replace("<<DOI>>", "DOI-1")))) +    .sink[Tuple](TypedTsv[(String, String, String)](output)) { +      outputBuffer => +      it("should return a 2-element list.") { +        assert(outputBuffer.size === 2) +      } +    } +    .run +    .finish  } | 
