aboutsummaryrefslogtreecommitdiffstats
path: root/scalding/src
diff options
context:
space:
mode:
Diffstat (limited to 'scalding/src')
-rw-r--r--scalding/src/main/scala/sandcrawler/CdxBackfillJob.scala173
-rw-r--r--scalding/src/test/scala/sandcrawler/CdxBackfillJob.scala117
2 files changed, 0 insertions, 290 deletions
diff --git a/scalding/src/main/scala/sandcrawler/CdxBackfillJob.scala b/scalding/src/main/scala/sandcrawler/CdxBackfillJob.scala
deleted file mode 100644
index 0251e07..0000000
--- a/scalding/src/main/scala/sandcrawler/CdxBackfillJob.scala
+++ /dev/null
@@ -1,173 +0,0 @@
-package sandcrawler
-
-import cascading.property.AppProps
-import cascading.tuple.Fields
-import cascading.pipe.joiner._
-import com.twitter.scalding._
-import com.twitter.scalding.typed.TDsl._
-import java.util.Properties
-import cascading.tap.SinkMode
-import parallelai.spyglass.base.JobBase
-import parallelai.spyglass.hbase.HBaseConstants.SourceMode
-import parallelai.spyglass.hbase.{HBaseSource, HBasePipeConversions}
-
-// Type that represents a raw parsed CDX line
-case class CdxLine(surt: String,
- datetime: String,
- url: String,
- mime: String,
- http_status: String,
- sha1: String,
- c_size: String,
- offset: String,
- warc: String)
-
-
-/**
- * CDX backfill:
- * 1. parse CDX (all columns)
- * 2. filter CDX (pdf, HTTP 200, etc)
- * 3. source HBase (key column only)
- * 4. left join CDX to HBase
- * 5. filter to only those with null HBase key column
- * 6. convert CDX fields to HBase columns
- * 7. sink results to HBase
- *
- * TODO: I really mixed the Scalding "field-base" and "type-based" APIs here.
- * Should decide on a best practice.
- */
-class CdxBackfillJob(args: Args) extends JobBase(args) with HBasePipeConversions {
-
- import CdxBackfillJob._
-
- val hbaseSource = getHBaseSource(args("hbase-table"), args("zookeeper-hosts"))
- val hbaseSink = getHBaseSink(args("hbase-table"), args("zookeeper-hosts"))
-
- // Parse CDX lines from text file to typed pipe
- val lines : TypedPipe[String] = TypedPipe.from(TextLine(args("cdx-input-path")))
- val cdxLines : TypedPipe[CdxLine] = lines
- .filter { isCdxLine }
- .map { lineToCdxLine }
- .filter { CdxBackfillJob.keepCdx(_) }
-
- val cdxRows : TypedPipe[(String, String, String, String)] = cdxLines
- .map { CdxBackfillJob.cdxLineToRow }
-
- val existingKeys : TypedPipe[String] = hbaseSource
- .read
- .toTypedPipe[String]('key)
-
- // filters out all the lines that have an existing SHA1 key in HBase
- // the groupBy statements are to select key values to join on
- val newRows : TypedPipe[(String, String, String, String)] = existingKeys
- .groupBy( identity )
- .rightJoin(cdxRows.groupBy(_._1))
- .toTypedPipe
- .debug
- .collect { case (_, (None, row)) => row }
- .debug
-
- // convert to tuple form and write out into HBase
- newRows
- .toPipe('key, 'c, 'cdx, 'mime)
- .toBytesWritable( new Fields("key", "c", "cdx", "mime") )
- .write(hbaseSink)
-
- // XXX:
- //.toPipe("all")
- //.mapTo('all -> ('key, 'c, 'cdx, 'mime)) { x : (String, String, String, String) => x }
-
-}
-
-object CdxBackfillJob {
-
- def getHBaseSource(hbase_table: String, zookeeper_hosts: String) : HBaseSource = {
- return HBaseBuilder.build(
- hbase_table,
- zookeeper_hosts,
- List("file:size"), // not actually needed
- SourceMode.SCAN_ALL)
- }
-
- def getHBaseSink(hbase_table: String, zookeeper_hosts: String) : HBaseSource = {
- return HBaseBuilder.buildSink(
- hbase_table,
- zookeeper_hosts,
- List("f:c", "file:cdx", "file:mime"),
- SinkMode.UPDATE)
- }
-
- def normalizeMime(raw: String) : String = {
-
- val NORMAL_MIME = List("application/pdf",
- "application/postscript",
- "text/html",
- "text/xml")
-
- val lower = raw.toLowerCase()
- NORMAL_MIME.foreach(norm =>
- if (lower.startsWith(norm)) {
- return norm
- }
- )
-
- // Common special cases
- if (lower.startsWith("application/xml")) {
- return "text/xml"
- }
- if (lower.startsWith("application/x-pdf")) {
- return "application/pdf"
- }
- return lower
-
- }
-
- def isCdxLine(line: String) : Boolean = {
- // malformated or non-CDX11 lines
- !(line.startsWith("#") || line.startsWith(" ") || line.startsWith("filedesc") ||
- line.split(" ").size != 11)
- }
-
- def keepCdx(line: CdxLine) : Boolean = {
- // TODO: sha1.isalnum() and c_size.isdigit() and offset.isdigit() and dt.isdigit()
- if (line.http_status != "200" || line.sha1.size != 32) {
- return false
- }
- // TODO: '-' in (line.surt, line.datetime, line.url, line.mime, line.c_size, line.offset, line.warc)
- return true
- }
-
- // Returns (key, f:c, file:cdx, file:mime), all as strings, which is close to
- // how they will be inserted into HBase
- def cdxLineToRow(line: CdxLine) : (String, String, String, String) = {
-
- val key = "sha1:" + line.sha1
-
- val warcFile = line.warc.split('/')(1)
-
- // Read CDX-style datetime and conver to ISO 8601 with second resolution
- val dtFormat = new java.text.SimpleDateFormat("yyyyMMddHHmmss")
- val isoFormat = new java.text.SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss'Z'")
- // TODO: timezones? UTC to UTC, so I don't think so.
- val dtIso = isoFormat.format(dtFormat.parse(line.datetime))
-
- // warc_file = warc.split('/')[-1]
- // dt_iso = datetime.strptime(dt, "%Y%m%d%H%M%S").isoformat()
- // f:c = dict(u=url, d=dt_iso, f=warc_file, o=int(offset), c=1)
-
- // This is the "f:c" field. 'i' intentionally not set
- val heritrixInfo = ""
-
- // file:cdx = dict(surt=surt, dt=dt, url=url, c_size=int(c_size),
- // offset=int(offset), warc=warc)
- val fileCdx = ""
- (key, heritrixInfo, fileCdx, line.mime)
- }
-
- def lineToCdxLine(line: String) : CdxLine = {
- val raw = line.split("\\s+")
- // surt, datetime, url, mime, http_status, sha1, SKIP, SKIP, c_size, offset, warc
- CdxLine(raw(0), raw(1), raw(2), raw(3), raw(4), raw(5), raw(8), raw(9), raw(10))
- }
-
-}
diff --git a/scalding/src/test/scala/sandcrawler/CdxBackfillJob.scala b/scalding/src/test/scala/sandcrawler/CdxBackfillJob.scala
deleted file mode 100644
index de94494..0000000
--- a/scalding/src/test/scala/sandcrawler/CdxBackfillJob.scala
+++ /dev/null
@@ -1,117 +0,0 @@
-
-package sandcrawler
-
-import org.scalatest._
-import cascading.tuple.{Tuple, Fields}
-import com.twitter.scalding.{JobTest, Tsv, TypedTsv, TupleConversions, TextLine}
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable
-import org.apache.hadoop.hbase.util.Bytes
-import org.junit.runner.RunWith
-import org.scalatest.FunSpec
-import org.scalatest.junit.JUnitRunner
-import org.slf4j.LoggerFactory
-import parallelai.spyglass.hbase.HBaseSource
-import parallelai.spyglass.hbase.HBaseConstants.SourceMode
-
-class CdxBackfillTest extends FlatSpec with Matchers {
-
- import CdxBackfillJob._
-
- it should "normalize mimetypes" in {
- assert(CdxBackfillJob.normalizeMime("asdf") === "asdf")
- assert(CdxBackfillJob.normalizeMime("application/pdf") === "application/pdf")
- assert(CdxBackfillJob.normalizeMime("application/pdf+journal") === "application/pdf")
- assert(CdxBackfillJob.normalizeMime("Application/PDF") === "application/pdf")
- assert(CdxBackfillJob.normalizeMime("application/p") === "application/p")
- assert(CdxBackfillJob.normalizeMime("application/xml+stuff") === "text/xml")
- assert(CdxBackfillJob.normalizeMime("application/x-pdf") === "application/pdf")
- assert(CdxBackfillJob.normalizeMime("application/x-html") === "application/x-html")
- }
-
- it should "filter CDX lines" in {
- assert(true === keepCdx(lineToCdxLine(
- "edu,upenn,ldc)/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf 20170828233154 https://www.ldc.upenn.edu/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf application/pdf 200 WL3FEA62TEU4F52Y5DOVQ62VET4QJW7G - - 210251 931661233 SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828231135742-00000-00009-wbgrp-svc284/SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828232253025-00005-3480~wbgrp-svc284.us.archive.org~8443.warc.gz")))
- // redirect
- assert(false === keepCdx(lineToCdxLine(
- "edu,upenn,ldc)/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf 20170828233154 https://www.ldc.upenn.edu/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf application/pdf 301 WL3FEA62TEU4F52Y5DOVQ62VET4QJW7G - - 210251 931661233 SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828231135742-00000-00009-wbgrp-svc284/SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828232253025-00005-3480~wbgrp-svc284.us.archive.org~8443.warc.gz")))
- }
-
- it should "know what CDX lines are" in {
- assert(true === isCdxLine(
- "edu,upenn,ldc)/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf 20170828233154 https://www.ldc.upenn.edu/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf application/pdf 200 WL3FEA62TEU4F52Y5DOVQ62VET4QJW7G - - 210251 931661233 SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828231135742-00000-00009-wbgrp-svc284/SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828232253025-00005-3480~wbgrp-svc284.us.archive.org~8443.warc.gz"))
- assert(false === isCdxLine(""))
- assert(false === isCdxLine(
- " edu,upenn,ldc)/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf 20170828233154 https://www.ldc.upenn.edu/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf application/pdf 200 WL3FEA62TEU4F52Y5DOVQ62VET4QJW7G - - 210251 931661233 SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828231135742-00000-00009-wbgrp-svc284/SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828232253025-00005-3480~wbgrp-svc284.us.archive.org~8443.warc.gz"))
- assert(false === isCdxLine(
- "#edu,upenn,ldc)/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf 20170828233154 https://www.ldc.upenn.edu/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf application/pdf 200 WL3FEA62TEU4F52Y5DOVQ62VET4QJW7G - - 210251 931661233 SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828231135742-00000-00009-wbgrp-svc284/SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828232253025-00005-3480~wbgrp-svc284.us.archive.org~8443.warc.gz"))
- // missing two fields
- assert(false === isCdxLine(
- "edu,upenn,ldc)/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf 20170828233154 https://www.ldc.upenn.edu/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf application/pdf 200 WL3FEA62TEU4F52Y5DOVQ62VET4QJW7G 210251 931661233 SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828231135742-00000-00009-wbgrp-svc284/SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828232253025-00005-3480~wbgrp-svc284.us.archive.org~8443.warc.gz"))
- // extra field
- assert(false === isCdxLine(
- "edu,upenn,ldc)/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf 20170828233154 https://www.ldc.upenn.edu/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf application/pdf 200 WL3FEA62TEU4F52Y5DOVQ62VET4QJW7G - - 210251 931661233 SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828231135742-00000-00009-wbgrp-svc284/SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828232253025-00005-3480~wbgrp-svc284.us.archive.org~8443.warc.gz -"))
- }
-
- it should "execute lineToRow" in {
- cdxLineToRow(lineToCdxLine(
- "edu,upenn,ldc)/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf 20170828233154 https://www.ldc.upenn.edu/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf application/pdf 200 WL3FEA62TEU4F52Y5DOVQ62VET4QJW7G - - 210251 931661233 SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828231135742-00000-00009-wbgrp-svc284/SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828232253025-00005-3480~wbgrp-svc284.us.archive.org~8443.warc.gz"))
- }
-
-}
-
-@RunWith(classOf[JUnitRunner])
-class CdxBackfillJobTest extends FunSpec with TupleConversions {
-
- val output = "/tmp/testOutput"
- val (testTable, testHost, testCdxFile) = ("test-table", "dummy-host:2181", "test_file.cdx")
-
- val log = LoggerFactory.getLogger(this.getClass.getName)
-
- val dummySizeBytes = Bytes.toBytes(100)
-
- val sampleData = List(
- List(Bytes.toBytes("sha1:K2DKSSVTXWPRMFDTWSTCQW3RVWRIOV3Q"), dummySizeBytes),
- List(Bytes.toBytes("sha1:C3YNNEGH5WAG5ZAAXWAEBNXJWT6CZ3WU"), dummySizeBytes),
- List(Bytes.toBytes("sha1:SDKUVHC3YNNEGH5WAG5ZAAXWAEBNX4WT"), dummySizeBytes),
- List(Bytes.toBytes("sha1:095893C3YNNEGH5WAG5ZAAXWAEBNXJWT"), dummySizeBytes)
- )
- val sampleCdxLines = List(
- // clean line
- "edu,upenn,ldc)/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf 20170828233154 https://www.ldc.upenn.edu/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf application/pdf 200 WL3FEA62TEU4F52Y5DOVQ62VET4QJW7G - - 210251 931661233 SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828231135742-00000-00009-wbgrp-svc284/SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828232253025-00005-3480~wbgrp-svc284.us.archive.org~8443.warc.gz",
- // has existing SHA1
- "edu,upenn,ldc)/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf 20170828233154 https://www.ldc.upenn.edu/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf application/pdf 200 C3YNNEGH5WAG5ZAAXWAEBNXJWT6CZ3WU - - 210251 931661233 SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828231135742-00000-00009-wbgrp-svc284/SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828232253025-00005-3480~wbgrp-svc284.us.archive.org~8443.warc.gz",
- // HTTP status code
- "edu,upenn,ldc)/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf 20170828233154 https://www.ldc.upenn.edu/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf application/pdf 301 WL3FEA62TEU4F52Y5DOVQ62VET4QJW7G - - 210251 931661233 SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828231135742-00000-00009-wbgrp-svc284/SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828232253025-00005-3480~wbgrp-svc284.us.archive.org~8443.warc.gz",
- // not CDX (prefixed with hash)
- "#edu,upenn,ldc)/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf 20170828233154 https://www.ldc.upenn.edu/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf application/pdf 200 WL3FEA62TEU4F52Y5DOVQ62VET4QJW7G - - 210251 931661233 SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828231135742-00000-00009-wbgrp-svc284/SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828232253025-00005-3480~wbgrp-svc284.us.archive.org~8443.warc.gz"
- )
-
- JobTest("sandcrawler.CdxBackfillJob")
- .arg("test", "")
- .arg("app.conf.path", "app.conf")
- .arg("output", output)
- .arg("hbase-table", testTable)
- .arg("zookeeper-hosts", testHost)
- .arg("cdx-input-path", testCdxFile)
- .arg("debug", "true")
- .source[Tuple](CdxBackfillJob.getHBaseSource(testTable, testHost),
- sampleData.map(l => new Tuple(l.map(s => {new ImmutableBytesWritable(s)}):_*)))
- .source[String](TextLine(testCdxFile), sampleCdxLines)
- .sink[Tuple](CdxBackfillJob.getHBaseSink(testTable, testHost)) {
- outputBuffer =>
-
- it("should return a 1-element list (after join).") {
- // XXX:
- assert(outputBuffer.size === 1)
- }
-
- // Convert List[Tuple] to Map[Long, Long].
- val counts = outputBuffer.map(t => (t.getLong(0), t.getLong(1))).toMap
- it("should have the appropriate number of each status type") {
- // XXX:
- assert(counts(1) == 3)
- }
- }
- .run
- .finish
-}