diff options
| author | Bryan Newbold <bnewbold@archive.org> | 2019-08-02 17:11:57 -0700 | 
|---|---|---|
| committer | Bryan Newbold <bnewbold@archive.org> | 2019-08-10 19:50:21 -0700 | 
| commit | ea9e8990139973d6f5fdf52a470bf6516c7d8c2f (patch) | |
| tree | 61e63509c28b7f280e7673c27276d28e8b0782ad /scalding/src/main/scala | |
| parent | ca725ffd9efe847905afb918ff324b421a4d8859 (diff) | |
| download | sandcrawler-ea9e8990139973d6f5fdf52a470bf6516c7d8c2f.tar.gz sandcrawler-ea9e8990139973d6f5fdf52a470bf6516c7d8c2f.zip | |
FatcatScorable and ScoreSelfFatcat job
Diffstat (limited to 'scalding/src/main/scala')
| -rw-r--r-- | scalding/src/main/scala/sandcrawler/FatcatScorable.scala | 131 | ||||
| -rw-r--r-- | scalding/src/main/scala/sandcrawler/ScoreSelfFatcat.scala | 43 | 
2 files changed, 174 insertions, 0 deletions
| diff --git a/scalding/src/main/scala/sandcrawler/FatcatScorable.scala b/scalding/src/main/scala/sandcrawler/FatcatScorable.scala new file mode 100644 index 0000000..cffc2c0 --- /dev/null +++ b/scalding/src/main/scala/sandcrawler/FatcatScorable.scala @@ -0,0 +1,131 @@ +package sandcrawler + +import scala.math +import scala.util.parsing.json.JSON +import scala.util.parsing.json.JSONArray +import scala.util.parsing.json.JSONObject + +import cascading.flow.FlowDef +import cascading.tuple.Fields +import com.twitter.scalding._ +import com.twitter.scalding.typed.TDsl._ +import parallelai.spyglass.hbase.HBasePipeConversions + +class FatcatScorable extends Scorable with HBasePipeConversions { + +  def getSource(args : Args) : Source = { +    TextLine(args("fatcat-release-input")) +  } + +  def getFeaturesPipe(args : Args)(implicit mode : Mode, flowDef : FlowDef) : TypedPipe[Option[MapFeatures]] = { +    getSource(args).read +      .toTypedPipe[String](new Fields("line")) +      .filter { FatcatScorable.keepRecord(_) } +      .map { FatcatScorable.jsonToMapFeatures(_) } +  } +} + +object FatcatScorable { + +  // Note; removed ReleaseType filtering + +  def keepRecord(json : String) : Boolean = { +    Scorable.jsonToMap(json) match { +      case None => false +      case Some(map) => { +        mapToTitle(map) match { +          case None => false +          case Some(title) => title.length <= Scorable.MaxTitleLength +        } +      } +    } +  } + +  // Returns None if title is null, empty, or too long. +  def mapToTitle(map : Map[String, Any]) : Option[String] = { +    def getTitle : Option[String] = { +      if (map contains "title") { +        val title = map("title").asInstanceOf[String] +        if (title == null || title.isEmpty) None else Some(title) +      } else { +        None +      } +    } + +    def getSubtitle : Option[String] = { +      if (map contains "subtitle") { +        val subtitle = map("subtitle").asInstanceOf[String] +        if (subtitle == null || subtitle.isEmpty) { +          None +        } else { +          Some(subtitle) +        } +      } else { +        None +      } +    } + +    getTitle match { +      case None => None +      case Some(baseTitle) => { +        if (baseTitle == null) { +          None +        } else { +          getSubtitle match { +            case None => Some(baseTitle) +            case Some(baseSubtitle) => Some(baseTitle.concat(":".concat(baseSubtitle))) +          } +        } +      } +    } +  } + +  def mapToAuthorList(map : Map[String, Any]) : List[String] = { +    if (map contains "contribs") { +      val objArray = map("contribs").asInstanceOf[List[Any]].map(e => e.asInstanceOf[Map[String,Any]]) +      // TODO(bnewbold): better name stuff... contrib.surname, creator.surname, +      // or raw_name split to last +      objArray +        .filter(e => e contains "raw_name") +        .map(e => e.get("raw_name").get.asInstanceOf[String]) +    } else { +      List() +    } +  } + +  def mapToYear(map : Map[String, Any]) : Option[Int] = { +    map.get("release_year") match { +      case None => None +      case Some(year) => { +        Some(year.asInstanceOf[Double].toInt) +      } +    } +  } + +  def jsonToMapFeatures(json : String) : Option[MapFeatures] = { +    def makeMapFeatures(title : String, doi : String, fatcat_release: String, fatcat_work : String, authors : List[String], year : Int, contentType : String) : Option[MapFeatures] = { +      // NOTE: not doing any filtering here! +      val sf : ScorableFeatures = ScorableFeatures.create(title=title, authors=authors, doi=doi, fatcat_release=fatcat_release, fatcat_work=fatcat_work, year=year) +      sf.toSlug match { +        case None => None +        case Some(slug) => Some(MapFeatures(slug, sf.toString)) +      } +    } +    Scorable.jsonToMap(json) match { +      case None => None +      case Some(map) => +        mapToTitle(map) match { +          case None => None +          case Some(title) => makeMapFeatures( +            title=title, +            // TODO: doi=Scorable.getString(map, "doi"), +            doi=null, +            fatcat_release=Scorable.getString(map, "ident"), +            fatcat_work=Scorable.getString(map, "work_id"), +            authors=mapToAuthorList(map), +            year=mapToYear(map).getOrElse(0), +            contentType=map.get("type").map(e => e.asInstanceOf[String]).getOrElse("MISSING-CONTENT-TYPE")) +        } +    } +  } +} diff --git a/scalding/src/main/scala/sandcrawler/ScoreSelfFatcat.scala b/scalding/src/main/scala/sandcrawler/ScoreSelfFatcat.scala new file mode 100644 index 0000000..d1a94fe --- /dev/null +++ b/scalding/src/main/scala/sandcrawler/ScoreSelfFatcat.scala @@ -0,0 +1,43 @@ +package sandcrawler + +import cascading.pipe.Pipe +import com.twitter.scalding.Args +import com.twitter.scalding.Stat +import com.twitter.scalding.TypedPipe +import com.twitter.scalding.TypedTsv +import parallelai.spyglass.base.JobBase + +class ScoreSelfFatcatJob(args: Args) extends JobBase(args) { + +  val fatcatRowCount = Stat("fatcat-rows-filtered", "sandcrawler") +  val joinedRowCount = Stat("joined-rows", "sandcrawler") + +  val fatcatScorable : Scorable = new FatcatScorable() +  val fatcatPipe : TypedPipe[(String, ReduceFeatures)] = fatcatScorable +    .getInputPipe(args) +    .map { r => +      fatcatRowCount.inc +      r +    } + +  val joinedPipe = fatcatPipe +    .addTrap(TypedTsv(args("output") + ".trapped")) +    .join(fatcatPipe) + +  // TypedTsv doesn't work over case classes. +  joinedPipe +    // filter out trivial self-matches (releases are identical) +    .filter { case (slug, (fatcatFeaturesLeft, fatcatFeaturesRight)) => +      Scorable.selfMatchable(fatcatFeaturesLeft, fatcatFeaturesRight) +    } +    .map { case (slug, (fatcatFeaturesLeft, fatcatFeaturesRight)) => +      joinedRowCount.inc +      new ReduceOutput( +        slug, +        Scorable.computeSimilarity(fatcatFeaturesLeft, fatcatFeaturesRight), +        fatcatFeaturesLeft.json, +        fatcatFeaturesRight.json) +    } +    .map { entry => (entry.slug, entry.score, entry.json1, entry.json2) } +    .write(TypedTsv[(String, Int, String, String)](args("output"))) +} | 
