From 8ae41461f901a8142af06610a4b9f500ce4fe47f Mon Sep 17 00:00:00 2001 From: Bryan Newbold Date: Thu, 19 Jul 2018 13:10:02 -0700 Subject: variant of CdxBackfillJob that writes to TSV Has the same test failure ("java.lang.IndexOutOfBoundsException: Index: 1, Size: 1") --- .../main/scala/sandcrawler/CdxBackfillJob.scala | 173 +++++++++++++++++++++ .../test/scala/sandcrawler/CdxBackfillJob.scala | 113 ++++++++++++++ 2 files changed, 286 insertions(+) create mode 100644 scalding/src/main/scala/sandcrawler/CdxBackfillJob.scala create mode 100644 scalding/src/test/scala/sandcrawler/CdxBackfillJob.scala diff --git a/scalding/src/main/scala/sandcrawler/CdxBackfillJob.scala b/scalding/src/main/scala/sandcrawler/CdxBackfillJob.scala new file mode 100644 index 0000000..a0d3e37 --- /dev/null +++ b/scalding/src/main/scala/sandcrawler/CdxBackfillJob.scala @@ -0,0 +1,173 @@ +package sandcrawler + +import cascading.property.AppProps +import cascading.tuple.Fields +import cascading.pipe.joiner._ +import com.twitter.scalding._ +import com.twitter.scalding.typed.TDsl._ +import java.util.Properties +import cascading.tap.SinkMode +import parallelai.spyglass.base.JobBase +import parallelai.spyglass.hbase.HBaseConstants.SourceMode +import parallelai.spyglass.hbase.{HBaseSource, HBasePipeConversions} + +// Type that represents a raw parsed CDX line +case class CdxLine(surt: String, + datetime: String, + url: String, + mime: String, + http_status: String, + sha1: String, + c_size: String, + offset: String, + warc: String) + + +/** + * CDX backfill: + * 1. parse CDX (all columns) + * 2. filter CDX (pdf, HTTP 200, etc) + * 3. source HBase (key column only) + * 4. left join CDX to HBase + * 5. filter to only those with null HBase key column + * 6. convert CDX fields to HBase columns + * 7. sink results to HBase + * + * TODO: I really mixed the Scalding "field-base" and "type-based" APIs here. + * Should decide on a best practice. + */ +class CdxBackfillJob(args: Args) extends JobBase(args) with HBasePipeConversions { + + import CdxBackfillJob._ + + val hbaseSource = getHBaseSource(args("hbase-table"), args("zookeeper-hosts")) + // XXX: val hbaseSink = getHBaseSink(args("hbase-table"), args("zookeeper-hosts")) + + // Parse CDX lines from text file to typed pipe + val lines : TypedPipe[String] = TypedPipe.from(TextLine(args("cdx-input-path"))) + val cdxLines : TypedPipe[CdxLine] = lines + .filter { isCdxLine } + .map { lineToCdxLine } + .filter { CdxBackfillJob.keepCdx(_) } + + val cdxRows : TypedPipe[(String, String, String, String)] = cdxLines + .map { CdxBackfillJob.cdxLineToRow } + + val existingKeys : TypedPipe[String] = hbaseSource + .read + .toTypedPipe[String]('key) + + // filters out all the lines that have an existing SHA1 key in HBase + // the groupBy statements are to select key values to join on + val newRows : TypedPipe[(String, String, String, String)] = existingKeys + .groupBy( identity ) + .rightJoin(cdxRows.groupBy(_._1)) + .toTypedPipe + .debug + .collect { case (_, (None, row)) => row } + .debug + + // convert to tuple form and write out into HBase + /* + newRows + .toPipe('key, 'c, 'cdx, 'mime) + .toBytesWritable( new Fields("key", "c", "cdx", "mime") ) + .write(hbaseSink) + */ + newRows + .write(TypedTsv[(String, String, String, String)]("dummy-output")) + +} + +object CdxBackfillJob { + + def getHBaseSource(hbase_table: String, zookeeper_hosts: String) : HBaseSource = { + return HBaseBuilder.build( + hbase_table, + zookeeper_hosts, + List("file:size"), // not actually needed + SourceMode.SCAN_ALL) + } + + def getHBaseSink(hbase_table: String, zookeeper_hosts: String) : HBaseSource = { + return HBaseBuilder.buildSink( + hbase_table, + zookeeper_hosts, + List("f:c", "file:cdx", "file:mime"), + SinkMode.UPDATE) + } + + def normalizeMime(raw: String) : String = { + + val NORMAL_MIME = List("application/pdf", + "application/postscript", + "text/html", + "text/xml") + + val lower = raw.toLowerCase() + NORMAL_MIME.foreach(norm => + if (lower.startsWith(norm)) { + return norm + } + ) + + // Common special cases + if (lower.startsWith("application/xml")) { + return "text/xml" + } + if (lower.startsWith("application/x-pdf")) { + return "application/pdf" + } + return lower + + } + + def isCdxLine(line: String) : Boolean = { + // malformated or non-CDX11 lines + !(line.startsWith("#") || line.startsWith(" ") || line.startsWith("filedesc") || + line.split(" ").size != 11) + } + + def keepCdx(line: CdxLine) : Boolean = { + // TODO: sha1.isalnum() and c_size.isdigit() and offset.isdigit() and dt.isdigit() + if (line.http_status != "200" || line.sha1.size != 32) { + return false + } + // TODO: '-' in (line.surt, line.datetime, line.url, line.mime, line.c_size, line.offset, line.warc) + return true + } + + // Returns (key, f:c, file:cdx, file:mime), all as strings, which is close to + // how they will be inserted into HBase + def cdxLineToRow(line: CdxLine) : (String, String, String, String) = { + + val key = "sha1:" + line.sha1 + + val warcFile = line.warc.split('/')(1) + + // Read CDX-style datetime and conver to ISO 8601 with second resolution + val dtFormat = new java.text.SimpleDateFormat("yyyyMMddHHmmss") + val isoFormat = new java.text.SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss'Z'") + // TODO: timezones? UTC to UTC, so I don't think so. + val dtIso = isoFormat.format(dtFormat.parse(line.datetime)) + + // warc_file = warc.split('/')[-1] + // dt_iso = datetime.strptime(dt, "%Y%m%d%H%M%S").isoformat() + // f:c = dict(u=url, d=dt_iso, f=warc_file, o=int(offset), c=1) + + // This is the "f:c" field. 'i' intentionally not set + val heritrixInfo = "" + + // file:cdx = dict(surt=surt, dt=dt, url=url, c_size=int(c_size), + // offset=int(offset), warc=warc) + val fileCdx = "" + (key, heritrixInfo, fileCdx, line.mime) + } + + def lineToCdxLine(line: String) : CdxLine = { + val raw = line.split("\\s+") + // surt, datetime, url, mime, http_status, sha1, SKIP, SKIP, c_size, offset, warc + CdxLine(raw(0), raw(1), raw(2), raw(3), raw(4), raw(5), raw(8), raw(9), raw(10)) + } + +} diff --git a/scalding/src/test/scala/sandcrawler/CdxBackfillJob.scala b/scalding/src/test/scala/sandcrawler/CdxBackfillJob.scala new file mode 100644 index 0000000..a8a638d --- /dev/null +++ b/scalding/src/test/scala/sandcrawler/CdxBackfillJob.scala @@ -0,0 +1,113 @@ + +package sandcrawler + +import org.scalatest._ +import cascading.tuple.{Tuple, Fields} +import com.twitter.scalding.{JobTest, Tsv, TypedTsv, TupleConversions, TextLine} +import org.apache.hadoop.hbase.io.ImmutableBytesWritable +import org.apache.hadoop.hbase.util.Bytes +import org.junit.runner.RunWith +import org.scalatest.FunSpec +import org.scalatest.junit.JUnitRunner +import org.slf4j.LoggerFactory +import parallelai.spyglass.hbase.HBaseSource +import parallelai.spyglass.hbase.HBaseConstants.SourceMode + +class CdxBackfillTest extends FlatSpec with Matchers { + + import CdxBackfillJob._ + + it should "normalize mimetypes" in { + assert(CdxBackfillJob.normalizeMime("asdf") === "asdf") + assert(CdxBackfillJob.normalizeMime("application/pdf") === "application/pdf") + assert(CdxBackfillJob.normalizeMime("application/pdf+journal") === "application/pdf") + assert(CdxBackfillJob.normalizeMime("Application/PDF") === "application/pdf") + assert(CdxBackfillJob.normalizeMime("application/p") === "application/p") + assert(CdxBackfillJob.normalizeMime("application/xml+stuff") === "text/xml") + assert(CdxBackfillJob.normalizeMime("application/x-pdf") === "application/pdf") + assert(CdxBackfillJob.normalizeMime("application/x-html") === "application/x-html") + } + + it should "filter CDX lines" in { + assert(true === keepCdx(lineToCdxLine( + "edu,upenn,ldc)/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf 20170828233154 https://www.ldc.upenn.edu/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf application/pdf 200 WL3FEA62TEU4F52Y5DOVQ62VET4QJW7G - - 210251 931661233 SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828231135742-00000-00009-wbgrp-svc284/SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828232253025-00005-3480~wbgrp-svc284.us.archive.org~8443.warc.gz"))) + // redirect + assert(false === keepCdx(lineToCdxLine( + "edu,upenn,ldc)/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf 20170828233154 https://www.ldc.upenn.edu/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf application/pdf 301 WL3FEA62TEU4F52Y5DOVQ62VET4QJW7G - - 210251 931661233 SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828231135742-00000-00009-wbgrp-svc284/SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828232253025-00005-3480~wbgrp-svc284.us.archive.org~8443.warc.gz"))) + } + + it should "know what CDX lines are" in { + assert(true === isCdxLine( + "edu,upenn,ldc)/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf 20170828233154 https://www.ldc.upenn.edu/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf application/pdf 200 WL3FEA62TEU4F52Y5DOVQ62VET4QJW7G - - 210251 931661233 SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828231135742-00000-00009-wbgrp-svc284/SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828232253025-00005-3480~wbgrp-svc284.us.archive.org~8443.warc.gz")) + assert(false === isCdxLine("")) + assert(false === isCdxLine( + " edu,upenn,ldc)/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf 20170828233154 https://www.ldc.upenn.edu/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf application/pdf 200 WL3FEA62TEU4F52Y5DOVQ62VET4QJW7G - - 210251 931661233 SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828231135742-00000-00009-wbgrp-svc284/SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828232253025-00005-3480~wbgrp-svc284.us.archive.org~8443.warc.gz")) + assert(false === isCdxLine( + "#edu,upenn,ldc)/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf 20170828233154 https://www.ldc.upenn.edu/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf application/pdf 200 WL3FEA62TEU4F52Y5DOVQ62VET4QJW7G - - 210251 931661233 SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828231135742-00000-00009-wbgrp-svc284/SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828232253025-00005-3480~wbgrp-svc284.us.archive.org~8443.warc.gz")) + // missing two fields + assert(false === isCdxLine( + "edu,upenn,ldc)/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf 20170828233154 https://www.ldc.upenn.edu/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf application/pdf 200 WL3FEA62TEU4F52Y5DOVQ62VET4QJW7G 210251 931661233 SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828231135742-00000-00009-wbgrp-svc284/SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828232253025-00005-3480~wbgrp-svc284.us.archive.org~8443.warc.gz")) + // extra field + assert(false === isCdxLine( + "edu,upenn,ldc)/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf 20170828233154 https://www.ldc.upenn.edu/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf application/pdf 200 WL3FEA62TEU4F52Y5DOVQ62VET4QJW7G - - 210251 931661233 SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828231135742-00000-00009-wbgrp-svc284/SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828232253025-00005-3480~wbgrp-svc284.us.archive.org~8443.warc.gz -")) + } + + it should "execute lineToRow" in { + cdxLineToRow(lineToCdxLine( + "edu,upenn,ldc)/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf 20170828233154 https://www.ldc.upenn.edu/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf application/pdf 200 WL3FEA62TEU4F52Y5DOVQ62VET4QJW7G - - 210251 931661233 SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828231135742-00000-00009-wbgrp-svc284/SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828232253025-00005-3480~wbgrp-svc284.us.archive.org~8443.warc.gz")) + } + +} + +@RunWith(classOf[JUnitRunner]) +class CdxBackfillJobTest extends FunSpec with TupleConversions { + + val (testTable, testHost, testCdxFile) = ("test-table", "dummy-host:2181", "test_file.cdx") + + val log = LoggerFactory.getLogger(this.getClass.getName) + + val dummySizeBytes = Bytes.toBytes(100) + + val sampleData = List( + List(Bytes.toBytes("sha1:K2DKSSVTXWPRMFDTWSTCQW3RVWRIOV3Q"), dummySizeBytes), + List(Bytes.toBytes("sha1:C3YNNEGH5WAG5ZAAXWAEBNXJWT6CZ3WU"), dummySizeBytes), + List(Bytes.toBytes("sha1:SDKUVHC3YNNEGH5WAG5ZAAXWAEBNX4WT"), dummySizeBytes), + List(Bytes.toBytes("sha1:095893C3YNNEGH5WAG5ZAAXWAEBNXJWT"), dummySizeBytes) + ) + val sampleCdxLines = List( + // clean line + "edu,upenn,ldc)/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf 20170828233154 https://www.ldc.upenn.edu/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf application/pdf 200 WL3FEA62TEU4F52Y5DOVQ62VET4QJW7G - - 210251 931661233 SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828231135742-00000-00009-wbgrp-svc284/SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828232253025-00005-3480~wbgrp-svc284.us.archive.org~8443.warc.gz", + // has existing SHA1 + "edu,upenn,ldc)/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf 20170828233154 https://www.ldc.upenn.edu/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf application/pdf 200 C3YNNEGH5WAG5ZAAXWAEBNXJWT6CZ3WU - - 210251 931661233 SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828231135742-00000-00009-wbgrp-svc284/SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828232253025-00005-3480~wbgrp-svc284.us.archive.org~8443.warc.gz", + // HTTP status code + "edu,upenn,ldc)/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf 20170828233154 https://www.ldc.upenn.edu/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf application/pdf 301 WL3FEA62TEU4F52Y5DOVQ62VET4QJW7G - - 210251 931661233 SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828231135742-00000-00009-wbgrp-svc284/SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828232253025-00005-3480~wbgrp-svc284.us.archive.org~8443.warc.gz", + // not CDX (prefixed with hash) + "#edu,upenn,ldc)/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf 20170828233154 https://www.ldc.upenn.edu/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf application/pdf 200 WL3FEA62TEU4F52Y5DOVQ62VET4QJW7G - - 210251 931661233 SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828231135742-00000-00009-wbgrp-svc284/SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828232253025-00005-3480~wbgrp-svc284.us.archive.org~8443.warc.gz" + ) + + JobTest("sandcrawler.CdxBackfillJob") + .arg("test", "") + .arg("app.conf.path", "app.conf") + .arg("hbase-table", testTable) + .arg("zookeeper-hosts", testHost) + .arg("cdx-input-path", testCdxFile) + .arg("debug", "true") + .source[Tuple](CdxBackfillJob.getHBaseSource(testTable, testHost), + sampleData.map(l => new Tuple(l.map(s => {new ImmutableBytesWritable(s)}):_*))) + .source[String](TextLine(testCdxFile), sampleCdxLines) + //.sink[Tuple](CdxBackfillJob.getHBaseSink(testTable, testHost)) { + .sink[(String, String, String, String)](TypedTsv[(String, String, String, String)]("dummy-output")) { + outputBuffer => + + it("should return a 1-element list (after join).") { + // XXX: + assert(outputBuffer.size === 1) + } + + it("should insert the valid, new CDX line") { + assert(outputBuffer(0)._1 == Bytes.toBytes("sha1:WL3FEA62TEU4F52Y5DOVQ62VET4QJW7G")) + } + } + .run + .finish +} -- cgit v1.2.3 From 5883c665b9c6d1d27fe4b0cfd98098f671a5f5dd Mon Sep 17 00:00:00 2001 From: Bryan Newbold Date: Thu, 19 Jul 2018 15:23:29 -0700 Subject: CdxBackfillJob back to HBase; tests work --- scalding/src/main/scala/sandcrawler/CdxBackfillJob.scala | 12 +++++------- scalding/src/test/scala/sandcrawler/CdxBackfillJob.scala | 16 ++++++++-------- 2 files changed, 13 insertions(+), 15 deletions(-) diff --git a/scalding/src/main/scala/sandcrawler/CdxBackfillJob.scala b/scalding/src/main/scala/sandcrawler/CdxBackfillJob.scala index a0d3e37..5ff667d 100644 --- a/scalding/src/main/scala/sandcrawler/CdxBackfillJob.scala +++ b/scalding/src/main/scala/sandcrawler/CdxBackfillJob.scala @@ -41,10 +41,11 @@ class CdxBackfillJob(args: Args) extends JobBase(args) with HBasePipeConversions import CdxBackfillJob._ val hbaseSource = getHBaseSource(args("hbase-table"), args("zookeeper-hosts")) - // XXX: val hbaseSink = getHBaseSink(args("hbase-table"), args("zookeeper-hosts")) + val hbaseSink = getHBaseSink(args("hbase-table"), args("zookeeper-hosts")) // Parse CDX lines from text file to typed pipe val lines : TypedPipe[String] = TypedPipe.from(TextLine(args("cdx-input-path"))) + val cdxLines : TypedPipe[CdxLine] = lines .filter { isCdxLine } .map { lineToCdxLine } @@ -55,7 +56,9 @@ class CdxBackfillJob(args: Args) extends JobBase(args) with HBasePipeConversions val existingKeys : TypedPipe[String] = hbaseSource .read + .fromBytesWritable( new Fields("key") ) .toTypedPipe[String]('key) + //.debug // filters out all the lines that have an existing SHA1 key in HBase // the groupBy statements are to select key values to join on @@ -63,19 +66,14 @@ class CdxBackfillJob(args: Args) extends JobBase(args) with HBasePipeConversions .groupBy( identity ) .rightJoin(cdxRows.groupBy(_._1)) .toTypedPipe - .debug .collect { case (_, (None, row)) => row } - .debug + //.debug // convert to tuple form and write out into HBase - /* newRows .toPipe('key, 'c, 'cdx, 'mime) .toBytesWritable( new Fields("key", "c", "cdx", "mime") ) .write(hbaseSink) - */ - newRows - .write(TypedTsv[(String, String, String, String)]("dummy-output")) } diff --git a/scalding/src/test/scala/sandcrawler/CdxBackfillJob.scala b/scalding/src/test/scala/sandcrawler/CdxBackfillJob.scala index a8a638d..f0a024a 100644 --- a/scalding/src/test/scala/sandcrawler/CdxBackfillJob.scala +++ b/scalding/src/test/scala/sandcrawler/CdxBackfillJob.scala @@ -76,13 +76,15 @@ class CdxBackfillJobTest extends FunSpec with TupleConversions { ) val sampleCdxLines = List( // clean line - "edu,upenn,ldc)/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf 20170828233154 https://www.ldc.upenn.edu/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf application/pdf 200 WL3FEA62TEU4F52Y5DOVQ62VET4QJW7G - - 210251 931661233 SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828231135742-00000-00009-wbgrp-svc284/SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828232253025-00005-3480~wbgrp-svc284.us.archive.org~8443.warc.gz", + "0" -> "edu,upenn,ldc)/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf 20170828233154 https://www.ldc.upenn.edu/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf application/pdf 200 WL3FEA62TEU4F52Y5DOVQ62VET4QJW7G - - 210251 931661233 SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828231135742-00000-00009-wbgrp-svc284/SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828232253025-00005-3480~wbgrp-svc284.us.archive.org~8443.warc.gz", // has existing SHA1 - "edu,upenn,ldc)/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf 20170828233154 https://www.ldc.upenn.edu/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf application/pdf 200 C3YNNEGH5WAG5ZAAXWAEBNXJWT6CZ3WU - - 210251 931661233 SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828231135742-00000-00009-wbgrp-svc284/SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828232253025-00005-3480~wbgrp-svc284.us.archive.org~8443.warc.gz", + "1" -> "edu,upenn,ldc)/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf 20170828233154 https://www.ldc.upenn.edu/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf application/pdf 200 C3YNNEGH5WAG5ZAAXWAEBNXJWT6CZ3WU - - 210251 931661233 SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828231135742-00000-00009-wbgrp-svc284/SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828232253025-00005-3480~wbgrp-svc284.us.archive.org~8443.warc.gz", // HTTP status code - "edu,upenn,ldc)/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf 20170828233154 https://www.ldc.upenn.edu/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf application/pdf 301 WL3FEA62TEU4F52Y5DOVQ62VET4QJW7G - - 210251 931661233 SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828231135742-00000-00009-wbgrp-svc284/SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828232253025-00005-3480~wbgrp-svc284.us.archive.org~8443.warc.gz", + "2" -> "edu,upenn,ldc)/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf 20170828233154 https://www.ldc.upenn.edu/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf application/pdf 301 WL3FEA62TEU4F52Y5DOVQ62VET4QJW7G - - 210251 931661233 SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828231135742-00000-00009-wbgrp-svc284/SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828232253025-00005-3480~wbgrp-svc284.us.archive.org~8443.warc.gz", // not CDX (prefixed with hash) - "#edu,upenn,ldc)/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf 20170828233154 https://www.ldc.upenn.edu/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf application/pdf 200 WL3FEA62TEU4F52Y5DOVQ62VET4QJW7G - - 210251 931661233 SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828231135742-00000-00009-wbgrp-svc284/SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828232253025-00005-3480~wbgrp-svc284.us.archive.org~8443.warc.gz" + "3" -> "#edu,upenn,ldc)/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf 20170828233154 https://www.ldc.upenn.edu/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf application/pdf 200 WL3FEA62TEU4F52Y5DOVQ62VET4QJW7G - - 210251 931661233 SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828231135742-00000-00009-wbgrp-svc284/SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828232253025-00005-3480~wbgrp-svc284.us.archive.org~8443.warc.gz", + // not PDF + "4" -> "edu,upenn,ldc)/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf 20170828233154 https://www.ldc.upenn.edu/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf application/film 200 AAAAAEA62TEU4F52Y5DOVQ62VET4QJW7G - - 210251 931661233 SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828231135742-00000-00009-wbgrp-svc284/SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828232253025-00005-3480~wbgrp-svc284.us.archive.org~8443.warc.gz" ) JobTest("sandcrawler.CdxBackfillJob") @@ -94,13 +96,11 @@ class CdxBackfillJobTest extends FunSpec with TupleConversions { .arg("debug", "true") .source[Tuple](CdxBackfillJob.getHBaseSource(testTable, testHost), sampleData.map(l => new Tuple(l.map(s => {new ImmutableBytesWritable(s)}):_*))) - .source[String](TextLine(testCdxFile), sampleCdxLines) - //.sink[Tuple](CdxBackfillJob.getHBaseSink(testTable, testHost)) { - .sink[(String, String, String, String)](TypedTsv[(String, String, String, String)]("dummy-output")) { + .source(TextLine(testCdxFile), sampleCdxLines) + .sink[(ImmutableBytesWritable, ImmutableBytesWritable, ImmutableBytesWritable, ImmutableBytesWritable)](CdxBackfillJob.getHBaseSink(testTable, testHost)) { outputBuffer => it("should return a 1-element list (after join).") { - // XXX: assert(outputBuffer.size === 1) } -- cgit v1.2.3 From c36e59bf03e692d22d6d72aa5ae37977e3a13524 Mon Sep 17 00:00:00 2001 From: Bryan Newbold Date: Thu, 19 Jul 2018 16:26:00 -0700 Subject: CdxBackfillJob: implement other fields --- .../main/scala/sandcrawler/CdxBackfillJob.scala | 34 +++++++---- .../test/scala/sandcrawler/CdxBackfillJob.scala | 69 +++++++++++++++++++--- 2 files changed, 84 insertions(+), 19 deletions(-) diff --git a/scalding/src/main/scala/sandcrawler/CdxBackfillJob.scala b/scalding/src/main/scala/sandcrawler/CdxBackfillJob.scala index 5ff667d..0af3c9c 100644 --- a/scalding/src/main/scala/sandcrawler/CdxBackfillJob.scala +++ b/scalding/src/main/scala/sandcrawler/CdxBackfillJob.scala @@ -10,6 +10,7 @@ import cascading.tap.SinkMode import parallelai.spyglass.base.JobBase import parallelai.spyglass.hbase.HBaseConstants.SourceMode import parallelai.spyglass.hbase.{HBaseSource, HBasePipeConversions} +import scala.util.parsing.json.JSONObject // Type that represents a raw parsed CDX line case class CdxLine(surt: String, @@ -53,6 +54,7 @@ class CdxBackfillJob(args: Args) extends JobBase(args) with HBasePipeConversions val cdxRows : TypedPipe[(String, String, String, String)] = cdxLines .map { CdxBackfillJob.cdxLineToRow } + .debug val existingKeys : TypedPipe[String] = hbaseSource .read @@ -67,7 +69,7 @@ class CdxBackfillJob(args: Args) extends JobBase(args) with HBasePipeConversions .rightJoin(cdxRows.groupBy(_._1)) .toTypedPipe .collect { case (_, (None, row)) => row } - //.debug + .debug // convert to tuple form and write out into HBase newRows @@ -149,17 +151,29 @@ object CdxBackfillJob { // TODO: timezones? UTC to UTC, so I don't think so. val dtIso = isoFormat.format(dtFormat.parse(line.datetime)) - // warc_file = warc.split('/')[-1] - // dt_iso = datetime.strptime(dt, "%Y%m%d%H%M%S").isoformat() - // f:c = dict(u=url, d=dt_iso, f=warc_file, o=int(offset), c=1) - // This is the "f:c" field. 'i' intentionally not set - val heritrixInfo = "" - - // file:cdx = dict(surt=surt, dt=dt, url=url, c_size=int(c_size), + // python: f:c = dict(u=url, d=dt_iso, f=warc_file, o=int(offset), c=1) + // python: warc_file = warc.split('/')[-1] + // python: dt_iso = datetime.strptime(dt, "%Y%m%d%H%M%S").isoformat() + val heritrixInfo = JSONObject(Map( + "u" -> line.url, + "d" -> dtIso, + "f" -> warcFile, + "o" -> line.offset.toInt, + "c" -> line.c_size.toInt + )) + + // python: dict(surt=surt, dt=dt, url=url, c_size=int(c_size), // offset=int(offset), warc=warc) - val fileCdx = "" - (key, heritrixInfo, fileCdx, line.mime) + val fileCdx = JSONObject(Map( + "surt" -> line.surt, + "dt" -> line.datetime, + "url" -> line.url, + "c_size" -> line.c_size.toInt, + "offset" -> line.offset.toInt, + "warc" -> line.warc + )) + (key, heritrixInfo.toString(), fileCdx.toString(), line.mime) } def lineToCdxLine(line: String) : CdxLine = { diff --git a/scalding/src/test/scala/sandcrawler/CdxBackfillJob.scala b/scalding/src/test/scala/sandcrawler/CdxBackfillJob.scala index f0a024a..fb5b162 100644 --- a/scalding/src/test/scala/sandcrawler/CdxBackfillJob.scala +++ b/scalding/src/test/scala/sandcrawler/CdxBackfillJob.scala @@ -12,6 +12,7 @@ import org.scalatest.junit.JUnitRunner import org.slf4j.LoggerFactory import parallelai.spyglass.hbase.HBaseSource import parallelai.spyglass.hbase.HBaseConstants.SourceMode +import scala.util.parsing.json.JSON class CdxBackfillTest extends FlatSpec with Matchers { @@ -30,7 +31,7 @@ class CdxBackfillTest extends FlatSpec with Matchers { it should "filter CDX lines" in { assert(true === keepCdx(lineToCdxLine( - "edu,upenn,ldc)/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf 20170828233154 https://www.ldc.upenn.edu/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf application/pdf 200 WL3FEA62TEU4F52Y5DOVQ62VET4QJW7G - - 210251 931661233 SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828231135742-00000-00009-wbgrp-svc284/SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828232253025-00005-3480~wbgrp-svc284.us.archive.org~8443.warc.gz"))) + """edu,upenn,ldc)/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf 20170828233154 https://www.ldc.upenn.edu/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf application/pdf 200 WL3FEA62TEU4F52Y5DOVQ62VET4QJW7G - - 210251 931661233 SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828231135742-00000-00009-wbgrp-svc284/SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828232253025-00005-3480~wbgrp-svc284.us.archive.org~8443.warc.gz"""))) // redirect assert(false === keepCdx(lineToCdxLine( "edu,upenn,ldc)/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf 20170828233154 https://www.ldc.upenn.edu/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf application/pdf 301 WL3FEA62TEU4F52Y5DOVQ62VET4QJW7G - - 210251 931661233 SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828231135742-00000-00009-wbgrp-svc284/SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828232253025-00005-3480~wbgrp-svc284.us.archive.org~8443.warc.gz"))) @@ -53,8 +54,33 @@ class CdxBackfillTest extends FlatSpec with Matchers { } it should "execute lineToRow" in { - cdxLineToRow(lineToCdxLine( - "edu,upenn,ldc)/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf 20170828233154 https://www.ldc.upenn.edu/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf application/pdf 200 WL3FEA62TEU4F52Y5DOVQ62VET4QJW7G - - 210251 931661233 SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828231135742-00000-00009-wbgrp-svc284/SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828232253025-00005-3480~wbgrp-svc284.us.archive.org~8443.warc.gz")) + // this particular test copied from python test_backfill_hbase_from_cdx.py + val row = cdxLineToRow(lineToCdxLine( + "eu,eui,cadmus)/bitstream/handle/1814/36635/rscas_2015_03.pdf;jsessionid=761393014319a39f40d32ae3eb3a853f?sequence=1 20170705062202 http://cadmus.eui.eu/bitstream/handle/1814/36635/RSCAS_2015_03.pdf%3Bjsessionid%3D761393014319A39F40D32AE3EB3A853F?sequence%3D1 application/PDF 200 MPCXVWMUTRUGFP36SLPHKDLY6NGU4S3J - - 854156 328850624 CITESEERX-CRAWL-2017-06-20-20170705061647307-00039-00048-wbgrp-svc284/CITESEERX-CRAWL-2017-06-20-20170705062052659-00043-31209~wbgrp-svc284.us.archive.org~8443.warc.gz")) + + assert(row._1 == "sha1:MPCXVWMUTRUGFP36SLPHKDLY6NGU4S3J") + JSON.parseFull(row._2) match { + case Some(obj: Map[String, Any]) => { + assert(obj("u") == "http://cadmus.eui.eu/bitstream/handle/1814/36635/RSCAS_2015_03.pdf%3Bjsessionid%3D761393014319A39F40D32AE3EB3A853F?sequence%3D1") + assert(obj("f") == "CITESEERX-CRAWL-2017-06-20-20170705062052659-00043-31209~wbgrp-svc284.us.archive.org~8443.warc.gz") + assert(obj("c") == 854156) + assert(obj("o") == 328850624) + assert(obj("d") == "2017-08-28T23:31:54Z") + } + case other => assert(false) + } + JSON.parseFull(row._3) match { + case Some(obj: Map[String, Any]) => { + assert(obj("surt") == "eu,eui,cadmus)/bitstream/handle/1814/36635/rscas_2015_03.pdf;jsessionid=761393014319a39f40d32ae3eb3a853f?sequence=1") + assert(obj("dt") == "20170705062202") + assert(obj("url") == "http://cadmus.eui.eu/bitstream/handle/1814/36635/RSCAS_2015_03.pdf%3Bjsessionid%3D761393014319A39F40D32AE3EB3A853F?sequence%3D1") + assert(obj("c_size") == 854156) + assert(obj("offset") == 328850624) + assert(obj("warc") == "CITESEERX-CRAWL-2017-06-20-20170705061647307-00039-00048-wbgrp-svc284/CITESEERX-CRAWL-2017-06-20-20170705062052659-00043-31209~wbgrp-svc284.us.archive.org~8443.warc.gz") + } + case other => assert(false) + } + assert(row._3 == "application/pdf") } } @@ -76,15 +102,15 @@ class CdxBackfillJobTest extends FunSpec with TupleConversions { ) val sampleCdxLines = List( // clean line - "0" -> "edu,upenn,ldc)/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf 20170828233154 https://www.ldc.upenn.edu/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf application/pdf 200 WL3FEA62TEU4F52Y5DOVQ62VET4QJW7G - - 210251 931661233 SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828231135742-00000-00009-wbgrp-svc284/SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828232253025-00005-3480~wbgrp-svc284.us.archive.org~8443.warc.gz", + "0" -> """edu,upenn,ldc)/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf 20170828233154 https://www.ldc.upenn.edu/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf application/pdf 200 WL3FEA62TEU4F52Y5DOVQ62VET4QJW7G - - 210251 931661233 SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828231135742-00000-00009-wbgrp-svc284/SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828232253025-00005-3480~wbgrp-svc284.us.archive.org~8443.warc.gz""", // has existing SHA1 - "1" -> "edu,upenn,ldc)/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf 20170828233154 https://www.ldc.upenn.edu/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf application/pdf 200 C3YNNEGH5WAG5ZAAXWAEBNXJWT6CZ3WU - - 210251 931661233 SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828231135742-00000-00009-wbgrp-svc284/SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828232253025-00005-3480~wbgrp-svc284.us.archive.org~8443.warc.gz", + "1" -> """edu,upenn,ldc)/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf 20170828233154 https://www.ldc.upenn.edu/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf application/pdf 200 C3YNNEGH5WAG5ZAAXWAEBNXJWT6CZ3WU - - 210251 931661233 SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828231135742-00000-00009-wbgrp-svc284/SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828232253025-00005-3480~wbgrp-svc284.us.archive.org~8443.warc.gz""", // HTTP status code - "2" -> "edu,upenn,ldc)/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf 20170828233154 https://www.ldc.upenn.edu/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf application/pdf 301 WL3FEA62TEU4F52Y5DOVQ62VET4QJW7G - - 210251 931661233 SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828231135742-00000-00009-wbgrp-svc284/SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828232253025-00005-3480~wbgrp-svc284.us.archive.org~8443.warc.gz", + "2" -> """edu,upenn,ldc)/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf 20170828233154 https://www.ldc.upenn.edu/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf application/pdf 301 WL3FEA62TEU4F52Y5DOVQ62VET4QJW7G - - 210251 931661233 SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828231135742-00000-00009-wbgrp-svc284/SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828232253025-00005-3480~wbgrp-svc284.us.archive.org~8443.warc.gz""", // not CDX (prefixed with hash) - "3" -> "#edu,upenn,ldc)/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf 20170828233154 https://www.ldc.upenn.edu/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf application/pdf 200 WL3FEA62TEU4F52Y5DOVQ62VET4QJW7G - - 210251 931661233 SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828231135742-00000-00009-wbgrp-svc284/SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828232253025-00005-3480~wbgrp-svc284.us.archive.org~8443.warc.gz", + "3" -> """#edu,upenn,ldc)/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf 20170828233154 https://www.ldc.upenn.edu/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf application/pdf 200 WL3FEA62TEU4F52Y5DOVQ62VET4QJW7G - - 210251 931661233 SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828231135742-00000-00009-wbgrp-svc284/SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828232253025-00005-3480~wbgrp-svc284.us.archive.org~8443.warc.gz""", // not PDF - "4" -> "edu,upenn,ldc)/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf 20170828233154 https://www.ldc.upenn.edu/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf application/film 200 AAAAAEA62TEU4F52Y5DOVQ62VET4QJW7G - - 210251 931661233 SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828231135742-00000-00009-wbgrp-svc284/SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828232253025-00005-3480~wbgrp-svc284.us.archive.org~8443.warc.gz" + "4" -> """edu,upenn,ldc)/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf 20170828233154 https://www.ldc.upenn.edu/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf application/film 200 AAAAAEA62TEU4F52Y5DOVQ62VET4QJW7G - - 210251 931661233 SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828231135742-00000-00009-wbgrp-svc284/SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828232253025-00005-3480~wbgrp-svc284.us.archive.org~8443.warc.gz""" ) JobTest("sandcrawler.CdxBackfillJob") @@ -100,12 +126,37 @@ class CdxBackfillJobTest extends FunSpec with TupleConversions { .sink[(ImmutableBytesWritable, ImmutableBytesWritable, ImmutableBytesWritable, ImmutableBytesWritable)](CdxBackfillJob.getHBaseSink(testTable, testHost)) { outputBuffer => + val buf0 = outputBuffer(0) + val row0 = List(buf0._1, buf0._2, buf0._3, buf0._4).map(b => Bytes.toString(b.copyBytes())) + it("should return a 1-element list (after join).") { assert(outputBuffer.size === 1) } it("should insert the valid, new CDX line") { - assert(outputBuffer(0)._1 == Bytes.toBytes("sha1:WL3FEA62TEU4F52Y5DOVQ62VET4QJW7G")) + assert(row0(0) == "sha1:WL3FEA62TEU4F52Y5DOVQ62VET4QJW7G") + JSON.parseFull(row0(1)) match { + case Some(obj: Map[String, Any]) => { + assert(obj("u") == "https://www.ldc.upenn.edu/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf") + assert(obj("f") == "SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828232253025-00005-3480~wbgrp-svc284.us.archive.org~8443.warc.gz") + assert(obj("c") == 210251) + assert(obj("o") == 931661233) + assert(obj("d") == "2017-08-28T23:31:54Z") + } + case other => assert(false) + } + JSON.parseFull(row0(2)) match { + case Some(obj: Map[String, Any]) => { + assert(obj("surt") == "edu,upenn,ldc)/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf") + assert(obj("dt") == "20170828233154") + assert(obj("url") == "https://www.ldc.upenn.edu/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf") + assert(obj("c_size") == 210251) + assert(obj("offset") == 931661233) + assert(obj("warc") == "SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828231135742-00000-00009-wbgrp-svc284/SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828232253025-00005-3480~wbgrp-svc284.us.archive.org~8443.warc.gz") + } + case other => assert(false) + } + assert(row0(3) == "application/pdf") } } .run -- cgit v1.2.3 From f6c88b66cea8919fe8a0a438e60841ad682aa71d Mon Sep 17 00:00:00 2001 From: Bryan Newbold Date: Thu, 19 Jul 2018 16:40:16 -0700 Subject: some scalastyle on CdxBackfillJob --- scalding/src/main/scala/sandcrawler/CdxBackfillJob.scala | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/scalding/src/main/scala/sandcrawler/CdxBackfillJob.scala b/scalding/src/main/scala/sandcrawler/CdxBackfillJob.scala index 0af3c9c..4a2eaba 100644 --- a/scalding/src/main/scala/sandcrawler/CdxBackfillJob.scala +++ b/scalding/src/main/scala/sandcrawler/CdxBackfillJob.scala @@ -1,15 +1,17 @@ package sandcrawler +import java.util.Properties + import cascading.property.AppProps +import cascading.tap.SinkMode import cascading.tuple.Fields import cascading.pipe.joiner._ import com.twitter.scalding._ import com.twitter.scalding.typed.TDsl._ -import java.util.Properties -import cascading.tap.SinkMode import parallelai.spyglass.base.JobBase import parallelai.spyglass.hbase.HBaseConstants.SourceMode -import parallelai.spyglass.hbase.{HBaseSource, HBasePipeConversions} +import parallelai.spyglass.hbase.HBasePipeConversions +import parallelai.spyglass.hbase.HBaseSource import scala.util.parsing.json.JSONObject // Type that represents a raw parsed CDX line @@ -82,7 +84,7 @@ class CdxBackfillJob(args: Args) extends JobBase(args) with HBasePipeConversions object CdxBackfillJob { def getHBaseSource(hbase_table: String, zookeeper_hosts: String) : HBaseSource = { - return HBaseBuilder.build( + HBaseBuilder.build( hbase_table, zookeeper_hosts, List("file:size"), // not actually needed @@ -90,7 +92,7 @@ object CdxBackfillJob { } def getHBaseSink(hbase_table: String, zookeeper_hosts: String) : HBaseSource = { - return HBaseBuilder.buildSink( + HBaseBuilder.buildSink( hbase_table, zookeeper_hosts, List("f:c", "file:cdx", "file:mime"), @@ -118,8 +120,7 @@ object CdxBackfillJob { if (lower.startsWith("application/x-pdf")) { return "application/pdf" } - return lower - + lower } def isCdxLine(line: String) : Boolean = { -- cgit v1.2.3 From 92650c4663bba7d8d9914e2bc120a4b923a7a94b Mon Sep 17 00:00:00 2001 From: Bryan Newbold Date: Thu, 19 Jul 2018 16:56:51 -0700 Subject: fix CdxBackfillJob tests --- scalding/src/main/scala/sandcrawler/CdxBackfillJob.scala | 15 +++++++++++---- scalding/src/test/scala/sandcrawler/CdxBackfillJob.scala | 4 ++-- 2 files changed, 13 insertions(+), 6 deletions(-) diff --git a/scalding/src/main/scala/sandcrawler/CdxBackfillJob.scala b/scalding/src/main/scala/sandcrawler/CdxBackfillJob.scala index 4a2eaba..36e017e 100644 --- a/scalding/src/main/scala/sandcrawler/CdxBackfillJob.scala +++ b/scalding/src/main/scala/sandcrawler/CdxBackfillJob.scala @@ -1,6 +1,8 @@ package sandcrawler import java.util.Properties +import scala.util.parsing.json.JSONObject +import scala.util.Try import cascading.property.AppProps import cascading.tap.SinkMode @@ -12,7 +14,6 @@ import parallelai.spyglass.base.JobBase import parallelai.spyglass.hbase.HBaseConstants.SourceMode import parallelai.spyglass.hbase.HBasePipeConversions import parallelai.spyglass.hbase.HBaseSource -import scala.util.parsing.json.JSONObject // Type that represents a raw parsed CDX line case class CdxLine(surt: String, @@ -130,11 +131,17 @@ object CdxBackfillJob { } def keepCdx(line: CdxLine) : Boolean = { - // TODO: sha1.isalnum() and c_size.isdigit() and offset.isdigit() and dt.isdigit() + if (List(line.surt, line.datetime, line.url, line.mime, line.c_size, line.offset, line.warc).contains("-")) { + println("DASHLINE") + return false + } + // TODO: sha1.isalnum() if (line.http_status != "200" || line.sha1.size != 32) { return false } - // TODO: '-' in (line.surt, line.datetime, line.url, line.mime, line.c_size, line.offset, line.warc) + if (List(line.c_size, line.offset, line.datetime).map(s => Try(s.toLong).toOption).contains(None)) { + return false + } return true } @@ -174,7 +181,7 @@ object CdxBackfillJob { "offset" -> line.offset.toInt, "warc" -> line.warc )) - (key, heritrixInfo.toString(), fileCdx.toString(), line.mime) + (key, heritrixInfo.toString(), fileCdx.toString(), normalizeMime(line.mime)) } def lineToCdxLine(line: String) : CdxLine = { diff --git a/scalding/src/test/scala/sandcrawler/CdxBackfillJob.scala b/scalding/src/test/scala/sandcrawler/CdxBackfillJob.scala index fb5b162..a6107fc 100644 --- a/scalding/src/test/scala/sandcrawler/CdxBackfillJob.scala +++ b/scalding/src/test/scala/sandcrawler/CdxBackfillJob.scala @@ -65,7 +65,7 @@ class CdxBackfillTest extends FlatSpec with Matchers { assert(obj("f") == "CITESEERX-CRAWL-2017-06-20-20170705062052659-00043-31209~wbgrp-svc284.us.archive.org~8443.warc.gz") assert(obj("c") == 854156) assert(obj("o") == 328850624) - assert(obj("d") == "2017-08-28T23:31:54Z") + assert(obj("d") == "2017-07-05T06:22:02Z") } case other => assert(false) } @@ -80,7 +80,7 @@ class CdxBackfillTest extends FlatSpec with Matchers { } case other => assert(false) } - assert(row._3 == "application/pdf") + assert(row._4 == "application/pdf") } } -- cgit v1.2.3 From 9a00e7beb5f1024ea77fc42196733a0447cf6d12 Mon Sep 17 00:00:00 2001 From: Bryan Newbold Date: Mon, 23 Jul 2018 13:56:51 -0700 Subject: reference TDsl note in docs --- scalding/scalding-debugging.md | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/scalding/scalding-debugging.md b/scalding/scalding-debugging.md index 9143698..bd9dd36 100644 --- a/scalding/scalding-debugging.md +++ b/scalding/scalding-debugging.md @@ -45,6 +45,22 @@ resolved by ensuring that the `HBaseSource` constructors had exactly identical names and arguments (eg, table names and zookeeper quorums have to be exact matches). +If you get: + + value toTypedPipe is not a member of cascading.pipe.Pipe + +You probably need to [import some types][tdsl] from: + + import com.twitter.scalding.typed.TDsl._ + +[tdsl]: https://github.com/twitter/scalding/wiki/Type-safe-api-reference#interoperating-between-fields-api-and-type-safe-api + +## Running Individual Tests + +You can run a single test matching a string glob pattern like: + + sbt:sandcrawler> testOnly *CdxBackfill* + ## Fields Values of type `List[Fields]` are not printed in the expected way: -- cgit v1.2.3 From 39353c37193ccd96c2e1f6cade50dcf9a3a34678 Mon Sep 17 00:00:00 2001 From: Bryan Newbold Date: Mon, 23 Jul 2018 14:03:50 -0700 Subject: address some (but not all) review comments --- .../main/scala/sandcrawler/CdxBackfillJob.scala | 41 +++++++++++----------- 1 file changed, 21 insertions(+), 20 deletions(-) diff --git a/scalding/src/main/scala/sandcrawler/CdxBackfillJob.scala b/scalding/src/main/scala/sandcrawler/CdxBackfillJob.scala index 36e017e..84c19b8 100644 --- a/scalding/src/main/scala/sandcrawler/CdxBackfillJob.scala +++ b/scalding/src/main/scala/sandcrawler/CdxBackfillJob.scala @@ -1,5 +1,7 @@ package sandcrawler +// TODO: fix import order to satisfy scala style + import java.util.Properties import scala.util.parsing.json.JSONObject import scala.util.Try @@ -20,7 +22,7 @@ case class CdxLine(surt: String, datetime: String, url: String, mime: String, - http_status: String, + httpStatus: String, sha1: String, c_size: String, offset: String, @@ -55,6 +57,7 @@ class CdxBackfillJob(args: Args) extends JobBase(args) with HBasePipeConversions .map { lineToCdxLine } .filter { CdxBackfillJob.keepCdx(_) } + // (key, f:c, file:cdx, file:mime) val cdxRows : TypedPipe[(String, String, String, String)] = cdxLines .map { CdxBackfillJob.cdxLineToRow } .debug @@ -66,7 +69,8 @@ class CdxBackfillJob(args: Args) extends JobBase(args) with HBasePipeConversions //.debug // filters out all the lines that have an existing SHA1 key in HBase - // the groupBy statements are to select key values to join on + // the groupBy statements are to select key values to join on. + // (key, f:c, file:cdx, file:mime) val newRows : TypedPipe[(String, String, String, String)] = existingKeys .groupBy( identity ) .rightJoin(cdxRows.groupBy(_._1)) @@ -102,41 +106,38 @@ object CdxBackfillJob { def normalizeMime(raw: String) : String = { - val NORMAL_MIME = List("application/pdf", - "application/postscript", - "text/html", - "text/xml") + val normalMime = Map( + "application/pdf" -> "application/pdf", + "application/x-pdf" -> "application/pdf", + "application/postscript" -> "application/postscript", + "text/html" -> "text/html", + "text/xml" -> "text/xml", + "application/xml" -> "text/xml" + ) + // TODO: improvement of control flow val lower = raw.toLowerCase() - NORMAL_MIME.foreach(norm => - if (lower.startsWith(norm)) { - return norm + normalMime.foreach { case (key, value) => + if (lower.startsWith(key)) { + return value } - ) - - // Common special cases - if (lower.startsWith("application/xml")) { - return "text/xml" - } - if (lower.startsWith("application/x-pdf")) { - return "application/pdf" } lower } def isCdxLine(line: String) : Boolean = { - // malformated or non-CDX11 lines + // malformatted or non-CDX11 lines !(line.startsWith("#") || line.startsWith(" ") || line.startsWith("filedesc") || line.split(" ").size != 11) } def keepCdx(line: CdxLine) : Boolean = { if (List(line.surt, line.datetime, line.url, line.mime, line.c_size, line.offset, line.warc).contains("-")) { - println("DASHLINE") + // TODO: hadoop counter (was: "DASHLINE") return false } // TODO: sha1.isalnum() - if (line.http_status != "200" || line.sha1.size != 32) { + if (line.httpStatus != "200" || line.sha1.size != 32) { return false } if (List(line.c_size, line.offset, line.datetime).map(s => Try(s.toLong).toOption).contains(None)) { -- cgit v1.2.3 From b235a60a7a9e701997d3e9f3673538cf294d8b5f Mon Sep 17 00:00:00 2001 From: Bryan Newbold Date: Tue, 24 Jul 2018 11:33:57 -0700 Subject: CdxBackfillJob: scalastyle --- .../main/scala/sandcrawler/CdxBackfillJob.scala | 36 +++++++++------------- 1 file changed, 14 insertions(+), 22 deletions(-) diff --git a/scalding/src/main/scala/sandcrawler/CdxBackfillJob.scala b/scalding/src/main/scala/sandcrawler/CdxBackfillJob.scala index 84c19b8..03db3cf 100644 --- a/scalding/src/main/scala/sandcrawler/CdxBackfillJob.scala +++ b/scalding/src/main/scala/sandcrawler/CdxBackfillJob.scala @@ -3,13 +3,14 @@ package sandcrawler // TODO: fix import order to satisfy scala style import java.util.Properties -import scala.util.parsing.json.JSONObject + import scala.util.Try +import scala.util.parsing.json.JSONObject +import cascading.pipe.joiner._ import cascading.property.AppProps import cascading.tap.SinkMode import cascading.tuple.Fields -import cascading.pipe.joiner._ import com.twitter.scalding._ import com.twitter.scalding.typed.TDsl._ import parallelai.spyglass.base.JobBase @@ -18,16 +19,7 @@ import parallelai.spyglass.hbase.HBasePipeConversions import parallelai.spyglass.hbase.HBaseSource // Type that represents a raw parsed CDX line -case class CdxLine(surt: String, - datetime: String, - url: String, - mime: String, - httpStatus: String, - sha1: String, - c_size: String, - offset: String, - warc: String) - +case class CdxLine(surt: String, datetime: String, url: String, mime: String, httpStatus: String, sha1: String, c_size: String, offset: String, warc: String) /** * CDX backfill: @@ -119,7 +111,8 @@ object CdxBackfillJob { val lower = raw.toLowerCase() normalMime.foreach { case (key, value) => if (lower.startsWith(key)) { - return value + lower = value + break } } lower @@ -134,16 +127,15 @@ object CdxBackfillJob { def keepCdx(line: CdxLine) : Boolean = { if (List(line.surt, line.datetime, line.url, line.mime, line.c_size, line.offset, line.warc).contains("-")) { // TODO: hadoop counter (was: "DASHLINE") - return false - } - // TODO: sha1.isalnum() - if (line.httpStatus != "200" || line.sha1.size != 32) { - return false - } - if (List(line.c_size, line.offset, line.datetime).map(s => Try(s.toLong).toOption).contains(None)) { - return false + false + } else if (line.httpStatus != "200" || line.sha1.size != 32) { + // TODO: sha1.isalnum() + false + } else if (List(line.c_size, line.offset, line.datetime).map(s => Try(s.toLong).toOption).contains(None)) { + false + } else { + true } - return true } // Returns (key, f:c, file:cdx, file:mime), all as strings, which is close to -- cgit v1.2.3 From 7802970c3d42cd3872ff0a0e8d0ffbbbae56ff80 Mon Sep 17 00:00:00 2001 From: Bryan Newbold Date: Tue, 24 Jul 2018 11:35:14 -0700 Subject: CdxBackfillJob: comment cleanup --- scalding/src/main/scala/sandcrawler/CdxBackfillJob.scala | 6 ------ 1 file changed, 6 deletions(-) diff --git a/scalding/src/main/scala/sandcrawler/CdxBackfillJob.scala b/scalding/src/main/scala/sandcrawler/CdxBackfillJob.scala index 03db3cf..eb168ac 100644 --- a/scalding/src/main/scala/sandcrawler/CdxBackfillJob.scala +++ b/scalding/src/main/scala/sandcrawler/CdxBackfillJob.scala @@ -1,7 +1,5 @@ package sandcrawler -// TODO: fix import order to satisfy scala style - import java.util.Properties import scala.util.Try @@ -30,9 +28,6 @@ case class CdxLine(surt: String, datetime: String, url: String, mime: String, ht * 5. filter to only those with null HBase key column * 6. convert CDX fields to HBase columns * 7. sink results to HBase - * - * TODO: I really mixed the Scalding "field-base" and "type-based" APIs here. - * Should decide on a best practice. */ class CdxBackfillJob(args: Args) extends JobBase(args) with HBasePipeConversions { @@ -107,7 +102,6 @@ object CdxBackfillJob { "application/xml" -> "text/xml" ) - // TODO: improvement of control flow val lower = raw.toLowerCase() normalMime.foreach { case (key, value) => if (lower.startsWith(key)) { -- cgit v1.2.3 From 4080ea26892c6155eb9239b94102b32d7237678e Mon Sep 17 00:00:00 2001 From: Bryan Newbold Date: Tue, 24 Jul 2018 12:16:00 -0700 Subject: more PDF mimetypes; fix return refactor --- scalding/src/main/scala/sandcrawler/CdxBackfillJob.scala | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/scalding/src/main/scala/sandcrawler/CdxBackfillJob.scala b/scalding/src/main/scala/sandcrawler/CdxBackfillJob.scala index eb168ac..f98b6e9 100644 --- a/scalding/src/main/scala/sandcrawler/CdxBackfillJob.scala +++ b/scalding/src/main/scala/sandcrawler/CdxBackfillJob.scala @@ -96,6 +96,10 @@ object CdxBackfillJob { val normalMime = Map( "application/pdf" -> "application/pdf", "application/x-pdf" -> "application/pdf", + "('application/pdf'" -> "application/pdf", + "image/pdf" -> "application/pdf", + "text/pdf" -> "application/pdf", + "\"application/pdf\"" -> "application/pdf", "application/postscript" -> "application/postscript", "text/html" -> "text/html", "text/xml" -> "text/xml", @@ -105,8 +109,7 @@ object CdxBackfillJob { val lower = raw.toLowerCase() normalMime.foreach { case (key, value) => if (lower.startsWith(key)) { - lower = value - break + return value } } lower -- cgit v1.2.3 From 18bdcfd362a190dd36d3b86996808366b83bbbda Mon Sep 17 00:00:00 2001 From: Bryan Newbold Date: Tue, 24 Jul 2018 12:16:21 -0700 Subject: do sha1 pattern match correctly --- scalding/src/main/scala/sandcrawler/CdxBackfillJob.scala | 10 +++++++--- scalding/src/test/scala/sandcrawler/CdxBackfillJob.scala | 11 +++++++++++ 2 files changed, 18 insertions(+), 3 deletions(-) diff --git a/scalding/src/main/scala/sandcrawler/CdxBackfillJob.scala b/scalding/src/main/scala/sandcrawler/CdxBackfillJob.scala index f98b6e9..389a96a 100644 --- a/scalding/src/main/scala/sandcrawler/CdxBackfillJob.scala +++ b/scalding/src/main/scala/sandcrawler/CdxBackfillJob.scala @@ -3,6 +3,7 @@ package sandcrawler import java.util.Properties import scala.util.Try +import scala.util.matching.Regex import scala.util.parsing.json.JSONObject import cascading.pipe.joiner._ @@ -122,11 +123,14 @@ object CdxBackfillJob { } def keepCdx(line: CdxLine) : Boolean = { + val sha1Pattern = """[A-Z2-7]{32}""".r if (List(line.surt, line.datetime, line.url, line.mime, line.c_size, line.offset, line.warc).contains("-")) { - // TODO: hadoop counter (was: "DASHLINE") false - } else if (line.httpStatus != "200" || line.sha1.size != 32) { - // TODO: sha1.isalnum() + } else if (line.httpStatus != "200") { + false + } else if (line.mime != "application/pdf") { + false + } else if (sha1Pattern.unapplySeq(line.sha1).isEmpty) { false } else if (List(line.c_size, line.offset, line.datetime).map(s => Try(s.toLong).toOption).contains(None)) { false diff --git a/scalding/src/test/scala/sandcrawler/CdxBackfillJob.scala b/scalding/src/test/scala/sandcrawler/CdxBackfillJob.scala index a6107fc..c092f7f 100644 --- a/scalding/src/test/scala/sandcrawler/CdxBackfillJob.scala +++ b/scalding/src/test/scala/sandcrawler/CdxBackfillJob.scala @@ -35,6 +35,17 @@ class CdxBackfillTest extends FlatSpec with Matchers { // redirect assert(false === keepCdx(lineToCdxLine( "edu,upenn,ldc)/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf 20170828233154 https://www.ldc.upenn.edu/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf application/pdf 301 WL3FEA62TEU4F52Y5DOVQ62VET4QJW7G - - 210251 931661233 SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828231135742-00000-00009-wbgrp-svc284/SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828232253025-00005-3480~wbgrp-svc284.us.archive.org~8443.warc.gz"))) + // not PDF + assert(false === keepCdx(lineToCdxLine( + """edu,upenn,ldc)/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf 20170828233154 https://www.ldc.upenn.edu/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf text/plain 200 WL3FEA62TEU4F52Y5DOVQ62VET4QJW7G - - 210251 931661233 SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828231135742-00000-00009-wbgrp-svc284/SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828232253025-00005-3480~wbgrp-svc284.us.archive.org~8443.warc.gz"""))) + // invalid base32 SHA1 + assert(false === keepCdx(lineToCdxLine( + """edu,upenn,ldc)/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf 20170828233154 https://www.ldc.upenn.edu/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf application/pdf 200 WL3FE010101010101010101VET4QJW7G - - 210251 931661233 SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828231135742-00000-00009-wbgrp-svc284/SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828232253025-00005-3480~wbgrp-svc284.us.archive.org~8443.warc.gz"""))) + assert(false === keepCdx(lineToCdxLine( + """edu,upenn,ldc)/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf 20170828233154 https://www.ldc.upenn.edu/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf application/pdf 200 WL33FEA62TEU4F52Y5DOVQ62VET4QJW7G - - 210251 931661233 SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828231135742-00000-00009-wbgrp-svc284/SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828232253025-00005-3480~wbgrp-svc284.us.archive.org~8443.warc.gz"""))) + // dashed field + assert(false === keepCdx(lineToCdxLine( + """edu,upenn,ldc)/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf 20170828233154 - application/pdf 200 WL3FEA62TEU4F52Y5DOVQ62VET4QJW7G - - 210251 931661233 SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828231135742-00000-00009-wbgrp-svc284/SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828232253025-00005-3480~wbgrp-svc284.us.archive.org~8443.warc.gz"""))) } it should "know what CDX lines are" in { -- cgit v1.2.3 From cd0769abe2fc7e0a91942e618dd00809fd058a70 Mon Sep 17 00:00:00 2001 From: Bryan Newbold Date: Tue, 24 Jul 2018 13:39:48 -0700 Subject: small CdxBackfillJob refactor (code quality) --- scalding/src/main/scala/sandcrawler/CdxBackfillJob.scala | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/scalding/src/main/scala/sandcrawler/CdxBackfillJob.scala b/scalding/src/main/scala/sandcrawler/CdxBackfillJob.scala index 389a96a..963fb10 100644 --- a/scalding/src/main/scala/sandcrawler/CdxBackfillJob.scala +++ b/scalding/src/main/scala/sandcrawler/CdxBackfillJob.scala @@ -108,12 +108,12 @@ object CdxBackfillJob { ) val lower = raw.toLowerCase() - normalMime.foreach { case (key, value) => - if (lower.startsWith(key)) { - return value - } + normalMime.find { case (key, _) => + lower.startsWith(key) + } match { + case Some((_, value)) => value + case None => lower } - lower } def isCdxLine(line: String) : Boolean = { -- cgit v1.2.3 From d71cc4e6cd7381f5f0596af1ce33c1bc744c8644 Mon Sep 17 00:00:00 2001 From: Bryan Newbold Date: Tue, 24 Jul 2018 18:01:14 -0700 Subject: temporary please option for scala backfill --- please | 22 ++++++++++++++++++++++ 1 file changed, 22 insertions(+) diff --git a/please b/please index a244b80..119790d 100755 --- a/please +++ b/please @@ -116,6 +116,23 @@ def run_statuscount(args): env=args.env) subprocess.call(cmd, shell=True) +def run_sbackfill(args): + if args.rebuild: + rebuild_scalding() + print("Starting scalding backfill job...") + cmd = """hadoop jar \ + scalding/target/scala-2.11/sandcrawler-assembly-0.2.0-SNAPSHOT.jar \ + com.twitter.scalding.Tool sandcrawler.CdxBackfillJob \ + --hdfs \ + --app.conf.path scalding/ia_cluster.conf \ + --hbase-table wbgrp-journal-extract-0-{env} \ + --zookeeper-hosts {zookeeper_hosts} \ + --cdx-input-path {input_cdx}""".format( + input_cdx=args.input_cdx, + zookeeper_hosts=ZOOKEEPER_HOSTS, + env=args.env) + subprocess.call(cmd, shell=True) + def main(): parser = argparse.ArgumentParser() @@ -135,6 +152,11 @@ def main(): sub_backfill.add_argument('input_cdx', help="full HDFS path of CDX file to backfill") + sub_sbackfill = subparsers.add_parser('sbackfill') + sub_sbackfill.set_defaults(func=run_sbackfill) + sub_sbackfill.add_argument('input_cdx', + help="full HDFS path of CDX file to backfill") + sub_extract = subparsers.add_parser('extract') sub_extract.set_defaults(func=run_extract) sub_extract.add_argument('input_cdx', -- cgit v1.2.3