diff options
Diffstat (limited to 'scalding/src/main/scala')
-rw-r--r-- | scalding/src/main/scala/sandcrawler/CdxBackfillJob.scala | 41 |
1 files changed, 21 insertions, 20 deletions
diff --git a/scalding/src/main/scala/sandcrawler/CdxBackfillJob.scala b/scalding/src/main/scala/sandcrawler/CdxBackfillJob.scala index 36e017e..84c19b8 100644 --- a/scalding/src/main/scala/sandcrawler/CdxBackfillJob.scala +++ b/scalding/src/main/scala/sandcrawler/CdxBackfillJob.scala @@ -1,5 +1,7 @@ package sandcrawler +// TODO: fix import order to satisfy scala style + import java.util.Properties import scala.util.parsing.json.JSONObject import scala.util.Try @@ -20,7 +22,7 @@ case class CdxLine(surt: String, datetime: String, url: String, mime: String, - http_status: String, + httpStatus: String, sha1: String, c_size: String, offset: String, @@ -55,6 +57,7 @@ class CdxBackfillJob(args: Args) extends JobBase(args) with HBasePipeConversions .map { lineToCdxLine } .filter { CdxBackfillJob.keepCdx(_) } + // (key, f:c, file:cdx, file:mime) val cdxRows : TypedPipe[(String, String, String, String)] = cdxLines .map { CdxBackfillJob.cdxLineToRow } .debug @@ -66,7 +69,8 @@ class CdxBackfillJob(args: Args) extends JobBase(args) with HBasePipeConversions //.debug // filters out all the lines that have an existing SHA1 key in HBase - // the groupBy statements are to select key values to join on + // the groupBy statements are to select key values to join on. + // (key, f:c, file:cdx, file:mime) val newRows : TypedPipe[(String, String, String, String)] = existingKeys .groupBy( identity ) .rightJoin(cdxRows.groupBy(_._1)) @@ -102,41 +106,38 @@ object CdxBackfillJob { def normalizeMime(raw: String) : String = { - val NORMAL_MIME = List("application/pdf", - "application/postscript", - "text/html", - "text/xml") + val normalMime = Map( + "application/pdf" -> "application/pdf", + "application/x-pdf" -> "application/pdf", + "application/postscript" -> "application/postscript", + "text/html" -> "text/html", + "text/xml" -> "text/xml", + "application/xml" -> "text/xml" + ) + // TODO: improvement of control flow val lower = raw.toLowerCase() - NORMAL_MIME.foreach(norm => - if (lower.startsWith(norm)) { - return norm + normalMime.foreach { case (key, value) => + if (lower.startsWith(key)) { + return value } - ) - - // Common special cases - if (lower.startsWith("application/xml")) { - return "text/xml" - } - if (lower.startsWith("application/x-pdf")) { - return "application/pdf" } lower } def isCdxLine(line: String) : Boolean = { - // malformated or non-CDX11 lines + // malformatted or non-CDX11 lines !(line.startsWith("#") || line.startsWith(" ") || line.startsWith("filedesc") || line.split(" ").size != 11) } def keepCdx(line: CdxLine) : Boolean = { if (List(line.surt, line.datetime, line.url, line.mime, line.c_size, line.offset, line.warc).contains("-")) { - println("DASHLINE") + // TODO: hadoop counter (was: "DASHLINE") return false } // TODO: sha1.isalnum() - if (line.http_status != "200" || line.sha1.size != 32) { + if (line.httpStatus != "200" || line.sha1.size != 32) { return false } if (List(line.c_size, line.offset, line.datetime).map(s => Try(s.toLong).toOption).contains(None)) { |