diff options
| author | bnewbold <bnewbold@archive.org> | 2021-10-04 20:05:21 +0000 | 
|---|---|---|
| committer | bnewbold <bnewbold@archive.org> | 2021-10-04 20:05:21 +0000 | 
| commit | 57f879c00b00c6cd4051f54662fea3f96f80ad35 (patch) | |
| tree | 8dc306d29ea8778fc4553d0ea8ff6e0d6b1b6fbb /scalding | |
| parent | 96033132be8976f0c9483a18dfe4a58bf94b0011 (diff) | |
| parent | d71cc4e6cd7381f5f0596af1ce33c1bc744c8644 (diff) | |
| download | sandcrawler-57f879c00b00c6cd4051f54662fea3f96f80ad35.tar.gz sandcrawler-57f879c00b00c6cd4051f54662fea3f96f80ad35.zip  | |
Merge branch 'bnewbold-backfill' into 'master'
CDX Backfill (scalding version)
See merge request webgroup/sandcrawler!12
Diffstat (limited to 'scalding')
| -rw-r--r-- | scalding/src/main/scala/sandcrawler/CdxBackfillJob.scala | 187 | ||||
| -rw-r--r-- | scalding/src/test/scala/sandcrawler/CdxBackfillJob.scala | 175 | 
2 files changed, 362 insertions, 0 deletions
diff --git a/scalding/src/main/scala/sandcrawler/CdxBackfillJob.scala b/scalding/src/main/scala/sandcrawler/CdxBackfillJob.scala new file mode 100644 index 0000000..963fb10 --- /dev/null +++ b/scalding/src/main/scala/sandcrawler/CdxBackfillJob.scala @@ -0,0 +1,187 @@ +package sandcrawler + +import java.util.Properties + +import scala.util.Try +import scala.util.matching.Regex +import scala.util.parsing.json.JSONObject + +import cascading.pipe.joiner._ +import cascading.property.AppProps +import cascading.tap.SinkMode +import cascading.tuple.Fields +import com.twitter.scalding._ +import com.twitter.scalding.typed.TDsl._ +import parallelai.spyglass.base.JobBase +import parallelai.spyglass.hbase.HBaseConstants.SourceMode +import parallelai.spyglass.hbase.HBasePipeConversions +import parallelai.spyglass.hbase.HBaseSource + +// Type that represents a raw parsed CDX line +case class CdxLine(surt: String, datetime: String, url: String, mime: String, httpStatus: String, sha1: String, c_size: String, offset: String, warc: String) + +/** + *  CDX backfill: + *  1. parse CDX (all columns) + *  2. filter CDX (pdf, HTTP 200, etc) + *  3. source HBase (key column only) + *  4. left join CDX to HBase + *  5. filter to only those with null HBase key column + *  6. convert CDX fields to HBase columns + *  7. sink results to HBase + */ +class CdxBackfillJob(args: Args) extends JobBase(args) with HBasePipeConversions { + +  import CdxBackfillJob._ + +  val hbaseSource = getHBaseSource(args("hbase-table"), args("zookeeper-hosts")) +  val hbaseSink = getHBaseSink(args("hbase-table"), args("zookeeper-hosts")) + +  // Parse CDX lines from text file to typed pipe +  val lines : TypedPipe[String] = TypedPipe.from(TextLine(args("cdx-input-path"))) + +  val cdxLines : TypedPipe[CdxLine] = lines +    .filter { isCdxLine } +    .map { lineToCdxLine } +    .filter { CdxBackfillJob.keepCdx(_) } + +  // (key, f:c, file:cdx, file:mime) +  val cdxRows : TypedPipe[(String, String, String, String)] = cdxLines +    .map { CdxBackfillJob.cdxLineToRow } +    .debug + +  val existingKeys : TypedPipe[String] = hbaseSource +    .read +    .fromBytesWritable( new Fields("key") ) +    .toTypedPipe[String]('key) +    //.debug + +  // filters out all the lines that have an existing SHA1 key in HBase +  // the groupBy statements are to select key values to join on. +  // (key, f:c, file:cdx, file:mime) +  val newRows : TypedPipe[(String, String, String, String)] = existingKeys +    .groupBy( identity ) +    .rightJoin(cdxRows.groupBy(_._1)) +    .toTypedPipe +    .collect { case (_, (None, row)) => row } +    .debug + +  // convert to tuple form and write out into HBase +  newRows +    .toPipe('key, 'c, 'cdx, 'mime) +    .toBytesWritable( new Fields("key", "c", "cdx", "mime") ) +    .write(hbaseSink) + +} + +object CdxBackfillJob { + +  def getHBaseSource(hbase_table: String, zookeeper_hosts: String) : HBaseSource = { +    HBaseBuilder.build( +      hbase_table, +      zookeeper_hosts, +      List("file:size"), // not actually needed +      SourceMode.SCAN_ALL) +  } + +  def getHBaseSink(hbase_table: String, zookeeper_hosts: String) : HBaseSource = { +    HBaseBuilder.buildSink( +      hbase_table, +      zookeeper_hosts, +      List("f:c", "file:cdx", "file:mime"), +      SinkMode.UPDATE) +  } + +  def normalizeMime(raw: String) : String = { + +    val normalMime = Map( +      "application/pdf" -> "application/pdf", +      "application/x-pdf" -> "application/pdf", +      "('application/pdf'" -> "application/pdf", +      "image/pdf" -> "application/pdf", +      "text/pdf" -> "application/pdf", +      "\"application/pdf\"" -> "application/pdf", +      "application/postscript" -> "application/postscript", +      "text/html" -> "text/html", +      "text/xml" -> "text/xml", +      "application/xml" -> "text/xml" +    ) + +    val lower = raw.toLowerCase() +    normalMime.find { case (key, _) => +      lower.startsWith(key) +    } match { +      case Some((_, value)) => value +      case None => lower +    } +  } + +  def isCdxLine(line: String) : Boolean = { +    // malformatted or non-CDX11 lines +    !(line.startsWith("#") || line.startsWith(" ") || line.startsWith("filedesc") || +      line.split(" ").size != 11) +  } + +  def keepCdx(line: CdxLine) : Boolean = { +    val sha1Pattern = """[A-Z2-7]{32}""".r +    if (List(line.surt, line.datetime, line.url, line.mime, line.c_size, line.offset, line.warc).contains("-")) { +      false +    } else if (line.httpStatus != "200") { +      false +    } else if (line.mime != "application/pdf") { +      false +    } else if (sha1Pattern.unapplySeq(line.sha1).isEmpty) { +      false +    } else if (List(line.c_size, line.offset, line.datetime).map(s => Try(s.toLong).toOption).contains(None)) { +      false +    } else { +      true +    } +  } + +  // Returns (key, f:c, file:cdx, file:mime), all as strings, which is close to +  // how they will be inserted into HBase +  def cdxLineToRow(line: CdxLine) : (String, String, String, String) = { + +    val key = "sha1:" + line.sha1 + +    val warcFile = line.warc.split('/')(1) + +    // Read CDX-style datetime and conver to ISO 8601 with second resolution +    val dtFormat = new java.text.SimpleDateFormat("yyyyMMddHHmmss") +    val isoFormat = new java.text.SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss'Z'") +    // TODO: timezones? UTC to UTC, so I don't think so. +    val dtIso = isoFormat.format(dtFormat.parse(line.datetime)) + +    // This is the "f:c" field. 'i' intentionally not set +    // python: f:c = dict(u=url, d=dt_iso, f=warc_file, o=int(offset), c=1) +    // python: warc_file = warc.split('/')[-1] +    // python: dt_iso = datetime.strptime(dt, "%Y%m%d%H%M%S").isoformat() +    val heritrixInfo = JSONObject(Map( +      "u" -> line.url, +      "d" -> dtIso, +      "f" -> warcFile, +      "o" -> line.offset.toInt, +      "c" -> line.c_size.toInt +    )) + +    // python: dict(surt=surt, dt=dt, url=url, c_size=int(c_size), +    //                 offset=int(offset), warc=warc) +    val fileCdx = JSONObject(Map( +      "surt" -> line.surt, +      "dt" -> line.datetime, +      "url" -> line.url, +      "c_size" -> line.c_size.toInt, +      "offset" -> line.offset.toInt, +      "warc" -> line.warc +    )) +    (key, heritrixInfo.toString(), fileCdx.toString(), normalizeMime(line.mime)) +  } + +  def lineToCdxLine(line: String) : CdxLine = { +    val raw = line.split("\\s+") +    // surt, datetime, url, mime, http_status, sha1, SKIP, SKIP, c_size, offset, warc +    CdxLine(raw(0), raw(1), raw(2), raw(3), raw(4), raw(5), raw(8), raw(9), raw(10)) +  } + +} diff --git a/scalding/src/test/scala/sandcrawler/CdxBackfillJob.scala b/scalding/src/test/scala/sandcrawler/CdxBackfillJob.scala new file mode 100644 index 0000000..c092f7f --- /dev/null +++ b/scalding/src/test/scala/sandcrawler/CdxBackfillJob.scala @@ -0,0 +1,175 @@ + +package sandcrawler + +import org.scalatest._ +import cascading.tuple.{Tuple, Fields} +import com.twitter.scalding.{JobTest, Tsv, TypedTsv, TupleConversions, TextLine} +import org.apache.hadoop.hbase.io.ImmutableBytesWritable +import org.apache.hadoop.hbase.util.Bytes +import org.junit.runner.RunWith +import org.scalatest.FunSpec +import org.scalatest.junit.JUnitRunner +import org.slf4j.LoggerFactory +import parallelai.spyglass.hbase.HBaseSource +import parallelai.spyglass.hbase.HBaseConstants.SourceMode +import scala.util.parsing.json.JSON + +class CdxBackfillTest extends FlatSpec with Matchers { + +  import CdxBackfillJob._ + +  it should "normalize mimetypes" in { +    assert(CdxBackfillJob.normalizeMime("asdf") === "asdf") +    assert(CdxBackfillJob.normalizeMime("application/pdf") === "application/pdf") +    assert(CdxBackfillJob.normalizeMime("application/pdf+journal") === "application/pdf") +    assert(CdxBackfillJob.normalizeMime("Application/PDF") === "application/pdf") +    assert(CdxBackfillJob.normalizeMime("application/p") === "application/p") +    assert(CdxBackfillJob.normalizeMime("application/xml+stuff") === "text/xml") +    assert(CdxBackfillJob.normalizeMime("application/x-pdf") === "application/pdf") +    assert(CdxBackfillJob.normalizeMime("application/x-html") === "application/x-html") +  } + +  it should "filter CDX lines" in { +    assert(true === keepCdx(lineToCdxLine( +      """edu,upenn,ldc)/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf 20170828233154 https://www.ldc.upenn.edu/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf application/pdf 200 WL3FEA62TEU4F52Y5DOVQ62VET4QJW7G - - 210251 931661233 SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828231135742-00000-00009-wbgrp-svc284/SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828232253025-00005-3480~wbgrp-svc284.us.archive.org~8443.warc.gz"""))) +    // redirect +    assert(false === keepCdx(lineToCdxLine( +      "edu,upenn,ldc)/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf 20170828233154 https://www.ldc.upenn.edu/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf application/pdf 301 WL3FEA62TEU4F52Y5DOVQ62VET4QJW7G - - 210251 931661233 SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828231135742-00000-00009-wbgrp-svc284/SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828232253025-00005-3480~wbgrp-svc284.us.archive.org~8443.warc.gz"))) +    // not PDF +    assert(false === keepCdx(lineToCdxLine( +      """edu,upenn,ldc)/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf 20170828233154 https://www.ldc.upenn.edu/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf text/plain 200 WL3FEA62TEU4F52Y5DOVQ62VET4QJW7G - - 210251 931661233 SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828231135742-00000-00009-wbgrp-svc284/SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828232253025-00005-3480~wbgrp-svc284.us.archive.org~8443.warc.gz"""))) +    // invalid base32 SHA1 +    assert(false === keepCdx(lineToCdxLine( +      """edu,upenn,ldc)/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf 20170828233154 https://www.ldc.upenn.edu/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf application/pdf 200 WL3FE010101010101010101VET4QJW7G - - 210251 931661233 SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828231135742-00000-00009-wbgrp-svc284/SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828232253025-00005-3480~wbgrp-svc284.us.archive.org~8443.warc.gz"""))) +    assert(false === keepCdx(lineToCdxLine( +      """edu,upenn,ldc)/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf 20170828233154 https://www.ldc.upenn.edu/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf application/pdf 200 WL33FEA62TEU4F52Y5DOVQ62VET4QJW7G - - 210251 931661233 SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828231135742-00000-00009-wbgrp-svc284/SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828232253025-00005-3480~wbgrp-svc284.us.archive.org~8443.warc.gz"""))) +    // dashed field +    assert(false === keepCdx(lineToCdxLine( +      """edu,upenn,ldc)/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf 20170828233154 - application/pdf 200 WL3FEA62TEU4F52Y5DOVQ62VET4QJW7G - - 210251 931661233 SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828231135742-00000-00009-wbgrp-svc284/SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828232253025-00005-3480~wbgrp-svc284.us.archive.org~8443.warc.gz"""))) +  } + +  it should "know what CDX lines are" in { +    assert(true === isCdxLine( +      "edu,upenn,ldc)/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf 20170828233154 https://www.ldc.upenn.edu/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf application/pdf 200 WL3FEA62TEU4F52Y5DOVQ62VET4QJW7G - - 210251 931661233 SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828231135742-00000-00009-wbgrp-svc284/SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828232253025-00005-3480~wbgrp-svc284.us.archive.org~8443.warc.gz")) +    assert(false === isCdxLine("")) +    assert(false === isCdxLine( +      " edu,upenn,ldc)/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf 20170828233154 https://www.ldc.upenn.edu/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf application/pdf 200 WL3FEA62TEU4F52Y5DOVQ62VET4QJW7G - - 210251 931661233 SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828231135742-00000-00009-wbgrp-svc284/SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828232253025-00005-3480~wbgrp-svc284.us.archive.org~8443.warc.gz")) +    assert(false === isCdxLine( +      "#edu,upenn,ldc)/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf 20170828233154 https://www.ldc.upenn.edu/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf application/pdf 200 WL3FEA62TEU4F52Y5DOVQ62VET4QJW7G - - 210251 931661233 SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828231135742-00000-00009-wbgrp-svc284/SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828232253025-00005-3480~wbgrp-svc284.us.archive.org~8443.warc.gz")) +    // missing two fields +    assert(false === isCdxLine( +      "edu,upenn,ldc)/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf 20170828233154 https://www.ldc.upenn.edu/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf application/pdf 200 WL3FEA62TEU4F52Y5DOVQ62VET4QJW7G 210251 931661233 SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828231135742-00000-00009-wbgrp-svc284/SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828232253025-00005-3480~wbgrp-svc284.us.archive.org~8443.warc.gz")) +    // extra field +    assert(false === isCdxLine( +      "edu,upenn,ldc)/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf 20170828233154 https://www.ldc.upenn.edu/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf application/pdf 200 WL3FEA62TEU4F52Y5DOVQ62VET4QJW7G - - 210251 931661233 SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828231135742-00000-00009-wbgrp-svc284/SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828232253025-00005-3480~wbgrp-svc284.us.archive.org~8443.warc.gz -")) +  } + +  it should "execute lineToRow" in { +    // this particular test copied from python test_backfill_hbase_from_cdx.py +    val row = cdxLineToRow(lineToCdxLine( +      "eu,eui,cadmus)/bitstream/handle/1814/36635/rscas_2015_03.pdf;jsessionid=761393014319a39f40d32ae3eb3a853f?sequence=1 20170705062202 http://cadmus.eui.eu/bitstream/handle/1814/36635/RSCAS_2015_03.pdf%3Bjsessionid%3D761393014319A39F40D32AE3EB3A853F?sequence%3D1 application/PDF 200 MPCXVWMUTRUGFP36SLPHKDLY6NGU4S3J - - 854156 328850624 CITESEERX-CRAWL-2017-06-20-20170705061647307-00039-00048-wbgrp-svc284/CITESEERX-CRAWL-2017-06-20-20170705062052659-00043-31209~wbgrp-svc284.us.archive.org~8443.warc.gz")) + +    assert(row._1 == "sha1:MPCXVWMUTRUGFP36SLPHKDLY6NGU4S3J") +    JSON.parseFull(row._2) match { +      case Some(obj: Map[String, Any]) => { +        assert(obj("u") == "http://cadmus.eui.eu/bitstream/handle/1814/36635/RSCAS_2015_03.pdf%3Bjsessionid%3D761393014319A39F40D32AE3EB3A853F?sequence%3D1") +        assert(obj("f") == "CITESEERX-CRAWL-2017-06-20-20170705062052659-00043-31209~wbgrp-svc284.us.archive.org~8443.warc.gz") +        assert(obj("c") == 854156) +        assert(obj("o") == 328850624) +        assert(obj("d") == "2017-07-05T06:22:02Z") +      } +      case other => assert(false) +    } +    JSON.parseFull(row._3) match { +      case Some(obj: Map[String, Any]) => { +        assert(obj("surt") == "eu,eui,cadmus)/bitstream/handle/1814/36635/rscas_2015_03.pdf;jsessionid=761393014319a39f40d32ae3eb3a853f?sequence=1") +        assert(obj("dt") == "20170705062202") +        assert(obj("url") == "http://cadmus.eui.eu/bitstream/handle/1814/36635/RSCAS_2015_03.pdf%3Bjsessionid%3D761393014319A39F40D32AE3EB3A853F?sequence%3D1") +        assert(obj("c_size") == 854156) +        assert(obj("offset") == 328850624) +        assert(obj("warc") == "CITESEERX-CRAWL-2017-06-20-20170705061647307-00039-00048-wbgrp-svc284/CITESEERX-CRAWL-2017-06-20-20170705062052659-00043-31209~wbgrp-svc284.us.archive.org~8443.warc.gz") +      } +      case other => assert(false) +    } +    assert(row._4 == "application/pdf") +  } + +} + +@RunWith(classOf[JUnitRunner]) +class CdxBackfillJobTest extends FunSpec with TupleConversions { + +  val (testTable, testHost, testCdxFile) = ("test-table", "dummy-host:2181", "test_file.cdx") + +  val log = LoggerFactory.getLogger(this.getClass.getName) + +  val dummySizeBytes = Bytes.toBytes(100) + +  val sampleData = List( +    List(Bytes.toBytes("sha1:K2DKSSVTXWPRMFDTWSTCQW3RVWRIOV3Q"), dummySizeBytes), +    List(Bytes.toBytes("sha1:C3YNNEGH5WAG5ZAAXWAEBNXJWT6CZ3WU"), dummySizeBytes), +    List(Bytes.toBytes("sha1:SDKUVHC3YNNEGH5WAG5ZAAXWAEBNX4WT"), dummySizeBytes), +    List(Bytes.toBytes("sha1:095893C3YNNEGH5WAG5ZAAXWAEBNXJWT"), dummySizeBytes) +  ) +  val sampleCdxLines = List( +    // clean line +    "0" -> """edu,upenn,ldc)/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf 20170828233154 https://www.ldc.upenn.edu/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf application/pdf 200 WL3FEA62TEU4F52Y5DOVQ62VET4QJW7G - - 210251 931661233 SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828231135742-00000-00009-wbgrp-svc284/SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828232253025-00005-3480~wbgrp-svc284.us.archive.org~8443.warc.gz""", +    // has existing SHA1 +    "1" -> """edu,upenn,ldc)/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf 20170828233154 https://www.ldc.upenn.edu/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf application/pdf 200 C3YNNEGH5WAG5ZAAXWAEBNXJWT6CZ3WU - - 210251 931661233 SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828231135742-00000-00009-wbgrp-svc284/SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828232253025-00005-3480~wbgrp-svc284.us.archive.org~8443.warc.gz""", +    // HTTP status code +    "2" -> """edu,upenn,ldc)/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf 20170828233154 https://www.ldc.upenn.edu/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf application/pdf 301 WL3FEA62TEU4F52Y5DOVQ62VET4QJW7G - - 210251 931661233 SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828231135742-00000-00009-wbgrp-svc284/SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828232253025-00005-3480~wbgrp-svc284.us.archive.org~8443.warc.gz""", +    // not CDX (prefixed with hash) +    "3" -> """#edu,upenn,ldc)/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf 20170828233154 https://www.ldc.upenn.edu/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf application/pdf 200 WL3FEA62TEU4F52Y5DOVQ62VET4QJW7G - - 210251 931661233 SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828231135742-00000-00009-wbgrp-svc284/SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828232253025-00005-3480~wbgrp-svc284.us.archive.org~8443.warc.gz""", +    // not PDF +    "4" -> """edu,upenn,ldc)/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf 20170828233154 https://www.ldc.upenn.edu/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf application/film 200 AAAAAEA62TEU4F52Y5DOVQ62VET4QJW7G - - 210251 931661233 SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828231135742-00000-00009-wbgrp-svc284/SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828232253025-00005-3480~wbgrp-svc284.us.archive.org~8443.warc.gz""" +  ) + +  JobTest("sandcrawler.CdxBackfillJob") +    .arg("test", "") +    .arg("app.conf.path", "app.conf") +    .arg("hbase-table", testTable) +    .arg("zookeeper-hosts", testHost) +    .arg("cdx-input-path", testCdxFile) +    .arg("debug", "true") +    .source[Tuple](CdxBackfillJob.getHBaseSource(testTable, testHost), +      sampleData.map(l => new Tuple(l.map(s => {new ImmutableBytesWritable(s)}):_*))) +    .source(TextLine(testCdxFile), sampleCdxLines) +    .sink[(ImmutableBytesWritable, ImmutableBytesWritable, ImmutableBytesWritable, ImmutableBytesWritable)](CdxBackfillJob.getHBaseSink(testTable, testHost)) { +      outputBuffer => + +        val buf0 = outputBuffer(0) +        val row0 = List(buf0._1, buf0._2, buf0._3, buf0._4).map(b => Bytes.toString(b.copyBytes())) + +        it("should return a 1-element list (after join).") { +          assert(outputBuffer.size === 1) +        } + +        it("should insert the valid, new CDX line") { +          assert(row0(0) == "sha1:WL3FEA62TEU4F52Y5DOVQ62VET4QJW7G") +          JSON.parseFull(row0(1)) match { +            case Some(obj: Map[String, Any]) => { +              assert(obj("u") == "https://www.ldc.upenn.edu/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf") +              assert(obj("f") == "SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828232253025-00005-3480~wbgrp-svc284.us.archive.org~8443.warc.gz") +              assert(obj("c") == 210251) +              assert(obj("o") == 931661233) +              assert(obj("d") == "2017-08-28T23:31:54Z") +            } +            case other => assert(false) +          } +          JSON.parseFull(row0(2)) match { +            case Some(obj: Map[String, Any]) => { +              assert(obj("surt") == "edu,upenn,ldc)/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf") +              assert(obj("dt") == "20170828233154") +              assert(obj("url") == "https://www.ldc.upenn.edu/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf") +              assert(obj("c_size") == 210251) +              assert(obj("offset") == 931661233) +              assert(obj("warc") == "SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828231135742-00000-00009-wbgrp-svc284/SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828232253025-00005-3480~wbgrp-svc284.us.archive.org~8443.warc.gz") +            } +            case other => assert(false) +          } +          assert(row0(3) == "application/pdf") +        } +      } +    .run +    .finish +}  | 
