diff options
Diffstat (limited to 'scalding')
-rw-r--r-- | scalding/src/main/scala/sandcrawler/CdxBackfillJob.scala | 12 | ||||
-rw-r--r-- | scalding/src/test/scala/sandcrawler/CdxBackfillJob.scala | 16 |
2 files changed, 13 insertions, 15 deletions
diff --git a/scalding/src/main/scala/sandcrawler/CdxBackfillJob.scala b/scalding/src/main/scala/sandcrawler/CdxBackfillJob.scala index a0d3e37..5ff667d 100644 --- a/scalding/src/main/scala/sandcrawler/CdxBackfillJob.scala +++ b/scalding/src/main/scala/sandcrawler/CdxBackfillJob.scala @@ -41,10 +41,11 @@ class CdxBackfillJob(args: Args) extends JobBase(args) with HBasePipeConversions import CdxBackfillJob._ val hbaseSource = getHBaseSource(args("hbase-table"), args("zookeeper-hosts")) - // XXX: val hbaseSink = getHBaseSink(args("hbase-table"), args("zookeeper-hosts")) + val hbaseSink = getHBaseSink(args("hbase-table"), args("zookeeper-hosts")) // Parse CDX lines from text file to typed pipe val lines : TypedPipe[String] = TypedPipe.from(TextLine(args("cdx-input-path"))) + val cdxLines : TypedPipe[CdxLine] = lines .filter { isCdxLine } .map { lineToCdxLine } @@ -55,7 +56,9 @@ class CdxBackfillJob(args: Args) extends JobBase(args) with HBasePipeConversions val existingKeys : TypedPipe[String] = hbaseSource .read + .fromBytesWritable( new Fields("key") ) .toTypedPipe[String]('key) + //.debug // filters out all the lines that have an existing SHA1 key in HBase // the groupBy statements are to select key values to join on @@ -63,19 +66,14 @@ class CdxBackfillJob(args: Args) extends JobBase(args) with HBasePipeConversions .groupBy( identity ) .rightJoin(cdxRows.groupBy(_._1)) .toTypedPipe - .debug .collect { case (_, (None, row)) => row } - .debug + //.debug // convert to tuple form and write out into HBase - /* newRows .toPipe('key, 'c, 'cdx, 'mime) .toBytesWritable( new Fields("key", "c", "cdx", "mime") ) .write(hbaseSink) - */ - newRows - .write(TypedTsv[(String, String, String, String)]("dummy-output")) } diff --git a/scalding/src/test/scala/sandcrawler/CdxBackfillJob.scala b/scalding/src/test/scala/sandcrawler/CdxBackfillJob.scala index a8a638d..f0a024a 100644 --- a/scalding/src/test/scala/sandcrawler/CdxBackfillJob.scala +++ b/scalding/src/test/scala/sandcrawler/CdxBackfillJob.scala @@ -76,13 +76,15 @@ class CdxBackfillJobTest extends FunSpec with TupleConversions { ) val sampleCdxLines = List( // clean line - "edu,upenn,ldc)/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf 20170828233154 https://www.ldc.upenn.edu/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf application/pdf 200 WL3FEA62TEU4F52Y5DOVQ62VET4QJW7G - - 210251 931661233 SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828231135742-00000-00009-wbgrp-svc284/SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828232253025-00005-3480~wbgrp-svc284.us.archive.org~8443.warc.gz", + "0" -> "edu,upenn,ldc)/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf 20170828233154 https://www.ldc.upenn.edu/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf application/pdf 200 WL3FEA62TEU4F52Y5DOVQ62VET4QJW7G - - 210251 931661233 SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828231135742-00000-00009-wbgrp-svc284/SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828232253025-00005-3480~wbgrp-svc284.us.archive.org~8443.warc.gz", // has existing SHA1 - "edu,upenn,ldc)/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf 20170828233154 https://www.ldc.upenn.edu/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf application/pdf 200 C3YNNEGH5WAG5ZAAXWAEBNXJWT6CZ3WU - - 210251 931661233 SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828231135742-00000-00009-wbgrp-svc284/SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828232253025-00005-3480~wbgrp-svc284.us.archive.org~8443.warc.gz", + "1" -> "edu,upenn,ldc)/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf 20170828233154 https://www.ldc.upenn.edu/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf application/pdf 200 C3YNNEGH5WAG5ZAAXWAEBNXJWT6CZ3WU - - 210251 931661233 SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828231135742-00000-00009-wbgrp-svc284/SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828232253025-00005-3480~wbgrp-svc284.us.archive.org~8443.warc.gz", // HTTP status code - "edu,upenn,ldc)/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf 20170828233154 https://www.ldc.upenn.edu/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf application/pdf 301 WL3FEA62TEU4F52Y5DOVQ62VET4QJW7G - - 210251 931661233 SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828231135742-00000-00009-wbgrp-svc284/SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828232253025-00005-3480~wbgrp-svc284.us.archive.org~8443.warc.gz", + "2" -> "edu,upenn,ldc)/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf 20170828233154 https://www.ldc.upenn.edu/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf application/pdf 301 WL3FEA62TEU4F52Y5DOVQ62VET4QJW7G - - 210251 931661233 SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828231135742-00000-00009-wbgrp-svc284/SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828232253025-00005-3480~wbgrp-svc284.us.archive.org~8443.warc.gz", // not CDX (prefixed with hash) - "#edu,upenn,ldc)/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf 20170828233154 https://www.ldc.upenn.edu/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf application/pdf 200 WL3FEA62TEU4F52Y5DOVQ62VET4QJW7G - - 210251 931661233 SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828231135742-00000-00009-wbgrp-svc284/SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828232253025-00005-3480~wbgrp-svc284.us.archive.org~8443.warc.gz" + "3" -> "#edu,upenn,ldc)/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf 20170828233154 https://www.ldc.upenn.edu/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf application/pdf 200 WL3FEA62TEU4F52Y5DOVQ62VET4QJW7G - - 210251 931661233 SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828231135742-00000-00009-wbgrp-svc284/SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828232253025-00005-3480~wbgrp-svc284.us.archive.org~8443.warc.gz", + // not PDF + "4" -> "edu,upenn,ldc)/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf 20170828233154 https://www.ldc.upenn.edu/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf application/film 200 AAAAAEA62TEU4F52Y5DOVQ62VET4QJW7G - - 210251 931661233 SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828231135742-00000-00009-wbgrp-svc284/SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828232253025-00005-3480~wbgrp-svc284.us.archive.org~8443.warc.gz" ) JobTest("sandcrawler.CdxBackfillJob") @@ -94,13 +96,11 @@ class CdxBackfillJobTest extends FunSpec with TupleConversions { .arg("debug", "true") .source[Tuple](CdxBackfillJob.getHBaseSource(testTable, testHost), sampleData.map(l => new Tuple(l.map(s => {new ImmutableBytesWritable(s)}):_*))) - .source[String](TextLine(testCdxFile), sampleCdxLines) - //.sink[Tuple](CdxBackfillJob.getHBaseSink(testTable, testHost)) { - .sink[(String, String, String, String)](TypedTsv[(String, String, String, String)]("dummy-output")) { + .source(TextLine(testCdxFile), sampleCdxLines) + .sink[(ImmutableBytesWritable, ImmutableBytesWritable, ImmutableBytesWritable, ImmutableBytesWritable)](CdxBackfillJob.getHBaseSink(testTable, testHost)) { outputBuffer => it("should return a 1-element list (after join).") { - // XXX: assert(outputBuffer.size === 1) } |