diff options
Diffstat (limited to 'scalding/src/main/scala/sandcrawler/GrobidScorableDumpJob.scala')
-rw-r--r-- | scalding/src/main/scala/sandcrawler/GrobidScorableDumpJob.scala | 10 |
1 files changed, 5 insertions, 5 deletions
diff --git a/scalding/src/main/scala/sandcrawler/GrobidScorableDumpJob.scala b/scalding/src/main/scala/sandcrawler/GrobidScorableDumpJob.scala index 5e06f9b..05e7074 100644 --- a/scalding/src/main/scala/sandcrawler/GrobidScorableDumpJob.scala +++ b/scalding/src/main/scala/sandcrawler/GrobidScorableDumpJob.scala @@ -26,13 +26,13 @@ class GrobidScorableDumpJob(args: Args) extends JobBase(args) { val pipe = GrobidScorable.getHBaseSource(args("hbase-table"), args("zookeeper-hosts")) .read // Can't just "fromBytesWritable" because we have multiple types? - .toTypedPipe[(ImmutableBytesWritable,ImmutableBytesWritable,ImmutableBytesWritable)](new Fields("key", "tei_json", "status_code")) - .filter { case (_, tei_json, status_code) => + .toTypedPipe[(ImmutableBytesWritable,ImmutableBytesWritable,ImmutableBytesWritable)](new Fields("key", "metadata", "status_code")) + .filter { case (_, metadata, status_code) => grobidHbaseRows.inc - tei_json != null && status_code != null + metadata != null && status_code != null } - .map { case (key, tei_json, status_code) => - (Bytes.toString(key.copyBytes()), Bytes.toString(tei_json.copyBytes()), Bytes.toLong(status_code.copyBytes())) + .map { case (key, metadata, status_code) => + (Bytes.toString(key.copyBytes()), Bytes.toString(metadata.copyBytes()), Bytes.toLong(status_code.copyBytes())) } // TODO: Should I combine next two stages for efficiency? .collect { case (key, json, 200) => |