aboutsummaryrefslogtreecommitdiffstats
path: root/scalding
diff options
context:
space:
mode:
authorBryan Newbold <bnewbold@archive.org>2018-06-11 17:49:28 -0700
committerBryan Newbold <bnewbold@archive.org>2018-07-17 18:55:55 -0700
commitf6f2fcafb245101f75c9cda175891ef2391cda97 (patch)
treefdb0b20ad09314a7542d4d8e66f19e3f9947a58d /scalding
parent3e6432307c4e85598cf63ab8578fd678ba9df156 (diff)
downloadsandcrawler-f6f2fcafb245101f75c9cda175891ef2391cda97.tar.gz
sandcrawler-f6f2fcafb245101f75c9cda175891ef2391cda97.zip
early work on scalding CDX backfill
Diffstat (limited to 'scalding')
-rw-r--r--scalding/src/main/scala/sandcrawler/CdxBackfillJob.scala148
-rw-r--r--scalding/src/test/scala/sandcrawler/CdxBackfillJob.scala51
2 files changed, 199 insertions, 0 deletions
diff --git a/scalding/src/main/scala/sandcrawler/CdxBackfillJob.scala b/scalding/src/main/scala/sandcrawler/CdxBackfillJob.scala
new file mode 100644
index 0000000..4f08665
--- /dev/null
+++ b/scalding/src/main/scala/sandcrawler/CdxBackfillJob.scala
@@ -0,0 +1,148 @@
+package sandcrawler
+
+import cascading.property.AppProps
+import cascading.tuple.Fields
+import cascading.pipe.joiner._
+import com.twitter.scalding._
+import java.util.Properties
+import parallelai.spyglass.base.JobBase
+import parallelai.spyglass.hbase.{HBaseSource, HBasePipeConversions}
+import parallelai.spyglass.hbase.HBaseConstants.SourceMode
+
+// Type that represents a raw parsed CDX line
+case class CdxLine(surt: String, datetime: String, url: String, mime: String, http_status: String, sha1: String, c_size: String, offset: String, warc: String)
+
+/**
+ * CDX backfill:
+ * 1. parse CDX (all columns)
+ * 2. filter CDX (pdf, HTTP 200, etc)
+ * 3. source HBase (key column only)
+ * 4. left join CDX to HBase
+ * 5. filter to only those with null HBase key column
+ * 6. convert CDX fields to HBase columns
+ * 7. sink results to HBase
+ *
+ * TODO: I really mixed the Scalding "field-base" and "type-based" APIs here.
+ * Should decide on a best practice.
+ */
+class CdxBackfillJob(args: Args) extends JobBase(args) with HBasePipeConversions {
+
+ //import CdxLine._
+ // XXX remove all other CdxBackfillJob.whatever
+ import CdxBackfillJob._
+
+ val cdxInputPath = args("cdx-input-path")
+ val hbaseTable = args("hbase-table")
+ val zookeeperHosts = args("zookeeper-hosts")
+
+ val hbaseSource = getHBaseSource(args("hbase-table"), args("zookeeper-hosts"))
+
+ val lines : TypedPipe[String] = TypedPipe.from(TextLine(cdxInputPath))
+
+ val cdxLines : TypedPipe[CdxLine] = lines
+ .filter { isCdxLine }
+ .map { lineToCdxLine }
+ .filter { CdxBackfillJob.keepCdx(_) }
+
+ val cdxRows = cdxLines
+ .map { CdxBackfillJob.cdxLineToRow(_) }
+ .toPipe(('key, 'f_c, 'file_cdx, 'file_mime))
+
+ val hbaseKeys = hbaseSource
+ .project('key)
+ .mapTo('key -> 'existingKey) { key : String => key }
+
+ // filters out all the lines that have an existing SHA1 key in HBase
+ val newRows = cdxRows
+ .joinWithLarger('key -> 'existingKey, hbaseKeys, joiner = new LeftJoin)
+ .filter('existingKey) { k : String => k == null } // is String the right type?
+
+ newRows
+ .write(hbaseSource)
+
+}
+
+object CdxBackfillJob {
+
+ def getHBaseSource(hbase_table: String, zookeeper_hosts: String) : HBaseSource = {
+ return HBaseBuilder.build(
+ hbase_table,
+ zookeeper_hosts,
+ List("file:size"), // not actually needed
+ SourceMode.SCAN_ALL)
+ }
+
+ def normalizeMime(raw: String) : String = {
+
+ val NORMAL_MIME = List("application/pdf",
+ "application/postscript",
+ "text/html",
+ "text/xml")
+
+ val lower = raw.toLowerCase()
+ NORMAL_MIME.foreach(norm =>
+ if (lower.startsWith(norm)) {
+ return norm
+ }
+ )
+
+ // Common special cases
+ if (lower.startsWith("application/xml")) {
+ return "text/xml"
+ }
+ if (lower.startsWith("application/x-pdf")) {
+ return "application/pdf"
+ }
+ return lower
+
+ }
+
+ def isCdxLine(line: String) : Boolean = {
+ // malformated or non-CDX11 lines
+ !(line.startsWith("#") || line.startsWith(" ") || line.startsWith("filedesc") ||
+ line.split(" ").size != 11)
+ }
+
+ def keepCdx(line: CdxLine) : Boolean = {
+ // TODO: sha1.isalnum() and c_size.isdigit() and offset.isdigit() and dt.isdigit()
+ if (line.http_status != "200" || line.sha1.size != 32) {
+ return false
+ }
+ // TODO: '-' in (line.surt, line.datetime, line.url, line.mime, line.c_size, line.offset, line.warc)
+ return true
+ }
+
+ // Returns (key, f:c, file:cdx, file:mime), all as strings, which is close to
+ // how they will be inserted into HBase
+ def cdxLineToRow(line: CdxLine) : (String, String, String, String) = {
+
+ val key = "sha1:" + line.sha1
+
+ val warcFile = line.warc.split('/')(1)
+
+ // Read CDX-style datetime and conver to ISO 8601 with second resolution
+ val dtFormat = new java.text.SimpleDateFormat("yyyyMMddHHmmss")
+ val isoFormat = new java.text.SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss'Z'")
+ // TODO: timezones? UTC to UTC, so I don't think so.
+ val dtIso = isoFormat.format(dtFormat.parse(line.datetime))
+
+ // warc_file = warc.split('/')[-1]
+ // dt_iso = datetime.strptime(dt, "%Y%m%d%H%M%S").isoformat()
+ // f:c = dict(u=url, d=dt_iso, f=warc_file, o=int(offset), c=1)
+
+ // This is the "f:c" field. 'i' intentionally not set
+ val heritrixInfo = ""
+
+ // file:cdx = dict(surt=surt, dt=dt, url=url, c_size=int(c_size),
+ // offset=int(offset), warc=warc)
+ val fileCdx = ""
+ (key, heritrixInfo, fileCdx, line.mime)
+ }
+
+ def lineToCdxLine(line: String) : CdxLine = {
+ val raw = line.split("\\s+")
+ // surt, datetime, url, mime, http_status, sha1, SKIP, SKIP, c_size, offset, warc
+ CdxLine(raw(0), raw(1), raw(2), raw(3), raw(4), raw(5), raw(8), raw(9), raw(10))
+ }
+
+}
diff --git a/scalding/src/test/scala/sandcrawler/CdxBackfillJob.scala b/scalding/src/test/scala/sandcrawler/CdxBackfillJob.scala
new file mode 100644
index 0000000..a442d79
--- /dev/null
+++ b/scalding/src/test/scala/sandcrawler/CdxBackfillJob.scala
@@ -0,0 +1,51 @@
+
+package sandcrawler
+
+import cascading.tuple.Fields
+import org.scalatest._
+
+class CdxBackfillTest extends FlatSpec with Matchers {
+
+ import CdxBackfillJob._
+
+ it should "normalize mimetypes" in {
+ assert(CdxBackfillJob.normalizeMime("asdf") === "asdf")
+ assert(CdxBackfillJob.normalizeMime("application/pdf") === "application/pdf")
+ assert(CdxBackfillJob.normalizeMime("application/pdf+journal") === "application/pdf")
+ assert(CdxBackfillJob.normalizeMime("Application/PDF") === "application/pdf")
+ assert(CdxBackfillJob.normalizeMime("application/p") === "application/p")
+ assert(CdxBackfillJob.normalizeMime("application/xml+stuff") === "text/xml")
+ assert(CdxBackfillJob.normalizeMime("application/x-pdf") === "application/pdf")
+ assert(CdxBackfillJob.normalizeMime("application/x-html") === "application/x-html")
+ }
+
+ it should "filter CDX lines" in {
+ assert(true === keepCdx(lineToCdxLine(
+ "edu,upenn,ldc)/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf 20170828233154 https://www.ldc.upenn.edu/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf application/pdf 200 WL3FEA62TEU4F52Y5DOVQ62VET4QJW7G - - 210251 931661233 SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828231135742-00000-00009-wbgrp-svc284/SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828232253025-00005-3480~wbgrp-svc284.us.archive.org~8443.warc.gz")))
+ // redirect
+ assert(false === keepCdx(lineToCdxLine(
+ "edu,upenn,ldc)/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf 20170828233154 https://www.ldc.upenn.edu/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf application/pdf 301 WL3FEA62TEU4F52Y5DOVQ62VET4QJW7G - - 210251 931661233 SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828231135742-00000-00009-wbgrp-svc284/SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828232253025-00005-3480~wbgrp-svc284.us.archive.org~8443.warc.gz")))
+ }
+
+ it should "know what CDX lines are" in {
+ assert(true === isCdxLine(
+ "edu,upenn,ldc)/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf 20170828233154 https://www.ldc.upenn.edu/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf application/pdf 200 WL3FEA62TEU4F52Y5DOVQ62VET4QJW7G - - 210251 931661233 SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828231135742-00000-00009-wbgrp-svc284/SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828232253025-00005-3480~wbgrp-svc284.us.archive.org~8443.warc.gz"))
+ assert(false === isCdxLine(""))
+ assert(false === isCdxLine(
+ " edu,upenn,ldc)/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf 20170828233154 https://www.ldc.upenn.edu/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf application/pdf 200 WL3FEA62TEU4F52Y5DOVQ62VET4QJW7G - - 210251 931661233 SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828231135742-00000-00009-wbgrp-svc284/SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828232253025-00005-3480~wbgrp-svc284.us.archive.org~8443.warc.gz"))
+ assert(false === isCdxLine(
+ "#edu,upenn,ldc)/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf 20170828233154 https://www.ldc.upenn.edu/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf application/pdf 200 WL3FEA62TEU4F52Y5DOVQ62VET4QJW7G - - 210251 931661233 SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828231135742-00000-00009-wbgrp-svc284/SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828232253025-00005-3480~wbgrp-svc284.us.archive.org~8443.warc.gz"))
+ // missing two fields
+ assert(false === isCdxLine(
+ "#edu,upenn,ldc)/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf 20170828233154 https://www.ldc.upenn.edu/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf application/pdf 200 WL3FEA62TEU4F52Y5DOVQ62VET4QJW7G 210251 931661233 SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828231135742-00000-00009-wbgrp-svc284/SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828232253025-00005-3480~wbgrp-svc284.us.archive.org~8443.warc.gz"))
+ // extra field
+ assert(false === isCdxLine(
+ "edu,upenn,ldc)/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf 20170828233154 https://www.ldc.upenn.edu/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf application/pdf 200 WL3FEA62TEU4F52Y5DOVQ62VET4QJW7G - - 210251 931661233 SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828231135742-00000-00009-wbgrp-svc284/SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828232253025-00005-3480~wbgrp-svc284.us.archive.org~8443.warc.gz -"))
+ }
+
+ it should "execute lineToRow" in {
+ cdxLineToRow(lineToCdxLine(
+ "edu,upenn,ldc)/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf 20170828233154 https://www.ldc.upenn.edu/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf application/pdf 200 WL3FEA62TEU4F52Y5DOVQ62VET4QJW7G - - 210251 931661233 SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828231135742-00000-00009-wbgrp-svc284/SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828232253025-00005-3480~wbgrp-svc284.us.archive.org~8443.warc.gz"))
+ }
+
+}