aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorgalarragas <galarragas@gmail.com>2014-02-24 15:21:28 +0000
committergalarragas <galarragas@gmail.com>2014-02-24 15:21:28 +0000
commit42249fad9938f5d643c53fe8d65375beed8a4de4 (patch)
tree14339e6cbd1a65a493deed10a2287e0adbfbd80c
parentd09d29b7f301d8eca613dff0e62f658bcee536bc (diff)
downloadSpyGlass-42249fad9938f5d643c53fe8d65375beed8a4de4.tar.gz
SpyGlass-42249fad9938f5d643c53fe8d65375beed8a4de4.zip
Code review. Removed some commented code and some warning
-rw-r--r--src/main/scala/parallelai/spyglass/hbase/HBaseConversions.scala17
-rw-r--r--src/main/scala/parallelai/spyglass/hbase/HBaseRawSource.scala83
-rw-r--r--src/main/scala/parallelai/spyglass/hbase/HBaseSource.scala53
-rw-r--r--src/main/scala/parallelai/spyglass/hbase/example/SimpleHBaseSourceExample.scala5
-rw-r--r--src/main/scala/parallelai/spyglass/hbase/example/SimpleHBaseSourceRunner.scala15
-rw-r--r--src/main/scala/parallelai/spyglass/jdbc/JDBCSource.scala12
6 files changed, 17 insertions, 168 deletions
diff --git a/src/main/scala/parallelai/spyglass/hbase/HBaseConversions.scala b/src/main/scala/parallelai/spyglass/hbase/HBaseConversions.scala
index 31ed3ea..debc66c 100644
--- a/src/main/scala/parallelai/spyglass/hbase/HBaseConversions.scala
+++ b/src/main/scala/parallelai/spyglass/hbase/HBaseConversions.scala
@@ -19,14 +19,6 @@ class HBasePipeWrapper (pipe: Pipe) {
}
}
-// def toBytesWritable : Pipe = {
-// asList(Fields.ALL.asInstanceOf[TupleEntry].getFields()).foldLeft(pipe){ (p, f) => {
-// p.map(f.toString -> f.toString){ from: String => {
-// new ImmutableBytesWritable(Bytes.toBytes(from))
-// }}
-// }}
-// }
-
def fromBytesWritable(f: Fields): Pipe = {
asList(f)
.foldLeft(pipe) { (p, fld) => {
@@ -35,15 +27,6 @@ class HBasePipeWrapper (pipe: Pipe) {
}
}}
}
-
-// def fromBytesWritable : Pipe = {
-// asList(Fields.ALL.asInstanceOf[TupleEntry].getFields()).foldLeft(pipe) { (p, fld) =>
-// p.map(fld.toString -> fld.toString) { from: ImmutableBytesWritable => {
-// Bytes.toString(from.get)
-// }
-// }
-// }
-// }
}
trait HBasePipeConversions {
diff --git a/src/main/scala/parallelai/spyglass/hbase/HBaseRawSource.scala b/src/main/scala/parallelai/spyglass/hbase/HBaseRawSource.scala
deleted file mode 100644
index 6216695..0000000
--- a/src/main/scala/parallelai/spyglass/hbase/HBaseRawSource.scala
+++ /dev/null
@@ -1,83 +0,0 @@
-//package parallelai.spyglass.hbase
-//
-//import cascading.pipe.Pipe
-//import cascading.pipe.assembly.Coerce
-//import cascading.scheme.Scheme
-//import cascading.tap.{ Tap, SinkMode }
-//import cascading.tuple.Fields
-//import org.apache.hadoop.mapred.{ RecordReader, OutputCollector, JobConf }
-//import org.apache.hadoop.hbase.util.Bytes
-//import scala.collection.JavaConversions._
-//import scala.collection.mutable.WrappedArray
-//import com.twitter.scalding._
-//import org.apache.hadoop.hbase.io.ImmutableBytesWritable
-//import org.apache.hadoop.hbase.client.Scan
-//import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil
-//import org.apache.hadoop.hbase.util.Base64
-//import java.io.ByteArrayOutputStream
-//import java.io.DataOutputStream
-//
-//object HBaseRawSource {
-// /**
-// * Converts a scan object to a base64 string that can be passed to HBaseRawSource
-// * @param scan
-// * @return base64 string representation
-// */
-// def convertScanToString(scan: Scan) = {
-// val out = new ByteArrayOutputStream();
-// val dos = new DataOutputStream(out);
-// scan.write(dos);
-// Base64.encodeBytes(out.toByteArray());
-// }
-//}
-//
-//
-///**
-// * @author Rotem Hermon
-// *
-// * HBaseRawSource is a scalding source that passes the original row (Result) object to the
-// * mapper for customized processing.
-// *
-// * @param tableName The name of the HBase table to read
-// * @param quorumNames HBase quorum
-// * @param familyNames Column families to get (source, if null will get all) or update to (sink)
-// * @param writeNulls Should the sink write null values. default = true. If false, null columns will not be written
-// * @param base64Scan An optional base64 encoded scan object
-// * @param sinkMode If REPLACE the output table will be deleted before writing to
-// *
-// */
-//class HBaseRawSource(
-// tableName: String,
-// quorumNames: String = "localhost",
-// familyNames: Array[String],
-// writeNulls: Boolean = true,
-// base64Scan: String = null,
-// sinkMode: SinkMode = null) extends Source {
-//
-// override val hdfsScheme = new HBaseRawScheme(familyNames, writeNulls)
-// .asInstanceOf[Scheme[JobConf, RecordReader[_, _], OutputCollector[_, _], _, _]]
-//
-// override def createTap(readOrWrite: AccessMode)(implicit mode: Mode): Tap[_, _, _] = {
-// val hBaseScheme = hdfsScheme match {
-// case hbase: HBaseRawScheme => hbase
-// case _ => throw new ClassCastException("Failed casting from Scheme to HBaseRawScheme")
-// }
-// mode match {
-// case hdfsMode @ Hdfs(_, _) => readOrWrite match {
-// case Read => {
-// new HBaseRawTap(quorumNames, tableName, hBaseScheme, base64Scan, sinkMode match {
-// case null => SinkMode.KEEP
-// case _ => sinkMode
-// }).asInstanceOf[Tap[_, _, _]]
-// }
-// case Write => {
-// new HBaseRawTap(quorumNames, tableName, hBaseScheme, base64Scan, sinkMode match {
-// case null => SinkMode.UPDATE
-// case _ => sinkMode
-// }).asInstanceOf[Tap[_, _, _]]
-// }
-// }
-// case _ => super.createTap(readOrWrite)(mode)
-// }
-// }
-//}
diff --git a/src/main/scala/parallelai/spyglass/hbase/HBaseSource.scala b/src/main/scala/parallelai/spyglass/hbase/HBaseSource.scala
index dc87a4b..957258c 100644
--- a/src/main/scala/parallelai/spyglass/hbase/HBaseSource.scala
+++ b/src/main/scala/parallelai/spyglass/hbase/HBaseSource.scala
@@ -57,7 +57,6 @@ case class HBaseSource(
// To enable local mode testing
val allFields = keyFields.append(valueFields.toArray)
- //def localScheme = new NullScheme(allFields, allFields)
type LocalScheme = Scheme[Properties, InputStream, OutputStream, _, _]
def localScheme = new NullScheme[Properties, InputStream, OutputStream, Any, Any] (allFields, allFields)
@@ -68,7 +67,7 @@ case class HBaseSource(
case _ => throw new ClassCastException("Failed casting from Scheme to HBaseScheme")
}
mode match {
- case hdfsMode @ Hdfs(_, _) => readOrWrite match {
+ case Hdfs(_, _) => readOrWrite match {
case Read => {
val hbt = new HBaseTap(quorumNames, tableName, hBaseScheme, SinkMode.KEEP)
@@ -120,14 +119,14 @@ case class HBaseSource(
// TODO MemoryTap could probably be rewritten not to require localScheme, and just fields
new MemoryTap[InputStream, OutputStream](localScheme, buffer)
}*/
- case testMode @ Test(buffer) => readOrWrite match {
+ case Test(buffer) => readOrWrite match {
case Read => {
- val hbt = new MemoryTap[InputStream, OutputStream](localScheme, buffer.apply(this).get) //new HBaseTap(quorumNames, tableName, localScheme, SinkMode.KEEP)
+ val hbt = new MemoryTap[InputStream, OutputStream](localScheme, buffer.apply(this).get)
hbt.asInstanceOf[Tap[_,_,_]]
}
case Write => {
- val hbt = new MemoryTap[InputStream, OutputStream](localScheme, buffer.apply(this).get) //new HBaseTap(quorumNames, tableName, localScheme, SinkMode.KEEP)
+ val hbt = new MemoryTap[InputStream, OutputStream](localScheme, buffer.apply(this).get)
hbt.asInstanceOf[Tap[_,_,_]]
}
}
@@ -138,48 +137,6 @@ case class HBaseSource(
def createEmptyTap(readOrWrite : AccessMode)(mode : Mode) : Tap[_,_,_] = {
- mode match {
- case _ => {
- throw new RuntimeException("Source: (" + toString + ") doesn't support mode: " + mode.toString)
- }
- }
+ throw new RuntimeException("Source: (" + toString + ") doesn't support mode: " + mode.toString)
}
-
- /**def createTaps(readOrWrite : AccessMode)(implicit mode : Mode) : Tap[_,_,_] = {
- mode match {
- case Test(buffers) => {
- /*
- * There MUST have already been a registered sink or source in the Test mode.
- * to access this. You must explicitly name each of your test sources in your
- * JobTest.
- */
- val buffer =
- if (readOrWrite == Write) {
- val buf = buffers(this)
- //Make sure we wipe it out:
- buf.clear()
- buf
- } else {
- // if the source is also used as a sink, we don't want its contents to get modified
- buffers(this).clone()
- }
- // TODO MemoryTap could probably be rewritten not to require localScheme, and just fields
- new MemoryTap[InputStream, OutputStream](localScheme, buffer)
- }
- case hdfsTest @ HadoopTest(conf, buffers) => readOrWrite match {
- case Read => {
- val buffer = buffers(this)
- val fields = hdfsScheme.getSourceFields
- (new MemorySourceTap(buffer.toList.asJava, fields)).asInstanceOf[Tap[JobConf,_,_]]
- }
- case Write => {
- val path = hdfsTest.getWritePathFor(this)
- castHfsTap(new Hfs(hdfsScheme, path, SinkMode.REPLACE))
- }
- }
- case _ => {
- throw new RuntimeException("Source: (" + toString + ") doesn't support mode: " + mode.toString)
- }
- }
- } */
}
diff --git a/src/main/scala/parallelai/spyglass/hbase/example/SimpleHBaseSourceExample.scala b/src/main/scala/parallelai/spyglass/hbase/example/SimpleHBaseSourceExample.scala
index 6e56c52..9deedaf 100644
--- a/src/main/scala/parallelai/spyglass/hbase/example/SimpleHBaseSourceExample.scala
+++ b/src/main/scala/parallelai/spyglass/hbase/example/SimpleHBaseSourceExample.scala
@@ -19,10 +19,7 @@ class SimpleHBaseSourceExample(args: Args) extends JobBase(args) with HBasePipeC
if (isDebug) Logger.getRootLogger.setLevel(Level.DEBUG)
val output = args("output")
-
-// val properties = new Properties()
-// AppProps.setApplicationJarClass( properties, classOf[SimpleHBaseSourceExample] );
-
+
val hbs = new HBaseSource(
"table_name",
"quorum_name:2181",
diff --git a/src/main/scala/parallelai/spyglass/hbase/example/SimpleHBaseSourceRunner.scala b/src/main/scala/parallelai/spyglass/hbase/example/SimpleHBaseSourceRunner.scala
index bbcf96d..12f1982 100644
--- a/src/main/scala/parallelai/spyglass/hbase/example/SimpleHBaseSourceRunner.scala
+++ b/src/main/scala/parallelai/spyglass/hbase/example/SimpleHBaseSourceRunner.scala
@@ -10,17 +10,16 @@ object SimpleHBaseSourceRunner extends App {
val log = LoggerFactory.getLogger(this.getClass.getName)
-
-
log.info("Starting HBaseSource Import Process Test...")
val start1 = System.currentTimeMillis
- JobRunner.main((classOf[SimpleHBaseSourceExample].getName :: mArgs.toList).toArray)
-
- val end = System.currentTimeMillis
+ try {
+ JobRunner.main((classOf[SimpleHBaseSourceExample].getName :: mArgs.toList).toArray)
+ } finally {
+ val end = System.currentTimeMillis
- log.info("HBaseSource Import process finished successfully.")
- log.info("HBaseSource Import process : " + (end - start1) + " milliseconds to complete")
-
+ log.info("HBaseSource Import process finished successfully.")
+ log.info("HBaseSource Import process : " + (end - start1) + " milliseconds to complete")
+ }
} \ No newline at end of file
diff --git a/src/main/scala/parallelai/spyglass/jdbc/JDBCSource.scala b/src/main/scala/parallelai/spyglass/jdbc/JDBCSource.scala
index 2472eda..1abad55 100644
--- a/src/main/scala/parallelai/spyglass/jdbc/JDBCSource.scala
+++ b/src/main/scala/parallelai/spyglass/jdbc/JDBCSource.scala
@@ -47,7 +47,7 @@ case class JDBCSource(
case _ => throw new ClassCastException("Failed casting from Scheme to JDBCScheme")
}
mode match {
- case hdfsMode @ Hdfs(_, _) => readOrWrite match {
+ case Hdfs(_, _) => readOrWrite match {
case Read => {
val tableDesc = new TableDesc(tableName, columnNames.toArray, columnDefs.toArray, primaryKeys.toArray)
val jdbcTap = new JDBCTap(connectionString, userId, password, driverName, tableDesc, jdbcScheme)
@@ -60,7 +60,7 @@ case class JDBCSource(
jdbcTap.asInstanceOf[Tap[_,_,_]]
}
}
- case testMode @ Test(buffer) => readOrWrite match {
+ case Test(buffer) => readOrWrite match {
case Read => {
val hbt = new MemoryTap[InputStream, OutputStream](localScheme, buffer.apply(this).get)
@@ -76,10 +76,6 @@ case class JDBCSource(
}
def createEmptyTap(readOrWrite : AccessMode)(mode : Mode) : Tap[_,_,_] = {
- mode match {
- case _ => {
- throw new RuntimeException("Source: (" + toString + ") doesn't support mode: " + mode.toString)
- }
- }
- }
+ throw new RuntimeException("Source: (" + toString + ") doesn't support mode: " + mode.toString)
+ }
}