diff options
Diffstat (limited to 'src/main/scala/parallelai')
6 files changed, 17 insertions, 168 deletions
diff --git a/src/main/scala/parallelai/spyglass/hbase/HBaseConversions.scala b/src/main/scala/parallelai/spyglass/hbase/HBaseConversions.scala index 31ed3ea..debc66c 100644 --- a/src/main/scala/parallelai/spyglass/hbase/HBaseConversions.scala +++ b/src/main/scala/parallelai/spyglass/hbase/HBaseConversions.scala @@ -19,14 +19,6 @@ class HBasePipeWrapper (pipe: Pipe) {        }      } -//   def toBytesWritable : Pipe = { -//	  asList(Fields.ALL.asInstanceOf[TupleEntry].getFields()).foldLeft(pipe){ (p, f) => { -//	    p.map(f.toString -> f.toString){ from: String => { -//	      new ImmutableBytesWritable(Bytes.toBytes(from)) -//	    }} -//	  }}  -//	} -  	def fromBytesWritable(f: Fields): Pipe = {  	  asList(f)  	    .foldLeft(pipe) { (p, fld) => { @@ -35,15 +27,6 @@ class HBasePipeWrapper (pipe: Pipe) {            }          }}      } - -//	def fromBytesWritable : Pipe = { -//	  asList(Fields.ALL.asInstanceOf[TupleEntry].getFields()).foldLeft(pipe) { (p, fld) => -//	    p.map(fld.toString -> fld.toString) { from: ImmutableBytesWritable => { -//	    	Bytes.toString(from.get) -//	      } -//	    } -//	  } -//	}  }  trait HBasePipeConversions { diff --git a/src/main/scala/parallelai/spyglass/hbase/HBaseRawSource.scala b/src/main/scala/parallelai/spyglass/hbase/HBaseRawSource.scala deleted file mode 100644 index 6216695..0000000 --- a/src/main/scala/parallelai/spyglass/hbase/HBaseRawSource.scala +++ /dev/null @@ -1,83 +0,0 @@ -//package parallelai.spyglass.hbase -// -//import cascading.pipe.Pipe -//import cascading.pipe.assembly.Coerce -//import cascading.scheme.Scheme -//import cascading.tap.{ Tap, SinkMode } -//import cascading.tuple.Fields -//import org.apache.hadoop.mapred.{ RecordReader, OutputCollector, JobConf } -//import org.apache.hadoop.hbase.util.Bytes -//import scala.collection.JavaConversions._ -//import scala.collection.mutable.WrappedArray -//import com.twitter.scalding._ -//import org.apache.hadoop.hbase.io.ImmutableBytesWritable -//import org.apache.hadoop.hbase.client.Scan -//import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil -//import org.apache.hadoop.hbase.util.Base64 -//import java.io.ByteArrayOutputStream -//import java.io.DataOutputStream -// -//object HBaseRawSource { -//	/** -//	 * Converts a scan object to a base64 string that can be passed to HBaseRawSource -//	 * @param scan -//	 * @return base64 string representation -//	 */ -//	def convertScanToString(scan: Scan) = { -//		val out = new ByteArrayOutputStream(); -//		val dos = new DataOutputStream(out); -//		scan.write(dos); -//		Base64.encodeBytes(out.toByteArray()); -//	} -//} -// -// -///** -// * @author Rotem Hermon -// * -// * HBaseRawSource is a scalding source that passes the original row (Result) object to the -// * mapper for customized processing. -// * -// * @param	tableName	The name of the HBase table to read -// * @param	quorumNames	HBase quorum -// * @param	familyNames	Column families to get (source, if null will get all) or update to (sink) -// * @param	writeNulls	Should the sink write null values. default = true. If false, null columns will not be written -// * @param	base64Scan	An optional base64 encoded scan object -// * @param	sinkMode	If REPLACE the output table will be deleted before writing to -// * -// */ -//class HBaseRawSource( -//	tableName: String, -//	quorumNames: String = "localhost", -//	familyNames: Array[String], -//	writeNulls: Boolean = true, -//	base64Scan: String = null, -//	sinkMode: SinkMode = null) extends Source { -// -//	override val hdfsScheme = new HBaseRawScheme(familyNames, writeNulls) -//		.asInstanceOf[Scheme[JobConf, RecordReader[_, _], OutputCollector[_, _], _, _]] -// -//	override def createTap(readOrWrite: AccessMode)(implicit mode: Mode): Tap[_, _, _] = { -//		val hBaseScheme = hdfsScheme match { -//			case hbase: HBaseRawScheme => hbase -//			case _ => throw new ClassCastException("Failed casting from Scheme to HBaseRawScheme") -//		} -//		mode match { -//			case hdfsMode @ Hdfs(_, _) => readOrWrite match { -//				case Read => { -//					new HBaseRawTap(quorumNames, tableName, hBaseScheme, base64Scan, sinkMode match { -//						case null => SinkMode.KEEP -//						case _ => sinkMode -//					}).asInstanceOf[Tap[_, _, _]] -//				} -//				case Write => { -//					new HBaseRawTap(quorumNames, tableName, hBaseScheme, base64Scan, sinkMode match { -//						case null => SinkMode.UPDATE -//						case _ => sinkMode -//					}).asInstanceOf[Tap[_, _, _]] -//				} -//			} -//			case _ => super.createTap(readOrWrite)(mode) -//		} -//	} -//} diff --git a/src/main/scala/parallelai/spyglass/hbase/HBaseSource.scala b/src/main/scala/parallelai/spyglass/hbase/HBaseSource.scala index dc87a4b..957258c 100644 --- a/src/main/scala/parallelai/spyglass/hbase/HBaseSource.scala +++ b/src/main/scala/parallelai/spyglass/hbase/HBaseSource.scala @@ -57,7 +57,6 @@ case class HBaseSource(    // To enable local mode testing    val allFields = keyFields.append(valueFields.toArray) -  //def localScheme = new NullScheme(allFields, allFields)    type LocalScheme = Scheme[Properties, InputStream, OutputStream, _, _]    def localScheme = new NullScheme[Properties, InputStream, OutputStream, Any, Any] (allFields, allFields)   @@ -68,7 +67,7 @@ case class HBaseSource(        case _ => throw new ClassCastException("Failed casting from Scheme to HBaseScheme")      }       mode match {  -      case hdfsMode @ Hdfs(_, _) => readOrWrite match { +      case Hdfs(_, _) => readOrWrite match {          case Read => {             val hbt = new HBaseTap(quorumNames, tableName, hBaseScheme, SinkMode.KEEP) @@ -120,14 +119,14 @@ case class HBaseSource(          // TODO MemoryTap could probably be rewritten not to require localScheme, and just fields          new MemoryTap[InputStream, OutputStream](localScheme, buffer)        }*/       -      case testMode @ Test(buffer) => readOrWrite match { +      case Test(buffer) => readOrWrite match {          case Read => {  -          val hbt = new MemoryTap[InputStream, OutputStream](localScheme, buffer.apply(this).get) //new HBaseTap(quorumNames, tableName, localScheme, SinkMode.KEEP) +          val hbt = new MemoryTap[InputStream, OutputStream](localScheme, buffer.apply(this).get)            hbt.asInstanceOf[Tap[_,_,_]]          }          case Write => { -          val hbt = new MemoryTap[InputStream, OutputStream](localScheme, buffer.apply(this).get) //new HBaseTap(quorumNames, tableName, localScheme, SinkMode.KEEP) +          val hbt = new MemoryTap[InputStream, OutputStream](localScheme, buffer.apply(this).get)            hbt.asInstanceOf[Tap[_,_,_]]          }        }       @@ -138,48 +137,6 @@ case class HBaseSource(    def createEmptyTap(readOrWrite : AccessMode)(mode : Mode) : Tap[_,_,_] = { -    mode match { -      case _ => { -        throw new RuntimeException("Source: (" + toString + ") doesn't support mode: " + mode.toString) -      } -    } +    throw new RuntimeException("Source: (" + toString + ") doesn't support mode: " + mode.toString)    } -   -  /**def createTaps(readOrWrite : AccessMode)(implicit mode : Mode) : Tap[_,_,_] = { -    mode match { -      case Test(buffers) => { -        /* -        * There MUST have already been a registered sink or source in the Test mode. -        * to access this.  You must explicitly name each of your test sources in your -        * JobTest. -        */ -        val buffer = -          if (readOrWrite == Write) { -            val buf = buffers(this) -            //Make sure we wipe it out: -            buf.clear() -            buf -          } else { -            // if the source is also used as a sink, we don't want its contents to get modified -            buffers(this).clone() -          } -        // TODO MemoryTap could probably be rewritten not to require localScheme, and just fields -        new MemoryTap[InputStream, OutputStream](localScheme, buffer) -      } -      case hdfsTest @ HadoopTest(conf, buffers) => readOrWrite match { -        case Read => { -          val buffer = buffers(this) -          val fields = hdfsScheme.getSourceFields -          (new MemorySourceTap(buffer.toList.asJava, fields)).asInstanceOf[Tap[JobConf,_,_]] -        } -        case Write => { -          val path = hdfsTest.getWritePathFor(this) -          castHfsTap(new Hfs(hdfsScheme, path, SinkMode.REPLACE)) -        } -      } -      case _ => { -        throw new RuntimeException("Source: (" + toString + ") doesn't support mode: " + mode.toString) -      } -    } -  } */   } diff --git a/src/main/scala/parallelai/spyglass/hbase/example/SimpleHBaseSourceExample.scala b/src/main/scala/parallelai/spyglass/hbase/example/SimpleHBaseSourceExample.scala index 6e56c52..9deedaf 100644 --- a/src/main/scala/parallelai/spyglass/hbase/example/SimpleHBaseSourceExample.scala +++ b/src/main/scala/parallelai/spyglass/hbase/example/SimpleHBaseSourceExample.scala @@ -19,10 +19,7 @@ class SimpleHBaseSourceExample(args: Args) extends JobBase(args) with HBasePipeC     if (isDebug) Logger.getRootLogger.setLevel(Level.DEBUG)     val output = args("output") -    -//   val properties = new Properties() -//   AppProps.setApplicationJarClass( properties, classOf[SimpleHBaseSourceExample] ); -    +     val hbs = new HBaseSource(       "table_name",       "quorum_name:2181", diff --git a/src/main/scala/parallelai/spyglass/hbase/example/SimpleHBaseSourceRunner.scala b/src/main/scala/parallelai/spyglass/hbase/example/SimpleHBaseSourceRunner.scala index bbcf96d..12f1982 100644 --- a/src/main/scala/parallelai/spyglass/hbase/example/SimpleHBaseSourceRunner.scala +++ b/src/main/scala/parallelai/spyglass/hbase/example/SimpleHBaseSourceRunner.scala @@ -10,17 +10,16 @@ object SimpleHBaseSourceRunner extends App {    val log = LoggerFactory.getLogger(this.getClass.getName) - -    log.info("Starting HBaseSource Import Process Test...")    val start1 = System.currentTimeMillis -  JobRunner.main((classOf[SimpleHBaseSourceExample].getName :: mArgs.toList).toArray) - -  val end = System.currentTimeMillis +  try { +    JobRunner.main((classOf[SimpleHBaseSourceExample].getName :: mArgs.toList).toArray) +  } finally { +    val end = System.currentTimeMillis -  log.info("HBaseSource Import process finished successfully.") -  log.info("HBaseSource Import process : " + (end - start1) + " milliseconds to complete") -   +    log.info("HBaseSource Import process finished successfully.") +    log.info("HBaseSource Import process : " + (end - start1) + " milliseconds to complete") +  }  }
\ No newline at end of file diff --git a/src/main/scala/parallelai/spyglass/jdbc/JDBCSource.scala b/src/main/scala/parallelai/spyglass/jdbc/JDBCSource.scala index 2472eda..1abad55 100644 --- a/src/main/scala/parallelai/spyglass/jdbc/JDBCSource.scala +++ b/src/main/scala/parallelai/spyglass/jdbc/JDBCSource.scala @@ -47,7 +47,7 @@ case class JDBCSource(        case _ => throw new ClassCastException("Failed casting from Scheme to JDBCScheme")      }      mode match { -      case hdfsMode @ Hdfs(_, _) => readOrWrite match { +      case Hdfs(_, _) => readOrWrite match {          case Read => {            val tableDesc = new TableDesc(tableName, columnNames.toArray, columnDefs.toArray, primaryKeys.toArray)            val jdbcTap = new JDBCTap(connectionString, userId, password, driverName, tableDesc, jdbcScheme) @@ -60,7 +60,7 @@ case class JDBCSource(            jdbcTap.asInstanceOf[Tap[_,_,_]]          }        } -      case testMode @ Test(buffer) => readOrWrite match { +      case Test(buffer) => readOrWrite match {          case Read => {             val hbt = new MemoryTap[InputStream, OutputStream](localScheme, buffer.apply(this).get) @@ -76,10 +76,6 @@ case class JDBCSource(    }    def createEmptyTap(readOrWrite : AccessMode)(mode : Mode) : Tap[_,_,_] = { -    mode match { -      case _ => { -        throw new RuntimeException("Source: (" + toString + ") doesn't support mode: " + mode.toString) -      } -    } -  }     +    throw new RuntimeException("Source: (" + toString + ") doesn't support mode: " + mode.toString) +  }  }  | 
