diff options
-rwxr-xr-x | mapreduce/extraction_cdx_grobid.py | 8 | ||||
-rw-r--r-- | pig/hbase-count-rows.pig | 15 | ||||
-rw-r--r-- | scalding/src/main/scala/sandcrawler/HBaseStatusCountJob.scala | 2 | ||||
-rw-r--r-- | scalding/src/test/scala/sandcrawler/HBaseStatusCountTest.scala | 2 |
4 files changed, 25 insertions, 2 deletions
diff --git a/mapreduce/extraction_cdx_grobid.py b/mapreduce/extraction_cdx_grobid.py index 7771e45..ed82a5e 100755 --- a/mapreduce/extraction_cdx_grobid.py +++ b/mapreduce/extraction_cdx_grobid.py @@ -36,6 +36,10 @@ from grobid2json import teixml2json # Yep, a global. Gets DSN from `SENTRY_DSN` environment variable sentry_client = raven.Client() +# Specific poison-pill rows we should skip +KEY_BLACKLIST = ( + 'sha1:DLCCSMMVTCCIR6LRXHEQLZ4PWO6NG2YT', # "failed to guess ARC header format" +) class MRExtractCdxGrobid(MRJob): @@ -203,6 +207,10 @@ class MRExtractCdxGrobid(MRJob): yield _, status return key = info['key'] + if key in KEY_BLACKLIST: + self.increment_counter('lines', 'blacklist') + yield _, dict(status='blacklist', key=key) + return # Note: this may not get "cleared" correctly sentry_client.extra_context(dict(row_key=key)) diff --git a/pig/hbase-count-rows.pig b/pig/hbase-count-rows.pig new file mode 100644 index 0000000..57e83a5 --- /dev/null +++ b/pig/hbase-count-rows.pig @@ -0,0 +1,15 @@ + +REGISTER /usr/lib/hbase/lib/hbase-client-0.98.6-cdh5.3.1.jar +REGISTER /usr/lib/hbase/lib/hbase-common-0.98.6-cdh5.3.1.jar +REGISTER /usr/lib/hbase/lib/hbase-hadoop2-compat-0.98.6-cdh5.3.1.jar +REGISTER /usr/lib/hbase/lib/hbase-protocol-0.98.6-cdh5.3.1.jar + +set hbase.zookeeper.quorum 'mtrcs-zk1.us.archive.org,mtrcs-zk2.us.archive.org,mtrcs-zk3.us.archive.org' + +data = LOAD 'hbase://wbgrp-journal-extract-0-qa' + USING org.apache.pig.backend.hadoop.hbase.HBaseStorage('grobid0:status_code', '-loadKey true') + AS (key:CHARARRAY, status:CHARARRAY); + +data_group = GROUP data ALL; +data_count = FOREACH data_group GENERATE COUNT(data); +DUMP data_count; diff --git a/scalding/src/main/scala/sandcrawler/HBaseStatusCountJob.scala b/scalding/src/main/scala/sandcrawler/HBaseStatusCountJob.scala index 27b3177..aabf9f8 100644 --- a/scalding/src/main/scala/sandcrawler/HBaseStatusCountJob.scala +++ b/scalding/src/main/scala/sandcrawler/HBaseStatusCountJob.scala @@ -2,4 +2,4 @@ package sandcrawler import com.twitter.scalding.Args -class HBaseStatusCountJob(args: Args) extends HBaseCountJob(args, "grobid0:status") +class HBaseStatusCountJob(args: Args) extends HBaseCountJob(args, "grobid0:status_code") diff --git a/scalding/src/test/scala/sandcrawler/HBaseStatusCountTest.scala b/scalding/src/test/scala/sandcrawler/HBaseStatusCountTest.scala index 35a4177..fca9c69 100644 --- a/scalding/src/test/scala/sandcrawler/HBaseStatusCountTest.scala +++ b/scalding/src/test/scala/sandcrawler/HBaseStatusCountTest.scala @@ -44,7 +44,7 @@ class HBaseStatusCountTest extends FunSpec with TupleConversions { .arg("hbase-table", testTable) .arg("zookeeper-hosts", testHost) .arg("debug", "true") - .source[Tuple](HBaseCountJob.getHBaseSource(testTable, testHost, "grobid0:status"), + .source[Tuple](HBaseCountJob.getHBaseSource(testTable, testHost, "grobid0:status_code"), sampleData.map(l => new Tuple(l.map(s => {new ImmutableBytesWritable(Bytes.toBytes(s))}):_*))) .sink[Tuple](Tsv(output)) { outputBuffer => |