diff options
| author | Ellen Spertus <ellen.spertus@gmail.com> | 2018-07-19 15:43:07 -0700 | 
|---|---|---|
| committer | Ellen Spertus <ellen.spertus@gmail.com> | 2018-07-19 15:43:07 -0700 | 
| commit | 05fdfcd052e06934f8313743ba60629b3b694c2f (patch) | |
| tree | 4ba28803b6eb659cf48caecb42f4ae5acb2a9ae6 | |
| parent | def7233b5d5aaf3622c7d368a7dbb35459e38f13 (diff) | |
| parent | 6e998633c6949c66bb6f9c5a1da7d7cf58511a39 (diff) | |
| download | sandcrawler-05fdfcd052e06934f8313743ba60629b3b694c2f.tar.gz sandcrawler-05fdfcd052e06934f8313743ba60629b3b694c2f.zip  | |
Cleanup after merge master. Note: I am not fixing warnings in CdxBackfillJob.scala.
| -rw-r--r-- | scalding/build.sbt | 2 | ||||
| -rw-r--r-- | scalding/scalastyle-config.xml | 18 | ||||
| -rw-r--r-- | scalding/scalding-debugging.md | 14 | ||||
| -rw-r--r-- | scalding/src/main/scala/sandcrawler/CdxBackfillJob.scala | 173 | ||||
| -rw-r--r-- | scalding/src/main/scala/sandcrawler/HBaseBuilder.scala | 10 | ||||
| -rw-r--r-- | scalding/src/main/scala/sandcrawler/HBaseStatusCountJob.scala | 15 | ||||
| -rw-r--r-- | scalding/src/test/scala/sandcrawler/CdxBackfillJob.scala | 117 | ||||
| -rw-r--r-- | scalding/src/test/scala/sandcrawler/HBaseStatusCountTest.scala | 22 | 
8 files changed, 341 insertions, 30 deletions
diff --git a/scalding/build.sbt b/scalding/build.sbt index ba2a825..980418c 100644 --- a/scalding/build.sbt +++ b/scalding/build.sbt @@ -19,6 +19,7 @@ lazy val root = (project in file(".")).        val dirNameToExclude = "/example/"        scalaSourceFiles.filterNot(_.getAbsolutePath.contains(dirNameToExclude))      }, +      name := "sandcrawler",      resolvers += "conjars.org" at "http://conjars.org/repo", @@ -53,4 +54,5 @@ lazy val root = (project in file(".")).          case s if s.endsWith("xml") => MergeStrategy.last          case x => (assemblyMergeStrategy in assembly).value(x)      }, +    ) diff --git a/scalding/scalastyle-config.xml b/scalding/scalastyle-config.xml index 23ec993..b184293 100644 --- a/scalding/scalastyle-config.xml +++ b/scalding/scalastyle-config.xml @@ -62,7 +62,7 @@     <parameter name="maxParameters"><![CDATA[8]]></parameter>    </parameters>   </check> - <check level="warning" class="org.scalastyle.scalariform.MagicNumberChecker" enabled="true"> + <check level="warning" class="org.scalastyle.scalariform.MagicNumberChecker" enabled="false">    <parameters>     <parameter name="ignore"><![CDATA[-1,0,1,2,3]]></parameter>    </parameters> @@ -118,14 +118,14 @@   <check level="warning" class="org.scalastyle.file.NoNewLineAtEofChecker" enabled="false"></check>   <check enabled="true" class="org.scalastyle.scalariform.BlockImportChecker" level="warning"/>   <check enabled="true" class="org.scalastyle.scalariform.ImportOrderChecker" level="warning"> - <parameters> -  <parameter name="groups">java,scala,others</parameter> -  <parameter name="group.java">javax?\..+</parameter> -  <parameter name="group.scala">scala\..+</parameter> -  <parameter name="group.others">.+</parameter> - </parameters> -</check> - <check level="warning" class="org.scalastyle.scalariform.IllegalImportsChecker" enabled="true"> +  <parameters> +   <parameter name="groups">java,scala,others</parameter> +   <parameter name="group.java">javax?\..+</parameter> +   <parameter name="group.scala">scala\..+</parameter> +   <parameter name="group.others">.+</parameter> +  </parameters> + </check> +<check level="warning" class="org.scalastyle.scalariform.IllegalImportsChecker" enabled="true">    <parameters>     <parameter name="illegalImports"><![CDATA[sun._,java.awt._]]></parameter>    </parameters> diff --git a/scalding/scalding-debugging.md b/scalding/scalding-debugging.md index 9143698..404fb4d 100644 --- a/scalding/scalding-debugging.md +++ b/scalding/scalding-debugging.md @@ -45,6 +45,20 @@ resolved by ensuring that the `HBaseSource` constructors had exactly identical  names and arguments (eg, table names and zookeeper quorums have to be exact  matches). +If you get: + +    value toTypedPipe is not a member of cascading.pipe.Pipe + +You probably need to: + +    import com.twitter.scalding.typed.TDsl._ + +## Running Individual Tests + +You can run a single test matching a string glob pattern like: + +    sbt:sandcrawler> testOnly *CdxBackfill* +  ## Fields  Values of type `List[Fields]` are not printed in the expected way: diff --git a/scalding/src/main/scala/sandcrawler/CdxBackfillJob.scala b/scalding/src/main/scala/sandcrawler/CdxBackfillJob.scala new file mode 100644 index 0000000..0251e07 --- /dev/null +++ b/scalding/src/main/scala/sandcrawler/CdxBackfillJob.scala @@ -0,0 +1,173 @@ +package sandcrawler + +import cascading.property.AppProps +import cascading.tuple.Fields +import cascading.pipe.joiner._ +import com.twitter.scalding._ +import com.twitter.scalding.typed.TDsl._ +import java.util.Properties +import cascading.tap.SinkMode +import parallelai.spyglass.base.JobBase +import parallelai.spyglass.hbase.HBaseConstants.SourceMode +import parallelai.spyglass.hbase.{HBaseSource, HBasePipeConversions} + +// Type that represents a raw parsed CDX line +case class CdxLine(surt: String, +                   datetime: String, +                   url: String, +                   mime: String, +                   http_status: String, +                   sha1: String, +                   c_size: String, +                   offset: String, +                   warc: String) + + +/** + *  CDX backfill: + *  1. parse CDX (all columns) + *  2. filter CDX (pdf, HTTP 200, etc) + *  3. source HBase (key column only) + *  4. left join CDX to HBase + *  5. filter to only those with null HBase key column + *  6. convert CDX fields to HBase columns + *  7. sink results to HBase + * + * TODO: I really mixed the Scalding "field-base" and "type-based" APIs here. + * Should decide on a best practice. + */ +class CdxBackfillJob(args: Args) extends JobBase(args) with HBasePipeConversions { + +  import CdxBackfillJob._ + +  val hbaseSource = getHBaseSource(args("hbase-table"), args("zookeeper-hosts")) +  val hbaseSink = getHBaseSink(args("hbase-table"), args("zookeeper-hosts")) + +  // Parse CDX lines from text file to typed pipe +  val lines : TypedPipe[String] = TypedPipe.from(TextLine(args("cdx-input-path"))) +  val cdxLines : TypedPipe[CdxLine] = lines +    .filter { isCdxLine } +    .map { lineToCdxLine } +    .filter { CdxBackfillJob.keepCdx(_) } + +  val cdxRows : TypedPipe[(String, String, String, String)] = cdxLines +    .map { CdxBackfillJob.cdxLineToRow } + +  val existingKeys : TypedPipe[String] = hbaseSource +    .read +    .toTypedPipe[String]('key) + +  // filters out all the lines that have an existing SHA1 key in HBase +  // the groupBy statements are to select key values to join on +  val newRows : TypedPipe[(String, String, String, String)] = existingKeys +    .groupBy( identity ) +    .rightJoin(cdxRows.groupBy(_._1)) +    .toTypedPipe +    .debug +    .collect { case (_, (None, row)) => row } +    .debug + +  // convert to tuple form and write out into HBase +  newRows +    .toPipe('key, 'c, 'cdx, 'mime) +    .toBytesWritable( new Fields("key", "c", "cdx", "mime") ) +    .write(hbaseSink) + +    // XXX: +    //.toPipe("all") +    //.mapTo('all -> ('key, 'c, 'cdx, 'mime)) { x : (String, String, String, String) => x } + +} + +object CdxBackfillJob { + +  def getHBaseSource(hbase_table: String, zookeeper_hosts: String) : HBaseSource = { +    return HBaseBuilder.build( +      hbase_table, +      zookeeper_hosts, +      List("file:size"), // not actually needed +      SourceMode.SCAN_ALL) +  } + +  def getHBaseSink(hbase_table: String, zookeeper_hosts: String) : HBaseSource = { +    return HBaseBuilder.buildSink( +      hbase_table, +      zookeeper_hosts, +      List("f:c", "file:cdx", "file:mime"), +      SinkMode.UPDATE) +  } + +  def normalizeMime(raw: String) : String = { + +    val NORMAL_MIME = List("application/pdf", +                           "application/postscript", +                           "text/html", +                           "text/xml") + +    val lower = raw.toLowerCase() +    NORMAL_MIME.foreach(norm => +      if (lower.startsWith(norm)) { +        return norm +      } +    ) + +    // Common special cases +    if (lower.startsWith("application/xml")) { +      return "text/xml" +    } +    if (lower.startsWith("application/x-pdf")) { +      return "application/pdf" +    } +    return lower + +  } + +  def isCdxLine(line: String) : Boolean = { +    // malformated or non-CDX11 lines +    !(line.startsWith("#") || line.startsWith(" ") || line.startsWith("filedesc") || +      line.split(" ").size != 11) +  } + +  def keepCdx(line: CdxLine) : Boolean = { +    // TODO: sha1.isalnum() and c_size.isdigit() and offset.isdigit() and dt.isdigit() +    if (line.http_status != "200" || line.sha1.size != 32) { +      return false +    } +    // TODO: '-' in (line.surt, line.datetime, line.url, line.mime, line.c_size, line.offset, line.warc) +    return true +  } + +  // Returns (key, f:c, file:cdx, file:mime), all as strings, which is close to +  // how they will be inserted into HBase +  def cdxLineToRow(line: CdxLine) : (String, String, String, String) = { + +    val key = "sha1:" + line.sha1 + +    val warcFile = line.warc.split('/')(1) + +    // Read CDX-style datetime and conver to ISO 8601 with second resolution +    val dtFormat = new java.text.SimpleDateFormat("yyyyMMddHHmmss") +    val isoFormat = new java.text.SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss'Z'") +    // TODO: timezones? UTC to UTC, so I don't think so. +    val dtIso = isoFormat.format(dtFormat.parse(line.datetime)) + +    // warc_file = warc.split('/')[-1] +    // dt_iso = datetime.strptime(dt, "%Y%m%d%H%M%S").isoformat() +    // f:c = dict(u=url, d=dt_iso, f=warc_file, o=int(offset), c=1) + +    // This is the "f:c" field. 'i' intentionally not set +    val heritrixInfo = "" + +    // file:cdx = dict(surt=surt, dt=dt, url=url, c_size=int(c_size), +    //                 offset=int(offset), warc=warc) +    val fileCdx = "" +    (key, heritrixInfo, fileCdx, line.mime) +  } + +  def lineToCdxLine(line: String) : CdxLine = { +    val raw = line.split("\\s+") +    // surt, datetime, url, mime, http_status, sha1, SKIP, SKIP, c_size, offset, warc +    CdxLine(raw(0), raw(1), raw(2), raw(3), raw(4), raw(5), raw(8), raw(9), raw(10)) +  } + +} diff --git a/scalding/src/main/scala/sandcrawler/HBaseBuilder.scala b/scalding/src/main/scala/sandcrawler/HBaseBuilder.scala index b4ade24..19df99d 100644 --- a/scalding/src/main/scala/sandcrawler/HBaseBuilder.scala +++ b/scalding/src/main/scala/sandcrawler/HBaseBuilder.scala @@ -1,9 +1,12 @@  package sandcrawler +import scala._ + +import cascading.tap.SinkMode  import cascading.tuple.Fields +import parallelai.spyglass.base.JobBase  import parallelai.spyglass.hbase.HBaseConstants.SourceMode  import parallelai.spyglass.hbase.HBaseSource -import scala._  object HBaseBuilder {    // map from column families to column names @@ -48,4 +51,9 @@ object HBaseBuilder {      val (families, fields) = parseColSpecs(colSpecs)      new HBaseSource(table, server, new Fields("key"), families, fields, sourceMode = sourceMode, keyList = keyList)    } + +  def buildSink(table: String, server: String, colSpecs: List[String], sinkMode: SinkMode, keyList: List[String] = List("key")) : HBaseSource = { +    val (families, fields) = parseColSpecs(colSpecs) +    new HBaseSource(table, server, new Fields("key"), families, fields, sinkMode = sinkMode) +  }  } diff --git a/scalding/src/main/scala/sandcrawler/HBaseStatusCountJob.scala b/scalding/src/main/scala/sandcrawler/HBaseStatusCountJob.scala index f88f9db..fd0b4e2 100644 --- a/scalding/src/main/scala/sandcrawler/HBaseStatusCountJob.scala +++ b/scalding/src/main/scala/sandcrawler/HBaseStatusCountJob.scala @@ -15,21 +15,18 @@ import parallelai.spyglass.hbase.HBaseSource  class HBaseStatusCountJob(args: Args) extends JobBase(args) with HBasePipeConversions { -  val colSpec = "grobid0:status_code" -  val output = args("output") -  HBaseBuilder.parseColSpec(colSpec) -  val Col: String = colSpec.split(":")(1) - -  val source : TypedPipe[Long] = HBaseCountJob.getHBaseSource( +  val source = HBaseCountJob.getHBaseSource(      args("hbase-table"),      args("zookeeper-hosts"), -    colSpec) +    "grobid0:status_code") + +  val statusPipe : TypedPipe[Long] = source      .read      .toTypedPipe[(ImmutableBytesWritable,ImmutableBytesWritable)]('key, 'status_code)      .map { case (key, raw_code) => Bytes.toLong(raw_code.copyBytes()) } -  source.groupBy { identity } +  statusPipe.groupBy { identity }      .size      .debug -    .write(TypedTsv[(Long,Long)](output)) +    .write(TypedTsv[(Long,Long)](args("output")))  } diff --git a/scalding/src/test/scala/sandcrawler/CdxBackfillJob.scala b/scalding/src/test/scala/sandcrawler/CdxBackfillJob.scala new file mode 100644 index 0000000..de94494 --- /dev/null +++ b/scalding/src/test/scala/sandcrawler/CdxBackfillJob.scala @@ -0,0 +1,117 @@ + +package sandcrawler + +import org.scalatest._ +import cascading.tuple.{Tuple, Fields} +import com.twitter.scalding.{JobTest, Tsv, TypedTsv, TupleConversions, TextLine} +import org.apache.hadoop.hbase.io.ImmutableBytesWritable +import org.apache.hadoop.hbase.util.Bytes +import org.junit.runner.RunWith +import org.scalatest.FunSpec +import org.scalatest.junit.JUnitRunner +import org.slf4j.LoggerFactory +import parallelai.spyglass.hbase.HBaseSource +import parallelai.spyglass.hbase.HBaseConstants.SourceMode + +class CdxBackfillTest extends FlatSpec with Matchers { + +  import CdxBackfillJob._ + +  it should "normalize mimetypes" in { +    assert(CdxBackfillJob.normalizeMime("asdf") === "asdf") +    assert(CdxBackfillJob.normalizeMime("application/pdf") === "application/pdf") +    assert(CdxBackfillJob.normalizeMime("application/pdf+journal") === "application/pdf") +    assert(CdxBackfillJob.normalizeMime("Application/PDF") === "application/pdf") +    assert(CdxBackfillJob.normalizeMime("application/p") === "application/p") +    assert(CdxBackfillJob.normalizeMime("application/xml+stuff") === "text/xml") +    assert(CdxBackfillJob.normalizeMime("application/x-pdf") === "application/pdf") +    assert(CdxBackfillJob.normalizeMime("application/x-html") === "application/x-html") +  } + +  it should "filter CDX lines" in { +    assert(true === keepCdx(lineToCdxLine( +      "edu,upenn,ldc)/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf 20170828233154 https://www.ldc.upenn.edu/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf application/pdf 200 WL3FEA62TEU4F52Y5DOVQ62VET4QJW7G - - 210251 931661233 SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828231135742-00000-00009-wbgrp-svc284/SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828232253025-00005-3480~wbgrp-svc284.us.archive.org~8443.warc.gz"))) +    // redirect +    assert(false === keepCdx(lineToCdxLine( +      "edu,upenn,ldc)/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf 20170828233154 https://www.ldc.upenn.edu/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf application/pdf 301 WL3FEA62TEU4F52Y5DOVQ62VET4QJW7G - - 210251 931661233 SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828231135742-00000-00009-wbgrp-svc284/SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828232253025-00005-3480~wbgrp-svc284.us.archive.org~8443.warc.gz"))) +  } + +  it should "know what CDX lines are" in { +    assert(true === isCdxLine( +      "edu,upenn,ldc)/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf 20170828233154 https://www.ldc.upenn.edu/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf application/pdf 200 WL3FEA62TEU4F52Y5DOVQ62VET4QJW7G - - 210251 931661233 SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828231135742-00000-00009-wbgrp-svc284/SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828232253025-00005-3480~wbgrp-svc284.us.archive.org~8443.warc.gz")) +    assert(false === isCdxLine("")) +    assert(false === isCdxLine( +      " edu,upenn,ldc)/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf 20170828233154 https://www.ldc.upenn.edu/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf application/pdf 200 WL3FEA62TEU4F52Y5DOVQ62VET4QJW7G - - 210251 931661233 SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828231135742-00000-00009-wbgrp-svc284/SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828232253025-00005-3480~wbgrp-svc284.us.archive.org~8443.warc.gz")) +    assert(false === isCdxLine( +      "#edu,upenn,ldc)/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf 20170828233154 https://www.ldc.upenn.edu/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf application/pdf 200 WL3FEA62TEU4F52Y5DOVQ62VET4QJW7G - - 210251 931661233 SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828231135742-00000-00009-wbgrp-svc284/SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828232253025-00005-3480~wbgrp-svc284.us.archive.org~8443.warc.gz")) +    // missing two fields +    assert(false === isCdxLine( +      "edu,upenn,ldc)/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf 20170828233154 https://www.ldc.upenn.edu/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf application/pdf 200 WL3FEA62TEU4F52Y5DOVQ62VET4QJW7G 210251 931661233 SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828231135742-00000-00009-wbgrp-svc284/SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828232253025-00005-3480~wbgrp-svc284.us.archive.org~8443.warc.gz")) +    // extra field +    assert(false === isCdxLine( +      "edu,upenn,ldc)/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf 20170828233154 https://www.ldc.upenn.edu/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf application/pdf 200 WL3FEA62TEU4F52Y5DOVQ62VET4QJW7G - - 210251 931661233 SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828231135742-00000-00009-wbgrp-svc284/SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828232253025-00005-3480~wbgrp-svc284.us.archive.org~8443.warc.gz -")) +  } + +  it should "execute lineToRow" in { +    cdxLineToRow(lineToCdxLine( +      "edu,upenn,ldc)/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf 20170828233154 https://www.ldc.upenn.edu/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf application/pdf 200 WL3FEA62TEU4F52Y5DOVQ62VET4QJW7G - - 210251 931661233 SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828231135742-00000-00009-wbgrp-svc284/SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828232253025-00005-3480~wbgrp-svc284.us.archive.org~8443.warc.gz")) +  } + +} + +@RunWith(classOf[JUnitRunner]) +class CdxBackfillJobTest extends FunSpec with TupleConversions { + +  val output = "/tmp/testOutput" +  val (testTable, testHost, testCdxFile) = ("test-table", "dummy-host:2181", "test_file.cdx") + +  val log = LoggerFactory.getLogger(this.getClass.getName) + +  val dummySizeBytes = Bytes.toBytes(100) + +  val sampleData = List( +    List(Bytes.toBytes("sha1:K2DKSSVTXWPRMFDTWSTCQW3RVWRIOV3Q"), dummySizeBytes), +    List(Bytes.toBytes("sha1:C3YNNEGH5WAG5ZAAXWAEBNXJWT6CZ3WU"), dummySizeBytes), +    List(Bytes.toBytes("sha1:SDKUVHC3YNNEGH5WAG5ZAAXWAEBNX4WT"), dummySizeBytes), +    List(Bytes.toBytes("sha1:095893C3YNNEGH5WAG5ZAAXWAEBNXJWT"), dummySizeBytes) +  ) +  val sampleCdxLines = List( +    // clean line +    "edu,upenn,ldc)/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf 20170828233154 https://www.ldc.upenn.edu/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf application/pdf 200 WL3FEA62TEU4F52Y5DOVQ62VET4QJW7G - - 210251 931661233 SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828231135742-00000-00009-wbgrp-svc284/SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828232253025-00005-3480~wbgrp-svc284.us.archive.org~8443.warc.gz", +    // has existing SHA1 +    "edu,upenn,ldc)/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf 20170828233154 https://www.ldc.upenn.edu/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf application/pdf 200 C3YNNEGH5WAG5ZAAXWAEBNXJWT6CZ3WU - - 210251 931661233 SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828231135742-00000-00009-wbgrp-svc284/SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828232253025-00005-3480~wbgrp-svc284.us.archive.org~8443.warc.gz", +    // HTTP status code +    "edu,upenn,ldc)/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf 20170828233154 https://www.ldc.upenn.edu/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf application/pdf 301 WL3FEA62TEU4F52Y5DOVQ62VET4QJW7G - - 210251 931661233 SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828231135742-00000-00009-wbgrp-svc284/SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828232253025-00005-3480~wbgrp-svc284.us.archive.org~8443.warc.gz", +    // not CDX (prefixed with hash) +    "#edu,upenn,ldc)/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf 20170828233154 https://www.ldc.upenn.edu/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf application/pdf 200 WL3FEA62TEU4F52Y5DOVQ62VET4QJW7G - - 210251 931661233 SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828231135742-00000-00009-wbgrp-svc284/SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828232253025-00005-3480~wbgrp-svc284.us.archive.org~8443.warc.gz" +  ) + +  JobTest("sandcrawler.CdxBackfillJob") +    .arg("test", "") +    .arg("app.conf.path", "app.conf") +    .arg("output", output) +    .arg("hbase-table", testTable) +    .arg("zookeeper-hosts", testHost) +    .arg("cdx-input-path", testCdxFile) +    .arg("debug", "true") +    .source[Tuple](CdxBackfillJob.getHBaseSource(testTable, testHost), +      sampleData.map(l => new Tuple(l.map(s => {new ImmutableBytesWritable(s)}):_*))) +    .source[String](TextLine(testCdxFile), sampleCdxLines) +    .sink[Tuple](CdxBackfillJob.getHBaseSink(testTable, testHost)) { +      outputBuffer => + +        it("should return a 1-element list (after join).") { +        // XXX: +          assert(outputBuffer.size === 1) +        } + +        // Convert List[Tuple] to Map[Long, Long]. +        val counts = outputBuffer.map(t => (t.getLong(0), t.getLong(1))).toMap +        it("should have the appropriate number of each status type") { +        // XXX: +          assert(counts(1) == 3) +        } +      } +    .run +    .finish +} diff --git a/scalding/src/test/scala/sandcrawler/HBaseStatusCountTest.scala b/scalding/src/test/scala/sandcrawler/HBaseStatusCountTest.scala index 11ab1d0..d7689cd 100644 --- a/scalding/src/test/scala/sandcrawler/HBaseStatusCountTest.scala +++ b/scalding/src/test/scala/sandcrawler/HBaseStatusCountTest.scala @@ -48,18 +48,18 @@ class HBaseStatusCountTest extends FunSpec with TupleConversions {      .arg("debug", "true")      .source[Tuple](HBaseCountJob.getHBaseSource(testTable, testHost, "grobid0:status_code"),        sampleData.map(l => new Tuple(l.map(s => {new ImmutableBytesWritable(s)}):_*))) -      .sink[Tuple](TypedTsv[(Long, Long)](output)) { -        outputBuffer => -        it("should return a 2-element list.") { -          assert(outputBuffer.size === 2) -        } +    .sink[Tuple](TypedTsv[(Long, Long)](output)) { +      outputBuffer => +      it("should return a 2-element list.") { +        assert(outputBuffer.size === 2) +      } -        // Convert List[Tuple] to Map[Long, Long]. -        val counts = outputBuffer.map(t => (t.getLong(0), t.getLong(1))).toMap -        it("should have the appropriate number of each status type") { -          assert(counts(statusType1) == statusType1Count) -          assert(counts(statusType2) == statusType2Count) -        } +      // Convert List[Tuple] to Map[Long, Long]. +      val counts = outputBuffer.map(t => (t.getLong(0), t.getLong(1))).toMap +      it("should have the appropriate number of each status type") { +        assert(counts(statusType1) == statusType1Count) +        assert(counts(statusType2) == statusType2Count) +      }      }      .run      .finish  | 
