aboutsummaryrefslogtreecommitdiffstats
path: root/scalding
diff options
context:
space:
mode:
Diffstat (limited to 'scalding')
-rw-r--r--scalding/build.sbt2
-rw-r--r--scalding/scalastyle-config.xml30
-rw-r--r--scalding/src/main/scala/sandcrawler/HBaseBuilder.scala14
-rw-r--r--scalding/src/main/scala/sandcrawler/HBaseCountJob.scala10
-rw-r--r--scalding/src/main/scala/sandcrawler/HBaseRowCountJob.scala9
-rw-r--r--scalding/src/main/scala/sandcrawler/HBaseStatusCountJob.scala14
-rw-r--r--scalding/src/test/scala/sandcrawler/HBaseStatusCountTest.scala22
7 files changed, 64 insertions, 37 deletions
diff --git a/scalding/build.sbt b/scalding/build.sbt
index cc21444..980418c 100644
--- a/scalding/build.sbt
+++ b/scalding/build.sbt
@@ -16,7 +16,7 @@ lazy val root = (project in file(".")).
(scalastyleSources in Compile) := {
// all .scala files in "src/main/scala"
val scalaSourceFiles = ((scalaSource in Compile).value ** "*.scala").get
- val dirNameToExclude = "example"
+ val dirNameToExclude = "/example/"
scalaSourceFiles.filterNot(_.getAbsolutePath.contains(dirNameToExclude))
},
diff --git a/scalding/scalastyle-config.xml b/scalding/scalastyle-config.xml
index e2f58a0..86d8fca 100644
--- a/scalding/scalastyle-config.xml
+++ b/scalding/scalastyle-config.xml
@@ -6,6 +6,13 @@
<parameter name="maxFileLength"><![CDATA[800]]></parameter>
</parameters>
</check>
+<check enabled="true" class="org.scalastyle.file.IndentationChecker" level="warning">
+ <parameters>
+ <parameter name="tabSize">2</parameter>
+ <parameter name="methodParamIndentSize">2</parameter>
+ <parameter name="classParamIndentSize">4</parameter>
+ </parameters>
+</check>
<check level="warning" class="org.scalastyle.file.HeaderMatchesChecker" enabled="false">
<parameters>
<parameter name="header"><![CDATA[// Copyright (C) 2011-2012 the original author or authors.
@@ -50,17 +57,12 @@
</parameters>
</check>
<check level="warning" class="org.scalastyle.scalariform.EqualsHashCodeChecker" enabled="true"></check>
- <check level="warning" class="org.scalastyle.scalariform.IllegalImportsChecker" enabled="true">
- <parameters>
- <parameter name="illegalImports"><![CDATA[sun._,java.awt._]]></parameter>
- </parameters>
- </check>
<check level="warning" class="org.scalastyle.scalariform.ParameterNumberChecker" enabled="true">
<parameters>
<parameter name="maxParameters"><![CDATA[8]]></parameter>
</parameters>
</check>
- <check level="warning" class="org.scalastyle.scalariform.MagicNumberChecker" enabled="true">
+ <check level="warning" class="org.scalastyle.scalariform.MagicNumberChecker" enabled="false">
<parameters>
<parameter name="ignore"><![CDATA[-1,0,1,2,3]]></parameter>
</parameters>
@@ -114,15 +116,23 @@
<check level="warning" class="org.scalastyle.scalariform.PublicMethodsHaveTypeChecker" enabled="true"></check>
<check level="warning" class="org.scalastyle.file.NewLineAtEofChecker" enabled="true"></check>
<check level="warning" class="org.scalastyle.file.NoNewLineAtEofChecker" enabled="false"></check>
- <check class="org.scalastyle.scalariform.BlockImportChecker" level="warning" enabled="true"/>
-<check class="org.scalastyle.scalariform.ImportOrderChecker" level="warning" enabled="true">
+ <check enabled="true" class="org.scalastyle.scalariform.BlockImportChecker" level="warning"/>
+ <check enabled="true" class="org.scalastyle.scalariform.ImportOrderChecker" level="warning">
<parameters>
<parameter name="groups">java,scala,others</parameter>
<parameter name="group.java">javax?\..+</parameter>
<parameter name="group.scala">scala\..+</parameter>
<parameter name="group.others">.+</parameter>
- <parameter name="maxBlankLines">1</parameter>
- <parameter name="lexicographic">true</parameter>
+ </parameters>
+ </check>
+<check level="warning" class="org.scalastyle.scalariform.IllegalImportsChecker" enabled="true">
+ <parameters>
+ <parameter name="illegalImports"><![CDATA[sun._,java.awt._]]></parameter>
+ </parameters>
+</check>
+<check level="warning" class="org.scalastyle.scalariform.IllegalImportsChecker" enabled="true">
+ <parameters>
+ <parameter name="illegalImports"><![CDATA[sun._,java.awt._]]></parameter>
</parameters>
</check>
</scalastyle>
diff --git a/scalding/src/main/scala/sandcrawler/HBaseBuilder.scala b/scalding/src/main/scala/sandcrawler/HBaseBuilder.scala
index b271def..19df99d 100644
--- a/scalding/src/main/scala/sandcrawler/HBaseBuilder.scala
+++ b/scalding/src/main/scala/sandcrawler/HBaseBuilder.scala
@@ -1,9 +1,12 @@
package sandcrawler
+import scala._
+
+import cascading.tap.SinkMode
import cascading.tuple.Fields
+import parallelai.spyglass.base.JobBase
import parallelai.spyglass.hbase.HBaseConstants.SourceMode
import parallelai.spyglass.hbase.HBaseSource
-import scala._
object HBaseBuilder {
// map from column families to column names
@@ -39,8 +42,8 @@ object HBaseBuilder {
val groupMap: Map[String, List[String]] = colSpecs.groupBy(c => (c split ":")(0))
val families = groupMap.keys.toList
val groupedColNames : List[List[String]] = families map {fam => {
- val cols = {groupMap(fam).map(v => v.split(":")(1))}
- cols}}
+ val cols = {groupMap(fam).map(v => v.split(":")(1))}
+ cols}}
(families, groupedColNames.map({fields => new Fields(fields : _*)}))
}
@@ -48,4 +51,9 @@ object HBaseBuilder {
val (families, fields) = parseColSpecs(colSpecs)
new HBaseSource(table, server, new Fields("key"), families, fields, sourceMode = sourceMode, keyList = keyList)
}
+
+ def buildSink(table: String, server: String, colSpecs: List[String], sinkMode: SinkMode, keyList: List[String] = List("key")) : HBaseSource = {
+ val (families, fields) = parseColSpecs(colSpecs)
+ new HBaseSource(table, server, new Fields("key"), families, fields, sinkMode = sinkMode)
+ }
}
diff --git a/scalding/src/main/scala/sandcrawler/HBaseCountJob.scala b/scalding/src/main/scala/sandcrawler/HBaseCountJob.scala
index 22e4e86..b12e723 100644
--- a/scalding/src/main/scala/sandcrawler/HBaseCountJob.scala
+++ b/scalding/src/main/scala/sandcrawler/HBaseCountJob.scala
@@ -1,9 +1,10 @@
package sandcrawler
+import java.util.Properties
+
import cascading.property.AppProps
import cascading.tuple.Fields
import com.twitter.scalding._
-import java.util.Properties
import parallelai.spyglass.base.JobBase
import parallelai.spyglass.hbase.HBaseConstants.SourceMode
import parallelai.spyglass.hbase.HBasePipeConversions
@@ -14,9 +15,10 @@ class HBaseCountJob(args: Args, colSpec: String) extends JobBase(args) with HBas
HBaseBuilder.parseColSpec(colSpec)
val Col: String = colSpec.split(":")(1)
- HBaseCountJob.getHBaseSource(args("hbase-table"),
- args("zookeeper-hosts"),
- colSpec)
+ HBaseCountJob.getHBaseSource(
+ args("hbase-table"),
+ args("zookeeper-hosts"),
+ colSpec)
.read
.fromBytesWritable(Symbol(Col))
.debug
diff --git a/scalding/src/main/scala/sandcrawler/HBaseRowCountJob.scala b/scalding/src/main/scala/sandcrawler/HBaseRowCountJob.scala
index 6def218..4c3de33 100644
--- a/scalding/src/main/scala/sandcrawler/HBaseRowCountJob.scala
+++ b/scalding/src/main/scala/sandcrawler/HBaseRowCountJob.scala
@@ -1,9 +1,10 @@
package sandcrawler
+import java.util.Properties
+
import cascading.property.AppProps
import cascading.tuple.Fields
import com.twitter.scalding._
-import java.util.Properties
import parallelai.spyglass.base.JobBase
import parallelai.spyglass.hbase.HBaseConstants.SourceMode
import parallelai.spyglass.hbase.HBasePipeConversions
@@ -13,8 +14,9 @@ class HBaseRowCountJob(args: Args) extends JobBase(args) with HBasePipeConversio
val output = args("output")
- HBaseRowCountJob.getHBaseSource(args("hbase-table"),
- args("zookeeper-hosts"))
+ HBaseRowCountJob.getHBaseSource(
+ args("hbase-table"),
+ args("zookeeper-hosts"))
.read
.debug
.groupAll { _.size('count) }
@@ -31,5 +33,4 @@ object HBaseRowCountJob {
List("file:size"),
SourceMode.SCAN_ALL)
}
-
}
diff --git a/scalding/src/main/scala/sandcrawler/HBaseStatusCountJob.scala b/scalding/src/main/scala/sandcrawler/HBaseStatusCountJob.scala
index b1dab0e..fd0b4e2 100644
--- a/scalding/src/main/scala/sandcrawler/HBaseStatusCountJob.scala
+++ b/scalding/src/main/scala/sandcrawler/HBaseStatusCountJob.scala
@@ -1,18 +1,24 @@
package sandcrawler
-import com.twitter.scalding.Args
+import java.util.Properties
+
+import cascading.property.AppProps
+import cascading.tuple.Fields
import com.twitter.scalding._
import com.twitter.scalding.typed.TDsl._
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.util.Bytes
import parallelai.spyglass.base.JobBase
+import parallelai.spyglass.hbase.HBaseConstants.SourceMode
import parallelai.spyglass.hbase.HBasePipeConversions
+import parallelai.spyglass.hbase.HBaseSource
class HBaseStatusCountJob(args: Args) extends JobBase(args) with HBasePipeConversions {
- val source = HBaseCountJob.getHBaseSource(args("hbase-table"),
- args("zookeeper-hosts"),
- "grobid0:status_code")
+ val source = HBaseCountJob.getHBaseSource(
+ args("hbase-table"),
+ args("zookeeper-hosts"),
+ "grobid0:status_code")
val statusPipe : TypedPipe[Long] = source
.read
diff --git a/scalding/src/test/scala/sandcrawler/HBaseStatusCountTest.scala b/scalding/src/test/scala/sandcrawler/HBaseStatusCountTest.scala
index 11ab1d0..d7689cd 100644
--- a/scalding/src/test/scala/sandcrawler/HBaseStatusCountTest.scala
+++ b/scalding/src/test/scala/sandcrawler/HBaseStatusCountTest.scala
@@ -48,18 +48,18 @@ class HBaseStatusCountTest extends FunSpec with TupleConversions {
.arg("debug", "true")
.source[Tuple](HBaseCountJob.getHBaseSource(testTable, testHost, "grobid0:status_code"),
sampleData.map(l => new Tuple(l.map(s => {new ImmutableBytesWritable(s)}):_*)))
- .sink[Tuple](TypedTsv[(Long, Long)](output)) {
- outputBuffer =>
- it("should return a 2-element list.") {
- assert(outputBuffer.size === 2)
- }
+ .sink[Tuple](TypedTsv[(Long, Long)](output)) {
+ outputBuffer =>
+ it("should return a 2-element list.") {
+ assert(outputBuffer.size === 2)
+ }
- // Convert List[Tuple] to Map[Long, Long].
- val counts = outputBuffer.map(t => (t.getLong(0), t.getLong(1))).toMap
- it("should have the appropriate number of each status type") {
- assert(counts(statusType1) == statusType1Count)
- assert(counts(statusType2) == statusType2Count)
- }
+ // Convert List[Tuple] to Map[Long, Long].
+ val counts = outputBuffer.map(t => (t.getLong(0), t.getLong(1))).toMap
+ it("should have the appropriate number of each status type") {
+ assert(counts(statusType1) == statusType1Count)
+ assert(counts(statusType2) == statusType2Count)
+ }
}
.run
.finish