diff options
Diffstat (limited to 'src/main/scala/parallelai/spyglass')
| -rw-r--r-- | src/main/scala/parallelai/spyglass/hbase/HBaseRawSource.scala | 95 | 
1 files changed, 58 insertions, 37 deletions
| diff --git a/src/main/scala/parallelai/spyglass/hbase/HBaseRawSource.scala b/src/main/scala/parallelai/spyglass/hbase/HBaseRawSource.scala index 1fc6f7d..bbc205b 100644 --- a/src/main/scala/parallelai/spyglass/hbase/HBaseRawSource.scala +++ b/src/main/scala/parallelai/spyglass/hbase/HBaseRawSource.scala @@ -18,45 +18,66 @@ import java.io.ByteArrayOutputStream  import java.io.DataOutputStream  object HBaseRawSource { -  def convertScanToString(scan: Scan) = { -    val out = new ByteArrayOutputStream(); -    val dos = new DataOutputStream(out); -    scan.write(dos); -    Base64.encodeBytes(out.toByteArray()); -  } +	/** +	 * Converts a scan object to a base64 string that can be passed to HBaseRawSource +	 * @param scan +	 * @return base64 string representation +	 */ +	def convertScanToString(scan: Scan) = { +		val out = new ByteArrayOutputStream(); +		val dos = new DataOutputStream(out); +		scan.write(dos); +		Base64.encodeBytes(out.toByteArray()); +	}  } + + +/** + * @author Rotem Hermon + * + * HBaseRawSource is a scalding source that passes the original row (Result) object to the  + * mapper for customized processing. + *  + * @param	tableName	The name of the HBase table to read + * @param	quorumNames	HBase quorum + * @param	familyNames	Column families to get (source, if null will get all) or update to (sink) + * @param	writeNulls	Should the sink write null values. default = true. If false, null columns will not be written + * @param	base64Scan	An optional base64 encoded scan object  + * @param	sinkMode	If REPLACE the output table will be deleted before writing to	 + *  + */  class HBaseRawSource( -  tableName: String, -  quorumNames: String = "localhost", -  familyNames: Array[String], -  writeNulls: Boolean = true, -  base64Scan:String = null, -  sinkMode: SinkMode = null) extends Source { +	tableName: String, +	quorumNames: String = "localhost", +	familyNames: Array[String], +	writeNulls: Boolean = true, +	base64Scan: String = null, +	sinkMode: SinkMode = null) extends Source { -  override val hdfsScheme = new HBaseRawScheme(familyNames, writeNulls) -    .asInstanceOf[Scheme[JobConf, RecordReader[_, _], OutputCollector[_, _], _, _]] +	override val hdfsScheme = new HBaseRawScheme(familyNames, writeNulls) +		.asInstanceOf[Scheme[JobConf, RecordReader[_, _], OutputCollector[_, _], _, _]] -  override def createTap(readOrWrite: AccessMode)(implicit mode: Mode): Tap[_, _, _] = { -    val hBaseScheme = hdfsScheme match { -      case hbase: HBaseRawScheme => hbase -      case _ => throw new ClassCastException("Failed casting from Scheme to HBaseRawScheme") -    } -    mode match { -      case hdfsMode @ Hdfs(_, _) => readOrWrite match { -        case Read => { -          new HBaseRawTap(quorumNames, tableName, hBaseScheme, base64Scan, sinkMode match { -            case null => SinkMode.KEEP -            case _ => sinkMode -          }).asInstanceOf[Tap[_,_,_]] -        } -        case Write => { -          new HBaseRawTap(quorumNames, tableName, hBaseScheme, base64Scan, sinkMode match { -            case null => SinkMode.UPDATE -            case _ => sinkMode -          }).asInstanceOf[Tap[_,_,_]] -        } -      } -      case _ => super.createTap(readOrWrite)(mode) -    } -  } +	override def createTap(readOrWrite: AccessMode)(implicit mode: Mode): Tap[_, _, _] = { +		val hBaseScheme = hdfsScheme match { +			case hbase: HBaseRawScheme => hbase +			case _ => throw new ClassCastException("Failed casting from Scheme to HBaseRawScheme") +		} +		mode match { +			case hdfsMode @ Hdfs(_, _) => readOrWrite match { +				case Read => { +					new HBaseRawTap(quorumNames, tableName, hBaseScheme, base64Scan, sinkMode match { +						case null => SinkMode.KEEP +						case _ => sinkMode +					}).asInstanceOf[Tap[_, _, _]] +				} +				case Write => { +					new HBaseRawTap(quorumNames, tableName, hBaseScheme, base64Scan, sinkMode match { +						case null => SinkMode.UPDATE +						case _ => sinkMode +					}).asInstanceOf[Tap[_, _, _]] +				} +			} +			case _ => super.createTap(readOrWrite)(mode) +		} +	}  } | 
