From d0ed197859dfcadf89f5321939bb5e83e1bee9ed Mon Sep 17 00:00:00 2001 From: Bryan Newbold Date: Tue, 17 Jul 2018 13:48:28 -0700 Subject: lint/cleanup fixes from review Thanks Ellen! --- .../scala/sandcrawler/HBaseStatusCountJob.scala | 29 +++++++++------------- 1 file changed, 12 insertions(+), 17 deletions(-) (limited to 'scalding') diff --git a/scalding/src/main/scala/sandcrawler/HBaseStatusCountJob.scala b/scalding/src/main/scala/sandcrawler/HBaseStatusCountJob.scala index 375d155..dbd444d 100644 --- a/scalding/src/main/scala/sandcrawler/HBaseStatusCountJob.scala +++ b/scalding/src/main/scala/sandcrawler/HBaseStatusCountJob.scala @@ -1,33 +1,28 @@ package sandcrawler -import cascading.property.AppProps -import cascading.tuple.Fields + +import com.twitter.scalding.Args import com.twitter.scalding._ import com.twitter.scalding.typed.TDsl._ -import java.util.Properties -import parallelai.spyglass.base.JobBase -import org.apache.hadoop.hbase.util.Bytes -import parallelai.spyglass.hbase.{HBaseSource, HBasePipeConversions} -import parallelai.spyglass.hbase.HBaseConstants.SourceMode -import com.twitter.scalding.Args import org.apache.hadoop.hbase.io.ImmutableBytesWritable +import org.apache.hadoop.hbase.util.Bytes +import parallelai.spyglass.base.JobBase +import parallelai.spyglass.hbase.HBasePipeConversions + class HBaseStatusCountJob(args: Args) extends JobBase(args) with HBasePipeConversions { - val colSpec = "grobid0:status_code" - val output = args("output") - HBaseBuilder.parseColSpec(colSpec) - val Col: String = colSpec.split(":")(1) + val source = HBaseCountJob.getHBaseSource(args("hbase-table"), + args("zookeeper-hosts"), + "grobid0:status_code") - val source : TypedPipe[Long] = HBaseCountJob.getHBaseSource(args("hbase-table"), - args("zookeeper-hosts"), - colSpec) + val statusPipe : TypedPipe[Long] = source .read .toTypedPipe[(ImmutableBytesWritable,ImmutableBytesWritable)]('key, 'status_code) .map { case (key, raw_code) => Bytes.toLong(raw_code.copyBytes()) } - source.groupBy { identity } + statusPipe.groupBy { identity } .size .debug - .write(TypedTsv[(Long,Long)](output)) + .write(TypedTsv[(Long,Long)](args("output"))) } -- cgit v1.2.3 From 258abaf0f74c07b0de59c95d77f614a6d28447eb Mon Sep 17 00:00:00 2001 From: Ellen Spertus Date: Tue, 17 Jul 2018 14:37:00 -0700 Subject: Added import order checking. Fixed misordered imports in our code. --- scalding/scalastyle-config.xml | 10 ++++++++++ scalding/src/main/scala/sandcrawler/HBaseCountJob.scala | 2 +- scalding/src/main/scala/sandcrawler/HBaseRowCountJob.scala | 2 +- scalding/src/main/scala/sandcrawler/HBaseStatusCountJob.scala | 2 -- 4 files changed, 12 insertions(+), 4 deletions(-) (limited to 'scalding') diff --git a/scalding/scalastyle-config.xml b/scalding/scalastyle-config.xml index 2f20677..1575217 100644 --- a/scalding/scalastyle-config.xml +++ b/scalding/scalastyle-config.xml @@ -114,4 +114,14 @@ + + + java,scala,others + javax?\..+ + scala\..+ + .+ + 1 + true + + diff --git a/scalding/src/main/scala/sandcrawler/HBaseCountJob.scala b/scalding/src/main/scala/sandcrawler/HBaseCountJob.scala index 1ebc261..e1f46ef 100644 --- a/scalding/src/main/scala/sandcrawler/HBaseCountJob.scala +++ b/scalding/src/main/scala/sandcrawler/HBaseCountJob.scala @@ -5,8 +5,8 @@ import cascading.tuple.Fields import com.twitter.scalding._ import java.util.Properties import parallelai.spyglass.base.JobBase -import parallelai.spyglass.hbase.{HBaseSource, HBasePipeConversions} import parallelai.spyglass.hbase.HBaseConstants.SourceMode +import parallelai.spyglass.hbase.{HBaseSource, HBasePipeConversions} class HBaseCountJob(args: Args, colSpec: String) extends JobBase(args) with HBasePipeConversions { val output = args("output") diff --git a/scalding/src/main/scala/sandcrawler/HBaseRowCountJob.scala b/scalding/src/main/scala/sandcrawler/HBaseRowCountJob.scala index ba3b9cd..41de719 100644 --- a/scalding/src/main/scala/sandcrawler/HBaseRowCountJob.scala +++ b/scalding/src/main/scala/sandcrawler/HBaseRowCountJob.scala @@ -5,8 +5,8 @@ import cascading.tuple.Fields import com.twitter.scalding._ import java.util.Properties import parallelai.spyglass.base.JobBase -import parallelai.spyglass.hbase.{HBaseSource, HBasePipeConversions} import parallelai.spyglass.hbase.HBaseConstants.SourceMode +import parallelai.spyglass.hbase.{HBaseSource, HBasePipeConversions} class HBaseRowCountJob(args: Args) extends JobBase(args) with HBasePipeConversions { diff --git a/scalding/src/main/scala/sandcrawler/HBaseStatusCountJob.scala b/scalding/src/main/scala/sandcrawler/HBaseStatusCountJob.scala index dbd444d..b1dab0e 100644 --- a/scalding/src/main/scala/sandcrawler/HBaseStatusCountJob.scala +++ b/scalding/src/main/scala/sandcrawler/HBaseStatusCountJob.scala @@ -1,6 +1,5 @@ package sandcrawler - import com.twitter.scalding.Args import com.twitter.scalding._ import com.twitter.scalding.typed.TDsl._ @@ -9,7 +8,6 @@ import org.apache.hadoop.hbase.util.Bytes import parallelai.spyglass.base.JobBase import parallelai.spyglass.hbase.HBasePipeConversions - class HBaseStatusCountJob(args: Args) extends JobBase(args) with HBasePipeConversions { val source = HBaseCountJob.getHBaseSource(args("hbase-table"), -- cgit v1.2.3 From 17f29013f960fdaf0e9d01ac8f128b05f327c95b Mon Sep 17 00:00:00 2001 From: Ellen Spertus Date: Tue, 17 Jul 2018 14:48:27 -0700 Subject: Eliminated example directories --- scalding/build.sbt | 8 ++++++++ 1 file changed, 8 insertions(+) (limited to 'scalding') diff --git a/scalding/build.sbt b/scalding/build.sbt index f333111..cc21444 100644 --- a/scalding/build.sbt +++ b/scalding/build.sbt @@ -13,6 +13,13 @@ lazy val root = (project in file(".")). test in assembly := {}, )), + (scalastyleSources in Compile) := { + // all .scala files in "src/main/scala" + val scalaSourceFiles = ((scalaSource in Compile).value ** "*.scala").get + val dirNameToExclude = "example" + scalaSourceFiles.filterNot(_.getAbsolutePath.contains(dirNameToExclude)) + }, + name := "sandcrawler", resolvers += "conjars.org" at "http://conjars.org/repo", @@ -47,4 +54,5 @@ lazy val root = (project in file(".")). case s if s.endsWith("xml") => MergeStrategy.last case x => (assemblyMergeStrategy in assembly).value(x) }, + ) -- cgit v1.2.3 From f6f2fcafb245101f75c9cda175891ef2391cda97 Mon Sep 17 00:00:00 2001 From: Bryan Newbold Date: Mon, 11 Jun 2018 17:49:28 -0700 Subject: early work on scalding CDX backfill --- .../main/scala/sandcrawler/CdxBackfillJob.scala | 148 +++++++++++++++++++++ .../test/scala/sandcrawler/CdxBackfillJob.scala | 51 +++++++ 2 files changed, 199 insertions(+) create mode 100644 scalding/src/main/scala/sandcrawler/CdxBackfillJob.scala create mode 100644 scalding/src/test/scala/sandcrawler/CdxBackfillJob.scala (limited to 'scalding') diff --git a/scalding/src/main/scala/sandcrawler/CdxBackfillJob.scala b/scalding/src/main/scala/sandcrawler/CdxBackfillJob.scala new file mode 100644 index 0000000..4f08665 --- /dev/null +++ b/scalding/src/main/scala/sandcrawler/CdxBackfillJob.scala @@ -0,0 +1,148 @@ +package sandcrawler + +import cascading.property.AppProps +import cascading.tuple.Fields +import cascading.pipe.joiner._ +import com.twitter.scalding._ +import java.util.Properties +import parallelai.spyglass.base.JobBase +import parallelai.spyglass.hbase.{HBaseSource, HBasePipeConversions} +import parallelai.spyglass.hbase.HBaseConstants.SourceMode + +// Type that represents a raw parsed CDX line +case class CdxLine(surt: String, datetime: String, url: String, mime: String, http_status: String, sha1: String, c_size: String, offset: String, warc: String) + +/** + * CDX backfill: + * 1. parse CDX (all columns) + * 2. filter CDX (pdf, HTTP 200, etc) + * 3. source HBase (key column only) + * 4. left join CDX to HBase + * 5. filter to only those with null HBase key column + * 6. convert CDX fields to HBase columns + * 7. sink results to HBase + * + * TODO: I really mixed the Scalding "field-base" and "type-based" APIs here. + * Should decide on a best practice. + */ +class CdxBackfillJob(args: Args) extends JobBase(args) with HBasePipeConversions { + + //import CdxLine._ + // XXX remove all other CdxBackfillJob.whatever + import CdxBackfillJob._ + + val cdxInputPath = args("cdx-input-path") + val hbaseTable = args("hbase-table") + val zookeeperHosts = args("zookeeper-hosts") + + val hbaseSource = getHBaseSource(args("hbase-table"), args("zookeeper-hosts")) + + val lines : TypedPipe[String] = TypedPipe.from(TextLine(cdxInputPath)) + + val cdxLines : TypedPipe[CdxLine] = lines + .filter { isCdxLine } + .map { lineToCdxLine } + .filter { CdxBackfillJob.keepCdx(_) } + + val cdxRows = cdxLines + .map { CdxBackfillJob.cdxLineToRow(_) } + .toPipe(('key, 'f_c, 'file_cdx, 'file_mime)) + + val hbaseKeys = hbaseSource + .project('key) + .mapTo('key -> 'existingKey) { key : String => key } + + // filters out all the lines that have an existing SHA1 key in HBase + val newRows = cdxRows + .joinWithLarger('key -> 'existingKey, hbaseKeys, joiner = new LeftJoin) + .filter('existingKey) { k : String => k == null } // is String the right type? + + newRows + .write(hbaseSource) + +} + +object CdxBackfillJob { + + def getHBaseSource(hbase_table: String, zookeeper_hosts: String) : HBaseSource = { + return HBaseBuilder.build( + hbase_table, + zookeeper_hosts, + List("file:size"), // not actually needed + SourceMode.SCAN_ALL) + } + + def normalizeMime(raw: String) : String = { + + val NORMAL_MIME = List("application/pdf", + "application/postscript", + "text/html", + "text/xml") + + val lower = raw.toLowerCase() + NORMAL_MIME.foreach(norm => + if (lower.startsWith(norm)) { + return norm + } + ) + + // Common special cases + if (lower.startsWith("application/xml")) { + return "text/xml" + } + if (lower.startsWith("application/x-pdf")) { + return "application/pdf" + } + return lower + + } + + def isCdxLine(line: String) : Boolean = { + // malformated or non-CDX11 lines + !(line.startsWith("#") || line.startsWith(" ") || line.startsWith("filedesc") || + line.split(" ").size != 11) + } + + def keepCdx(line: CdxLine) : Boolean = { + // TODO: sha1.isalnum() and c_size.isdigit() and offset.isdigit() and dt.isdigit() + if (line.http_status != "200" || line.sha1.size != 32) { + return false + } + // TODO: '-' in (line.surt, line.datetime, line.url, line.mime, line.c_size, line.offset, line.warc) + return true + } + + // Returns (key, f:c, file:cdx, file:mime), all as strings, which is close to + // how they will be inserted into HBase + def cdxLineToRow(line: CdxLine) : (String, String, String, String) = { + + val key = "sha1:" + line.sha1 + + val warcFile = line.warc.split('/')(1) + + // Read CDX-style datetime and conver to ISO 8601 with second resolution + val dtFormat = new java.text.SimpleDateFormat("yyyyMMddHHmmss") + val isoFormat = new java.text.SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss'Z'") + // TODO: timezones? UTC to UTC, so I don't think so. + val dtIso = isoFormat.format(dtFormat.parse(line.datetime)) + + // warc_file = warc.split('/')[-1] + // dt_iso = datetime.strptime(dt, "%Y%m%d%H%M%S").isoformat() + // f:c = dict(u=url, d=dt_iso, f=warc_file, o=int(offset), c=1) + + // This is the "f:c" field. 'i' intentionally not set + val heritrixInfo = "" + + // file:cdx = dict(surt=surt, dt=dt, url=url, c_size=int(c_size), + // offset=int(offset), warc=warc) + val fileCdx = "" + (key, heritrixInfo, fileCdx, line.mime) + } + + def lineToCdxLine(line: String) : CdxLine = { + val raw = line.split("\\s+") + // surt, datetime, url, mime, http_status, sha1, SKIP, SKIP, c_size, offset, warc + CdxLine(raw(0), raw(1), raw(2), raw(3), raw(4), raw(5), raw(8), raw(9), raw(10)) + } + +} diff --git a/scalding/src/test/scala/sandcrawler/CdxBackfillJob.scala b/scalding/src/test/scala/sandcrawler/CdxBackfillJob.scala new file mode 100644 index 0000000..a442d79 --- /dev/null +++ b/scalding/src/test/scala/sandcrawler/CdxBackfillJob.scala @@ -0,0 +1,51 @@ + +package sandcrawler + +import cascading.tuple.Fields +import org.scalatest._ + +class CdxBackfillTest extends FlatSpec with Matchers { + + import CdxBackfillJob._ + + it should "normalize mimetypes" in { + assert(CdxBackfillJob.normalizeMime("asdf") === "asdf") + assert(CdxBackfillJob.normalizeMime("application/pdf") === "application/pdf") + assert(CdxBackfillJob.normalizeMime("application/pdf+journal") === "application/pdf") + assert(CdxBackfillJob.normalizeMime("Application/PDF") === "application/pdf") + assert(CdxBackfillJob.normalizeMime("application/p") === "application/p") + assert(CdxBackfillJob.normalizeMime("application/xml+stuff") === "text/xml") + assert(CdxBackfillJob.normalizeMime("application/x-pdf") === "application/pdf") + assert(CdxBackfillJob.normalizeMime("application/x-html") === "application/x-html") + } + + it should "filter CDX lines" in { + assert(true === keepCdx(lineToCdxLine( + "edu,upenn,ldc)/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf 20170828233154 https://www.ldc.upenn.edu/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf application/pdf 200 WL3FEA62TEU4F52Y5DOVQ62VET4QJW7G - - 210251 931661233 SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828231135742-00000-00009-wbgrp-svc284/SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828232253025-00005-3480~wbgrp-svc284.us.archive.org~8443.warc.gz"))) + // redirect + assert(false === keepCdx(lineToCdxLine( + "edu,upenn,ldc)/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf 20170828233154 https://www.ldc.upenn.edu/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf application/pdf 301 WL3FEA62TEU4F52Y5DOVQ62VET4QJW7G - - 210251 931661233 SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828231135742-00000-00009-wbgrp-svc284/SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828232253025-00005-3480~wbgrp-svc284.us.archive.org~8443.warc.gz"))) + } + + it should "know what CDX lines are" in { + assert(true === isCdxLine( + "edu,upenn,ldc)/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf 20170828233154 https://www.ldc.upenn.edu/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf application/pdf 200 WL3FEA62TEU4F52Y5DOVQ62VET4QJW7G - - 210251 931661233 SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828231135742-00000-00009-wbgrp-svc284/SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828232253025-00005-3480~wbgrp-svc284.us.archive.org~8443.warc.gz")) + assert(false === isCdxLine("")) + assert(false === isCdxLine( + " edu,upenn,ldc)/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf 20170828233154 https://www.ldc.upenn.edu/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf application/pdf 200 WL3FEA62TEU4F52Y5DOVQ62VET4QJW7G - - 210251 931661233 SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828231135742-00000-00009-wbgrp-svc284/SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828232253025-00005-3480~wbgrp-svc284.us.archive.org~8443.warc.gz")) + assert(false === isCdxLine( + "#edu,upenn,ldc)/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf 20170828233154 https://www.ldc.upenn.edu/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf application/pdf 200 WL3FEA62TEU4F52Y5DOVQ62VET4QJW7G - - 210251 931661233 SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828231135742-00000-00009-wbgrp-svc284/SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828232253025-00005-3480~wbgrp-svc284.us.archive.org~8443.warc.gz")) + // missing two fields + assert(false === isCdxLine( + "#edu,upenn,ldc)/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf 20170828233154 https://www.ldc.upenn.edu/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf application/pdf 200 WL3FEA62TEU4F52Y5DOVQ62VET4QJW7G 210251 931661233 SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828231135742-00000-00009-wbgrp-svc284/SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828232253025-00005-3480~wbgrp-svc284.us.archive.org~8443.warc.gz")) + // extra field + assert(false === isCdxLine( + "edu,upenn,ldc)/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf 20170828233154 https://www.ldc.upenn.edu/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf application/pdf 200 WL3FEA62TEU4F52Y5DOVQ62VET4QJW7G - - 210251 931661233 SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828231135742-00000-00009-wbgrp-svc284/SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828232253025-00005-3480~wbgrp-svc284.us.archive.org~8443.warc.gz -")) + } + + it should "execute lineToRow" in { + cdxLineToRow(lineToCdxLine( + "edu,upenn,ldc)/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf 20170828233154 https://www.ldc.upenn.edu/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf application/pdf 200 WL3FEA62TEU4F52Y5DOVQ62VET4QJW7G - - 210251 931661233 SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828231135742-00000-00009-wbgrp-svc284/SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828232253025-00005-3480~wbgrp-svc284.us.archive.org~8443.warc.gz")) + } + +} -- cgit v1.2.3 From bc10143c54c02d5e6a806f1d5f7bb326c24793f3 Mon Sep 17 00:00:00 2001 From: Bryan Newbold Date: Tue, 17 Jul 2018 18:52:58 -0700 Subject: add buildSink() method for writing to HBase --- scalding/src/main/scala/sandcrawler/HBaseBuilder.scala | 7 +++++++ 1 file changed, 7 insertions(+) (limited to 'scalding') diff --git a/scalding/src/main/scala/sandcrawler/HBaseBuilder.scala b/scalding/src/main/scala/sandcrawler/HBaseBuilder.scala index b271def..fd04f2e 100644 --- a/scalding/src/main/scala/sandcrawler/HBaseBuilder.scala +++ b/scalding/src/main/scala/sandcrawler/HBaseBuilder.scala @@ -1,6 +1,8 @@ package sandcrawler import cascading.tuple.Fields +import parallelai.spyglass.base.JobBase +import cascading.tap.SinkMode import parallelai.spyglass.hbase.HBaseConstants.SourceMode import parallelai.spyglass.hbase.HBaseSource import scala._ @@ -48,4 +50,9 @@ object HBaseBuilder { val (families, fields) = parseColSpecs(colSpecs) new HBaseSource(table, server, new Fields("key"), families, fields, sourceMode = sourceMode, keyList = keyList) } + + def buildSink(table: String, server: String, colSpecs: List[String], sinkMode: SinkMode, keyList: List[String] = List("key")) : HBaseSource = { + val (families, fields) = parseColSpecs(colSpecs) + new HBaseSource(table, server, new Fields("key"), families, fields, sinkMode = sinkMode) + } } -- cgit v1.2.3 From ef1e7b674435c1d8759227018f88aad8033c0ec1 Mon Sep 17 00:00:00 2001 From: Bryan Newbold Date: Tue, 17 Jul 2018 18:53:17 -0700 Subject: fix confusing whitespace in HBaseStatusCountTest --- .../scala/sandcrawler/HBaseStatusCountTest.scala | 22 +++++++++++----------- 1 file changed, 11 insertions(+), 11 deletions(-) (limited to 'scalding') diff --git a/scalding/src/test/scala/sandcrawler/HBaseStatusCountTest.scala b/scalding/src/test/scala/sandcrawler/HBaseStatusCountTest.scala index 11ab1d0..d7689cd 100644 --- a/scalding/src/test/scala/sandcrawler/HBaseStatusCountTest.scala +++ b/scalding/src/test/scala/sandcrawler/HBaseStatusCountTest.scala @@ -48,18 +48,18 @@ class HBaseStatusCountTest extends FunSpec with TupleConversions { .arg("debug", "true") .source[Tuple](HBaseCountJob.getHBaseSource(testTable, testHost, "grobid0:status_code"), sampleData.map(l => new Tuple(l.map(s => {new ImmutableBytesWritable(s)}):_*))) - .sink[Tuple](TypedTsv[(Long, Long)](output)) { - outputBuffer => - it("should return a 2-element list.") { - assert(outputBuffer.size === 2) - } + .sink[Tuple](TypedTsv[(Long, Long)](output)) { + outputBuffer => + it("should return a 2-element list.") { + assert(outputBuffer.size === 2) + } - // Convert List[Tuple] to Map[Long, Long]. - val counts = outputBuffer.map(t => (t.getLong(0), t.getLong(1))).toMap - it("should have the appropriate number of each status type") { - assert(counts(statusType1) == statusType1Count) - assert(counts(statusType2) == statusType2Count) - } + // Convert List[Tuple] to Map[Long, Long]. + val counts = outputBuffer.map(t => (t.getLong(0), t.getLong(1))).toMap + it("should have the appropriate number of each status type") { + assert(counts(statusType1) == statusType1Count) + assert(counts(statusType2) == statusType2Count) + } } .run .finish -- cgit v1.2.3 From 71e2219226ba889b0ca90d81f9e278519520ad20 Mon Sep 17 00:00:00 2001 From: Bryan Newbold Date: Tue, 17 Jul 2018 18:53:37 -0700 Subject: more complete CdxBackfillJob (untested) --- .../test/scala/sandcrawler/CdxBackfillJob.scala | 70 +++++++++++++++++++++- 1 file changed, 68 insertions(+), 2 deletions(-) (limited to 'scalding') diff --git a/scalding/src/test/scala/sandcrawler/CdxBackfillJob.scala b/scalding/src/test/scala/sandcrawler/CdxBackfillJob.scala index a442d79..de94494 100644 --- a/scalding/src/test/scala/sandcrawler/CdxBackfillJob.scala +++ b/scalding/src/test/scala/sandcrawler/CdxBackfillJob.scala @@ -1,8 +1,17 @@ package sandcrawler -import cascading.tuple.Fields import org.scalatest._ +import cascading.tuple.{Tuple, Fields} +import com.twitter.scalding.{JobTest, Tsv, TypedTsv, TupleConversions, TextLine} +import org.apache.hadoop.hbase.io.ImmutableBytesWritable +import org.apache.hadoop.hbase.util.Bytes +import org.junit.runner.RunWith +import org.scalatest.FunSpec +import org.scalatest.junit.JUnitRunner +import org.slf4j.LoggerFactory +import parallelai.spyglass.hbase.HBaseSource +import parallelai.spyglass.hbase.HBaseConstants.SourceMode class CdxBackfillTest extends FlatSpec with Matchers { @@ -37,7 +46,7 @@ class CdxBackfillTest extends FlatSpec with Matchers { "#edu,upenn,ldc)/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf 20170828233154 https://www.ldc.upenn.edu/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf application/pdf 200 WL3FEA62TEU4F52Y5DOVQ62VET4QJW7G - - 210251 931661233 SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828231135742-00000-00009-wbgrp-svc284/SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828232253025-00005-3480~wbgrp-svc284.us.archive.org~8443.warc.gz")) // missing two fields assert(false === isCdxLine( - "#edu,upenn,ldc)/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf 20170828233154 https://www.ldc.upenn.edu/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf application/pdf 200 WL3FEA62TEU4F52Y5DOVQ62VET4QJW7G 210251 931661233 SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828231135742-00000-00009-wbgrp-svc284/SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828232253025-00005-3480~wbgrp-svc284.us.archive.org~8443.warc.gz")) + "edu,upenn,ldc)/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf 20170828233154 https://www.ldc.upenn.edu/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf application/pdf 200 WL3FEA62TEU4F52Y5DOVQ62VET4QJW7G 210251 931661233 SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828231135742-00000-00009-wbgrp-svc284/SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828232253025-00005-3480~wbgrp-svc284.us.archive.org~8443.warc.gz")) // extra field assert(false === isCdxLine( "edu,upenn,ldc)/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf 20170828233154 https://www.ldc.upenn.edu/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf application/pdf 200 WL3FEA62TEU4F52Y5DOVQ62VET4QJW7G - - 210251 931661233 SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828231135742-00000-00009-wbgrp-svc284/SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828232253025-00005-3480~wbgrp-svc284.us.archive.org~8443.warc.gz -")) @@ -49,3 +58,60 @@ class CdxBackfillTest extends FlatSpec with Matchers { } } + +@RunWith(classOf[JUnitRunner]) +class CdxBackfillJobTest extends FunSpec with TupleConversions { + + val output = "/tmp/testOutput" + val (testTable, testHost, testCdxFile) = ("test-table", "dummy-host:2181", "test_file.cdx") + + val log = LoggerFactory.getLogger(this.getClass.getName) + + val dummySizeBytes = Bytes.toBytes(100) + + val sampleData = List( + List(Bytes.toBytes("sha1:K2DKSSVTXWPRMFDTWSTCQW3RVWRIOV3Q"), dummySizeBytes), + List(Bytes.toBytes("sha1:C3YNNEGH5WAG5ZAAXWAEBNXJWT6CZ3WU"), dummySizeBytes), + List(Bytes.toBytes("sha1:SDKUVHC3YNNEGH5WAG5ZAAXWAEBNX4WT"), dummySizeBytes), + List(Bytes.toBytes("sha1:095893C3YNNEGH5WAG5ZAAXWAEBNXJWT"), dummySizeBytes) + ) + val sampleCdxLines = List( + // clean line + "edu,upenn,ldc)/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf 20170828233154 https://www.ldc.upenn.edu/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf application/pdf 200 WL3FEA62TEU4F52Y5DOVQ62VET4QJW7G - - 210251 931661233 SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828231135742-00000-00009-wbgrp-svc284/SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828232253025-00005-3480~wbgrp-svc284.us.archive.org~8443.warc.gz", + // has existing SHA1 + "edu,upenn,ldc)/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf 20170828233154 https://www.ldc.upenn.edu/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf application/pdf 200 C3YNNEGH5WAG5ZAAXWAEBNXJWT6CZ3WU - - 210251 931661233 SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828231135742-00000-00009-wbgrp-svc284/SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828232253025-00005-3480~wbgrp-svc284.us.archive.org~8443.warc.gz", + // HTTP status code + "edu,upenn,ldc)/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf 20170828233154 https://www.ldc.upenn.edu/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf application/pdf 301 WL3FEA62TEU4F52Y5DOVQ62VET4QJW7G - - 210251 931661233 SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828231135742-00000-00009-wbgrp-svc284/SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828232253025-00005-3480~wbgrp-svc284.us.archive.org~8443.warc.gz", + // not CDX (prefixed with hash) + "#edu,upenn,ldc)/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf 20170828233154 https://www.ldc.upenn.edu/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf application/pdf 200 WL3FEA62TEU4F52Y5DOVQ62VET4QJW7G - - 210251 931661233 SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828231135742-00000-00009-wbgrp-svc284/SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828232253025-00005-3480~wbgrp-svc284.us.archive.org~8443.warc.gz" + ) + + JobTest("sandcrawler.CdxBackfillJob") + .arg("test", "") + .arg("app.conf.path", "app.conf") + .arg("output", output) + .arg("hbase-table", testTable) + .arg("zookeeper-hosts", testHost) + .arg("cdx-input-path", testCdxFile) + .arg("debug", "true") + .source[Tuple](CdxBackfillJob.getHBaseSource(testTable, testHost), + sampleData.map(l => new Tuple(l.map(s => {new ImmutableBytesWritable(s)}):_*))) + .source[String](TextLine(testCdxFile), sampleCdxLines) + .sink[Tuple](CdxBackfillJob.getHBaseSink(testTable, testHost)) { + outputBuffer => + + it("should return a 1-element list (after join).") { + // XXX: + assert(outputBuffer.size === 1) + } + + // Convert List[Tuple] to Map[Long, Long]. + val counts = outputBuffer.map(t => (t.getLong(0), t.getLong(1))).toMap + it("should have the appropriate number of each status type") { + // XXX: + assert(counts(1) == 3) + } + } + .run + .finish +} -- cgit v1.2.3 From 866864dc5d52b8c8466b42d28148ace108d0e1f2 Mon Sep 17 00:00:00 2001 From: Bryan Newbold Date: Tue, 17 Jul 2018 18:53:57 -0700 Subject: refactor CdxBackfillJob to be all Typed (failed tests) --- .../main/scala/sandcrawler/CdxBackfillJob.scala | 67 +++++++++++++++------- 1 file changed, 46 insertions(+), 21 deletions(-) (limited to 'scalding') diff --git a/scalding/src/main/scala/sandcrawler/CdxBackfillJob.scala b/scalding/src/main/scala/sandcrawler/CdxBackfillJob.scala index 4f08665..0251e07 100644 --- a/scalding/src/main/scala/sandcrawler/CdxBackfillJob.scala +++ b/scalding/src/main/scala/sandcrawler/CdxBackfillJob.scala @@ -4,13 +4,24 @@ import cascading.property.AppProps import cascading.tuple.Fields import cascading.pipe.joiner._ import com.twitter.scalding._ +import com.twitter.scalding.typed.TDsl._ import java.util.Properties +import cascading.tap.SinkMode import parallelai.spyglass.base.JobBase -import parallelai.spyglass.hbase.{HBaseSource, HBasePipeConversions} import parallelai.spyglass.hbase.HBaseConstants.SourceMode +import parallelai.spyglass.hbase.{HBaseSource, HBasePipeConversions} // Type that represents a raw parsed CDX line -case class CdxLine(surt: String, datetime: String, url: String, mime: String, http_status: String, sha1: String, c_size: String, offset: String, warc: String) +case class CdxLine(surt: String, + datetime: String, + url: String, + mime: String, + http_status: String, + sha1: String, + c_size: String, + offset: String, + warc: String) + /** * CDX backfill: @@ -27,38 +38,44 @@ case class CdxLine(surt: String, datetime: String, url: String, mime: String, ht */ class CdxBackfillJob(args: Args) extends JobBase(args) with HBasePipeConversions { - //import CdxLine._ - // XXX remove all other CdxBackfillJob.whatever import CdxBackfillJob._ - val cdxInputPath = args("cdx-input-path") - val hbaseTable = args("hbase-table") - val zookeeperHosts = args("zookeeper-hosts") - val hbaseSource = getHBaseSource(args("hbase-table"), args("zookeeper-hosts")) + val hbaseSink = getHBaseSink(args("hbase-table"), args("zookeeper-hosts")) - val lines : TypedPipe[String] = TypedPipe.from(TextLine(cdxInputPath)) - + // Parse CDX lines from text file to typed pipe + val lines : TypedPipe[String] = TypedPipe.from(TextLine(args("cdx-input-path"))) val cdxLines : TypedPipe[CdxLine] = lines .filter { isCdxLine } .map { lineToCdxLine } .filter { CdxBackfillJob.keepCdx(_) } - val cdxRows = cdxLines - .map { CdxBackfillJob.cdxLineToRow(_) } - .toPipe(('key, 'f_c, 'file_cdx, 'file_mime)) + val cdxRows : TypedPipe[(String, String, String, String)] = cdxLines + .map { CdxBackfillJob.cdxLineToRow } - val hbaseKeys = hbaseSource - .project('key) - .mapTo('key -> 'existingKey) { key : String => key } + val existingKeys : TypedPipe[String] = hbaseSource + .read + .toTypedPipe[String]('key) // filters out all the lines that have an existing SHA1 key in HBase - val newRows = cdxRows - .joinWithLarger('key -> 'existingKey, hbaseKeys, joiner = new LeftJoin) - .filter('existingKey) { k : String => k == null } // is String the right type? - + // the groupBy statements are to select key values to join on + val newRows : TypedPipe[(String, String, String, String)] = existingKeys + .groupBy( identity ) + .rightJoin(cdxRows.groupBy(_._1)) + .toTypedPipe + .debug + .collect { case (_, (None, row)) => row } + .debug + + // convert to tuple form and write out into HBase newRows - .write(hbaseSource) + .toPipe('key, 'c, 'cdx, 'mime) + .toBytesWritable( new Fields("key", "c", "cdx", "mime") ) + .write(hbaseSink) + + // XXX: + //.toPipe("all") + //.mapTo('all -> ('key, 'c, 'cdx, 'mime)) { x : (String, String, String, String) => x } } @@ -72,6 +89,14 @@ object CdxBackfillJob { SourceMode.SCAN_ALL) } + def getHBaseSink(hbase_table: String, zookeeper_hosts: String) : HBaseSource = { + return HBaseBuilder.buildSink( + hbase_table, + zookeeper_hosts, + List("f:c", "file:cdx", "file:mime"), + SinkMode.UPDATE) + } + def normalizeMime(raw: String) : String = { val NORMAL_MIME = List("application/pdf", -- cgit v1.2.3 From 3df897225b942e3205283ba30575a2336e318e34 Mon Sep 17 00:00:00 2001 From: Bryan Newbold Date: Tue, 17 Jul 2018 18:55:17 -0700 Subject: more debugging notes --- scalding/scalding-debugging.md | 14 ++++++++++++++ 1 file changed, 14 insertions(+) (limited to 'scalding') diff --git a/scalding/scalding-debugging.md b/scalding/scalding-debugging.md index 9143698..404fb4d 100644 --- a/scalding/scalding-debugging.md +++ b/scalding/scalding-debugging.md @@ -45,6 +45,20 @@ resolved by ensuring that the `HBaseSource` constructors had exactly identical names and arguments (eg, table names and zookeeper quorums have to be exact matches). +If you get: + + value toTypedPipe is not a member of cascading.pipe.Pipe + +You probably need to: + + import com.twitter.scalding.typed.TDsl._ + +## Running Individual Tests + +You can run a single test matching a string glob pattern like: + + sbt:sandcrawler> testOnly *CdxBackfill* + ## Fields Values of type `List[Fields]` are not printed in the expected way: -- cgit v1.2.3 From e2d5b35a8cb5eefafc12abe0394ec5f60ab87d79 Mon Sep 17 00:00:00 2001 From: Ellen Spertus Date: Thu, 19 Jul 2018 10:30:20 -0700 Subject: Added rule to warn about block imports. Removed block imports. --- scalding/scalastyle-config.xml | 3 ++- scalding/src/main/scala/sandcrawler/HBaseCountJob.scala | 3 ++- scalding/src/main/scala/sandcrawler/HBaseRowCountJob.scala | 3 ++- 3 files changed, 6 insertions(+), 3 deletions(-) (limited to 'scalding') diff --git a/scalding/scalastyle-config.xml b/scalding/scalastyle-config.xml index 1575217..e2f58a0 100644 --- a/scalding/scalastyle-config.xml +++ b/scalding/scalastyle-config.xml @@ -114,7 +114,8 @@ - + + java,scala,others javax?\..+ diff --git a/scalding/src/main/scala/sandcrawler/HBaseCountJob.scala b/scalding/src/main/scala/sandcrawler/HBaseCountJob.scala index e1f46ef..22e4e86 100644 --- a/scalding/src/main/scala/sandcrawler/HBaseCountJob.scala +++ b/scalding/src/main/scala/sandcrawler/HBaseCountJob.scala @@ -6,7 +6,8 @@ import com.twitter.scalding._ import java.util.Properties import parallelai.spyglass.base.JobBase import parallelai.spyglass.hbase.HBaseConstants.SourceMode -import parallelai.spyglass.hbase.{HBaseSource, HBasePipeConversions} +import parallelai.spyglass.hbase.HBasePipeConversions +import parallelai.spyglass.hbase.HBaseSource class HBaseCountJob(args: Args, colSpec: String) extends JobBase(args) with HBasePipeConversions { val output = args("output") diff --git a/scalding/src/main/scala/sandcrawler/HBaseRowCountJob.scala b/scalding/src/main/scala/sandcrawler/HBaseRowCountJob.scala index 41de719..6def218 100644 --- a/scalding/src/main/scala/sandcrawler/HBaseRowCountJob.scala +++ b/scalding/src/main/scala/sandcrawler/HBaseRowCountJob.scala @@ -6,7 +6,8 @@ import com.twitter.scalding._ import java.util.Properties import parallelai.spyglass.base.JobBase import parallelai.spyglass.hbase.HBaseConstants.SourceMode -import parallelai.spyglass.hbase.{HBaseSource, HBasePipeConversions} +import parallelai.spyglass.hbase.HBasePipeConversions +import parallelai.spyglass.hbase.HBaseSource class HBaseRowCountJob(args: Args) extends JobBase(args) with HBasePipeConversions { -- cgit v1.2.3