diff options
| author | Ellen Spertus <ellen.spertus@gmail.com> | 2018-07-24 16:15:42 -0700 | 
|---|---|---|
| committer | Ellen Spertus <ellen.spertus@gmail.com> | 2018-07-24 16:15:42 -0700 | 
| commit | a950d5d5c61fb77b2ba83703ef853ef951ac94af (patch) | |
| tree | 38b34c904ad9d21c0d51ab6401d485e71ce4fcf3 /scalding/src/main/scala | |
| parent | 07edf1ccad9c3268324926471dd0c8a7433f0c08 (diff) | |
| download | sandcrawler-a950d5d5c61fb77b2ba83703ef853ef951ac94af.tar.gz sandcrawler-a950d5d5c61fb77b2ba83703ef853ef951ac94af.zip | |
WIP. I'm having problems converting between ImmutableBytesWritable and String.
Diffstat (limited to 'scalding/src/main/scala')
| -rw-r--r-- | scalding/src/main/scala/sandcrawler/HBaseCrossrefScoreJob.scala | 58 | 
1 files changed, 40 insertions, 18 deletions
| diff --git a/scalding/src/main/scala/sandcrawler/HBaseCrossrefScoreJob.scala b/scalding/src/main/scala/sandcrawler/HBaseCrossrefScoreJob.scala index 12660e8..1360af0 100644 --- a/scalding/src/main/scala/sandcrawler/HBaseCrossrefScoreJob.scala +++ b/scalding/src/main/scala/sandcrawler/HBaseCrossrefScoreJob.scala @@ -8,75 +8,97 @@ import cascading.tuple.Fields  import com.twitter.scalding._  import com.twitter.scalding.typed.TDsl._  import org.apache.hadoop.hbase.io.ImmutableBytesWritable +import org.apache.hadoop.hbase.util.Bytes  import parallelai.spyglass.base.JobBase  import parallelai.spyglass.hbase.HBaseConstants.SourceMode  import parallelai.spyglass.hbase.HBasePipeConversions +import parallelai.spyglass.hbase.HBaseSource  class HBaseCrossrefScoreJob(args: Args) extends JobBase(args) with HBasePipeConversions {    // key is SHA1 -  val grobidSource = HBaseBuilder.build( -    args("grobid-table"), -    args("zookeeper-hosts"), -    List("grobid0:tei_json"), -    sourceMode = SourceMode.SCAN_ALL) - +  val grobidSource = HBaseCrossrefScore.getHBaseSource( +    args("hbase-table"), +    args("zookeeper-hosts"))    val grobidPipe = grobidSource      .read      .map('tei_json -> 'slug) { -      json : String => HBaseCrossrefScore.grobidToSlug(json)} +      json : ImmutableBytesWritable => { +        HBaseCrossrefScore.grobidToSlug(json.toString) match { +          case Some(slug) => slug +          case None => "nothing" +        } +      } +    } +    .debug +    .map('key -> 'sha1) { sha1 : String => sha1 } -  val crossrefSource = TextLine(args("input")) +  val crossrefSource = TextLine(args("crossref-input"))    val crossrefPipe = crossrefSource      .read      .map('line -> 'slug) {        json : String => HBaseCrossrefScore.crossrefToSlug(json)} - -/* -  statusPipe.groupBy { identity } -    .size      .debug -    .write(TypedTsv[(Long,Long)](args("output"))) -   */ + +  val innerJoinPipe = grobidPipe.joinWithSmaller('slug -> 'slug, crossrefPipe) +  innerJoinPipe +    .mapTo(('tei_json, 'line, 'sha1) -> ('sha1, 'doi, 'score)) { +      x : (String, String, String) => HBaseCrossrefScore.performJoin(x._1, x._2, x._3)} +    .write(TypedTsv[(String, String, String)](args("output")))  }  object HBaseCrossrefScore { +  def getHBaseSource(hbaseTable: String, zookeeperHosts: String) : HBaseSource = HBaseBuilder.build( +    hbaseTable,      // HBase Table Name +    zookeeperHosts,  // HBase Zookeeper server (to get runtime config info; can be array?) +    List("grobid0:tei_json"), +    SourceMode.SCAN_ALL) + +  def performJoin(grobidJson : String, crossRefJson : String, sha1 : String) : (String, String, String) = { +    (sha1, "1.2.3.4", "100") +  } +    def jsonToMap(json : String) : Map[String, Any] = {      // https://stackoverflow.com/a/32717262/631051      val jsonObject = JSON.parseFull(json)      if (jsonObject == None) {        // Empty map for malformed JSON -      Map[String, Any]() +      Map[String, Any]("foo" -> json)      } else {        jsonObject.get.asInstanceOf[Map[String, Any]]      }    } -    def grobidToSlug(json : String) : Option[String] = { +    throw new Exception(json)      val map = jsonToMap(json)      if (map contains "title") {        titleToSlug(map("title").asInstanceOf[String])      } else { -      None +      Some("grobidToSlug None: " + map("foo"))      }    }    def crossrefToSlug(json : String) : Option[String] = {      val map = jsonToMap(json)      if (map contains "title") { +      // TODO: Don't ignore titles after the first.        titleToSlug(map("title").asInstanceOf[List[String]](0))      } else { -      None +      Some("crossRefToSlug None")      }    }    def titleToSlug(title : String) : Option[String] = { +    Some(title) +    /*      val slug = title.split(":")(0).toLowerCase() +    println("title: " + title + ", slug: " + slug)      if (slug.isEmpty) {        None      } else {        Some(slug)      } +     */    }  } | 
