aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--scalding/src/main/scala/sandcrawler/CdxBackfillJob.scala12
-rw-r--r--scalding/src/test/scala/sandcrawler/CdxBackfillJob.scala16
2 files changed, 13 insertions, 15 deletions
diff --git a/scalding/src/main/scala/sandcrawler/CdxBackfillJob.scala b/scalding/src/main/scala/sandcrawler/CdxBackfillJob.scala
index a0d3e37..5ff667d 100644
--- a/scalding/src/main/scala/sandcrawler/CdxBackfillJob.scala
+++ b/scalding/src/main/scala/sandcrawler/CdxBackfillJob.scala
@@ -41,10 +41,11 @@ class CdxBackfillJob(args: Args) extends JobBase(args) with HBasePipeConversions
import CdxBackfillJob._
val hbaseSource = getHBaseSource(args("hbase-table"), args("zookeeper-hosts"))
- // XXX: val hbaseSink = getHBaseSink(args("hbase-table"), args("zookeeper-hosts"))
+ val hbaseSink = getHBaseSink(args("hbase-table"), args("zookeeper-hosts"))
// Parse CDX lines from text file to typed pipe
val lines : TypedPipe[String] = TypedPipe.from(TextLine(args("cdx-input-path")))
+
val cdxLines : TypedPipe[CdxLine] = lines
.filter { isCdxLine }
.map { lineToCdxLine }
@@ -55,7 +56,9 @@ class CdxBackfillJob(args: Args) extends JobBase(args) with HBasePipeConversions
val existingKeys : TypedPipe[String] = hbaseSource
.read
+ .fromBytesWritable( new Fields("key") )
.toTypedPipe[String]('key)
+ //.debug
// filters out all the lines that have an existing SHA1 key in HBase
// the groupBy statements are to select key values to join on
@@ -63,19 +66,14 @@ class CdxBackfillJob(args: Args) extends JobBase(args) with HBasePipeConversions
.groupBy( identity )
.rightJoin(cdxRows.groupBy(_._1))
.toTypedPipe
- .debug
.collect { case (_, (None, row)) => row }
- .debug
+ //.debug
// convert to tuple form and write out into HBase
- /*
newRows
.toPipe('key, 'c, 'cdx, 'mime)
.toBytesWritable( new Fields("key", "c", "cdx", "mime") )
.write(hbaseSink)
- */
- newRows
- .write(TypedTsv[(String, String, String, String)]("dummy-output"))
}
diff --git a/scalding/src/test/scala/sandcrawler/CdxBackfillJob.scala b/scalding/src/test/scala/sandcrawler/CdxBackfillJob.scala
index a8a638d..f0a024a 100644
--- a/scalding/src/test/scala/sandcrawler/CdxBackfillJob.scala
+++ b/scalding/src/test/scala/sandcrawler/CdxBackfillJob.scala
@@ -76,13 +76,15 @@ class CdxBackfillJobTest extends FunSpec with TupleConversions {
)
val sampleCdxLines = List(
// clean line
- "edu,upenn,ldc)/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf 20170828233154 https://www.ldc.upenn.edu/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf application/pdf 200 WL3FEA62TEU4F52Y5DOVQ62VET4QJW7G - - 210251 931661233 SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828231135742-00000-00009-wbgrp-svc284/SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828232253025-00005-3480~wbgrp-svc284.us.archive.org~8443.warc.gz",
+ "0" -> "edu,upenn,ldc)/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf 20170828233154 https://www.ldc.upenn.edu/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf application/pdf 200 WL3FEA62TEU4F52Y5DOVQ62VET4QJW7G - - 210251 931661233 SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828231135742-00000-00009-wbgrp-svc284/SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828232253025-00005-3480~wbgrp-svc284.us.archive.org~8443.warc.gz",
// has existing SHA1
- "edu,upenn,ldc)/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf 20170828233154 https://www.ldc.upenn.edu/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf application/pdf 200 C3YNNEGH5WAG5ZAAXWAEBNXJWT6CZ3WU - - 210251 931661233 SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828231135742-00000-00009-wbgrp-svc284/SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828232253025-00005-3480~wbgrp-svc284.us.archive.org~8443.warc.gz",
+ "1" -> "edu,upenn,ldc)/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf 20170828233154 https://www.ldc.upenn.edu/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf application/pdf 200 C3YNNEGH5WAG5ZAAXWAEBNXJWT6CZ3WU - - 210251 931661233 SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828231135742-00000-00009-wbgrp-svc284/SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828232253025-00005-3480~wbgrp-svc284.us.archive.org~8443.warc.gz",
// HTTP status code
- "edu,upenn,ldc)/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf 20170828233154 https://www.ldc.upenn.edu/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf application/pdf 301 WL3FEA62TEU4F52Y5DOVQ62VET4QJW7G - - 210251 931661233 SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828231135742-00000-00009-wbgrp-svc284/SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828232253025-00005-3480~wbgrp-svc284.us.archive.org~8443.warc.gz",
+ "2" -> "edu,upenn,ldc)/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf 20170828233154 https://www.ldc.upenn.edu/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf application/pdf 301 WL3FEA62TEU4F52Y5DOVQ62VET4QJW7G - - 210251 931661233 SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828231135742-00000-00009-wbgrp-svc284/SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828232253025-00005-3480~wbgrp-svc284.us.archive.org~8443.warc.gz",
// not CDX (prefixed with hash)
- "#edu,upenn,ldc)/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf 20170828233154 https://www.ldc.upenn.edu/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf application/pdf 200 WL3FEA62TEU4F52Y5DOVQ62VET4QJW7G - - 210251 931661233 SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828231135742-00000-00009-wbgrp-svc284/SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828232253025-00005-3480~wbgrp-svc284.us.archive.org~8443.warc.gz"
+ "3" -> "#edu,upenn,ldc)/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf 20170828233154 https://www.ldc.upenn.edu/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf application/pdf 200 WL3FEA62TEU4F52Y5DOVQ62VET4QJW7G - - 210251 931661233 SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828231135742-00000-00009-wbgrp-svc284/SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828232253025-00005-3480~wbgrp-svc284.us.archive.org~8443.warc.gz",
+ // not PDF
+ "4" -> "edu,upenn,ldc)/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf 20170828233154 https://www.ldc.upenn.edu/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf application/film 200 AAAAAEA62TEU4F52Y5DOVQ62VET4QJW7G - - 210251 931661233 SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828231135742-00000-00009-wbgrp-svc284/SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828232253025-00005-3480~wbgrp-svc284.us.archive.org~8443.warc.gz"
)
JobTest("sandcrawler.CdxBackfillJob")
@@ -94,13 +96,11 @@ class CdxBackfillJobTest extends FunSpec with TupleConversions {
.arg("debug", "true")
.source[Tuple](CdxBackfillJob.getHBaseSource(testTable, testHost),
sampleData.map(l => new Tuple(l.map(s => {new ImmutableBytesWritable(s)}):_*)))
- .source[String](TextLine(testCdxFile), sampleCdxLines)
- //.sink[Tuple](CdxBackfillJob.getHBaseSink(testTable, testHost)) {
- .sink[(String, String, String, String)](TypedTsv[(String, String, String, String)]("dummy-output")) {
+ .source(TextLine(testCdxFile), sampleCdxLines)
+ .sink[(ImmutableBytesWritable, ImmutableBytesWritable, ImmutableBytesWritable, ImmutableBytesWritable)](CdxBackfillJob.getHBaseSink(testTable, testHost)) {
outputBuffer =>
it("should return a 1-element list (after join).") {
- // XXX:
assert(outputBuffer.size === 1)
}