diff options
author | Bryan Newbold <bnewbold@archive.org> | 2018-07-19 16:26:00 -0700 |
---|---|---|
committer | Bryan Newbold <bnewbold@archive.org> | 2018-07-24 11:27:45 -0700 |
commit | c36e59bf03e692d22d6d72aa5ae37977e3a13524 (patch) | |
tree | 94a2e2f025e930a2d3d5c683a3b99f54c1cec5a6 /scalding/src/main/scala | |
parent | 5883c665b9c6d1d27fe4b0cfd98098f671a5f5dd (diff) | |
download | sandcrawler-c36e59bf03e692d22d6d72aa5ae37977e3a13524.tar.gz sandcrawler-c36e59bf03e692d22d6d72aa5ae37977e3a13524.zip |
CdxBackfillJob: implement other fields
Diffstat (limited to 'scalding/src/main/scala')
-rw-r--r-- | scalding/src/main/scala/sandcrawler/CdxBackfillJob.scala | 34 |
1 files changed, 24 insertions, 10 deletions
diff --git a/scalding/src/main/scala/sandcrawler/CdxBackfillJob.scala b/scalding/src/main/scala/sandcrawler/CdxBackfillJob.scala index 5ff667d..0af3c9c 100644 --- a/scalding/src/main/scala/sandcrawler/CdxBackfillJob.scala +++ b/scalding/src/main/scala/sandcrawler/CdxBackfillJob.scala @@ -10,6 +10,7 @@ import cascading.tap.SinkMode import parallelai.spyglass.base.JobBase import parallelai.spyglass.hbase.HBaseConstants.SourceMode import parallelai.spyglass.hbase.{HBaseSource, HBasePipeConversions} +import scala.util.parsing.json.JSONObject // Type that represents a raw parsed CDX line case class CdxLine(surt: String, @@ -53,6 +54,7 @@ class CdxBackfillJob(args: Args) extends JobBase(args) with HBasePipeConversions val cdxRows : TypedPipe[(String, String, String, String)] = cdxLines .map { CdxBackfillJob.cdxLineToRow } + .debug val existingKeys : TypedPipe[String] = hbaseSource .read @@ -67,7 +69,7 @@ class CdxBackfillJob(args: Args) extends JobBase(args) with HBasePipeConversions .rightJoin(cdxRows.groupBy(_._1)) .toTypedPipe .collect { case (_, (None, row)) => row } - //.debug + .debug // convert to tuple form and write out into HBase newRows @@ -149,17 +151,29 @@ object CdxBackfillJob { // TODO: timezones? UTC to UTC, so I don't think so. val dtIso = isoFormat.format(dtFormat.parse(line.datetime)) - // warc_file = warc.split('/')[-1] - // dt_iso = datetime.strptime(dt, "%Y%m%d%H%M%S").isoformat() - // f:c = dict(u=url, d=dt_iso, f=warc_file, o=int(offset), c=1) - // This is the "f:c" field. 'i' intentionally not set - val heritrixInfo = "" - - // file:cdx = dict(surt=surt, dt=dt, url=url, c_size=int(c_size), + // python: f:c = dict(u=url, d=dt_iso, f=warc_file, o=int(offset), c=1) + // python: warc_file = warc.split('/')[-1] + // python: dt_iso = datetime.strptime(dt, "%Y%m%d%H%M%S").isoformat() + val heritrixInfo = JSONObject(Map( + "u" -> line.url, + "d" -> dtIso, + "f" -> warcFile, + "o" -> line.offset.toInt, + "c" -> line.c_size.toInt + )) + + // python: dict(surt=surt, dt=dt, url=url, c_size=int(c_size), // offset=int(offset), warc=warc) - val fileCdx = "" - (key, heritrixInfo, fileCdx, line.mime) + val fileCdx = JSONObject(Map( + "surt" -> line.surt, + "dt" -> line.datetime, + "url" -> line.url, + "c_size" -> line.c_size.toInt, + "offset" -> line.offset.toInt, + "warc" -> line.warc + )) + (key, heritrixInfo.toString(), fileCdx.toString(), line.mime) } def lineToCdxLine(line: String) : CdxLine = { |