From c36e59bf03e692d22d6d72aa5ae37977e3a13524 Mon Sep 17 00:00:00 2001 From: Bryan Newbold Date: Thu, 19 Jul 2018 16:26:00 -0700 Subject: CdxBackfillJob: implement other fields --- .../main/scala/sandcrawler/CdxBackfillJob.scala | 34 +++++++++++++++------- 1 file changed, 24 insertions(+), 10 deletions(-) (limited to 'scalding/src/main') diff --git a/scalding/src/main/scala/sandcrawler/CdxBackfillJob.scala b/scalding/src/main/scala/sandcrawler/CdxBackfillJob.scala index 5ff667d..0af3c9c 100644 --- a/scalding/src/main/scala/sandcrawler/CdxBackfillJob.scala +++ b/scalding/src/main/scala/sandcrawler/CdxBackfillJob.scala @@ -10,6 +10,7 @@ import cascading.tap.SinkMode import parallelai.spyglass.base.JobBase import parallelai.spyglass.hbase.HBaseConstants.SourceMode import parallelai.spyglass.hbase.{HBaseSource, HBasePipeConversions} +import scala.util.parsing.json.JSONObject // Type that represents a raw parsed CDX line case class CdxLine(surt: String, @@ -53,6 +54,7 @@ class CdxBackfillJob(args: Args) extends JobBase(args) with HBasePipeConversions val cdxRows : TypedPipe[(String, String, String, String)] = cdxLines .map { CdxBackfillJob.cdxLineToRow } + .debug val existingKeys : TypedPipe[String] = hbaseSource .read @@ -67,7 +69,7 @@ class CdxBackfillJob(args: Args) extends JobBase(args) with HBasePipeConversions .rightJoin(cdxRows.groupBy(_._1)) .toTypedPipe .collect { case (_, (None, row)) => row } - //.debug + .debug // convert to tuple form and write out into HBase newRows @@ -149,17 +151,29 @@ object CdxBackfillJob { // TODO: timezones? UTC to UTC, so I don't think so. val dtIso = isoFormat.format(dtFormat.parse(line.datetime)) - // warc_file = warc.split('/')[-1] - // dt_iso = datetime.strptime(dt, "%Y%m%d%H%M%S").isoformat() - // f:c = dict(u=url, d=dt_iso, f=warc_file, o=int(offset), c=1) - // This is the "f:c" field. 'i' intentionally not set - val heritrixInfo = "" - - // file:cdx = dict(surt=surt, dt=dt, url=url, c_size=int(c_size), + // python: f:c = dict(u=url, d=dt_iso, f=warc_file, o=int(offset), c=1) + // python: warc_file = warc.split('/')[-1] + // python: dt_iso = datetime.strptime(dt, "%Y%m%d%H%M%S").isoformat() + val heritrixInfo = JSONObject(Map( + "u" -> line.url, + "d" -> dtIso, + "f" -> warcFile, + "o" -> line.offset.toInt, + "c" -> line.c_size.toInt + )) + + // python: dict(surt=surt, dt=dt, url=url, c_size=int(c_size), // offset=int(offset), warc=warc) - val fileCdx = "" - (key, heritrixInfo, fileCdx, line.mime) + val fileCdx = JSONObject(Map( + "surt" -> line.surt, + "dt" -> line.datetime, + "url" -> line.url, + "c_size" -> line.c_size.toInt, + "offset" -> line.offset.toInt, + "warc" -> line.warc + )) + (key, heritrixInfo.toString(), fileCdx.toString(), line.mime) } def lineToCdxLine(line: String) : CdxLine = { -- cgit v1.2.3