diff options
author | Antonios Chalkiopoulos <Antwnis@gmail.com> | 2014-08-22 23:09:39 +0100 |
---|---|---|
committer | Antonios Chalkiopoulos <Antwnis@gmail.com> | 2014-08-22 23:09:39 +0100 |
commit | c629b3b1ee85e0cc2b70659105cede6a0012e6f6 (patch) | |
tree | a2b7301b4f785adcbc1af31d586f62054260abed /src/test/scala | |
parent | d539a5df90d92657dbe024f8c674ca91ed1dd542 (diff) | |
download | SpyGlass-c629b3b1ee85e0cc2b70659105cede6a0012e6f6.tar.gz SpyGlass-c629b3b1ee85e0cc2b70659105cede6a0012e6f6.zip |
Adding new tests
Diffstat (limited to 'src/test/scala')
4 files changed, 794 insertions, 0 deletions
diff --git a/src/test/scala/parallelai/spyglass/hbase/HBaseReadTest.scala b/src/test/scala/parallelai/spyglass/hbase/HBaseReadTest.scala new file mode 100644 index 0000000..f08efc7 --- /dev/null +++ b/src/test/scala/parallelai/spyglass/hbase/HBaseReadTest.scala @@ -0,0 +1,195 @@ +package parallelai.spyglass.hbase + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.hbase._ +import org.apache.hadoop.hbase.client.HBaseAdmin +import org.apache.hadoop.hbase.client.HTable +import org.apache.hadoop.hbase.client.Put +import org.apache.hadoop.hbase.client.Scan +import org.apache.hadoop.hbase.util.Bytes +import junit.framework.Assert +import org.junit.runner.RunWith +import org.scalatest.WordSpecLike +import org.scalatest.MustMatchers +import org.scalatest.junit.JUnitRunner +import org.slf4j.LoggerFactory + +/** + * Generates TWO tables in HBase 'TABLE_01' and 'TABLE_02', populate with some data + * and perform a number of tests + */ + +@RunWith(classOf[JUnitRunner]) +class HBaseReadTest extends MustMatchers with WordSpecLike { + + val QUORUM = "localhost" + val QUORUM_PORT = "2181" + val STARTING_TIMESTAMP = 1260000000000L + + val log = LoggerFactory.getLogger(this.getClass.getName) + + var config:Configuration = HBaseConfiguration.create + config.set("hbase.zookeeper.quorum", QUORUM) + config.set("hbase.zookeeper.property.clientPort", QUORUM_PORT) + + log.info("Connecting to Zookeeper {}:{}", QUORUM, QUORUM_PORT) + + "An HBase integration test" must { + + "generate 2 testing HBase tables" in { + + val testingTables = List("TABLE_01", "TABLE_02") + + try { + testingTables.foreach(deleteTestTable(_,config)) + testingTables.foreach(createTestTable(_,config)) + testingTables.foreach(populateTestTable(_,config)) + testingTables.foreach(printHTable(_,config)) + + // If we've reached here - the testing data are in + Assert.assertEquals("true", "true") + } catch { + case e: Exception => + log.error("EXCEPTION ===> {}", e.toString()) + } + + } + + "perform a SCAN_ALL in an HBase table" in { + + } + + "perform a SCAN_RANGE in an HBase table" in { + + } + + "perform a GET_LIST in an HBase table" in { + + } + + + } + + + /** + * Method to disable and delete HBase Tables i.e. "int-test-01" + */ + private def deleteTestTable(tableName: String, config: Configuration ) = { + + val hbaseAdmin = new HBaseAdmin(config) + if (hbaseAdmin.tableExists(tableName)) { + log.info("Table: " + tableName + " exists.") + hbaseAdmin.disableTable(tableName) + hbaseAdmin.deleteTable(tableName) + log.info("Table: " + tableName + " disabled and deleted.") + } else { + log.info("Table: " + tableName + " does not exist.") + } + hbaseAdmin.close() + + } + + def createTestTable(tableName: String, config: Configuration) = { + + val hbase = new HBaseAdmin(config) + // Get and set the name of the new table + val newTable = new HTableDescriptor(tableName) + + val meta = new HColumnDescriptor("data") + .setMaxVersions(3) + .setInMemory(HColumnDescriptor.DEFAULT_IN_MEMORY) + .setBlockCacheEnabled(HColumnDescriptor.DEFAULT_BLOCKCACHE) + .setTimeToLive(HColumnDescriptor.DEFAULT_TTL) + + newTable.addFamily(meta) + + try { + log.info("Creating table " + tableName) + hbase.createTable(newTable) + } catch { + case et: TableExistsException => + log.error("TableExistsException for table: {} ",tableName) + log.debug(et.toString()) + case e: Exception => + log.error("IOException: " + e.toString) + } + + hbase.close + } + + private def populateTestTable(testingTable: String, config: Configuration) = { + + // Load up HBase table + val table = new HTable(config, testingTable) + + log.info("Populating table: " + testingTable) + + // Table_01 + if (testingTable == "TABLE_01") { + val put1 = new Put("2000-01-01 10:00:10".getBytes()).add("data".getBytes(), "column1".getBytes(), STARTING_TIMESTAMP, "1".getBytes()) + val put2 = new Put("2000-01-01 10:05:00".getBytes()).add("data".getBytes(), "column1".getBytes(), STARTING_TIMESTAMP, "2".getBytes()) + val put3 = new Put("2000-01-01 10:10:00".getBytes()).add("data".getBytes(), "column1".getBytes(), STARTING_TIMESTAMP, "3".getBytes()) + table.put(put1) + table.put(put2) + table.put(put3) + } else + // Table_02 + if (testingTable == "TABLE_02") { + + // 3 versions at 10 o'clock + val k1 = "2000-01-01 10:00:00".getBytes() + val put1 = new Put(k1).add("data".getBytes(), "column1".getBytes(), STARTING_TIMESTAMP, "1".getBytes()) + val put2 = new Put(k1).add("data".getBytes(), "column1".getBytes(), STARTING_TIMESTAMP + 1000L, "2".getBytes()) + val put3 = new Put(k1).add("data".getBytes(), "column1".getBytes(), STARTING_TIMESTAMP + 2000L, "3".getBytes()) + + // 3 versions at 11 o'clock + val k2 = "2000-01-01 11:00:00".getBytes() + val put4 = new Put(k2).add("data".getBytes(), "column1".getBytes(), STARTING_TIMESTAMP, "4".getBytes()) + val put5 = new Put(k2).add("data".getBytes(), "column1".getBytes(), STARTING_TIMESTAMP + 1000L, "5".getBytes()) + val put6 = new Put(k2).add("data".getBytes(), "column1".getBytes(), STARTING_TIMESTAMP + 2000L, "6".getBytes()) + + import scala.collection.JavaConverters._ + table.put(List(put1, put2, put3, put4, put5, put6).asJava) + } + + table.close + } + + /** + * Method to print-out an HTable + */ + private def printHTable(testingTable: String, config: Configuration) = { + + val table = new HTable(config, testingTable) + val scanner = table.getScanner(new Scan) + + log.info("Printing HTable: " + Bytes.toString(table.getTableName)) + + try { + // Iterate results + // for (Result rr = scanner.next() rr != null rr = scanner.next()) { + while (scanner.iterator().hasNext) { + val rr = scanner.iterator().next + val key = Bytes.toString(rr.getRow()) + var iter = rr.list().iterator() + + var header = "Key:\t" + var data = key + "\t" + + while (iter.hasNext()) { + val kv = iter.next() + header += Bytes.toString(CellUtil.cloneFamily(kv)) + ":" + Bytes.toString(CellUtil.cloneQualifier(kv)) + "\t" + data += Bytes.toString(CellUtil.cloneValue(kv)) + "\t" + } + + log.info(header) + log.info(data) + } + } finally { + // Close scanners when done + scanner.close + table.close + } + } + +}
\ No newline at end of file diff --git a/src/test/scala/parallelai/spyglass/hbase/HBaseSalterTest.java b/src/test/scala/parallelai/spyglass/hbase/HBaseSalterTest.java new file mode 100644 index 0000000..3060a45 --- /dev/null +++ b/src/test/scala/parallelai/spyglass/hbase/HBaseSalterTest.java @@ -0,0 +1,490 @@ +package parallelai.spyglass.hbase; + +import static org.junit.Assert.*; + +import java.io.IOException; +import java.util.Arrays; + +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Pair; +import org.junit.Test; + +public class HBaseSalterTest { + + @Test + public void addSaltPrefix() throws IOException { + String keyStr = "1021"; + byte [] keyBytes = Bytes.toBytes(keyStr); + byte [] expected = Bytes.toBytes("1_1021"); + byte [] actual = HBaseSalter.addSaltPrefix(keyBytes); + + assertArrayEquals(actual, expected); + + String actualStr = HBaseSalter.addSaltPrefix(keyStr); + + System.out.println(Bytes.toString(expected) + " -> " + actualStr ); + + assertEquals(Bytes.toString(expected), actualStr); + + } + + @Test + public void delSaltPrefix() throws IOException { + String keyStr = "1_1021"; + byte [] keyBytes = Bytes.toBytes(keyStr); + byte [] expected = Bytes.toBytes("1021"); + byte [] actual = HBaseSalter.delSaltPrefix(keyBytes); + + assertArrayEquals(actual, expected); + + String actualStr = HBaseSalter.delSaltPrefix(keyStr); + + assertEquals(Bytes.toString(expected), actualStr); + + } + + @Test + public void getAllKeys() throws IOException { + String keyStr = "1021"; + byte [] keyBytes = Bytes.toBytes(keyStr); + + char [] prefixArr = HBaseSalter.DEFAULT_PREFIX_LIST.toCharArray(); + + byte [][] expected = new byte[prefixArr.length][]; + + for(int i = 0; i < prefixArr.length; i++ ) { + expected[i] = Bytes.toBytes(prefixArr[i] + "_" + keyStr); + } + + byte [][] actual = HBaseSalter.getAllKeys(keyBytes); + + assertEquals(expected.length, actual.length); + + for( int i = 0; i < expected.length; i++) { + assertArrayEquals(expected[i], actual[i]); + } + } + + @Test + public void getAllKeysWithPrefix() throws IOException { + String keyStr = "1021"; + byte [] keyBytes = Bytes.toBytes(keyStr); + String prefix = "0123456789"; + + char [] prefixArr = prefix.toCharArray(); + + byte [][] expected = new byte[prefixArr.length][]; + + for(int i = 0; i < prefixArr.length; i++ ) { + expected[i] = Bytes.toBytes(prefixArr[i] + "_" + keyStr); + } + + byte [][] actual = HBaseSalter.getAllKeys(keyBytes, prefix); + + assertEquals(expected.length, actual.length); + + for( int i = 0; i < expected.length; i++) { + assertArrayEquals(expected[i], actual[i]); + } + } + + @Test + public void getAllKeysWithPrefixAndRange() throws IOException { + String keyStr = "1021"; + byte [] keyBytes = Bytes.toBytes(keyStr); + String prefix = "12345"; + String fullPrefix = "0123456789"; + + char [] prefixArr = prefix.toCharArray(); + Byte [] prefixBytes = new Byte[prefixArr.length]; + + byte [][] expected = new byte[prefixArr.length][]; + + for(int i = 0; i < prefixArr.length; i++ ) { + expected[i] = Bytes.toBytes(prefixArr[i] + "_" + keyStr); + prefixBytes[i] = (byte)prefixArr[i]; + } + + byte [][] actual = HBaseSalter.getAllKeysInRange(keyBytes, fullPrefix, (byte)'1', (byte)'5'); + + assertEquals(expected.length, actual.length); + + for( int i = 0; i < expected.length; i++) { + assertArrayEquals(expected[i], actual[i]); + } + + actual = HBaseSalter.getAllKeys(keyBytes, prefixBytes); + + for( int i = 0; i < expected.length; i++) { + assertArrayEquals(expected[i], actual[i]); + } + + } + + + @Test + public void getAllKeysWithPrefixWithStart() throws IOException { + String keyStr = "1021"; + byte [] keyBytes = Bytes.toBytes(keyStr); + String prefix = "3456789"; + String fullPrefix = "0123456789"; + + char [] prefixArr = prefix.toCharArray(); + Byte [] prefixBytes = new Byte[prefixArr.length]; + + byte [][] expected = new byte[prefixArr.length][]; + + for(int i = 0; i < prefixArr.length; i++ ) { + expected[i] = Bytes.toBytes(prefixArr[i] + "_" + keyStr); + prefixBytes[i] = (byte)prefixArr[i]; + } + + byte [][] actual = HBaseSalter.getAllKeysWithStart(keyBytes, fullPrefix, (byte)'3'); + + assertEquals(expected.length , actual.length); + + for( int i = 0; i < expected.length; i++) { + assertArrayEquals(expected[i], actual[i]); + } + + actual = HBaseSalter.getAllKeys(keyBytes, prefixBytes); + + for( int i = 0; i < expected.length; i++) { + assertArrayEquals(expected[i], actual[i]); + } + + } + + @Test + public void getAllKeysWithPrefixWithStop() throws IOException { + String keyStr = "1021"; + byte [] keyBytes = Bytes.toBytes(keyStr); + String prefix = "012345"; + String fullPrefix = "0123456789"; + + char [] prefixArr = prefix.toCharArray(); + Byte [] prefixBytes = new Byte[prefixArr.length]; + + byte [][] expected = new byte[prefixArr.length][]; + + for(int i = 0; i < prefixArr.length; i++ ) { + expected[i] = Bytes.toBytes(prefixArr[i] + "_" + keyStr); + prefixBytes[i] = (byte)prefixArr[i]; + } + + byte [][] actual = HBaseSalter.getAllKeysWithStop(keyBytes, fullPrefix, (byte)'5'); + + assertEquals(expected.length , actual.length); + + for( int i = 0; i < expected.length; i++) { + assertArrayEquals(expected[i], actual[i]); + } + + actual = HBaseSalter.getAllKeys(keyBytes, prefixBytes); + + for( int i = 0; i < expected.length; i++) { + assertArrayEquals(expected[i], actual[i]); + } + } + + @Test + public void getDistributedIntervals() throws IOException { + String keyStrStart = "1021"; + byte [] keyBytesStart = Bytes.toBytes(keyStrStart); + + String keyStrStop = "1022"; + byte [] keyBytesStop = Bytes.toBytes(keyStrStop); + + char [] prefixArr = HBaseSalter.DEFAULT_PREFIX_LIST.toCharArray(); + + byte [][] expectedStart = new byte[prefixArr.length][]; + byte [][] expectedStop = new byte[prefixArr.length][]; + Pair<byte[], byte[]> expectedPairs [] = new Pair[prefixArr.length]; + + for(int i = 0; i < prefixArr.length; i++ ) { + expectedStart[i] = Bytes.toBytes(prefixArr[i] + "_" + keyStrStart); + expectedStop[i] = Bytes.toBytes(prefixArr[i] + "_" + keyStrStop); + expectedPairs[i] = new Pair<byte[], byte[]>(expectedStart[i], expectedStop[i]); + } + + Pair<byte[], byte[]> actualPairs [] = HBaseSalter.getDistributedIntervals(keyBytesStart, keyBytesStop); + + assertEquals(expectedPairs.length, actualPairs.length); + + for( int i = 0; i < expectedPairs.length; i++ ) { +// System.out.println("".format("FIRST: EXPECTED: (%s) ACTUAL: (%s)", +// Bytes.toString(expectedPairs[i].getFirst()), Bytes.toString(actualPairs[i].getFirst()) )); +// +// System.out.println("".format("SECOND: EXPECTED: (%s) ACTUAL: (%s)", +// Bytes.toString(expectedPairs[i].getSecond()), Bytes.toString(actualPairs[i].getSecond()) )); + + assertArrayEquals(expectedPairs[i].getFirst(), actualPairs[i].getFirst()); + assertArrayEquals(expectedPairs[i].getSecond(), actualPairs[i].getSecond()); + } + } + + + @Test + public void getDistributedIntervalsWithPrefix() throws IOException { + String keyStrStart = "1021"; + byte [] keyBytesStart = Bytes.toBytes(keyStrStart); + + String keyStrStop = "1022"; + byte [] keyBytesStop = Bytes.toBytes(keyStrStop); + + String prefix = "0123"; + char [] prefixArr = prefix.toCharArray(); + + byte [][] expectedStart = new byte[prefixArr.length][]; + byte [][] expectedStop = new byte[prefixArr.length][]; + Pair<byte[], byte[]> expectedPairs [] = new Pair[prefixArr.length]; + + for(int i = 0; i < prefixArr.length; i++ ) { + expectedStart[i] = Bytes.toBytes(prefixArr[i] + "_" + keyStrStart); + expectedStop[i] = Bytes.toBytes(prefixArr[i] + "_" + keyStrStop); + expectedPairs[i] = new Pair<byte[], byte[]>(expectedStart[i], expectedStop[i]); + } + + Pair<byte[], byte[]> actualPairs [] = HBaseSalter.getDistributedIntervals(keyBytesStart, keyBytesStop, prefix); + + assertEquals(expectedPairs.length, actualPairs.length); + + for( int i = 0; i < expectedPairs.length; i++ ) { + System.out.println("".format("FIRST: EXPECTED: (%s) ACTUAL: (%s)", + Bytes.toString(expectedPairs[i].getFirst()), Bytes.toString(actualPairs[i].getFirst()) )); + + System.out.println("".format("SECOND: EXPECTED: (%s) ACTUAL: (%s)", + Bytes.toString(expectedPairs[i].getSecond()), Bytes.toString(actualPairs[i].getSecond()) )); + + assertArrayEquals(expectedPairs[i].getFirst(), actualPairs[i].getFirst()); + assertArrayEquals(expectedPairs[i].getSecond(), actualPairs[i].getSecond()); + } + } + + @Test + public void getDistributedIntervalsWithRegionsStartStop() throws IOException { + String keyStrStart = "1021"; + byte [] keyBytesStart = Bytes.toBytes(keyStrStart); + + String keyStrStop = "1022"; + byte [] keyBytesStop = Bytes.toBytes(keyStrStop); + + byte [] regionStart = Bytes.toBytes("1"); + byte [] regionsStop = Bytes.toBytes("4"); + + String expectedPrefix = "1234"; + char [] prefixArr = expectedPrefix.toCharArray(); + + byte [][] expectedStart = new byte[prefixArr.length][]; + byte [][] expectedStop = new byte[prefixArr.length][]; + Pair<byte[], byte[]> expectedPairs [] = new Pair[prefixArr.length]; + + for(int i = 0; i < prefixArr.length; i++ ) { + expectedStart[i] = Bytes.toBytes(prefixArr[i] + "_" + keyStrStart); + expectedStop[i] = Bytes.toBytes(prefixArr[i] + "_" + keyStrStop); + expectedPairs[i] = new Pair<byte[], byte[]>(expectedStart[i], expectedStop[i]); + } + + Pair<byte[], byte[]> actualPairs [] = HBaseSalter.getDistributedIntervals(keyBytesStart, keyBytesStop, regionStart, regionsStop, HBaseSalter.DEFAULT_PREFIX_LIST); + + assertEquals(expectedPairs.length, actualPairs.length); + + for( int i = 0; i < expectedPairs.length; i++ ) { + System.out.println("".format("FIRST: EXPECTED: (%s) ACTUAL: (%s)", + Bytes.toString(expectedPairs[i].getFirst()), Bytes.toString(actualPairs[i].getFirst()) )); + + System.out.println("".format("SECOND: EXPECTED: (%s) ACTUAL: (%s)", + Bytes.toString(expectedPairs[i].getSecond()), Bytes.toString(actualPairs[i].getSecond()) )); + + assertArrayEquals(expectedPairs[i].getFirst(), actualPairs[i].getFirst()); + assertArrayEquals(expectedPairs[i].getSecond(), actualPairs[i].getSecond()); + } + } + + + @Test + public void getDistributedIntervalsWithRegionsStartStopWithPrefixAll() throws IOException { + System.out.println("------------ TEST 20 --------------"); + getDistributedIntervalsWithRegionsStartStopWithPrefix( + "1020", "1021", + "1_1021", "3_1023", + "123", "012345" + ); + + System.out.println("------------ TEST 21 --------------"); + getDistributedIntervalsWithRegionsStartStopWithPrefix( + "1020", "1021", + "2_1021", Bytes.toString(HConstants.EMPTY_END_ROW), + "2345", "012345" + ); + + System.out.println("------------ TEST 22 --------------"); + getDistributedIntervalsWithRegionsStartStopWithPrefix( + "1020", "1021", + Bytes.toString(HConstants.EMPTY_START_ROW), "3_1023", + "0123", "012345" + ); + + System.out.println("------------ TEST 23 --------------"); + getDistributedIntervalsWithRegionsStartStopWithPrefix( + "1020", "1021", + Bytes.toString(HConstants.EMPTY_START_ROW), Bytes.toString(HConstants.EMPTY_END_ROW), + "012345", "012345" + ); + + System.out.println("------------ TEST 24 --------------"); + getDistributedIntervalsWithRegionsStartStopWithPrefix( + Bytes.toString(HConstants.EMPTY_START_ROW), "1021", + "1_1021", "3_1023", + "123", "012345" + ); + + System.out.println("------------ TEST 25 --------------"); + getDistributedIntervalsWithRegionsStartStopWithPrefix( + "1020", Bytes.toString(HConstants.EMPTY_END_ROW), + "1_1021", "3_1023", + "123", "012345" + ); + + System.out.println("------------ TEST 26 --------------"); + getDistributedIntervalsWithRegionsStartStopWithPrefix( + Bytes.toString(HConstants.EMPTY_START_ROW), Bytes.toString(HConstants.EMPTY_END_ROW), + "1_1021", "3_1023", + "123", "012345" + ); + + System.out.println("------------ TEST 27 --------------"); + getDistributedIntervalsWithRegionsStartStopWithPrefix( + Bytes.toString(HConstants.EMPTY_START_ROW), "1021", + Bytes.toString(HConstants.EMPTY_START_ROW), "3_1023", + "0123", "012345" + ); + + System.out.println("------------ TEST 28 --------------"); + getDistributedIntervalsWithRegionsStartStopWithPrefix( + "1020", Bytes.toString(HConstants.EMPTY_END_ROW), + Bytes.toString(HConstants.EMPTY_START_ROW), "3_1023", + "0123", "012345" + ); + + System.out.println("------------ TEST 29 --------------"); + getDistributedIntervalsWithRegionsStartStopWithPrefix( + Bytes.toString(HConstants.EMPTY_START_ROW), Bytes.toString(HConstants.EMPTY_END_ROW), + Bytes.toString(HConstants.EMPTY_START_ROW), "3_1023", + "0123", "012345" + ); + + System.out.println("------------ TEST 30 --------------"); + getDistributedIntervalsWithRegionsStartStopWithPrefix( + Bytes.toString(HConstants.EMPTY_START_ROW), "1021", + "1_1021", Bytes.toString(HConstants.EMPTY_END_ROW), + "12345", "012345" + ); + + System.out.println("------------ TEST 31 --------------"); + getDistributedIntervalsWithRegionsStartStopWithPrefix( + "1020", Bytes.toString(HConstants.EMPTY_END_ROW), + "1_1021", Bytes.toString(HConstants.EMPTY_END_ROW), + "12345", "012345" + ); + + System.out.println("------------ TEST 32 --------------"); + getDistributedIntervalsWithRegionsStartStopWithPrefix( + Bytes.toString(HConstants.EMPTY_START_ROW), Bytes.toString(HConstants.EMPTY_END_ROW), + "1_1021", Bytes.toString(HConstants.EMPTY_END_ROW), + "12345", "012345" + ); + + System.out.println("------------ TEST 33 --------------"); + getDistributedIntervalsWithRegionsStartStopWithPrefix( + Bytes.toString(HConstants.EMPTY_START_ROW), "1021", + Bytes.toString(HConstants.EMPTY_START_ROW), Bytes.toString(HConstants.EMPTY_END_ROW), + "012345", "012345" + ); + + System.out.println("------------ TEST 34 --------------"); + getDistributedIntervalsWithRegionsStartStopWithPrefix( + "1020", Bytes.toString(HConstants.EMPTY_END_ROW), + Bytes.toString(HConstants.EMPTY_START_ROW), Bytes.toString(HConstants.EMPTY_END_ROW), + "012345", "012345" + ); + + System.out.println("------------ TEST 35 --------------"); + getDistributedIntervalsWithRegionsStartStopWithPrefix( + Bytes.toString(HConstants.EMPTY_START_ROW), Bytes.toString(HConstants.EMPTY_END_ROW), + Bytes.toString(HConstants.EMPTY_START_ROW), Bytes.toString(HConstants.EMPTY_END_ROW), + "012345", "012345" + ); + + } + + private void getDistributedIntervalsWithRegionsStartStopWithPrefix( + String keyStrStart, String keyStrStop, + String regionStrStart, String regionStrStop, + String expectedPrefix, String sendPrefix) throws IOException { + + byte [] keyBytesStart = Bytes.toBytes(keyStrStart); + byte [] keyBytesStop = Bytes.toBytes(keyStrStop); + + byte [] regionStart = Bytes.toBytes(regionStrStart); + byte [] regionsStop = Bytes.toBytes(regionStrStop); + + char [] prefixArr = expectedPrefix.toCharArray(); + + byte [][] expectedStart = new byte[prefixArr.length][]; + byte [][] expectedStop = new byte[prefixArr.length][]; + Pair<byte[], byte[]> expectedPairs [] = new Pair[prefixArr.length]; + + for(int i = 0; i < prefixArr.length; i++ ) { + expectedStart[i] = Bytes.toBytes(prefixArr[i] + "_" + keyStrStart); + expectedStop[i] = Bytes.toBytes(prefixArr[i] + "_" + keyStrStop); + } + + if( Arrays.equals(keyBytesStart, HConstants.EMPTY_START_ROW) + && Arrays.equals(keyBytesStop, HConstants.EMPTY_END_ROW) ) { + for( int i = expectedStart.length - 1; i >=1; i--) { + expectedStart[i] = expectedStart[i - 1]; + } + + expectedStart[0] = HConstants.EMPTY_START_ROW; + expectedStop[expectedStop.length - 1] = HConstants.EMPTY_END_ROW; + } else if(Arrays.equals(keyBytesStart, HConstants.EMPTY_START_ROW)) { + for( int i = expectedStart.length - 1; i >=1; i--) { + expectedStart[i] = expectedStart[i - 1]; + } + + expectedStart[0] = HConstants.EMPTY_START_ROW; + } else if (Arrays.equals(keyBytesStop, HConstants.EMPTY_END_ROW)) { + for(int i = 0; i < expectedStop.length - 1; i++ ) { + expectedStop[i] = expectedStop[i + 1]; + } + expectedStop[expectedStop.length - 1] = HConstants.EMPTY_END_ROW; + } + + for(int i = 0; i < prefixArr.length; i++ ) { + expectedPairs[i] = new Pair<byte[], byte[]>(expectedStart[i], expectedStop[i]); + } + + Pair<byte[], byte[]> actualPairs [] = HBaseSalter.getDistributedIntervals(keyBytesStart, keyBytesStop, regionStart, regionsStop, sendPrefix); + + for(Pair<byte[], byte[]> p : expectedPairs ) { + System.out.println("- EXPECTED " + Bytes.toString(p.getFirst()) + + " -> " + Bytes.toString(p.getSecond())); + } + + for(Pair<byte[], byte[]> p : actualPairs ) { + System.out.println("- ACTUAL " + Bytes.toString(p.getFirst()) + + " -> " + Bytes.toString(p.getSecond())); + } + + assertEquals(expectedPairs.length, actualPairs.length); + + for( int i = 0; i < expectedPairs.length; i++ ) { + assertArrayEquals(expectedPairs[i].getFirst(), actualPairs[i].getFirst()); + assertArrayEquals(expectedPairs[i].getSecond(), actualPairs[i].getSecond()); + } + } + + +} diff --git a/src/test/scala/parallelai/spyglass/hbase/SimpleHBaseSourceExampleTest.scala b/src/test/scala/parallelai/spyglass/hbase/SimpleHBaseSourceExampleTest.scala new file mode 100644 index 0000000..c3a73a5 --- /dev/null +++ b/src/test/scala/parallelai/spyglass/hbase/SimpleHBaseSourceExampleTest.scala @@ -0,0 +1,56 @@ +package parallelai.spyglass.hbase + +import org.junit.runner.RunWith +import com.twitter.scalding.JobTest +import org.scalatest.FunSpec +import org.scalatest.junit.JUnitRunner +import org.slf4j.LoggerFactory +import org.apache.hadoop.hbase.io.ImmutableBytesWritable +import cascading.tuple.{Tuple, Fields} +import org.apache.hadoop.hbase.util.Bytes +import com.twitter.scalding.Tsv +import parallelai.spyglass.hbase.HBaseConstants.SourceMode + +/** + * Example of how to define tests for HBaseSource + */ +@RunWith(classOf[JUnitRunner]) +class SimpleHBaseSourceExampleTest extends FunSpec { + + val output = "/tmp/testOutput" + + val log = LoggerFactory.getLogger(this.getClass.getName) + + val sampleData = List( + List("1", "kk1", "pp1"), + List("2", "kk2", "pp2"), + List("3", "kk3", "pp3") + ) + + JobTest("parallelai.spyglass.hbase.example.SimpleHBaseSourceExample") + .arg("test", "") + .arg("app.conf.path", "app.conf") + .arg("output", output) + .arg("debug", "true") + .source[Tuple]( + new HBaseSource( + "table_name", + "quorum_name:2181", + new Fields("key"), + List("column_family"), + List(new Fields("column_name1", "column_name2")), + sourceMode = SourceMode.GET_LIST, keyList = List("1", "2", "3")), + sampleData.map(l => new Tuple(l.map(s => {new ImmutableBytesWritable(Bytes.toBytes(s))}):_*))) + .sink[Tuple](Tsv(output format "get_list")) { + outputBuffer => + log.debug("Output => " + outputBuffer) + + it("should return the test data provided.") { + println("outputBuffer.size => " + outputBuffer.size) + assert(outputBuffer.size === 3) + } + } + .run + .finish + +} diff --git a/src/test/scala/parallelai/spyglass/jdbc/JdbcSourceExampleTest.scala b/src/test/scala/parallelai/spyglass/jdbc/JdbcSourceExampleTest.scala new file mode 100644 index 0000000..845e763 --- /dev/null +++ b/src/test/scala/parallelai/spyglass/jdbc/JdbcSourceExampleTest.scala @@ -0,0 +1,53 @@ +package parallelai.spyglass.jdbc + +import org.scalatest.junit.JUnitRunner +import org.junit.runner.RunWith +import org.scalatest.FunSpec +import com.twitter.scalding.{Tsv, JobTest} +import org.slf4j.LoggerFactory +import cascading.tuple.{Tuple, Fields} + +/** + * Simple example of JDBCSource testing + */ +@RunWith(classOf[JUnitRunner]) +class JdbcSourceExampleTest extends FunSpec { + + val output = "src/test/resources/outputs/testOutput" + + val log = LoggerFactory.getLogger(this.getClass.getName) + + val sampleData = List( + (1, "c11", "c12", 11), + (2, "c21", "c22", 22) + ) + + JobTest("parallelai.spyglass.jdbc.example.JdbcSourceExample") + .arg("test", "") + .arg("app.conf.path", "app.conf") + .arg("output", output) + .arg("debug", "true") + .source( + new JDBCSource( + "db_name", + "com.mysql.jdbc.Driver", + "jdbc:mysql://<hostname>:<port>/<db_name>?zeroDateTimeBehavior=convertToNull", + "user", + "password", + List("KEY_ID", "COL1", "COL2", "COL3"), + List("bigint(20)", "varchar(45)", "varchar(45)", "bigint(20)"), + List("key_id"), + new Fields("key_id", "col1", "col2", "col3")), sampleData) + .sink[Tuple](Tsv(output.format("get_list"))) { + outputBuffer => + log.debug("Output => " + outputBuffer) + + it("should return the mock data provided.") { + assert(outputBuffer.size === 2) + } + } + .run + .finish + + +} |