aboutsummaryrefslogtreecommitdiffstats
path: root/scalding/src
diff options
context:
space:
mode:
Diffstat (limited to 'scalding/src')
-rw-r--r--scalding/src/main/scala/sandcrawler/CdxBackfillJob.scala187
-rw-r--r--scalding/src/test/scala/sandcrawler/CdxBackfillJob.scala175
2 files changed, 362 insertions, 0 deletions
diff --git a/scalding/src/main/scala/sandcrawler/CdxBackfillJob.scala b/scalding/src/main/scala/sandcrawler/CdxBackfillJob.scala
new file mode 100644
index 0000000..963fb10
--- /dev/null
+++ b/scalding/src/main/scala/sandcrawler/CdxBackfillJob.scala
@@ -0,0 +1,187 @@
+package sandcrawler
+
+import java.util.Properties
+
+import scala.util.Try
+import scala.util.matching.Regex
+import scala.util.parsing.json.JSONObject
+
+import cascading.pipe.joiner._
+import cascading.property.AppProps
+import cascading.tap.SinkMode
+import cascading.tuple.Fields
+import com.twitter.scalding._
+import com.twitter.scalding.typed.TDsl._
+import parallelai.spyglass.base.JobBase
+import parallelai.spyglass.hbase.HBaseConstants.SourceMode
+import parallelai.spyglass.hbase.HBasePipeConversions
+import parallelai.spyglass.hbase.HBaseSource
+
+// Type that represents a raw parsed CDX line
+case class CdxLine(surt: String, datetime: String, url: String, mime: String, httpStatus: String, sha1: String, c_size: String, offset: String, warc: String)
+
+/**
+ * CDX backfill:
+ * 1. parse CDX (all columns)
+ * 2. filter CDX (pdf, HTTP 200, etc)
+ * 3. source HBase (key column only)
+ * 4. left join CDX to HBase
+ * 5. filter to only those with null HBase key column
+ * 6. convert CDX fields to HBase columns
+ * 7. sink results to HBase
+ */
+class CdxBackfillJob(args: Args) extends JobBase(args) with HBasePipeConversions {
+
+ import CdxBackfillJob._
+
+ val hbaseSource = getHBaseSource(args("hbase-table"), args("zookeeper-hosts"))
+ val hbaseSink = getHBaseSink(args("hbase-table"), args("zookeeper-hosts"))
+
+ // Parse CDX lines from text file to typed pipe
+ val lines : TypedPipe[String] = TypedPipe.from(TextLine(args("cdx-input-path")))
+
+ val cdxLines : TypedPipe[CdxLine] = lines
+ .filter { isCdxLine }
+ .map { lineToCdxLine }
+ .filter { CdxBackfillJob.keepCdx(_) }
+
+ // (key, f:c, file:cdx, file:mime)
+ val cdxRows : TypedPipe[(String, String, String, String)] = cdxLines
+ .map { CdxBackfillJob.cdxLineToRow }
+ .debug
+
+ val existingKeys : TypedPipe[String] = hbaseSource
+ .read
+ .fromBytesWritable( new Fields("key") )
+ .toTypedPipe[String]('key)
+ //.debug
+
+ // filters out all the lines that have an existing SHA1 key in HBase
+ // the groupBy statements are to select key values to join on.
+ // (key, f:c, file:cdx, file:mime)
+ val newRows : TypedPipe[(String, String, String, String)] = existingKeys
+ .groupBy( identity )
+ .rightJoin(cdxRows.groupBy(_._1))
+ .toTypedPipe
+ .collect { case (_, (None, row)) => row }
+ .debug
+
+ // convert to tuple form and write out into HBase
+ newRows
+ .toPipe('key, 'c, 'cdx, 'mime)
+ .toBytesWritable( new Fields("key", "c", "cdx", "mime") )
+ .write(hbaseSink)
+
+}
+
+object CdxBackfillJob {
+
+ def getHBaseSource(hbase_table: String, zookeeper_hosts: String) : HBaseSource = {
+ HBaseBuilder.build(
+ hbase_table,
+ zookeeper_hosts,
+ List("file:size"), // not actually needed
+ SourceMode.SCAN_ALL)
+ }
+
+ def getHBaseSink(hbase_table: String, zookeeper_hosts: String) : HBaseSource = {
+ HBaseBuilder.buildSink(
+ hbase_table,
+ zookeeper_hosts,
+ List("f:c", "file:cdx", "file:mime"),
+ SinkMode.UPDATE)
+ }
+
+ def normalizeMime(raw: String) : String = {
+
+ val normalMime = Map(
+ "application/pdf" -> "application/pdf",
+ "application/x-pdf" -> "application/pdf",
+ "('application/pdf'" -> "application/pdf",
+ "image/pdf" -> "application/pdf",
+ "text/pdf" -> "application/pdf",
+ "\"application/pdf\"" -> "application/pdf",
+ "application/postscript" -> "application/postscript",
+ "text/html" -> "text/html",
+ "text/xml" -> "text/xml",
+ "application/xml" -> "text/xml"
+ )
+
+ val lower = raw.toLowerCase()
+ normalMime.find { case (key, _) =>
+ lower.startsWith(key)
+ } match {
+ case Some((_, value)) => value
+ case None => lower
+ }
+ }
+
+ def isCdxLine(line: String) : Boolean = {
+ // malformatted or non-CDX11 lines
+ !(line.startsWith("#") || line.startsWith(" ") || line.startsWith("filedesc") ||
+ line.split(" ").size != 11)
+ }
+
+ def keepCdx(line: CdxLine) : Boolean = {
+ val sha1Pattern = """[A-Z2-7]{32}""".r
+ if (List(line.surt, line.datetime, line.url, line.mime, line.c_size, line.offset, line.warc).contains("-")) {
+ false
+ } else if (line.httpStatus != "200") {
+ false
+ } else if (line.mime != "application/pdf") {
+ false
+ } else if (sha1Pattern.unapplySeq(line.sha1).isEmpty) {
+ false
+ } else if (List(line.c_size, line.offset, line.datetime).map(s => Try(s.toLong).toOption).contains(None)) {
+ false
+ } else {
+ true
+ }
+ }
+
+ // Returns (key, f:c, file:cdx, file:mime), all as strings, which is close to
+ // how they will be inserted into HBase
+ def cdxLineToRow(line: CdxLine) : (String, String, String, String) = {
+
+ val key = "sha1:" + line.sha1
+
+ val warcFile = line.warc.split('/')(1)
+
+ // Read CDX-style datetime and conver to ISO 8601 with second resolution
+ val dtFormat = new java.text.SimpleDateFormat("yyyyMMddHHmmss")
+ val isoFormat = new java.text.SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss'Z'")
+ // TODO: timezones? UTC to UTC, so I don't think so.
+ val dtIso = isoFormat.format(dtFormat.parse(line.datetime))
+
+ // This is the "f:c" field. 'i' intentionally not set
+ // python: f:c = dict(u=url, d=dt_iso, f=warc_file, o=int(offset), c=1)
+ // python: warc_file = warc.split('/')[-1]
+ // python: dt_iso = datetime.strptime(dt, "%Y%m%d%H%M%S").isoformat()
+ val heritrixInfo = JSONObject(Map(
+ "u" -> line.url,
+ "d" -> dtIso,
+ "f" -> warcFile,
+ "o" -> line.offset.toInt,
+ "c" -> line.c_size.toInt
+ ))
+
+ // python: dict(surt=surt, dt=dt, url=url, c_size=int(c_size),
+ // offset=int(offset), warc=warc)
+ val fileCdx = JSONObject(Map(
+ "surt" -> line.surt,
+ "dt" -> line.datetime,
+ "url" -> line.url,
+ "c_size" -> line.c_size.toInt,
+ "offset" -> line.offset.toInt,
+ "warc" -> line.warc
+ ))
+ (key, heritrixInfo.toString(), fileCdx.toString(), normalizeMime(line.mime))
+ }
+
+ def lineToCdxLine(line: String) : CdxLine = {
+ val raw = line.split("\\s+")
+ // surt, datetime, url, mime, http_status, sha1, SKIP, SKIP, c_size, offset, warc
+ CdxLine(raw(0), raw(1), raw(2), raw(3), raw(4), raw(5), raw(8), raw(9), raw(10))
+ }
+
+}
diff --git a/scalding/src/test/scala/sandcrawler/CdxBackfillJob.scala b/scalding/src/test/scala/sandcrawler/CdxBackfillJob.scala
new file mode 100644
index 0000000..c092f7f
--- /dev/null
+++ b/scalding/src/test/scala/sandcrawler/CdxBackfillJob.scala
@@ -0,0 +1,175 @@
+
+package sandcrawler
+
+import org.scalatest._
+import cascading.tuple.{Tuple, Fields}
+import com.twitter.scalding.{JobTest, Tsv, TypedTsv, TupleConversions, TextLine}
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable
+import org.apache.hadoop.hbase.util.Bytes
+import org.junit.runner.RunWith
+import org.scalatest.FunSpec
+import org.scalatest.junit.JUnitRunner
+import org.slf4j.LoggerFactory
+import parallelai.spyglass.hbase.HBaseSource
+import parallelai.spyglass.hbase.HBaseConstants.SourceMode
+import scala.util.parsing.json.JSON
+
+class CdxBackfillTest extends FlatSpec with Matchers {
+
+ import CdxBackfillJob._
+
+ it should "normalize mimetypes" in {
+ assert(CdxBackfillJob.normalizeMime("asdf") === "asdf")
+ assert(CdxBackfillJob.normalizeMime("application/pdf") === "application/pdf")
+ assert(CdxBackfillJob.normalizeMime("application/pdf+journal") === "application/pdf")
+ assert(CdxBackfillJob.normalizeMime("Application/PDF") === "application/pdf")
+ assert(CdxBackfillJob.normalizeMime("application/p") === "application/p")
+ assert(CdxBackfillJob.normalizeMime("application/xml+stuff") === "text/xml")
+ assert(CdxBackfillJob.normalizeMime("application/x-pdf") === "application/pdf")
+ assert(CdxBackfillJob.normalizeMime("application/x-html") === "application/x-html")
+ }
+
+ it should "filter CDX lines" in {
+ assert(true === keepCdx(lineToCdxLine(
+ """edu,upenn,ldc)/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf 20170828233154 https://www.ldc.upenn.edu/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf application/pdf 200 WL3FEA62TEU4F52Y5DOVQ62VET4QJW7G - - 210251 931661233 SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828231135742-00000-00009-wbgrp-svc284/SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828232253025-00005-3480~wbgrp-svc284.us.archive.org~8443.warc.gz""")))
+ // redirect
+ assert(false === keepCdx(lineToCdxLine(
+ "edu,upenn,ldc)/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf 20170828233154 https://www.ldc.upenn.edu/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf application/pdf 301 WL3FEA62TEU4F52Y5DOVQ62VET4QJW7G - - 210251 931661233 SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828231135742-00000-00009-wbgrp-svc284/SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828232253025-00005-3480~wbgrp-svc284.us.archive.org~8443.warc.gz")))
+ // not PDF
+ assert(false === keepCdx(lineToCdxLine(
+ """edu,upenn,ldc)/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf 20170828233154 https://www.ldc.upenn.edu/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf text/plain 200 WL3FEA62TEU4F52Y5DOVQ62VET4QJW7G - - 210251 931661233 SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828231135742-00000-00009-wbgrp-svc284/SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828232253025-00005-3480~wbgrp-svc284.us.archive.org~8443.warc.gz""")))
+ // invalid base32 SHA1
+ assert(false === keepCdx(lineToCdxLine(
+ """edu,upenn,ldc)/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf 20170828233154 https://www.ldc.upenn.edu/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf application/pdf 200 WL3FE010101010101010101VET4QJW7G - - 210251 931661233 SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828231135742-00000-00009-wbgrp-svc284/SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828232253025-00005-3480~wbgrp-svc284.us.archive.org~8443.warc.gz""")))
+ assert(false === keepCdx(lineToCdxLine(
+ """edu,upenn,ldc)/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf 20170828233154 https://www.ldc.upenn.edu/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf application/pdf 200 WL33FEA62TEU4F52Y5DOVQ62VET4QJW7G - - 210251 931661233 SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828231135742-00000-00009-wbgrp-svc284/SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828232253025-00005-3480~wbgrp-svc284.us.archive.org~8443.warc.gz""")))
+ // dashed field
+ assert(false === keepCdx(lineToCdxLine(
+ """edu,upenn,ldc)/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf 20170828233154 - application/pdf 200 WL3FEA62TEU4F52Y5DOVQ62VET4QJW7G - - 210251 931661233 SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828231135742-00000-00009-wbgrp-svc284/SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828232253025-00005-3480~wbgrp-svc284.us.archive.org~8443.warc.gz""")))
+ }
+
+ it should "know what CDX lines are" in {
+ assert(true === isCdxLine(
+ "edu,upenn,ldc)/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf 20170828233154 https://www.ldc.upenn.edu/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf application/pdf 200 WL3FEA62TEU4F52Y5DOVQ62VET4QJW7G - - 210251 931661233 SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828231135742-00000-00009-wbgrp-svc284/SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828232253025-00005-3480~wbgrp-svc284.us.archive.org~8443.warc.gz"))
+ assert(false === isCdxLine(""))
+ assert(false === isCdxLine(
+ " edu,upenn,ldc)/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf 20170828233154 https://www.ldc.upenn.edu/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf application/pdf 200 WL3FEA62TEU4F52Y5DOVQ62VET4QJW7G - - 210251 931661233 SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828231135742-00000-00009-wbgrp-svc284/SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828232253025-00005-3480~wbgrp-svc284.us.archive.org~8443.warc.gz"))
+ assert(false === isCdxLine(
+ "#edu,upenn,ldc)/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf 20170828233154 https://www.ldc.upenn.edu/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf application/pdf 200 WL3FEA62TEU4F52Y5DOVQ62VET4QJW7G - - 210251 931661233 SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828231135742-00000-00009-wbgrp-svc284/SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828232253025-00005-3480~wbgrp-svc284.us.archive.org~8443.warc.gz"))
+ // missing two fields
+ assert(false === isCdxLine(
+ "edu,upenn,ldc)/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf 20170828233154 https://www.ldc.upenn.edu/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf application/pdf 200 WL3FEA62TEU4F52Y5DOVQ62VET4QJW7G 210251 931661233 SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828231135742-00000-00009-wbgrp-svc284/SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828232253025-00005-3480~wbgrp-svc284.us.archive.org~8443.warc.gz"))
+ // extra field
+ assert(false === isCdxLine(
+ "edu,upenn,ldc)/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf 20170828233154 https://www.ldc.upenn.edu/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf application/pdf 200 WL3FEA62TEU4F52Y5DOVQ62VET4QJW7G - - 210251 931661233 SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828231135742-00000-00009-wbgrp-svc284/SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828232253025-00005-3480~wbgrp-svc284.us.archive.org~8443.warc.gz -"))
+ }
+
+ it should "execute lineToRow" in {
+ // this particular test copied from python test_backfill_hbase_from_cdx.py
+ val row = cdxLineToRow(lineToCdxLine(
+ "eu,eui,cadmus)/bitstream/handle/1814/36635/rscas_2015_03.pdf;jsessionid=761393014319a39f40d32ae3eb3a853f?sequence=1 20170705062202 http://cadmus.eui.eu/bitstream/handle/1814/36635/RSCAS_2015_03.pdf%3Bjsessionid%3D761393014319A39F40D32AE3EB3A853F?sequence%3D1 application/PDF 200 MPCXVWMUTRUGFP36SLPHKDLY6NGU4S3J - - 854156 328850624 CITESEERX-CRAWL-2017-06-20-20170705061647307-00039-00048-wbgrp-svc284/CITESEERX-CRAWL-2017-06-20-20170705062052659-00043-31209~wbgrp-svc284.us.archive.org~8443.warc.gz"))
+
+ assert(row._1 == "sha1:MPCXVWMUTRUGFP36SLPHKDLY6NGU4S3J")
+ JSON.parseFull(row._2) match {
+ case Some(obj: Map[String, Any]) => {
+ assert(obj("u") == "http://cadmus.eui.eu/bitstream/handle/1814/36635/RSCAS_2015_03.pdf%3Bjsessionid%3D761393014319A39F40D32AE3EB3A853F?sequence%3D1")
+ assert(obj("f") == "CITESEERX-CRAWL-2017-06-20-20170705062052659-00043-31209~wbgrp-svc284.us.archive.org~8443.warc.gz")
+ assert(obj("c") == 854156)
+ assert(obj("o") == 328850624)
+ assert(obj("d") == "2017-07-05T06:22:02Z")
+ }
+ case other => assert(false)
+ }
+ JSON.parseFull(row._3) match {
+ case Some(obj: Map[String, Any]) => {
+ assert(obj("surt") == "eu,eui,cadmus)/bitstream/handle/1814/36635/rscas_2015_03.pdf;jsessionid=761393014319a39f40d32ae3eb3a853f?sequence=1")
+ assert(obj("dt") == "20170705062202")
+ assert(obj("url") == "http://cadmus.eui.eu/bitstream/handle/1814/36635/RSCAS_2015_03.pdf%3Bjsessionid%3D761393014319A39F40D32AE3EB3A853F?sequence%3D1")
+ assert(obj("c_size") == 854156)
+ assert(obj("offset") == 328850624)
+ assert(obj("warc") == "CITESEERX-CRAWL-2017-06-20-20170705061647307-00039-00048-wbgrp-svc284/CITESEERX-CRAWL-2017-06-20-20170705062052659-00043-31209~wbgrp-svc284.us.archive.org~8443.warc.gz")
+ }
+ case other => assert(false)
+ }
+ assert(row._4 == "application/pdf")
+ }
+
+}
+
+@RunWith(classOf[JUnitRunner])
+class CdxBackfillJobTest extends FunSpec with TupleConversions {
+
+ val (testTable, testHost, testCdxFile) = ("test-table", "dummy-host:2181", "test_file.cdx")
+
+ val log = LoggerFactory.getLogger(this.getClass.getName)
+
+ val dummySizeBytes = Bytes.toBytes(100)
+
+ val sampleData = List(
+ List(Bytes.toBytes("sha1:K2DKSSVTXWPRMFDTWSTCQW3RVWRIOV3Q"), dummySizeBytes),
+ List(Bytes.toBytes("sha1:C3YNNEGH5WAG5ZAAXWAEBNXJWT6CZ3WU"), dummySizeBytes),
+ List(Bytes.toBytes("sha1:SDKUVHC3YNNEGH5WAG5ZAAXWAEBNX4WT"), dummySizeBytes),
+ List(Bytes.toBytes("sha1:095893C3YNNEGH5WAG5ZAAXWAEBNXJWT"), dummySizeBytes)
+ )
+ val sampleCdxLines = List(
+ // clean line
+ "0" -> """edu,upenn,ldc)/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf 20170828233154 https://www.ldc.upenn.edu/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf application/pdf 200 WL3FEA62TEU4F52Y5DOVQ62VET4QJW7G - - 210251 931661233 SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828231135742-00000-00009-wbgrp-svc284/SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828232253025-00005-3480~wbgrp-svc284.us.archive.org~8443.warc.gz""",
+ // has existing SHA1
+ "1" -> """edu,upenn,ldc)/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf 20170828233154 https://www.ldc.upenn.edu/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf application/pdf 200 C3YNNEGH5WAG5ZAAXWAEBNXJWT6CZ3WU - - 210251 931661233 SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828231135742-00000-00009-wbgrp-svc284/SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828232253025-00005-3480~wbgrp-svc284.us.archive.org~8443.warc.gz""",
+ // HTTP status code
+ "2" -> """edu,upenn,ldc)/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf 20170828233154 https://www.ldc.upenn.edu/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf application/pdf 301 WL3FEA62TEU4F52Y5DOVQ62VET4QJW7G - - 210251 931661233 SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828231135742-00000-00009-wbgrp-svc284/SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828232253025-00005-3480~wbgrp-svc284.us.archive.org~8443.warc.gz""",
+ // not CDX (prefixed with hash)
+ "3" -> """#edu,upenn,ldc)/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf 20170828233154 https://www.ldc.upenn.edu/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf application/pdf 200 WL3FEA62TEU4F52Y5DOVQ62VET4QJW7G - - 210251 931661233 SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828231135742-00000-00009-wbgrp-svc284/SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828232253025-00005-3480~wbgrp-svc284.us.archive.org~8443.warc.gz""",
+ // not PDF
+ "4" -> """edu,upenn,ldc)/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf 20170828233154 https://www.ldc.upenn.edu/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf application/film 200 AAAAAEA62TEU4F52Y5DOVQ62VET4QJW7G - - 210251 931661233 SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828231135742-00000-00009-wbgrp-svc284/SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828232253025-00005-3480~wbgrp-svc284.us.archive.org~8443.warc.gz"""
+ )
+
+ JobTest("sandcrawler.CdxBackfillJob")
+ .arg("test", "")
+ .arg("app.conf.path", "app.conf")
+ .arg("hbase-table", testTable)
+ .arg("zookeeper-hosts", testHost)
+ .arg("cdx-input-path", testCdxFile)
+ .arg("debug", "true")
+ .source[Tuple](CdxBackfillJob.getHBaseSource(testTable, testHost),
+ sampleData.map(l => new Tuple(l.map(s => {new ImmutableBytesWritable(s)}):_*)))
+ .source(TextLine(testCdxFile), sampleCdxLines)
+ .sink[(ImmutableBytesWritable, ImmutableBytesWritable, ImmutableBytesWritable, ImmutableBytesWritable)](CdxBackfillJob.getHBaseSink(testTable, testHost)) {
+ outputBuffer =>
+
+ val buf0 = outputBuffer(0)
+ val row0 = List(buf0._1, buf0._2, buf0._3, buf0._4).map(b => Bytes.toString(b.copyBytes()))
+
+ it("should return a 1-element list (after join).") {
+ assert(outputBuffer.size === 1)
+ }
+
+ it("should insert the valid, new CDX line") {
+ assert(row0(0) == "sha1:WL3FEA62TEU4F52Y5DOVQ62VET4QJW7G")
+ JSON.parseFull(row0(1)) match {
+ case Some(obj: Map[String, Any]) => {
+ assert(obj("u") == "https://www.ldc.upenn.edu/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf")
+ assert(obj("f") == "SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828232253025-00005-3480~wbgrp-svc284.us.archive.org~8443.warc.gz")
+ assert(obj("c") == 210251)
+ assert(obj("o") == 931661233)
+ assert(obj("d") == "2017-08-28T23:31:54Z")
+ }
+ case other => assert(false)
+ }
+ JSON.parseFull(row0(2)) match {
+ case Some(obj: Map[String, Any]) => {
+ assert(obj("surt") == "edu,upenn,ldc)/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf")
+ assert(obj("dt") == "20170828233154")
+ assert(obj("url") == "https://www.ldc.upenn.edu/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf")
+ assert(obj("c_size") == 210251)
+ assert(obj("offset") == 931661233)
+ assert(obj("warc") == "SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828231135742-00000-00009-wbgrp-svc284/SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828232253025-00005-3480~wbgrp-svc284.us.archive.org~8443.warc.gz")
+ }
+ case other => assert(false)
+ }
+ assert(row0(3) == "application/pdf")
+ }
+ }
+ .run
+ .finish
+}