diff options
Diffstat (limited to 'scalding')
-rw-r--r-- | scalding/build.sbt | 2 | ||||
-rw-r--r-- | scalding/scalastyle-config.xml | 30 | ||||
-rw-r--r-- | scalding/src/main/scala/sandcrawler/HBaseBuilder.scala | 14 | ||||
-rw-r--r-- | scalding/src/main/scala/sandcrawler/HBaseCountJob.scala | 10 | ||||
-rw-r--r-- | scalding/src/main/scala/sandcrawler/HBaseRowCountJob.scala | 9 | ||||
-rw-r--r-- | scalding/src/main/scala/sandcrawler/HBaseStatusCountJob.scala | 14 | ||||
-rw-r--r-- | scalding/src/test/scala/sandcrawler/HBaseStatusCountTest.scala | 22 |
7 files changed, 64 insertions, 37 deletions
diff --git a/scalding/build.sbt b/scalding/build.sbt index cc21444..980418c 100644 --- a/scalding/build.sbt +++ b/scalding/build.sbt @@ -16,7 +16,7 @@ lazy val root = (project in file(".")). (scalastyleSources in Compile) := { // all .scala files in "src/main/scala" val scalaSourceFiles = ((scalaSource in Compile).value ** "*.scala").get - val dirNameToExclude = "example" + val dirNameToExclude = "/example/" scalaSourceFiles.filterNot(_.getAbsolutePath.contains(dirNameToExclude)) }, diff --git a/scalding/scalastyle-config.xml b/scalding/scalastyle-config.xml index e2f58a0..86d8fca 100644 --- a/scalding/scalastyle-config.xml +++ b/scalding/scalastyle-config.xml @@ -6,6 +6,13 @@ <parameter name="maxFileLength"><![CDATA[800]]></parameter> </parameters> </check> +<check enabled="true" class="org.scalastyle.file.IndentationChecker" level="warning"> + <parameters> + <parameter name="tabSize">2</parameter> + <parameter name="methodParamIndentSize">2</parameter> + <parameter name="classParamIndentSize">4</parameter> + </parameters> +</check> <check level="warning" class="org.scalastyle.file.HeaderMatchesChecker" enabled="false"> <parameters> <parameter name="header"><![CDATA[// Copyright (C) 2011-2012 the original author or authors. @@ -50,17 +57,12 @@ </parameters> </check> <check level="warning" class="org.scalastyle.scalariform.EqualsHashCodeChecker" enabled="true"></check> - <check level="warning" class="org.scalastyle.scalariform.IllegalImportsChecker" enabled="true"> - <parameters> - <parameter name="illegalImports"><![CDATA[sun._,java.awt._]]></parameter> - </parameters> - </check> <check level="warning" class="org.scalastyle.scalariform.ParameterNumberChecker" enabled="true"> <parameters> <parameter name="maxParameters"><![CDATA[8]]></parameter> </parameters> </check> - <check level="warning" class="org.scalastyle.scalariform.MagicNumberChecker" enabled="true"> + <check level="warning" class="org.scalastyle.scalariform.MagicNumberChecker" enabled="false"> <parameters> <parameter name="ignore"><![CDATA[-1,0,1,2,3]]></parameter> </parameters> @@ -114,15 +116,23 @@ <check level="warning" class="org.scalastyle.scalariform.PublicMethodsHaveTypeChecker" enabled="true"></check> <check level="warning" class="org.scalastyle.file.NewLineAtEofChecker" enabled="true"></check> <check level="warning" class="org.scalastyle.file.NoNewLineAtEofChecker" enabled="false"></check> - <check class="org.scalastyle.scalariform.BlockImportChecker" level="warning" enabled="true"/> -<check class="org.scalastyle.scalariform.ImportOrderChecker" level="warning" enabled="true"> + <check enabled="true" class="org.scalastyle.scalariform.BlockImportChecker" level="warning"/> + <check enabled="true" class="org.scalastyle.scalariform.ImportOrderChecker" level="warning"> <parameters> <parameter name="groups">java,scala,others</parameter> <parameter name="group.java">javax?\..+</parameter> <parameter name="group.scala">scala\..+</parameter> <parameter name="group.others">.+</parameter> - <parameter name="maxBlankLines">1</parameter> - <parameter name="lexicographic">true</parameter> + </parameters> + </check> +<check level="warning" class="org.scalastyle.scalariform.IllegalImportsChecker" enabled="true"> + <parameters> + <parameter name="illegalImports"><![CDATA[sun._,java.awt._]]></parameter> + </parameters> +</check> +<check level="warning" class="org.scalastyle.scalariform.IllegalImportsChecker" enabled="true"> + <parameters> + <parameter name="illegalImports"><![CDATA[sun._,java.awt._]]></parameter> </parameters> </check> </scalastyle> diff --git a/scalding/src/main/scala/sandcrawler/HBaseBuilder.scala b/scalding/src/main/scala/sandcrawler/HBaseBuilder.scala index b271def..19df99d 100644 --- a/scalding/src/main/scala/sandcrawler/HBaseBuilder.scala +++ b/scalding/src/main/scala/sandcrawler/HBaseBuilder.scala @@ -1,9 +1,12 @@ package sandcrawler +import scala._ + +import cascading.tap.SinkMode import cascading.tuple.Fields +import parallelai.spyglass.base.JobBase import parallelai.spyglass.hbase.HBaseConstants.SourceMode import parallelai.spyglass.hbase.HBaseSource -import scala._ object HBaseBuilder { // map from column families to column names @@ -39,8 +42,8 @@ object HBaseBuilder { val groupMap: Map[String, List[String]] = colSpecs.groupBy(c => (c split ":")(0)) val families = groupMap.keys.toList val groupedColNames : List[List[String]] = families map {fam => { - val cols = {groupMap(fam).map(v => v.split(":")(1))} - cols}} + val cols = {groupMap(fam).map(v => v.split(":")(1))} + cols}} (families, groupedColNames.map({fields => new Fields(fields : _*)})) } @@ -48,4 +51,9 @@ object HBaseBuilder { val (families, fields) = parseColSpecs(colSpecs) new HBaseSource(table, server, new Fields("key"), families, fields, sourceMode = sourceMode, keyList = keyList) } + + def buildSink(table: String, server: String, colSpecs: List[String], sinkMode: SinkMode, keyList: List[String] = List("key")) : HBaseSource = { + val (families, fields) = parseColSpecs(colSpecs) + new HBaseSource(table, server, new Fields("key"), families, fields, sinkMode = sinkMode) + } } diff --git a/scalding/src/main/scala/sandcrawler/HBaseCountJob.scala b/scalding/src/main/scala/sandcrawler/HBaseCountJob.scala index 22e4e86..b12e723 100644 --- a/scalding/src/main/scala/sandcrawler/HBaseCountJob.scala +++ b/scalding/src/main/scala/sandcrawler/HBaseCountJob.scala @@ -1,9 +1,10 @@ package sandcrawler +import java.util.Properties + import cascading.property.AppProps import cascading.tuple.Fields import com.twitter.scalding._ -import java.util.Properties import parallelai.spyglass.base.JobBase import parallelai.spyglass.hbase.HBaseConstants.SourceMode import parallelai.spyglass.hbase.HBasePipeConversions @@ -14,9 +15,10 @@ class HBaseCountJob(args: Args, colSpec: String) extends JobBase(args) with HBas HBaseBuilder.parseColSpec(colSpec) val Col: String = colSpec.split(":")(1) - HBaseCountJob.getHBaseSource(args("hbase-table"), - args("zookeeper-hosts"), - colSpec) + HBaseCountJob.getHBaseSource( + args("hbase-table"), + args("zookeeper-hosts"), + colSpec) .read .fromBytesWritable(Symbol(Col)) .debug diff --git a/scalding/src/main/scala/sandcrawler/HBaseRowCountJob.scala b/scalding/src/main/scala/sandcrawler/HBaseRowCountJob.scala index 6def218..4c3de33 100644 --- a/scalding/src/main/scala/sandcrawler/HBaseRowCountJob.scala +++ b/scalding/src/main/scala/sandcrawler/HBaseRowCountJob.scala @@ -1,9 +1,10 @@ package sandcrawler +import java.util.Properties + import cascading.property.AppProps import cascading.tuple.Fields import com.twitter.scalding._ -import java.util.Properties import parallelai.spyglass.base.JobBase import parallelai.spyglass.hbase.HBaseConstants.SourceMode import parallelai.spyglass.hbase.HBasePipeConversions @@ -13,8 +14,9 @@ class HBaseRowCountJob(args: Args) extends JobBase(args) with HBasePipeConversio val output = args("output") - HBaseRowCountJob.getHBaseSource(args("hbase-table"), - args("zookeeper-hosts")) + HBaseRowCountJob.getHBaseSource( + args("hbase-table"), + args("zookeeper-hosts")) .read .debug .groupAll { _.size('count) } @@ -31,5 +33,4 @@ object HBaseRowCountJob { List("file:size"), SourceMode.SCAN_ALL) } - } diff --git a/scalding/src/main/scala/sandcrawler/HBaseStatusCountJob.scala b/scalding/src/main/scala/sandcrawler/HBaseStatusCountJob.scala index b1dab0e..fd0b4e2 100644 --- a/scalding/src/main/scala/sandcrawler/HBaseStatusCountJob.scala +++ b/scalding/src/main/scala/sandcrawler/HBaseStatusCountJob.scala @@ -1,18 +1,24 @@ package sandcrawler -import com.twitter.scalding.Args +import java.util.Properties + +import cascading.property.AppProps +import cascading.tuple.Fields import com.twitter.scalding._ import com.twitter.scalding.typed.TDsl._ import org.apache.hadoop.hbase.io.ImmutableBytesWritable import org.apache.hadoop.hbase.util.Bytes import parallelai.spyglass.base.JobBase +import parallelai.spyglass.hbase.HBaseConstants.SourceMode import parallelai.spyglass.hbase.HBasePipeConversions +import parallelai.spyglass.hbase.HBaseSource class HBaseStatusCountJob(args: Args) extends JobBase(args) with HBasePipeConversions { - val source = HBaseCountJob.getHBaseSource(args("hbase-table"), - args("zookeeper-hosts"), - "grobid0:status_code") + val source = HBaseCountJob.getHBaseSource( + args("hbase-table"), + args("zookeeper-hosts"), + "grobid0:status_code") val statusPipe : TypedPipe[Long] = source .read diff --git a/scalding/src/test/scala/sandcrawler/HBaseStatusCountTest.scala b/scalding/src/test/scala/sandcrawler/HBaseStatusCountTest.scala index 11ab1d0..d7689cd 100644 --- a/scalding/src/test/scala/sandcrawler/HBaseStatusCountTest.scala +++ b/scalding/src/test/scala/sandcrawler/HBaseStatusCountTest.scala @@ -48,18 +48,18 @@ class HBaseStatusCountTest extends FunSpec with TupleConversions { .arg("debug", "true") .source[Tuple](HBaseCountJob.getHBaseSource(testTable, testHost, "grobid0:status_code"), sampleData.map(l => new Tuple(l.map(s => {new ImmutableBytesWritable(s)}):_*))) - .sink[Tuple](TypedTsv[(Long, Long)](output)) { - outputBuffer => - it("should return a 2-element list.") { - assert(outputBuffer.size === 2) - } + .sink[Tuple](TypedTsv[(Long, Long)](output)) { + outputBuffer => + it("should return a 2-element list.") { + assert(outputBuffer.size === 2) + } - // Convert List[Tuple] to Map[Long, Long]. - val counts = outputBuffer.map(t => (t.getLong(0), t.getLong(1))).toMap - it("should have the appropriate number of each status type") { - assert(counts(statusType1) == statusType1Count) - assert(counts(statusType2) == statusType2Count) - } + // Convert List[Tuple] to Map[Long, Long]. + val counts = outputBuffer.map(t => (t.getLong(0), t.getLong(1))).toMap + it("should have the appropriate number of each status type") { + assert(counts(statusType1) == statusType1Count) + assert(counts(statusType2) == statusType2Count) + } } .run .finish |