diff options
Diffstat (limited to 'scalding/src/main')
4 files changed, 2 insertions, 23 deletions
diff --git a/scalding/src/main/scala/sandcrawler/BibjsonScorable.scala b/scalding/src/main/scala/sandcrawler/BibjsonScorable.scala index cdd598f..0d26d75 100644 --- a/scalding/src/main/scala/sandcrawler/BibjsonScorable.scala +++ b/scalding/src/main/scala/sandcrawler/BibjsonScorable.scala @@ -8,9 +8,6 @@ import cascading.flow.FlowDef import cascading.tuple.Fields import com.twitter.scalding._ import com.twitter.scalding.typed.TDsl._ -// XXX: import parallelai.spyglass.hbase.HBasePipeConversions - -// XXX: class BibjsonScorable extends Scorable with HBasePipeConversions { class BibjsonScorable extends Scorable { diff --git a/scalding/src/main/scala/sandcrawler/GrobidScorable.scala b/scalding/src/main/scala/sandcrawler/GrobidScorable.scala index c55cb40..899ce66 100644 --- a/scalding/src/main/scala/sandcrawler/GrobidScorable.scala +++ b/scalding/src/main/scala/sandcrawler/GrobidScorable.scala @@ -16,20 +16,19 @@ class GrobidScorable extends Scorable with HBasePipeConversions { val StatusOK = 200 def getSource(args : Args) : Source = { - // TODO: Generalize args so there can be multiple grobid pipes in one job. + // TODO: Generalize args so there can be multiple grobid pipes in one job GrobidScorable.getHBaseSource(args("hbase-table"), args("zookeeper-hosts")) } def getFeaturesPipe(args : Args)(implicit mode : Mode, flowDef : FlowDef) : TypedPipe[MapFeatures] = { getSource(args) .read - // Can't just "fromBytesWritable" because we have multiple types? + // Can't just "fromBytesWritable" because we have multiple types .toTypedPipe[(ImmutableBytesWritable,ImmutableBytesWritable,ImmutableBytesWritable)](new Fields("key", "metadata", "status_code")) .filter { case (_, metadata, status_code) => metadata != null && status_code != null } .map { case (key, metadata, status_code) => (Bytes.toString(key.copyBytes()), Bytes.toString(metadata.copyBytes()), Bytes.toLong(status_code.copyBytes())) } - // TODO: Should I combine next two stages for efficiency? .collect { case (key, json, StatusOK) => (key, json) } .filter { case (key, json) => GrobidScorable.keepRecord(json) } .map { entry : (String, String) => GrobidScorable.jsonToMapFeatures(entry._1, entry._2) } diff --git a/scalding/src/main/scala/sandcrawler/GrobidScorableDumpJob.scala b/scalding/src/main/scala/sandcrawler/GrobidScorableDumpJob.scala index f4e84fe..19b257f 100644 --- a/scalding/src/main/scala/sandcrawler/GrobidScorableDumpJob.scala +++ b/scalding/src/main/scala/sandcrawler/GrobidScorableDumpJob.scala @@ -45,7 +45,6 @@ class GrobidScorableDumpJob(args: Args) extends JobBase(args) { validGrobidRows.inc entry } - // XXX: this groupBy after the map? .groupBy { case MapFeatures(slug, json) => slug } .map { tuple => val (slug : String, features : MapFeatures) = tuple diff --git a/scalding/src/main/scala/sandcrawler/ScoreJob.scala b/scalding/src/main/scala/sandcrawler/ScoreJob.scala index 107f504..ccb9b76 100644 --- a/scalding/src/main/scala/sandcrawler/ScoreJob.scala +++ b/scalding/src/main/scala/sandcrawler/ScoreJob.scala @@ -12,10 +12,6 @@ class ScoreJob(args: Args) extends JobBase(args) { val grobidRowCount = Stat("grobid-rows-filtered", "sandcrawler") val crossrefRowCount = Stat("crossref-rows-filtered", "sandcrawler") val joinedRowCount = Stat("joined-rows", "sandcrawler") - /* TODO: - val uniqueDoiCount = Stat("unique-doi-count", "sandcrawler") - val uniqueSha1Count = Stat("unique-sha1-count", "sandcrawler") - */ val grobidScorable : Scorable = new GrobidScorable() val crossrefScorable : Scorable = new CrossrefScorable() @@ -36,18 +32,6 @@ class ScoreJob(args: Args) extends JobBase(args) { .addTrap(TypedTsv(args("output") + ".trapped")) .join(crossrefPipe) - /* TODO: - // Reduces to count unique SHA1 and DOI - joinedPipe - .map { case (_, (grobidFeatures, _)) => grobidFeatures.sha } - .distinct - .map { _ => uniqueSha1Count.inc } - joinedPipe - .map { case (_, (_, crossrefFeatures)) => crossrefFeatures.doi } - .distinct - .map { _ => uniqueDoiCount.inc } - */ - // TypedTsv doesn't work over case classes. joinedPipe .map { case (slug, (grobidFeatures, crossrefFeatures)) => |