aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--scalding/src/main/scala/sandcrawler/CdxBackfillJob.scala41
1 files changed, 21 insertions, 20 deletions
diff --git a/scalding/src/main/scala/sandcrawler/CdxBackfillJob.scala b/scalding/src/main/scala/sandcrawler/CdxBackfillJob.scala
index 36e017e..84c19b8 100644
--- a/scalding/src/main/scala/sandcrawler/CdxBackfillJob.scala
+++ b/scalding/src/main/scala/sandcrawler/CdxBackfillJob.scala
@@ -1,5 +1,7 @@
package sandcrawler
+// TODO: fix import order to satisfy scala style
+
import java.util.Properties
import scala.util.parsing.json.JSONObject
import scala.util.Try
@@ -20,7 +22,7 @@ case class CdxLine(surt: String,
datetime: String,
url: String,
mime: String,
- http_status: String,
+ httpStatus: String,
sha1: String,
c_size: String,
offset: String,
@@ -55,6 +57,7 @@ class CdxBackfillJob(args: Args) extends JobBase(args) with HBasePipeConversions
.map { lineToCdxLine }
.filter { CdxBackfillJob.keepCdx(_) }
+ // (key, f:c, file:cdx, file:mime)
val cdxRows : TypedPipe[(String, String, String, String)] = cdxLines
.map { CdxBackfillJob.cdxLineToRow }
.debug
@@ -66,7 +69,8 @@ class CdxBackfillJob(args: Args) extends JobBase(args) with HBasePipeConversions
//.debug
// filters out all the lines that have an existing SHA1 key in HBase
- // the groupBy statements are to select key values to join on
+ // the groupBy statements are to select key values to join on.
+ // (key, f:c, file:cdx, file:mime)
val newRows : TypedPipe[(String, String, String, String)] = existingKeys
.groupBy( identity )
.rightJoin(cdxRows.groupBy(_._1))
@@ -102,41 +106,38 @@ object CdxBackfillJob {
def normalizeMime(raw: String) : String = {
- val NORMAL_MIME = List("application/pdf",
- "application/postscript",
- "text/html",
- "text/xml")
+ val normalMime = Map(
+ "application/pdf" -> "application/pdf",
+ "application/x-pdf" -> "application/pdf",
+ "application/postscript" -> "application/postscript",
+ "text/html" -> "text/html",
+ "text/xml" -> "text/xml",
+ "application/xml" -> "text/xml"
+ )
+ // TODO: improvement of control flow
val lower = raw.toLowerCase()
- NORMAL_MIME.foreach(norm =>
- if (lower.startsWith(norm)) {
- return norm
+ normalMime.foreach { case (key, value) =>
+ if (lower.startsWith(key)) {
+ return value
}
- )
-
- // Common special cases
- if (lower.startsWith("application/xml")) {
- return "text/xml"
- }
- if (lower.startsWith("application/x-pdf")) {
- return "application/pdf"
}
lower
}
def isCdxLine(line: String) : Boolean = {
- // malformated or non-CDX11 lines
+ // malformatted or non-CDX11 lines
!(line.startsWith("#") || line.startsWith(" ") || line.startsWith("filedesc") ||
line.split(" ").size != 11)
}
def keepCdx(line: CdxLine) : Boolean = {
if (List(line.surt, line.datetime, line.url, line.mime, line.c_size, line.offset, line.warc).contains("-")) {
- println("DASHLINE")
+ // TODO: hadoop counter (was: "DASHLINE")
return false
}
// TODO: sha1.isalnum()
- if (line.http_status != "200" || line.sha1.size != 32) {
+ if (line.httpStatus != "200" || line.sha1.size != 32) {
return false
}
if (List(line.c_size, line.offset, line.datetime).map(s => Try(s.toLong).toOption).contains(None)) {