From 42249fad9938f5d643c53fe8d65375beed8a4de4 Mon Sep 17 00:00:00 2001 From: galarragas Date: Mon, 24 Feb 2014 15:21:28 +0000 Subject: Code review. Removed some commented code and some warning --- .../spyglass/hbase/HBaseConversions.scala | 17 ----- .../parallelai/spyglass/hbase/HBaseRawSource.scala | 83 ---------------------- .../parallelai/spyglass/hbase/HBaseSource.scala | 53 ++------------ .../hbase/example/SimpleHBaseSourceExample.scala | 5 +- .../hbase/example/SimpleHBaseSourceRunner.scala | 15 ++-- .../parallelai/spyglass/jdbc/JDBCSource.scala | 12 ++-- 6 files changed, 17 insertions(+), 168 deletions(-) delete mode 100644 src/main/scala/parallelai/spyglass/hbase/HBaseRawSource.scala diff --git a/src/main/scala/parallelai/spyglass/hbase/HBaseConversions.scala b/src/main/scala/parallelai/spyglass/hbase/HBaseConversions.scala index 31ed3ea..debc66c 100644 --- a/src/main/scala/parallelai/spyglass/hbase/HBaseConversions.scala +++ b/src/main/scala/parallelai/spyglass/hbase/HBaseConversions.scala @@ -19,14 +19,6 @@ class HBasePipeWrapper (pipe: Pipe) { } } -// def toBytesWritable : Pipe = { -// asList(Fields.ALL.asInstanceOf[TupleEntry].getFields()).foldLeft(pipe){ (p, f) => { -// p.map(f.toString -> f.toString){ from: String => { -// new ImmutableBytesWritable(Bytes.toBytes(from)) -// }} -// }} -// } - def fromBytesWritable(f: Fields): Pipe = { asList(f) .foldLeft(pipe) { (p, fld) => { @@ -35,15 +27,6 @@ class HBasePipeWrapper (pipe: Pipe) { } }} } - -// def fromBytesWritable : Pipe = { -// asList(Fields.ALL.asInstanceOf[TupleEntry].getFields()).foldLeft(pipe) { (p, fld) => -// p.map(fld.toString -> fld.toString) { from: ImmutableBytesWritable => { -// Bytes.toString(from.get) -// } -// } -// } -// } } trait HBasePipeConversions { diff --git a/src/main/scala/parallelai/spyglass/hbase/HBaseRawSource.scala b/src/main/scala/parallelai/spyglass/hbase/HBaseRawSource.scala deleted file mode 100644 index 6216695..0000000 --- a/src/main/scala/parallelai/spyglass/hbase/HBaseRawSource.scala +++ /dev/null @@ -1,83 +0,0 @@ -//package parallelai.spyglass.hbase -// -//import cascading.pipe.Pipe -//import cascading.pipe.assembly.Coerce -//import cascading.scheme.Scheme -//import cascading.tap.{ Tap, SinkMode } -//import cascading.tuple.Fields -//import org.apache.hadoop.mapred.{ RecordReader, OutputCollector, JobConf } -//import org.apache.hadoop.hbase.util.Bytes -//import scala.collection.JavaConversions._ -//import scala.collection.mutable.WrappedArray -//import com.twitter.scalding._ -//import org.apache.hadoop.hbase.io.ImmutableBytesWritable -//import org.apache.hadoop.hbase.client.Scan -//import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil -//import org.apache.hadoop.hbase.util.Base64 -//import java.io.ByteArrayOutputStream -//import java.io.DataOutputStream -// -//object HBaseRawSource { -// /** -// * Converts a scan object to a base64 string that can be passed to HBaseRawSource -// * @param scan -// * @return base64 string representation -// */ -// def convertScanToString(scan: Scan) = { -// val out = new ByteArrayOutputStream(); -// val dos = new DataOutputStream(out); -// scan.write(dos); -// Base64.encodeBytes(out.toByteArray()); -// } -//} -// -// -///** -// * @author Rotem Hermon -// * -// * HBaseRawSource is a scalding source that passes the original row (Result) object to the -// * mapper for customized processing. -// * -// * @param tableName The name of the HBase table to read -// * @param quorumNames HBase quorum -// * @param familyNames Column families to get (source, if null will get all) or update to (sink) -// * @param writeNulls Should the sink write null values. default = true. If false, null columns will not be written -// * @param base64Scan An optional base64 encoded scan object -// * @param sinkMode If REPLACE the output table will be deleted before writing to -// * -// */ -//class HBaseRawSource( -// tableName: String, -// quorumNames: String = "localhost", -// familyNames: Array[String], -// writeNulls: Boolean = true, -// base64Scan: String = null, -// sinkMode: SinkMode = null) extends Source { -// -// override val hdfsScheme = new HBaseRawScheme(familyNames, writeNulls) -// .asInstanceOf[Scheme[JobConf, RecordReader[_, _], OutputCollector[_, _], _, _]] -// -// override def createTap(readOrWrite: AccessMode)(implicit mode: Mode): Tap[_, _, _] = { -// val hBaseScheme = hdfsScheme match { -// case hbase: HBaseRawScheme => hbase -// case _ => throw new ClassCastException("Failed casting from Scheme to HBaseRawScheme") -// } -// mode match { -// case hdfsMode @ Hdfs(_, _) => readOrWrite match { -// case Read => { -// new HBaseRawTap(quorumNames, tableName, hBaseScheme, base64Scan, sinkMode match { -// case null => SinkMode.KEEP -// case _ => sinkMode -// }).asInstanceOf[Tap[_, _, _]] -// } -// case Write => { -// new HBaseRawTap(quorumNames, tableName, hBaseScheme, base64Scan, sinkMode match { -// case null => SinkMode.UPDATE -// case _ => sinkMode -// }).asInstanceOf[Tap[_, _, _]] -// } -// } -// case _ => super.createTap(readOrWrite)(mode) -// } -// } -//} diff --git a/src/main/scala/parallelai/spyglass/hbase/HBaseSource.scala b/src/main/scala/parallelai/spyglass/hbase/HBaseSource.scala index dc87a4b..957258c 100644 --- a/src/main/scala/parallelai/spyglass/hbase/HBaseSource.scala +++ b/src/main/scala/parallelai/spyglass/hbase/HBaseSource.scala @@ -57,7 +57,6 @@ case class HBaseSource( // To enable local mode testing val allFields = keyFields.append(valueFields.toArray) - //def localScheme = new NullScheme(allFields, allFields) type LocalScheme = Scheme[Properties, InputStream, OutputStream, _, _] def localScheme = new NullScheme[Properties, InputStream, OutputStream, Any, Any] (allFields, allFields) @@ -68,7 +67,7 @@ case class HBaseSource( case _ => throw new ClassCastException("Failed casting from Scheme to HBaseScheme") } mode match { - case hdfsMode @ Hdfs(_, _) => readOrWrite match { + case Hdfs(_, _) => readOrWrite match { case Read => { val hbt = new HBaseTap(quorumNames, tableName, hBaseScheme, SinkMode.KEEP) @@ -120,14 +119,14 @@ case class HBaseSource( // TODO MemoryTap could probably be rewritten not to require localScheme, and just fields new MemoryTap[InputStream, OutputStream](localScheme, buffer) }*/ - case testMode @ Test(buffer) => readOrWrite match { + case Test(buffer) => readOrWrite match { case Read => { - val hbt = new MemoryTap[InputStream, OutputStream](localScheme, buffer.apply(this).get) //new HBaseTap(quorumNames, tableName, localScheme, SinkMode.KEEP) + val hbt = new MemoryTap[InputStream, OutputStream](localScheme, buffer.apply(this).get) hbt.asInstanceOf[Tap[_,_,_]] } case Write => { - val hbt = new MemoryTap[InputStream, OutputStream](localScheme, buffer.apply(this).get) //new HBaseTap(quorumNames, tableName, localScheme, SinkMode.KEEP) + val hbt = new MemoryTap[InputStream, OutputStream](localScheme, buffer.apply(this).get) hbt.asInstanceOf[Tap[_,_,_]] } } @@ -138,48 +137,6 @@ case class HBaseSource( def createEmptyTap(readOrWrite : AccessMode)(mode : Mode) : Tap[_,_,_] = { - mode match { - case _ => { - throw new RuntimeException("Source: (" + toString + ") doesn't support mode: " + mode.toString) - } - } + throw new RuntimeException("Source: (" + toString + ") doesn't support mode: " + mode.toString) } - - /**def createTaps(readOrWrite : AccessMode)(implicit mode : Mode) : Tap[_,_,_] = { - mode match { - case Test(buffers) => { - /* - * There MUST have already been a registered sink or source in the Test mode. - * to access this. You must explicitly name each of your test sources in your - * JobTest. - */ - val buffer = - if (readOrWrite == Write) { - val buf = buffers(this) - //Make sure we wipe it out: - buf.clear() - buf - } else { - // if the source is also used as a sink, we don't want its contents to get modified - buffers(this).clone() - } - // TODO MemoryTap could probably be rewritten not to require localScheme, and just fields - new MemoryTap[InputStream, OutputStream](localScheme, buffer) - } - case hdfsTest @ HadoopTest(conf, buffers) => readOrWrite match { - case Read => { - val buffer = buffers(this) - val fields = hdfsScheme.getSourceFields - (new MemorySourceTap(buffer.toList.asJava, fields)).asInstanceOf[Tap[JobConf,_,_]] - } - case Write => { - val path = hdfsTest.getWritePathFor(this) - castHfsTap(new Hfs(hdfsScheme, path, SinkMode.REPLACE)) - } - } - case _ => { - throw new RuntimeException("Source: (" + toString + ") doesn't support mode: " + mode.toString) - } - } - } */ } diff --git a/src/main/scala/parallelai/spyglass/hbase/example/SimpleHBaseSourceExample.scala b/src/main/scala/parallelai/spyglass/hbase/example/SimpleHBaseSourceExample.scala index 6e56c52..9deedaf 100644 --- a/src/main/scala/parallelai/spyglass/hbase/example/SimpleHBaseSourceExample.scala +++ b/src/main/scala/parallelai/spyglass/hbase/example/SimpleHBaseSourceExample.scala @@ -19,10 +19,7 @@ class SimpleHBaseSourceExample(args: Args) extends JobBase(args) with HBasePipeC if (isDebug) Logger.getRootLogger.setLevel(Level.DEBUG) val output = args("output") - -// val properties = new Properties() -// AppProps.setApplicationJarClass( properties, classOf[SimpleHBaseSourceExample] ); - + val hbs = new HBaseSource( "table_name", "quorum_name:2181", diff --git a/src/main/scala/parallelai/spyglass/hbase/example/SimpleHBaseSourceRunner.scala b/src/main/scala/parallelai/spyglass/hbase/example/SimpleHBaseSourceRunner.scala index bbcf96d..12f1982 100644 --- a/src/main/scala/parallelai/spyglass/hbase/example/SimpleHBaseSourceRunner.scala +++ b/src/main/scala/parallelai/spyglass/hbase/example/SimpleHBaseSourceRunner.scala @@ -10,17 +10,16 @@ object SimpleHBaseSourceRunner extends App { val log = LoggerFactory.getLogger(this.getClass.getName) - - log.info("Starting HBaseSource Import Process Test...") val start1 = System.currentTimeMillis - JobRunner.main((classOf[SimpleHBaseSourceExample].getName :: mArgs.toList).toArray) - - val end = System.currentTimeMillis + try { + JobRunner.main((classOf[SimpleHBaseSourceExample].getName :: mArgs.toList).toArray) + } finally { + val end = System.currentTimeMillis - log.info("HBaseSource Import process finished successfully.") - log.info("HBaseSource Import process : " + (end - start1) + " milliseconds to complete") - + log.info("HBaseSource Import process finished successfully.") + log.info("HBaseSource Import process : " + (end - start1) + " milliseconds to complete") + } } \ No newline at end of file diff --git a/src/main/scala/parallelai/spyglass/jdbc/JDBCSource.scala b/src/main/scala/parallelai/spyglass/jdbc/JDBCSource.scala index 2472eda..1abad55 100644 --- a/src/main/scala/parallelai/spyglass/jdbc/JDBCSource.scala +++ b/src/main/scala/parallelai/spyglass/jdbc/JDBCSource.scala @@ -47,7 +47,7 @@ case class JDBCSource( case _ => throw new ClassCastException("Failed casting from Scheme to JDBCScheme") } mode match { - case hdfsMode @ Hdfs(_, _) => readOrWrite match { + case Hdfs(_, _) => readOrWrite match { case Read => { val tableDesc = new TableDesc(tableName, columnNames.toArray, columnDefs.toArray, primaryKeys.toArray) val jdbcTap = new JDBCTap(connectionString, userId, password, driverName, tableDesc, jdbcScheme) @@ -60,7 +60,7 @@ case class JDBCSource( jdbcTap.asInstanceOf[Tap[_,_,_]] } } - case testMode @ Test(buffer) => readOrWrite match { + case Test(buffer) => readOrWrite match { case Read => { val hbt = new MemoryTap[InputStream, OutputStream](localScheme, buffer.apply(this).get) @@ -76,10 +76,6 @@ case class JDBCSource( } def createEmptyTap(readOrWrite : AccessMode)(mode : Mode) : Tap[_,_,_] = { - mode match { - case _ => { - throw new RuntimeException("Source: (" + toString + ") doesn't support mode: " + mode.toString) - } - } - } + throw new RuntimeException("Source: (" + toString + ") doesn't support mode: " + mode.toString) + } } -- cgit v1.2.3