aboutsummaryrefslogtreecommitdiffstats
path: root/scalding/src/main/scala/sandcrawler/CdxBackfillJob.scala
diff options
context:
space:
mode:
Diffstat (limited to 'scalding/src/main/scala/sandcrawler/CdxBackfillJob.scala')
-rw-r--r--scalding/src/main/scala/sandcrawler/CdxBackfillJob.scala34
1 files changed, 24 insertions, 10 deletions
diff --git a/scalding/src/main/scala/sandcrawler/CdxBackfillJob.scala b/scalding/src/main/scala/sandcrawler/CdxBackfillJob.scala
index 5ff667d..0af3c9c 100644
--- a/scalding/src/main/scala/sandcrawler/CdxBackfillJob.scala
+++ b/scalding/src/main/scala/sandcrawler/CdxBackfillJob.scala
@@ -10,6 +10,7 @@ import cascading.tap.SinkMode
import parallelai.spyglass.base.JobBase
import parallelai.spyglass.hbase.HBaseConstants.SourceMode
import parallelai.spyglass.hbase.{HBaseSource, HBasePipeConversions}
+import scala.util.parsing.json.JSONObject
// Type that represents a raw parsed CDX line
case class CdxLine(surt: String,
@@ -53,6 +54,7 @@ class CdxBackfillJob(args: Args) extends JobBase(args) with HBasePipeConversions
val cdxRows : TypedPipe[(String, String, String, String)] = cdxLines
.map { CdxBackfillJob.cdxLineToRow }
+ .debug
val existingKeys : TypedPipe[String] = hbaseSource
.read
@@ -67,7 +69,7 @@ class CdxBackfillJob(args: Args) extends JobBase(args) with HBasePipeConversions
.rightJoin(cdxRows.groupBy(_._1))
.toTypedPipe
.collect { case (_, (None, row)) => row }
- //.debug
+ .debug
// convert to tuple form and write out into HBase
newRows
@@ -149,17 +151,29 @@ object CdxBackfillJob {
// TODO: timezones? UTC to UTC, so I don't think so.
val dtIso = isoFormat.format(dtFormat.parse(line.datetime))
- // warc_file = warc.split('/')[-1]
- // dt_iso = datetime.strptime(dt, "%Y%m%d%H%M%S").isoformat()
- // f:c = dict(u=url, d=dt_iso, f=warc_file, o=int(offset), c=1)
-
// This is the "f:c" field. 'i' intentionally not set
- val heritrixInfo = ""
-
- // file:cdx = dict(surt=surt, dt=dt, url=url, c_size=int(c_size),
+ // python: f:c = dict(u=url, d=dt_iso, f=warc_file, o=int(offset), c=1)
+ // python: warc_file = warc.split('/')[-1]
+ // python: dt_iso = datetime.strptime(dt, "%Y%m%d%H%M%S").isoformat()
+ val heritrixInfo = JSONObject(Map(
+ "u" -> line.url,
+ "d" -> dtIso,
+ "f" -> warcFile,
+ "o" -> line.offset.toInt,
+ "c" -> line.c_size.toInt
+ ))
+
+ // python: dict(surt=surt, dt=dt, url=url, c_size=int(c_size),
// offset=int(offset), warc=warc)
- val fileCdx = ""
- (key, heritrixInfo, fileCdx, line.mime)
+ val fileCdx = JSONObject(Map(
+ "surt" -> line.surt,
+ "dt" -> line.datetime,
+ "url" -> line.url,
+ "c_size" -> line.c_size.toInt,
+ "offset" -> line.offset.toInt,
+ "warc" -> line.warc
+ ))
+ (key, heritrixInfo.toString(), fileCdx.toString(), line.mime)
}
def lineToCdxLine(line: String) : CdxLine = {