aboutsummaryrefslogtreecommitdiffstats
path: root/scalding/src/main/scala/sandcrawler/CdxBackfillJob.scala
blob: 0af3c9c870ffa3deaa1d1d26947a8cc80f4645fd (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
package sandcrawler

import cascading.property.AppProps
import cascading.tuple.Fields
import cascading.pipe.joiner._
import com.twitter.scalding._
import com.twitter.scalding.typed.TDsl._
import java.util.Properties
import cascading.tap.SinkMode
import parallelai.spyglass.base.JobBase
import parallelai.spyglass.hbase.HBaseConstants.SourceMode
import parallelai.spyglass.hbase.{HBaseSource, HBasePipeConversions}
import scala.util.parsing.json.JSONObject

// Type that represents a raw parsed CDX line
case class CdxLine(surt: String,
                   datetime: String,
                   url: String,
                   mime: String,
                   http_status: String,
                   sha1: String,
                   c_size: String,
                   offset: String,
                   warc: String)


/**
 *  CDX backfill:
 *  1. parse CDX (all columns)
 *  2. filter CDX (pdf, HTTP 200, etc)
 *  3. source HBase (key column only)
 *  4. left join CDX to HBase
 *  5. filter to only those with null HBase key column
 *  6. convert CDX fields to HBase columns
 *  7. sink results to HBase
 *
 * TODO: I really mixed the Scalding "field-base" and "type-based" APIs here.
 * Should decide on a best practice.
 */
class CdxBackfillJob(args: Args) extends JobBase(args) with HBasePipeConversions {

  import CdxBackfillJob._

  val hbaseSource = getHBaseSource(args("hbase-table"), args("zookeeper-hosts"))
  val hbaseSink = getHBaseSink(args("hbase-table"), args("zookeeper-hosts"))

  // Parse CDX lines from text file to typed pipe
  val lines : TypedPipe[String] = TypedPipe.from(TextLine(args("cdx-input-path")))

  val cdxLines : TypedPipe[CdxLine] = lines
    .filter { isCdxLine }
    .map { lineToCdxLine }
    .filter { CdxBackfillJob.keepCdx(_) }

  val cdxRows : TypedPipe[(String, String, String, String)] = cdxLines
    .map { CdxBackfillJob.cdxLineToRow }
    .debug

  val existingKeys : TypedPipe[String] = hbaseSource
    .read
    .fromBytesWritable( new Fields("key") )
    .toTypedPipe[String]('key)
    //.debug

  // filters out all the lines that have an existing SHA1 key in HBase
  // the groupBy statements are to select key values to join on
  val newRows : TypedPipe[(String, String, String, String)] = existingKeys
    .groupBy( identity )
    .rightJoin(cdxRows.groupBy(_._1))
    .toTypedPipe
    .collect { case (_, (None, row)) => row }
    .debug

  // convert to tuple form and write out into HBase
  newRows
    .toPipe('key, 'c, 'cdx, 'mime)
    .toBytesWritable( new Fields("key", "c", "cdx", "mime") )
    .write(hbaseSink)

}

object CdxBackfillJob {

  def getHBaseSource(hbase_table: String, zookeeper_hosts: String) : HBaseSource = {
    return HBaseBuilder.build(
      hbase_table,
      zookeeper_hosts,
      List("file:size"), // not actually needed
      SourceMode.SCAN_ALL)
  }

  def getHBaseSink(hbase_table: String, zookeeper_hosts: String) : HBaseSource = {
    return HBaseBuilder.buildSink(
      hbase_table,
      zookeeper_hosts,
      List("f:c", "file:cdx", "file:mime"),
      SinkMode.UPDATE)
  }

  def normalizeMime(raw: String) : String = {

    val NORMAL_MIME = List("application/pdf",
                           "application/postscript",
                           "text/html",
                           "text/xml")

    val lower = raw.toLowerCase()
    NORMAL_MIME.foreach(norm =>
      if (lower.startsWith(norm)) {
        return norm
      }
    )

    // Common special cases
    if (lower.startsWith("application/xml")) {
      return "text/xml"
    }
    if (lower.startsWith("application/x-pdf")) {
      return "application/pdf"
    }
    return lower

  }

  def isCdxLine(line: String) : Boolean = {
    // malformated or non-CDX11 lines
    !(line.startsWith("#") || line.startsWith(" ") || line.startsWith("filedesc") ||
      line.split(" ").size != 11)
  }

  def keepCdx(line: CdxLine) : Boolean = {
    // TODO: sha1.isalnum() and c_size.isdigit() and offset.isdigit() and dt.isdigit()
    if (line.http_status != "200" || line.sha1.size != 32) {
      return false
    }
    // TODO: '-' in (line.surt, line.datetime, line.url, line.mime, line.c_size, line.offset, line.warc)
    return true
  }

  // Returns (key, f:c, file:cdx, file:mime), all as strings, which is close to
  // how they will be inserted into HBase
  def cdxLineToRow(line: CdxLine) : (String, String, String, String) = {

    val key = "sha1:" + line.sha1

    val warcFile = line.warc.split('/')(1)

    // Read CDX-style datetime and conver to ISO 8601 with second resolution
    val dtFormat = new java.text.SimpleDateFormat("yyyyMMddHHmmss")
    val isoFormat = new java.text.SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss'Z'")
    // TODO: timezones? UTC to UTC, so I don't think so.
    val dtIso = isoFormat.format(dtFormat.parse(line.datetime))

    // This is the "f:c" field. 'i' intentionally not set
    // python: f:c = dict(u=url, d=dt_iso, f=warc_file, o=int(offset), c=1)
    // python: warc_file = warc.split('/')[-1]
    // python: dt_iso = datetime.strptime(dt, "%Y%m%d%H%M%S").isoformat()
    val heritrixInfo = JSONObject(Map(
      "u" -> line.url,
      "d" -> dtIso,
      "f" -> warcFile,
      "o" -> line.offset.toInt,
      "c" -> line.c_size.toInt
    ))

    // python: dict(surt=surt, dt=dt, url=url, c_size=int(c_size),
    //                 offset=int(offset), warc=warc)
    val fileCdx = JSONObject(Map(
      "surt" -> line.surt,
      "dt" -> line.datetime,
      "url" -> line.url,
      "c_size" -> line.c_size.toInt,
      "offset" -> line.offset.toInt,
      "warc" -> line.warc
    ))
    (key, heritrixInfo.toString(), fileCdx.toString(), line.mime)
  }

  def lineToCdxLine(line: String) : CdxLine = {
    val raw = line.split("\\s+")
    // surt, datetime, url, mime, http_status, sha1, SKIP, SKIP, c_size, offset, warc
    CdxLine(raw(0), raw(1), raw(2), raw(3), raw(4), raw(5), raw(8), raw(9), raw(10))
  }

}