diff options
| -rwxr-xr-x | mapreduce/extraction_cdx_grobid.py | 8 | ||||
| -rw-r--r-- | pig/hbase-count-rows.pig | 15 | ||||
| -rw-r--r-- | scalding/src/main/scala/sandcrawler/HBaseStatusCountJob.scala | 2 | ||||
| -rw-r--r-- | scalding/src/test/scala/sandcrawler/HBaseStatusCountTest.scala | 2 | 
4 files changed, 25 insertions, 2 deletions
| diff --git a/mapreduce/extraction_cdx_grobid.py b/mapreduce/extraction_cdx_grobid.py index 7771e45..ed82a5e 100755 --- a/mapreduce/extraction_cdx_grobid.py +++ b/mapreduce/extraction_cdx_grobid.py @@ -36,6 +36,10 @@ from grobid2json import teixml2json  # Yep, a global. Gets DSN from `SENTRY_DSN` environment variable  sentry_client = raven.Client() +# Specific poison-pill rows we should skip +KEY_BLACKLIST = ( +    'sha1:DLCCSMMVTCCIR6LRXHEQLZ4PWO6NG2YT',    # "failed to guess ARC header format" +)  class MRExtractCdxGrobid(MRJob): @@ -203,6 +207,10 @@ class MRExtractCdxGrobid(MRJob):              yield _, status              return          key = info['key'] +        if key in KEY_BLACKLIST: +            self.increment_counter('lines', 'blacklist') +            yield _, dict(status='blacklist', key=key) +            return          # Note: this may not get "cleared" correctly          sentry_client.extra_context(dict(row_key=key)) diff --git a/pig/hbase-count-rows.pig b/pig/hbase-count-rows.pig new file mode 100644 index 0000000..57e83a5 --- /dev/null +++ b/pig/hbase-count-rows.pig @@ -0,0 +1,15 @@ + +REGISTER /usr/lib/hbase/lib/hbase-client-0.98.6-cdh5.3.1.jar +REGISTER /usr/lib/hbase/lib/hbase-common-0.98.6-cdh5.3.1.jar +REGISTER /usr/lib/hbase/lib/hbase-hadoop2-compat-0.98.6-cdh5.3.1.jar +REGISTER /usr/lib/hbase/lib/hbase-protocol-0.98.6-cdh5.3.1.jar + +set hbase.zookeeper.quorum 'mtrcs-zk1.us.archive.org,mtrcs-zk2.us.archive.org,mtrcs-zk3.us.archive.org' + +data = LOAD 'hbase://wbgrp-journal-extract-0-qa' +       USING org.apache.pig.backend.hadoop.hbase.HBaseStorage('grobid0:status_code', '-loadKey true') +       AS (key:CHARARRAY, status:CHARARRAY); + +data_group = GROUP data ALL; +data_count = FOREACH data_group GENERATE COUNT(data); +DUMP data_count; diff --git a/scalding/src/main/scala/sandcrawler/HBaseStatusCountJob.scala b/scalding/src/main/scala/sandcrawler/HBaseStatusCountJob.scala index 27b3177..aabf9f8 100644 --- a/scalding/src/main/scala/sandcrawler/HBaseStatusCountJob.scala +++ b/scalding/src/main/scala/sandcrawler/HBaseStatusCountJob.scala @@ -2,4 +2,4 @@ package sandcrawler  import com.twitter.scalding.Args -class HBaseStatusCountJob(args: Args) extends HBaseCountJob(args, "grobid0:status") +class HBaseStatusCountJob(args: Args) extends HBaseCountJob(args, "grobid0:status_code") diff --git a/scalding/src/test/scala/sandcrawler/HBaseStatusCountTest.scala b/scalding/src/test/scala/sandcrawler/HBaseStatusCountTest.scala index 35a4177..fca9c69 100644 --- a/scalding/src/test/scala/sandcrawler/HBaseStatusCountTest.scala +++ b/scalding/src/test/scala/sandcrawler/HBaseStatusCountTest.scala @@ -44,7 +44,7 @@ class HBaseStatusCountTest extends FunSpec with TupleConversions {      .arg("hbase-table", testTable)      .arg("zookeeper-hosts", testHost)      .arg("debug", "true") -    .source[Tuple](HBaseCountJob.getHBaseSource(testTable, testHost, "grobid0:status"), +    .source[Tuple](HBaseCountJob.getHBaseSource(testTable, testHost, "grobid0:status_code"),        sampleData.map(l => new Tuple(l.map(s => {new ImmutableBytesWritable(Bytes.toBytes(s))}):_*)))        .sink[Tuple](Tsv(output)) {          outputBuffer => | 
