diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/test/scala/parallelai/spyglass/hbase/HBaseReadTest.scala | 195 | ||||
-rw-r--r-- | src/test/scala/parallelai/spyglass/hbase/HBaseSalterTest.java (renamed from src/test/java/parallelai/spyglass/hbase/HBaseSalterTester.java) | 11 | ||||
-rw-r--r-- | src/test/scala/parallelai/spyglass/hbase/SimpleHBaseSourceExampleTest.scala | 56 | ||||
-rw-r--r-- | src/test/scala/parallelai/spyglass/jdbc/JdbcSourceExampleTest.scala | 53 |
4 files changed, 305 insertions, 10 deletions
diff --git a/src/test/scala/parallelai/spyglass/hbase/HBaseReadTest.scala b/src/test/scala/parallelai/spyglass/hbase/HBaseReadTest.scala new file mode 100644 index 0000000..f08efc7 --- /dev/null +++ b/src/test/scala/parallelai/spyglass/hbase/HBaseReadTest.scala @@ -0,0 +1,195 @@ +package parallelai.spyglass.hbase + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.hbase._ +import org.apache.hadoop.hbase.client.HBaseAdmin +import org.apache.hadoop.hbase.client.HTable +import org.apache.hadoop.hbase.client.Put +import org.apache.hadoop.hbase.client.Scan +import org.apache.hadoop.hbase.util.Bytes +import junit.framework.Assert +import org.junit.runner.RunWith +import org.scalatest.WordSpecLike +import org.scalatest.MustMatchers +import org.scalatest.junit.JUnitRunner +import org.slf4j.LoggerFactory + +/** + * Generates TWO tables in HBase 'TABLE_01' and 'TABLE_02', populate with some data + * and perform a number of tests + */ + +@RunWith(classOf[JUnitRunner]) +class HBaseReadTest extends MustMatchers with WordSpecLike { + + val QUORUM = "localhost" + val QUORUM_PORT = "2181" + val STARTING_TIMESTAMP = 1260000000000L + + val log = LoggerFactory.getLogger(this.getClass.getName) + + var config:Configuration = HBaseConfiguration.create + config.set("hbase.zookeeper.quorum", QUORUM) + config.set("hbase.zookeeper.property.clientPort", QUORUM_PORT) + + log.info("Connecting to Zookeeper {}:{}", QUORUM, QUORUM_PORT) + + "An HBase integration test" must { + + "generate 2 testing HBase tables" in { + + val testingTables = List("TABLE_01", "TABLE_02") + + try { + testingTables.foreach(deleteTestTable(_,config)) + testingTables.foreach(createTestTable(_,config)) + testingTables.foreach(populateTestTable(_,config)) + testingTables.foreach(printHTable(_,config)) + + // If we've reached here - the testing data are in + Assert.assertEquals("true", "true") + } catch { + case e: Exception => + log.error("EXCEPTION ===> {}", e.toString()) + } + + } + + "perform a SCAN_ALL in an HBase table" in { + + } + + "perform a SCAN_RANGE in an HBase table" in { + + } + + "perform a GET_LIST in an HBase table" in { + + } + + + } + + + /** + * Method to disable and delete HBase Tables i.e. "int-test-01" + */ + private def deleteTestTable(tableName: String, config: Configuration ) = { + + val hbaseAdmin = new HBaseAdmin(config) + if (hbaseAdmin.tableExists(tableName)) { + log.info("Table: " + tableName + " exists.") + hbaseAdmin.disableTable(tableName) + hbaseAdmin.deleteTable(tableName) + log.info("Table: " + tableName + " disabled and deleted.") + } else { + log.info("Table: " + tableName + " does not exist.") + } + hbaseAdmin.close() + + } + + def createTestTable(tableName: String, config: Configuration) = { + + val hbase = new HBaseAdmin(config) + // Get and set the name of the new table + val newTable = new HTableDescriptor(tableName) + + val meta = new HColumnDescriptor("data") + .setMaxVersions(3) + .setInMemory(HColumnDescriptor.DEFAULT_IN_MEMORY) + .setBlockCacheEnabled(HColumnDescriptor.DEFAULT_BLOCKCACHE) + .setTimeToLive(HColumnDescriptor.DEFAULT_TTL) + + newTable.addFamily(meta) + + try { + log.info("Creating table " + tableName) + hbase.createTable(newTable) + } catch { + case et: TableExistsException => + log.error("TableExistsException for table: {} ",tableName) + log.debug(et.toString()) + case e: Exception => + log.error("IOException: " + e.toString) + } + + hbase.close + } + + private def populateTestTable(testingTable: String, config: Configuration) = { + + // Load up HBase table + val table = new HTable(config, testingTable) + + log.info("Populating table: " + testingTable) + + // Table_01 + if (testingTable == "TABLE_01") { + val put1 = new Put("2000-01-01 10:00:10".getBytes()).add("data".getBytes(), "column1".getBytes(), STARTING_TIMESTAMP, "1".getBytes()) + val put2 = new Put("2000-01-01 10:05:00".getBytes()).add("data".getBytes(), "column1".getBytes(), STARTING_TIMESTAMP, "2".getBytes()) + val put3 = new Put("2000-01-01 10:10:00".getBytes()).add("data".getBytes(), "column1".getBytes(), STARTING_TIMESTAMP, "3".getBytes()) + table.put(put1) + table.put(put2) + table.put(put3) + } else + // Table_02 + if (testingTable == "TABLE_02") { + + // 3 versions at 10 o'clock + val k1 = "2000-01-01 10:00:00".getBytes() + val put1 = new Put(k1).add("data".getBytes(), "column1".getBytes(), STARTING_TIMESTAMP, "1".getBytes()) + val put2 = new Put(k1).add("data".getBytes(), "column1".getBytes(), STARTING_TIMESTAMP + 1000L, "2".getBytes()) + val put3 = new Put(k1).add("data".getBytes(), "column1".getBytes(), STARTING_TIMESTAMP + 2000L, "3".getBytes()) + + // 3 versions at 11 o'clock + val k2 = "2000-01-01 11:00:00".getBytes() + val put4 = new Put(k2).add("data".getBytes(), "column1".getBytes(), STARTING_TIMESTAMP, "4".getBytes()) + val put5 = new Put(k2).add("data".getBytes(), "column1".getBytes(), STARTING_TIMESTAMP + 1000L, "5".getBytes()) + val put6 = new Put(k2).add("data".getBytes(), "column1".getBytes(), STARTING_TIMESTAMP + 2000L, "6".getBytes()) + + import scala.collection.JavaConverters._ + table.put(List(put1, put2, put3, put4, put5, put6).asJava) + } + + table.close + } + + /** + * Method to print-out an HTable + */ + private def printHTable(testingTable: String, config: Configuration) = { + + val table = new HTable(config, testingTable) + val scanner = table.getScanner(new Scan) + + log.info("Printing HTable: " + Bytes.toString(table.getTableName)) + + try { + // Iterate results + // for (Result rr = scanner.next() rr != null rr = scanner.next()) { + while (scanner.iterator().hasNext) { + val rr = scanner.iterator().next + val key = Bytes.toString(rr.getRow()) + var iter = rr.list().iterator() + + var header = "Key:\t" + var data = key + "\t" + + while (iter.hasNext()) { + val kv = iter.next() + header += Bytes.toString(CellUtil.cloneFamily(kv)) + ":" + Bytes.toString(CellUtil.cloneQualifier(kv)) + "\t" + data += Bytes.toString(CellUtil.cloneValue(kv)) + "\t" + } + + log.info(header) + log.info(data) + } + } finally { + // Close scanners when done + scanner.close + table.close + } + } + +}
\ No newline at end of file diff --git a/src/test/java/parallelai/spyglass/hbase/HBaseSalterTester.java b/src/test/scala/parallelai/spyglass/hbase/HBaseSalterTest.java index 1f4c8e7..3060a45 100644 --- a/src/test/java/parallelai/spyglass/hbase/HBaseSalterTester.java +++ b/src/test/scala/parallelai/spyglass/hbase/HBaseSalterTest.java @@ -10,10 +10,7 @@ import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; import org.junit.Test; -import parallelai.spyglass.hbase.HBaseSalter; - - -public class HBaseSalterTester { +public class HBaseSalterTest { @Test public void addSaltPrefix() throws IOException { @@ -484,12 +481,6 @@ public class HBaseSalterTester { assertEquals(expectedPairs.length, actualPairs.length); for( int i = 0; i < expectedPairs.length; i++ ) { -// System.out.println("".format("FIRST: EXPECTED: (%s) ACTUAL: (%s)", -// Bytes.toString(expectedPairs[i].getFirst()), Bytes.toString(actualPairs[i].getFirst()) )); -// -// System.out.println("".format("SECOND: EXPECTED: (%s) ACTUAL: (%s)", -// Bytes.toString(expectedPairs[i].getSecond()), Bytes.toString(actualPairs[i].getSecond()) )); - assertArrayEquals(expectedPairs[i].getFirst(), actualPairs[i].getFirst()); assertArrayEquals(expectedPairs[i].getSecond(), actualPairs[i].getSecond()); } diff --git a/src/test/scala/parallelai/spyglass/hbase/SimpleHBaseSourceExampleTest.scala b/src/test/scala/parallelai/spyglass/hbase/SimpleHBaseSourceExampleTest.scala new file mode 100644 index 0000000..c3a73a5 --- /dev/null +++ b/src/test/scala/parallelai/spyglass/hbase/SimpleHBaseSourceExampleTest.scala @@ -0,0 +1,56 @@ +package parallelai.spyglass.hbase + +import org.junit.runner.RunWith +import com.twitter.scalding.JobTest +import org.scalatest.FunSpec +import org.scalatest.junit.JUnitRunner +import org.slf4j.LoggerFactory +import org.apache.hadoop.hbase.io.ImmutableBytesWritable +import cascading.tuple.{Tuple, Fields} +import org.apache.hadoop.hbase.util.Bytes +import com.twitter.scalding.Tsv +import parallelai.spyglass.hbase.HBaseConstants.SourceMode + +/** + * Example of how to define tests for HBaseSource + */ +@RunWith(classOf[JUnitRunner]) +class SimpleHBaseSourceExampleTest extends FunSpec { + + val output = "/tmp/testOutput" + + val log = LoggerFactory.getLogger(this.getClass.getName) + + val sampleData = List( + List("1", "kk1", "pp1"), + List("2", "kk2", "pp2"), + List("3", "kk3", "pp3") + ) + + JobTest("parallelai.spyglass.hbase.example.SimpleHBaseSourceExample") + .arg("test", "") + .arg("app.conf.path", "app.conf") + .arg("output", output) + .arg("debug", "true") + .source[Tuple]( + new HBaseSource( + "table_name", + "quorum_name:2181", + new Fields("key"), + List("column_family"), + List(new Fields("column_name1", "column_name2")), + sourceMode = SourceMode.GET_LIST, keyList = List("1", "2", "3")), + sampleData.map(l => new Tuple(l.map(s => {new ImmutableBytesWritable(Bytes.toBytes(s))}):_*))) + .sink[Tuple](Tsv(output format "get_list")) { + outputBuffer => + log.debug("Output => " + outputBuffer) + + it("should return the test data provided.") { + println("outputBuffer.size => " + outputBuffer.size) + assert(outputBuffer.size === 3) + } + } + .run + .finish + +} diff --git a/src/test/scala/parallelai/spyglass/jdbc/JdbcSourceExampleTest.scala b/src/test/scala/parallelai/spyglass/jdbc/JdbcSourceExampleTest.scala new file mode 100644 index 0000000..845e763 --- /dev/null +++ b/src/test/scala/parallelai/spyglass/jdbc/JdbcSourceExampleTest.scala @@ -0,0 +1,53 @@ +package parallelai.spyglass.jdbc + +import org.scalatest.junit.JUnitRunner +import org.junit.runner.RunWith +import org.scalatest.FunSpec +import com.twitter.scalding.{Tsv, JobTest} +import org.slf4j.LoggerFactory +import cascading.tuple.{Tuple, Fields} + +/** + * Simple example of JDBCSource testing + */ +@RunWith(classOf[JUnitRunner]) +class JdbcSourceExampleTest extends FunSpec { + + val output = "src/test/resources/outputs/testOutput" + + val log = LoggerFactory.getLogger(this.getClass.getName) + + val sampleData = List( + (1, "c11", "c12", 11), + (2, "c21", "c22", 22) + ) + + JobTest("parallelai.spyglass.jdbc.example.JdbcSourceExample") + .arg("test", "") + .arg("app.conf.path", "app.conf") + .arg("output", output) + .arg("debug", "true") + .source( + new JDBCSource( + "db_name", + "com.mysql.jdbc.Driver", + "jdbc:mysql://<hostname>:<port>/<db_name>?zeroDateTimeBehavior=convertToNull", + "user", + "password", + List("KEY_ID", "COL1", "COL2", "COL3"), + List("bigint(20)", "varchar(45)", "varchar(45)", "bigint(20)"), + List("key_id"), + new Fields("key_id", "col1", "col2", "col3")), sampleData) + .sink[Tuple](Tsv(output.format("get_list"))) { + outputBuffer => + log.debug("Output => " + outputBuffer) + + it("should return the mock data provided.") { + assert(outputBuffer.size === 2) + } + } + .run + .finish + + +} |