aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rwxr-xr-xmapreduce/extraction_cdx_grobid.py8
-rw-r--r--pig/hbase-count-rows.pig15
-rw-r--r--scalding/src/main/scala/sandcrawler/HBaseStatusCountJob.scala2
-rw-r--r--scalding/src/test/scala/sandcrawler/HBaseStatusCountTest.scala2
4 files changed, 25 insertions, 2 deletions
diff --git a/mapreduce/extraction_cdx_grobid.py b/mapreduce/extraction_cdx_grobid.py
index 7771e45..ed82a5e 100755
--- a/mapreduce/extraction_cdx_grobid.py
+++ b/mapreduce/extraction_cdx_grobid.py
@@ -36,6 +36,10 @@ from grobid2json import teixml2json
# Yep, a global. Gets DSN from `SENTRY_DSN` environment variable
sentry_client = raven.Client()
+# Specific poison-pill rows we should skip
+KEY_BLACKLIST = (
+ 'sha1:DLCCSMMVTCCIR6LRXHEQLZ4PWO6NG2YT', # "failed to guess ARC header format"
+)
class MRExtractCdxGrobid(MRJob):
@@ -203,6 +207,10 @@ class MRExtractCdxGrobid(MRJob):
yield _, status
return
key = info['key']
+ if key in KEY_BLACKLIST:
+ self.increment_counter('lines', 'blacklist')
+ yield _, dict(status='blacklist', key=key)
+ return
# Note: this may not get "cleared" correctly
sentry_client.extra_context(dict(row_key=key))
diff --git a/pig/hbase-count-rows.pig b/pig/hbase-count-rows.pig
new file mode 100644
index 0000000..57e83a5
--- /dev/null
+++ b/pig/hbase-count-rows.pig
@@ -0,0 +1,15 @@
+
+REGISTER /usr/lib/hbase/lib/hbase-client-0.98.6-cdh5.3.1.jar
+REGISTER /usr/lib/hbase/lib/hbase-common-0.98.6-cdh5.3.1.jar
+REGISTER /usr/lib/hbase/lib/hbase-hadoop2-compat-0.98.6-cdh5.3.1.jar
+REGISTER /usr/lib/hbase/lib/hbase-protocol-0.98.6-cdh5.3.1.jar
+
+set hbase.zookeeper.quorum 'mtrcs-zk1.us.archive.org,mtrcs-zk2.us.archive.org,mtrcs-zk3.us.archive.org'
+
+data = LOAD 'hbase://wbgrp-journal-extract-0-qa'
+ USING org.apache.pig.backend.hadoop.hbase.HBaseStorage('grobid0:status_code', '-loadKey true')
+ AS (key:CHARARRAY, status:CHARARRAY);
+
+data_group = GROUP data ALL;
+data_count = FOREACH data_group GENERATE COUNT(data);
+DUMP data_count;
diff --git a/scalding/src/main/scala/sandcrawler/HBaseStatusCountJob.scala b/scalding/src/main/scala/sandcrawler/HBaseStatusCountJob.scala
index 27b3177..aabf9f8 100644
--- a/scalding/src/main/scala/sandcrawler/HBaseStatusCountJob.scala
+++ b/scalding/src/main/scala/sandcrawler/HBaseStatusCountJob.scala
@@ -2,4 +2,4 @@ package sandcrawler
import com.twitter.scalding.Args
-class HBaseStatusCountJob(args: Args) extends HBaseCountJob(args, "grobid0:status")
+class HBaseStatusCountJob(args: Args) extends HBaseCountJob(args, "grobid0:status_code")
diff --git a/scalding/src/test/scala/sandcrawler/HBaseStatusCountTest.scala b/scalding/src/test/scala/sandcrawler/HBaseStatusCountTest.scala
index 35a4177..fca9c69 100644
--- a/scalding/src/test/scala/sandcrawler/HBaseStatusCountTest.scala
+++ b/scalding/src/test/scala/sandcrawler/HBaseStatusCountTest.scala
@@ -44,7 +44,7 @@ class HBaseStatusCountTest extends FunSpec with TupleConversions {
.arg("hbase-table", testTable)
.arg("zookeeper-hosts", testHost)
.arg("debug", "true")
- .source[Tuple](HBaseCountJob.getHBaseSource(testTable, testHost, "grobid0:status"),
+ .source[Tuple](HBaseCountJob.getHBaseSource(testTable, testHost, "grobid0:status_code"),
sampleData.map(l => new Tuple(l.map(s => {new ImmutableBytesWritable(Bytes.toBytes(s))}):_*)))
.sink[Tuple](Tsv(output)) {
outputBuffer =>