diff options
| author | Bryan Newbold <bnewbold@archive.org> | 2018-07-19 13:10:02 -0700 | 
|---|---|---|
| committer | Bryan Newbold <bnewbold@archive.org> | 2018-07-24 11:27:45 -0700 | 
| commit | 8ae41461f901a8142af06610a4b9f500ce4fe47f (patch) | |
| tree | 17a7c99e69da8f4c105f8e3c33d0712345579e5c /scalding/src/main | |
| parent | c4db53036eac90841eb4f970b77db8c1677ef75b (diff) | |
| download | sandcrawler-8ae41461f901a8142af06610a4b9f500ce4fe47f.tar.gz sandcrawler-8ae41461f901a8142af06610a4b9f500ce4fe47f.zip | |
variant of CdxBackfillJob that writes to TSV
Has the same test failure ("java.lang.IndexOutOfBoundsException: Index:
1, Size: 1")
Diffstat (limited to 'scalding/src/main')
| -rw-r--r-- | scalding/src/main/scala/sandcrawler/CdxBackfillJob.scala | 173 | 
1 files changed, 173 insertions, 0 deletions
| diff --git a/scalding/src/main/scala/sandcrawler/CdxBackfillJob.scala b/scalding/src/main/scala/sandcrawler/CdxBackfillJob.scala new file mode 100644 index 0000000..a0d3e37 --- /dev/null +++ b/scalding/src/main/scala/sandcrawler/CdxBackfillJob.scala @@ -0,0 +1,173 @@ +package sandcrawler + +import cascading.property.AppProps +import cascading.tuple.Fields +import cascading.pipe.joiner._ +import com.twitter.scalding._ +import com.twitter.scalding.typed.TDsl._ +import java.util.Properties +import cascading.tap.SinkMode +import parallelai.spyglass.base.JobBase +import parallelai.spyglass.hbase.HBaseConstants.SourceMode +import parallelai.spyglass.hbase.{HBaseSource, HBasePipeConversions} + +// Type that represents a raw parsed CDX line +case class CdxLine(surt: String, +                   datetime: String, +                   url: String, +                   mime: String, +                   http_status: String, +                   sha1: String, +                   c_size: String, +                   offset: String, +                   warc: String) + + +/** + *  CDX backfill: + *  1. parse CDX (all columns) + *  2. filter CDX (pdf, HTTP 200, etc) + *  3. source HBase (key column only) + *  4. left join CDX to HBase + *  5. filter to only those with null HBase key column + *  6. convert CDX fields to HBase columns + *  7. sink results to HBase + * + * TODO: I really mixed the Scalding "field-base" and "type-based" APIs here. + * Should decide on a best practice. + */ +class CdxBackfillJob(args: Args) extends JobBase(args) with HBasePipeConversions { + +  import CdxBackfillJob._ + +  val hbaseSource = getHBaseSource(args("hbase-table"), args("zookeeper-hosts")) +  // XXX: val hbaseSink = getHBaseSink(args("hbase-table"), args("zookeeper-hosts")) + +  // Parse CDX lines from text file to typed pipe +  val lines : TypedPipe[String] = TypedPipe.from(TextLine(args("cdx-input-path"))) +  val cdxLines : TypedPipe[CdxLine] = lines +    .filter { isCdxLine } +    .map { lineToCdxLine } +    .filter { CdxBackfillJob.keepCdx(_) } + +  val cdxRows : TypedPipe[(String, String, String, String)] = cdxLines +    .map { CdxBackfillJob.cdxLineToRow } + +  val existingKeys : TypedPipe[String] = hbaseSource +    .read +    .toTypedPipe[String]('key) + +  // filters out all the lines that have an existing SHA1 key in HBase +  // the groupBy statements are to select key values to join on +  val newRows : TypedPipe[(String, String, String, String)] = existingKeys +    .groupBy( identity ) +    .rightJoin(cdxRows.groupBy(_._1)) +    .toTypedPipe +    .debug +    .collect { case (_, (None, row)) => row } +    .debug + +  // convert to tuple form and write out into HBase +  /* +  newRows +    .toPipe('key, 'c, 'cdx, 'mime) +    .toBytesWritable( new Fields("key", "c", "cdx", "mime") ) +    .write(hbaseSink) +  */ +  newRows +    .write(TypedTsv[(String, String, String, String)]("dummy-output")) + +} + +object CdxBackfillJob { + +  def getHBaseSource(hbase_table: String, zookeeper_hosts: String) : HBaseSource = { +    return HBaseBuilder.build( +      hbase_table, +      zookeeper_hosts, +      List("file:size"), // not actually needed +      SourceMode.SCAN_ALL) +  } + +  def getHBaseSink(hbase_table: String, zookeeper_hosts: String) : HBaseSource = { +    return HBaseBuilder.buildSink( +      hbase_table, +      zookeeper_hosts, +      List("f:c", "file:cdx", "file:mime"), +      SinkMode.UPDATE) +  } + +  def normalizeMime(raw: String) : String = { + +    val NORMAL_MIME = List("application/pdf", +                           "application/postscript", +                           "text/html", +                           "text/xml") + +    val lower = raw.toLowerCase() +    NORMAL_MIME.foreach(norm => +      if (lower.startsWith(norm)) { +        return norm +      } +    ) + +    // Common special cases +    if (lower.startsWith("application/xml")) { +      return "text/xml" +    } +    if (lower.startsWith("application/x-pdf")) { +      return "application/pdf" +    } +    return lower + +  } + +  def isCdxLine(line: String) : Boolean = { +    // malformated or non-CDX11 lines +    !(line.startsWith("#") || line.startsWith(" ") || line.startsWith("filedesc") || +      line.split(" ").size != 11) +  } + +  def keepCdx(line: CdxLine) : Boolean = { +    // TODO: sha1.isalnum() and c_size.isdigit() and offset.isdigit() and dt.isdigit() +    if (line.http_status != "200" || line.sha1.size != 32) { +      return false +    } +    // TODO: '-' in (line.surt, line.datetime, line.url, line.mime, line.c_size, line.offset, line.warc) +    return true +  } + +  // Returns (key, f:c, file:cdx, file:mime), all as strings, which is close to +  // how they will be inserted into HBase +  def cdxLineToRow(line: CdxLine) : (String, String, String, String) = { + +    val key = "sha1:" + line.sha1 + +    val warcFile = line.warc.split('/')(1) + +    // Read CDX-style datetime and conver to ISO 8601 with second resolution +    val dtFormat = new java.text.SimpleDateFormat("yyyyMMddHHmmss") +    val isoFormat = new java.text.SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss'Z'") +    // TODO: timezones? UTC to UTC, so I don't think so. +    val dtIso = isoFormat.format(dtFormat.parse(line.datetime)) + +    // warc_file = warc.split('/')[-1] +    // dt_iso = datetime.strptime(dt, "%Y%m%d%H%M%S").isoformat() +    // f:c = dict(u=url, d=dt_iso, f=warc_file, o=int(offset), c=1) + +    // This is the "f:c" field. 'i' intentionally not set +    val heritrixInfo = "" + +    // file:cdx = dict(surt=surt, dt=dt, url=url, c_size=int(c_size), +    //                 offset=int(offset), warc=warc) +    val fileCdx = "" +    (key, heritrixInfo, fileCdx, line.mime) +  } + +  def lineToCdxLine(line: String) : CdxLine = { +    val raw = line.split("\\s+") +    // surt, datetime, url, mime, http_status, sha1, SKIP, SKIP, c_size, offset, warc +    CdxLine(raw(0), raw(1), raw(2), raw(3), raw(4), raw(5), raw(8), raw(9), raw(10)) +  } + +} | 
