diff options
| author | Antonios Chalkiopoulos <Antwnis@gmail.com> | 2014-08-22 23:09:39 +0100 | 
|---|---|---|
| committer | Antonios Chalkiopoulos <Antwnis@gmail.com> | 2014-08-22 23:09:39 +0100 | 
| commit | c629b3b1ee85e0cc2b70659105cede6a0012e6f6 (patch) | |
| tree | a2b7301b4f785adcbc1af31d586f62054260abed | |
| parent | d539a5df90d92657dbe024f8c674ca91ed1dd542 (diff) | |
| download | SpyGlass-c629b3b1ee85e0cc2b70659105cede6a0012e6f6.tar.gz SpyGlass-c629b3b1ee85e0cc2b70659105cede6a0012e6f6.zip | |
Adding new tests
| -rw-r--r-- | src/test/scala/parallelai/spyglass/hbase/HBaseReadTest.scala | 195 | ||||
| -rw-r--r-- | src/test/scala/parallelai/spyglass/hbase/HBaseSalterTest.java (renamed from src/test/java/parallelai/spyglass/hbase/HBaseSalterTester.java) | 11 | ||||
| -rw-r--r-- | src/test/scala/parallelai/spyglass/hbase/SimpleHBaseSourceExampleTest.scala | 56 | ||||
| -rw-r--r-- | src/test/scala/parallelai/spyglass/jdbc/JdbcSourceExampleTest.scala | 53 | 
4 files changed, 305 insertions, 10 deletions
| diff --git a/src/test/scala/parallelai/spyglass/hbase/HBaseReadTest.scala b/src/test/scala/parallelai/spyglass/hbase/HBaseReadTest.scala new file mode 100644 index 0000000..f08efc7 --- /dev/null +++ b/src/test/scala/parallelai/spyglass/hbase/HBaseReadTest.scala @@ -0,0 +1,195 @@ +package parallelai.spyglass.hbase + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.hbase._ +import org.apache.hadoop.hbase.client.HBaseAdmin +import org.apache.hadoop.hbase.client.HTable +import org.apache.hadoop.hbase.client.Put +import org.apache.hadoop.hbase.client.Scan +import org.apache.hadoop.hbase.util.Bytes +import junit.framework.Assert +import org.junit.runner.RunWith +import org.scalatest.WordSpecLike +import org.scalatest.MustMatchers +import org.scalatest.junit.JUnitRunner +import org.slf4j.LoggerFactory + +/** + * Generates TWO tables in HBase 'TABLE_01' and 'TABLE_02', populate with some data + * and perform a number of tests + */ + +@RunWith(classOf[JUnitRunner]) +class HBaseReadTest extends MustMatchers with WordSpecLike { + +  val QUORUM = "localhost" +  val QUORUM_PORT = "2181" +  val STARTING_TIMESTAMP = 1260000000000L + +  val log = LoggerFactory.getLogger(this.getClass.getName) + +  var config:Configuration = HBaseConfiguration.create +  config.set("hbase.zookeeper.quorum", QUORUM) +  config.set("hbase.zookeeper.property.clientPort", QUORUM_PORT) + +  log.info("Connecting to Zookeeper {}:{}", QUORUM, QUORUM_PORT) + +  "An HBase integration test" must { + +    "generate 2 testing HBase tables" in { + +      val testingTables = List("TABLE_01", "TABLE_02") + +      try { +        testingTables.foreach(deleteTestTable(_,config)) +        testingTables.foreach(createTestTable(_,config)) +        testingTables.foreach(populateTestTable(_,config)) +        testingTables.foreach(printHTable(_,config)) + +        // If we've reached here - the testing data are in +        Assert.assertEquals("true", "true") +      } catch { +        case e: Exception => +          log.error("EXCEPTION ===> {}", e.toString()) +      } + +    } + +    "perform a SCAN_ALL in an HBase table" in { + +    } + +    "perform a SCAN_RANGE in an HBase table" in { + +    } + +    "perform a GET_LIST in an HBase table" in { + +    } + + +  } + + +  /** +   * Method to disable and delete HBase Tables i.e. "int-test-01" +   */ +  private def deleteTestTable(tableName: String, config: Configuration ) = { + +    val hbaseAdmin = new HBaseAdmin(config) +    if (hbaseAdmin.tableExists(tableName)) { +      log.info("Table: " + tableName + " exists.") +      hbaseAdmin.disableTable(tableName) +      hbaseAdmin.deleteTable(tableName) +      log.info("Table: " + tableName + " disabled and deleted.") +    } else { +      log.info("Table: " + tableName + " does not exist.") +    } +    hbaseAdmin.close() + +  } + +  def createTestTable(tableName: String, config: Configuration) = { + +    val hbase = new HBaseAdmin(config) +    // Get and set the name of the new table +    val newTable = new HTableDescriptor(tableName) + +    val meta = new HColumnDescriptor("data") +        .setMaxVersions(3) +        .setInMemory(HColumnDescriptor.DEFAULT_IN_MEMORY) +        .setBlockCacheEnabled(HColumnDescriptor.DEFAULT_BLOCKCACHE) +        .setTimeToLive(HColumnDescriptor.DEFAULT_TTL) + +      newTable.addFamily(meta) + +    try { +      log.info("Creating table " + tableName) +      hbase.createTable(newTable) +    } catch { +      case et: TableExistsException => +        log.error("TableExistsException for table: {} ",tableName) +        log.debug(et.toString()) +      case e: Exception => +        log.error("IOException: " + e.toString) +    } + +    hbase.close +  } + +  private def populateTestTable(testingTable: String, config: Configuration) = { + +    // Load up HBase table +    val table = new HTable(config, testingTable) + +    log.info("Populating table: " + testingTable) + +    // Table_01 +    if (testingTable == "TABLE_01") { +      val put1 = new Put("2000-01-01 10:00:10".getBytes()).add("data".getBytes(), "column1".getBytes(), STARTING_TIMESTAMP, "1".getBytes()) +      val put2 = new Put("2000-01-01 10:05:00".getBytes()).add("data".getBytes(), "column1".getBytes(), STARTING_TIMESTAMP, "2".getBytes()) +      val put3 = new Put("2000-01-01 10:10:00".getBytes()).add("data".getBytes(), "column1".getBytes(), STARTING_TIMESTAMP, "3".getBytes()) +      table.put(put1) +      table.put(put2) +      table.put(put3) +    } else +    // Table_02 +    if (testingTable == "TABLE_02") { + +      // 3 versions at 10 o'clock +      val k1 = "2000-01-01 10:00:00".getBytes() +      val put1 = new Put(k1).add("data".getBytes(), "column1".getBytes(), STARTING_TIMESTAMP, "1".getBytes()) +      val put2 = new Put(k1).add("data".getBytes(), "column1".getBytes(), STARTING_TIMESTAMP + 1000L, "2".getBytes()) +      val put3 = new Put(k1).add("data".getBytes(), "column1".getBytes(), STARTING_TIMESTAMP + 2000L, "3".getBytes()) + +      // 3 versions at 11 o'clock +      val k2 = "2000-01-01 11:00:00".getBytes() +      val put4 = new Put(k2).add("data".getBytes(), "column1".getBytes(), STARTING_TIMESTAMP, "4".getBytes()) +      val put5 = new Put(k2).add("data".getBytes(), "column1".getBytes(), STARTING_TIMESTAMP + 1000L, "5".getBytes()) +      val put6 = new Put(k2).add("data".getBytes(), "column1".getBytes(), STARTING_TIMESTAMP + 2000L, "6".getBytes()) + +      import scala.collection.JavaConverters._ +      table.put(List(put1, put2, put3, put4, put5, put6).asJava) +    } + +    table.close +  } + +  /** +   * Method to print-out an HTable +   */ +  private def printHTable(testingTable: String, config: Configuration) = { + +    val table = new HTable(config, testingTable) +    val scanner = table.getScanner(new Scan) + +    log.info("Printing HTable: " + Bytes.toString(table.getTableName)) + +    try { +      // Iterate results +      //			for (Result rr = scanner.next() rr != null rr = scanner.next()) { +      while (scanner.iterator().hasNext) { +        val rr = scanner.iterator().next +        val key = Bytes.toString(rr.getRow()) +        var iter = rr.list().iterator() + +        var header = "Key:\t" +        var data = key + "\t" + +        while (iter.hasNext()) { +          val kv = iter.next() +          header += Bytes.toString(CellUtil.cloneFamily(kv)) + ":" + Bytes.toString(CellUtil.cloneQualifier(kv)) + "\t" +          data += Bytes.toString(CellUtil.cloneValue(kv)) + "\t" +        } + +        log.info(header) +        log.info(data) +      } +    } finally { +      // Close scanners when done +      scanner.close +      table.close +    } +  } + +}
\ No newline at end of file diff --git a/src/test/java/parallelai/spyglass/hbase/HBaseSalterTester.java b/src/test/scala/parallelai/spyglass/hbase/HBaseSalterTest.java index 1f4c8e7..3060a45 100644 --- a/src/test/java/parallelai/spyglass/hbase/HBaseSalterTester.java +++ b/src/test/scala/parallelai/spyglass/hbase/HBaseSalterTest.java @@ -10,10 +10,7 @@ import org.apache.hadoop.hbase.util.Bytes;  import org.apache.hadoop.hbase.util.Pair;  import org.junit.Test; -import parallelai.spyglass.hbase.HBaseSalter; - - -public class HBaseSalterTester { +public class HBaseSalterTest {  	@Test  	public void addSaltPrefix() throws IOException { @@ -484,12 +481,6 @@ public class HBaseSalterTester {  		assertEquals(expectedPairs.length, actualPairs.length);  		for( int i = 0; i < expectedPairs.length; i++ ) { -//			System.out.println("".format("FIRST: EXPECTED: (%s) ACTUAL: (%s)",  -//					Bytes.toString(expectedPairs[i].getFirst()), Bytes.toString(actualPairs[i].getFirst()) )); -// -//			System.out.println("".format("SECOND: EXPECTED: (%s) ACTUAL: (%s)",  -//					Bytes.toString(expectedPairs[i].getSecond()), Bytes.toString(actualPairs[i].getSecond()) )); -  			assertArrayEquals(expectedPairs[i].getFirst(), actualPairs[i].getFirst());  			assertArrayEquals(expectedPairs[i].getSecond(), actualPairs[i].getSecond());  		} diff --git a/src/test/scala/parallelai/spyglass/hbase/SimpleHBaseSourceExampleTest.scala b/src/test/scala/parallelai/spyglass/hbase/SimpleHBaseSourceExampleTest.scala new file mode 100644 index 0000000..c3a73a5 --- /dev/null +++ b/src/test/scala/parallelai/spyglass/hbase/SimpleHBaseSourceExampleTest.scala @@ -0,0 +1,56 @@ +package parallelai.spyglass.hbase + +import org.junit.runner.RunWith +import com.twitter.scalding.JobTest +import org.scalatest.FunSpec +import org.scalatest.junit.JUnitRunner +import org.slf4j.LoggerFactory +import org.apache.hadoop.hbase.io.ImmutableBytesWritable +import cascading.tuple.{Tuple, Fields} +import org.apache.hadoop.hbase.util.Bytes +import com.twitter.scalding.Tsv +import parallelai.spyglass.hbase.HBaseConstants.SourceMode + +/** + * Example of how to define tests for HBaseSource + */ +@RunWith(classOf[JUnitRunner]) +class SimpleHBaseSourceExampleTest extends FunSpec { + +  val output = "/tmp/testOutput" + +  val log = LoggerFactory.getLogger(this.getClass.getName) + +  val sampleData = List( +    List("1", "kk1", "pp1"), +    List("2", "kk2", "pp2"), +    List("3", "kk3", "pp3") +  ) + +  JobTest("parallelai.spyglass.hbase.example.SimpleHBaseSourceExample") +    .arg("test", "") +    .arg("app.conf.path", "app.conf") +    .arg("output", output) +    .arg("debug", "true") +    .source[Tuple]( +    new HBaseSource( +      "table_name", +      "quorum_name:2181", +      new Fields("key"), +      List("column_family"), +      List(new Fields("column_name1", "column_name2")), +      sourceMode = SourceMode.GET_LIST, keyList = List("1", "2", "3")), +    sampleData.map(l => new Tuple(l.map(s => {new ImmutableBytesWritable(Bytes.toBytes(s))}):_*))) +    .sink[Tuple](Tsv(output format "get_list")) { +      outputBuffer => +        log.debug("Output => " + outputBuffer) + +        it("should return the test data provided.") { +          println("outputBuffer.size => " + outputBuffer.size) +          assert(outputBuffer.size === 3) +        } +    } +    .run +    .finish + +} diff --git a/src/test/scala/parallelai/spyglass/jdbc/JdbcSourceExampleTest.scala b/src/test/scala/parallelai/spyglass/jdbc/JdbcSourceExampleTest.scala new file mode 100644 index 0000000..845e763 --- /dev/null +++ b/src/test/scala/parallelai/spyglass/jdbc/JdbcSourceExampleTest.scala @@ -0,0 +1,53 @@ +package parallelai.spyglass.jdbc + +import org.scalatest.junit.JUnitRunner +import org.junit.runner.RunWith +import org.scalatest.FunSpec +import com.twitter.scalding.{Tsv, JobTest} +import org.slf4j.LoggerFactory +import cascading.tuple.{Tuple, Fields} + +/** + * Simple example of JDBCSource testing + */ +@RunWith(classOf[JUnitRunner]) +class JdbcSourceExampleTest extends FunSpec { + +  val output = "src/test/resources/outputs/testOutput" + +  val log = LoggerFactory.getLogger(this.getClass.getName) + +  val sampleData = List( +    (1, "c11", "c12", 11), +    (2, "c21", "c22", 22) +  ) + +  JobTest("parallelai.spyglass.jdbc.example.JdbcSourceExample") +    .arg("test", "") +    .arg("app.conf.path", "app.conf") +    .arg("output", output) +    .arg("debug", "true") +    .source( +    new JDBCSource( +      "db_name", +      "com.mysql.jdbc.Driver", +      "jdbc:mysql://<hostname>:<port>/<db_name>?zeroDateTimeBehavior=convertToNull", +      "user", +      "password", +      List("KEY_ID", "COL1", "COL2", "COL3"), +      List("bigint(20)", "varchar(45)", "varchar(45)", "bigint(20)"), +      List("key_id"), +      new Fields("key_id", "col1", "col2", "col3")), sampleData) +    .sink[Tuple](Tsv(output.format("get_list"))) { +      outputBuffer => +        log.debug("Output => " + outputBuffer) + +        it("should return the mock data provided.") { +          assert(outputBuffer.size === 2) +        } +    } +    .run +    .finish + + +} | 
