diff options
| -rw-r--r-- | README.md | 96 | 
1 files changed, 48 insertions, 48 deletions
| @@ -263,16 +263,16 @@ e.g.  	val hbaseOut = new HBaseRawSource("MY_RESULTS", "hbase-local", Array("out-family"), writeNulls=false, sinkMode = SinkMode.REPLACE)  	hbaseSource.read -		.mapTo(('rowkey, 'row) -> ('rowkey, "different_family:col1", 'col2)) { -			x: (ImmutableBytesWritable, Result) => -				{ -					val (rowkey, row) = x -					val col1Times2 = Bytes.toInt(row.getValue(Bytes.toBytes("col-family"), Bytes.toBytes("col1"))) * 2; -					val col2 = row.getValue(Bytes.toBytes("col-family"), Bytes.toBytes("col2")); -					(rowkey, col1Times2, col2) -				} +	.mapTo(('rowkey, 'row) -> ('rowkey, "different_family:col1", 'col2)) { +	    x: (ImmutableBytesWritable, Result) => +		{ +		    val (rowkey, row) = x +		    val col1Times2 = Bytes.toInt(row.getValue(Bytes.toBytes("col-family"), Bytes.toBytes("col1"))) * 2; +		    val col2 = row.getValue(Bytes.toBytes("col-family"), Bytes.toBytes("col2")); +		    (rowkey, col1Times2, col2)  		} -		.write(hbaseOut) +	} +	.write(hbaseOut)  6. Jdbc Tap and Source  =========================== @@ -280,31 +280,31 @@ e.g.  To be added soon.  e.g. -  val jdbcSourceRead = new JDBCSource( -    "TABLE_01", -    "com.mysql.jdbc.Driver", -    "jdbc:mysql://localhost:3306/sky_db?zeroDateTimeBehavior=convertToNull", -    "root", -    "password", -    List("ID", "TEST_COLUMN1", "TEST_COLUMN2", "TEST_COLUMN3"), -    List("bigint(20)", "varchar(45)", "varchar(45)", "bigint(20)"), -    List("id"), -    new Fields("key", "column1", "column2", "column3"), -    null, null, null -  ) - -  val jdbcSourceWrite = new JDBCSource( -    "TABLE_01", -    "com.mysql.jdbc.Driver", -    "jdbc:mysql://localhost:3306/sky_db?zeroDateTimeBehavior=convertToNull", -    "root", -    "password", -    List("ID", "TEST_COLUMN1", "TEST_COLUMN2", "TEST_COLUMN3"), -    List("bigint(20)", "varchar(45)", "varchar(45)", "bigint(20)"), -    List("id"), -    new Fields("key", "column1", "column2", "column3"), -    null, null, null -  ) +	val jdbcSourceRead = new JDBCSource( +		"TABLE_01", +		"com.mysql.jdbc.Driver", +		"jdbc:mysql://localhost:3306/sky_db?zeroDateTimeBehavior=convertToNull", +		"root", +		"password", +		List("ID", "TEST_COLUMN1", "TEST_COLUMN2", "TEST_COLUMN3"), +		List("bigint(20)", "varchar(45)", "varchar(45)", "bigint(20)"), +		List("id"), +		new Fields("key", "column1", "column2", "column3"), +		null, null, null +		) + +	val jdbcSourceWrite = new JDBCSource( +		"TABLE_01", +		"com.mysql.jdbc.Driver", +		"jdbc:mysql://localhost:3306/sky_db?zeroDateTimeBehavior=convertToNull", +		"root", +		"password", +		List("ID", "TEST_COLUMN1", "TEST_COLUMN2", "TEST_COLUMN3"), +		List("bigint(20)", "varchar(45)", "varchar(45)", "bigint(20)"), +		List("id"), +		new Fields("key", "column1", "column2", "column3"), +		null, null, null +		)  7. HBase Delete Functionality @@ -316,10 +316,10 @@ The feature can be enable by using the parameter sinkMode = SinkMode.REPLACE in  You can use sinkMode = SinkMode.UPDATE to add to the HBaseTable as usual.  e.g. -  val eraser = toIBW(input, TABLE_SCHEMA) -    .write(new HBaseSource( "_TEST.SALT.01", quorum, 'key, -      TABLE_SCHEMA.tail.map((x: Symbol) => "data"), -      TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)), sinkMode = SinkMode.REPLACE )) +	val eraser = toIBW(input, TABLE_SCHEMA) +		.write(new HBaseSource( "_TEST.SALT.01", quorum, 'key, +		TABLE_SCHEMA.tail.map((x: Symbol) => "data"), +		TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)), sinkMode = SinkMode.REPLACE ))  All rows with the key will be deleted. This includes all versions too. @@ -332,13 +332,13 @@ This feature can be activated by using inputSplitType = SplitType.REGIONAL  You can use inputSplitType = SplitType.GRANULAR to use the previous functionality as is.  e.g. -  val hbase04 = new HBaseSource( "_TEST.SALT.01", quorum, 'key, -          TABLE_SCHEMA.tail.map((x: Symbol) => "data"), -          TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)), -          sourceMode = SourceMode.SCAN_RANGE, startKey = sttKeyP, stopKey = stpKeyP, -          inputSplitType = splitType).read -          .fromBytesWritable(TABLE_SCHEMA ) -          .map(('key, 'salted, 'unsalted) -> 'testData) {x: (String, String, String) => List(x._1, x._2, x._3)} -          .project('testData) -          .write(TextLine("saltTesting/ScanRangeNoSalt01")) -          .groupAll(group => group.toList[List[List[String]]]('testData -> 'testData))
\ No newline at end of file +	val hbase04 = new HBaseSource( "_TEST.SALT.01", quorum, 'key, +	  TABLE_SCHEMA.tail.map((x: Symbol) => "data"), +	  TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)), +	  sourceMode = SourceMode.SCAN_RANGE, startKey = sttKeyP, stopKey = stpKeyP, +	  inputSplitType = splitType).read +	  .fromBytesWritable(TABLE_SCHEMA ) +	  .map(('key, 'salted, 'unsalted) -> 'testData) {x: (String, String, String) => List(x._1, x._2, x._3)} +	  .project('testData) +	  .write(TextLine("saltTesting/ScanRangeNoSalt01")) +	  .groupAll(group => group.toList[List[List[String]]]('testData -> 'testData))
\ No newline at end of file | 
