aboutsummaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/test/scala/parallelai/spyglass/hbase/HBaseReadTest.scala195
-rw-r--r--src/test/scala/parallelai/spyglass/hbase/HBaseSalterTest.java (renamed from src/test/java/parallelai/spyglass/hbase/HBaseSalterTester.java)11
-rw-r--r--src/test/scala/parallelai/spyglass/hbase/SimpleHBaseSourceExampleTest.scala56
-rw-r--r--src/test/scala/parallelai/spyglass/jdbc/JdbcSourceExampleTest.scala53
4 files changed, 305 insertions, 10 deletions
diff --git a/src/test/scala/parallelai/spyglass/hbase/HBaseReadTest.scala b/src/test/scala/parallelai/spyglass/hbase/HBaseReadTest.scala
new file mode 100644
index 0000000..f08efc7
--- /dev/null
+++ b/src/test/scala/parallelai/spyglass/hbase/HBaseReadTest.scala
@@ -0,0 +1,195 @@
+package parallelai.spyglass.hbase
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.hbase._
+import org.apache.hadoop.hbase.client.HBaseAdmin
+import org.apache.hadoop.hbase.client.HTable
+import org.apache.hadoop.hbase.client.Put
+import org.apache.hadoop.hbase.client.Scan
+import org.apache.hadoop.hbase.util.Bytes
+import junit.framework.Assert
+import org.junit.runner.RunWith
+import org.scalatest.WordSpecLike
+import org.scalatest.MustMatchers
+import org.scalatest.junit.JUnitRunner
+import org.slf4j.LoggerFactory
+
+/**
+ * Generates TWO tables in HBase 'TABLE_01' and 'TABLE_02', populate with some data
+ * and perform a number of tests
+ */
+
+@RunWith(classOf[JUnitRunner])
+class HBaseReadTest extends MustMatchers with WordSpecLike {
+
+ val QUORUM = "localhost"
+ val QUORUM_PORT = "2181"
+ val STARTING_TIMESTAMP = 1260000000000L
+
+ val log = LoggerFactory.getLogger(this.getClass.getName)
+
+ var config:Configuration = HBaseConfiguration.create
+ config.set("hbase.zookeeper.quorum", QUORUM)
+ config.set("hbase.zookeeper.property.clientPort", QUORUM_PORT)
+
+ log.info("Connecting to Zookeeper {}:{}", QUORUM, QUORUM_PORT)
+
+ "An HBase integration test" must {
+
+ "generate 2 testing HBase tables" in {
+
+ val testingTables = List("TABLE_01", "TABLE_02")
+
+ try {
+ testingTables.foreach(deleteTestTable(_,config))
+ testingTables.foreach(createTestTable(_,config))
+ testingTables.foreach(populateTestTable(_,config))
+ testingTables.foreach(printHTable(_,config))
+
+ // If we've reached here - the testing data are in
+ Assert.assertEquals("true", "true")
+ } catch {
+ case e: Exception =>
+ log.error("EXCEPTION ===> {}", e.toString())
+ }
+
+ }
+
+ "perform a SCAN_ALL in an HBase table" in {
+
+ }
+
+ "perform a SCAN_RANGE in an HBase table" in {
+
+ }
+
+ "perform a GET_LIST in an HBase table" in {
+
+ }
+
+
+ }
+
+
+ /**
+ * Method to disable and delete HBase Tables i.e. "int-test-01"
+ */
+ private def deleteTestTable(tableName: String, config: Configuration ) = {
+
+ val hbaseAdmin = new HBaseAdmin(config)
+ if (hbaseAdmin.tableExists(tableName)) {
+ log.info("Table: " + tableName + " exists.")
+ hbaseAdmin.disableTable(tableName)
+ hbaseAdmin.deleteTable(tableName)
+ log.info("Table: " + tableName + " disabled and deleted.")
+ } else {
+ log.info("Table: " + tableName + " does not exist.")
+ }
+ hbaseAdmin.close()
+
+ }
+
+ def createTestTable(tableName: String, config: Configuration) = {
+
+ val hbase = new HBaseAdmin(config)
+ // Get and set the name of the new table
+ val newTable = new HTableDescriptor(tableName)
+
+ val meta = new HColumnDescriptor("data")
+ .setMaxVersions(3)
+ .setInMemory(HColumnDescriptor.DEFAULT_IN_MEMORY)
+ .setBlockCacheEnabled(HColumnDescriptor.DEFAULT_BLOCKCACHE)
+ .setTimeToLive(HColumnDescriptor.DEFAULT_TTL)
+
+ newTable.addFamily(meta)
+
+ try {
+ log.info("Creating table " + tableName)
+ hbase.createTable(newTable)
+ } catch {
+ case et: TableExistsException =>
+ log.error("TableExistsException for table: {} ",tableName)
+ log.debug(et.toString())
+ case e: Exception =>
+ log.error("IOException: " + e.toString)
+ }
+
+ hbase.close
+ }
+
+ private def populateTestTable(testingTable: String, config: Configuration) = {
+
+ // Load up HBase table
+ val table = new HTable(config, testingTable)
+
+ log.info("Populating table: " + testingTable)
+
+ // Table_01
+ if (testingTable == "TABLE_01") {
+ val put1 = new Put("2000-01-01 10:00:10".getBytes()).add("data".getBytes(), "column1".getBytes(), STARTING_TIMESTAMP, "1".getBytes())
+ val put2 = new Put("2000-01-01 10:05:00".getBytes()).add("data".getBytes(), "column1".getBytes(), STARTING_TIMESTAMP, "2".getBytes())
+ val put3 = new Put("2000-01-01 10:10:00".getBytes()).add("data".getBytes(), "column1".getBytes(), STARTING_TIMESTAMP, "3".getBytes())
+ table.put(put1)
+ table.put(put2)
+ table.put(put3)
+ } else
+ // Table_02
+ if (testingTable == "TABLE_02") {
+
+ // 3 versions at 10 o'clock
+ val k1 = "2000-01-01 10:00:00".getBytes()
+ val put1 = new Put(k1).add("data".getBytes(), "column1".getBytes(), STARTING_TIMESTAMP, "1".getBytes())
+ val put2 = new Put(k1).add("data".getBytes(), "column1".getBytes(), STARTING_TIMESTAMP + 1000L, "2".getBytes())
+ val put3 = new Put(k1).add("data".getBytes(), "column1".getBytes(), STARTING_TIMESTAMP + 2000L, "3".getBytes())
+
+ // 3 versions at 11 o'clock
+ val k2 = "2000-01-01 11:00:00".getBytes()
+ val put4 = new Put(k2).add("data".getBytes(), "column1".getBytes(), STARTING_TIMESTAMP, "4".getBytes())
+ val put5 = new Put(k2).add("data".getBytes(), "column1".getBytes(), STARTING_TIMESTAMP + 1000L, "5".getBytes())
+ val put6 = new Put(k2).add("data".getBytes(), "column1".getBytes(), STARTING_TIMESTAMP + 2000L, "6".getBytes())
+
+ import scala.collection.JavaConverters._
+ table.put(List(put1, put2, put3, put4, put5, put6).asJava)
+ }
+
+ table.close
+ }
+
+ /**
+ * Method to print-out an HTable
+ */
+ private def printHTable(testingTable: String, config: Configuration) = {
+
+ val table = new HTable(config, testingTable)
+ val scanner = table.getScanner(new Scan)
+
+ log.info("Printing HTable: " + Bytes.toString(table.getTableName))
+
+ try {
+ // Iterate results
+ // for (Result rr = scanner.next() rr != null rr = scanner.next()) {
+ while (scanner.iterator().hasNext) {
+ val rr = scanner.iterator().next
+ val key = Bytes.toString(rr.getRow())
+ var iter = rr.list().iterator()
+
+ var header = "Key:\t"
+ var data = key + "\t"
+
+ while (iter.hasNext()) {
+ val kv = iter.next()
+ header += Bytes.toString(CellUtil.cloneFamily(kv)) + ":" + Bytes.toString(CellUtil.cloneQualifier(kv)) + "\t"
+ data += Bytes.toString(CellUtil.cloneValue(kv)) + "\t"
+ }
+
+ log.info(header)
+ log.info(data)
+ }
+ } finally {
+ // Close scanners when done
+ scanner.close
+ table.close
+ }
+ }
+
+} \ No newline at end of file
diff --git a/src/test/java/parallelai/spyglass/hbase/HBaseSalterTester.java b/src/test/scala/parallelai/spyglass/hbase/HBaseSalterTest.java
index 1f4c8e7..3060a45 100644
--- a/src/test/java/parallelai/spyglass/hbase/HBaseSalterTester.java
+++ b/src/test/scala/parallelai/spyglass/hbase/HBaseSalterTest.java
@@ -10,10 +10,7 @@ import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import org.junit.Test;
-import parallelai.spyglass.hbase.HBaseSalter;
-
-
-public class HBaseSalterTester {
+public class HBaseSalterTest {
@Test
public void addSaltPrefix() throws IOException {
@@ -484,12 +481,6 @@ public class HBaseSalterTester {
assertEquals(expectedPairs.length, actualPairs.length);
for( int i = 0; i < expectedPairs.length; i++ ) {
-// System.out.println("".format("FIRST: EXPECTED: (%s) ACTUAL: (%s)",
-// Bytes.toString(expectedPairs[i].getFirst()), Bytes.toString(actualPairs[i].getFirst()) ));
-//
-// System.out.println("".format("SECOND: EXPECTED: (%s) ACTUAL: (%s)",
-// Bytes.toString(expectedPairs[i].getSecond()), Bytes.toString(actualPairs[i].getSecond()) ));
-
assertArrayEquals(expectedPairs[i].getFirst(), actualPairs[i].getFirst());
assertArrayEquals(expectedPairs[i].getSecond(), actualPairs[i].getSecond());
}
diff --git a/src/test/scala/parallelai/spyglass/hbase/SimpleHBaseSourceExampleTest.scala b/src/test/scala/parallelai/spyglass/hbase/SimpleHBaseSourceExampleTest.scala
new file mode 100644
index 0000000..c3a73a5
--- /dev/null
+++ b/src/test/scala/parallelai/spyglass/hbase/SimpleHBaseSourceExampleTest.scala
@@ -0,0 +1,56 @@
+package parallelai.spyglass.hbase
+
+import org.junit.runner.RunWith
+import com.twitter.scalding.JobTest
+import org.scalatest.FunSpec
+import org.scalatest.junit.JUnitRunner
+import org.slf4j.LoggerFactory
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable
+import cascading.tuple.{Tuple, Fields}
+import org.apache.hadoop.hbase.util.Bytes
+import com.twitter.scalding.Tsv
+import parallelai.spyglass.hbase.HBaseConstants.SourceMode
+
+/**
+ * Example of how to define tests for HBaseSource
+ */
+@RunWith(classOf[JUnitRunner])
+class SimpleHBaseSourceExampleTest extends FunSpec {
+
+ val output = "/tmp/testOutput"
+
+ val log = LoggerFactory.getLogger(this.getClass.getName)
+
+ val sampleData = List(
+ List("1", "kk1", "pp1"),
+ List("2", "kk2", "pp2"),
+ List("3", "kk3", "pp3")
+ )
+
+ JobTest("parallelai.spyglass.hbase.example.SimpleHBaseSourceExample")
+ .arg("test", "")
+ .arg("app.conf.path", "app.conf")
+ .arg("output", output)
+ .arg("debug", "true")
+ .source[Tuple](
+ new HBaseSource(
+ "table_name",
+ "quorum_name:2181",
+ new Fields("key"),
+ List("column_family"),
+ List(new Fields("column_name1", "column_name2")),
+ sourceMode = SourceMode.GET_LIST, keyList = List("1", "2", "3")),
+ sampleData.map(l => new Tuple(l.map(s => {new ImmutableBytesWritable(Bytes.toBytes(s))}):_*)))
+ .sink[Tuple](Tsv(output format "get_list")) {
+ outputBuffer =>
+ log.debug("Output => " + outputBuffer)
+
+ it("should return the test data provided.") {
+ println("outputBuffer.size => " + outputBuffer.size)
+ assert(outputBuffer.size === 3)
+ }
+ }
+ .run
+ .finish
+
+}
diff --git a/src/test/scala/parallelai/spyglass/jdbc/JdbcSourceExampleTest.scala b/src/test/scala/parallelai/spyglass/jdbc/JdbcSourceExampleTest.scala
new file mode 100644
index 0000000..845e763
--- /dev/null
+++ b/src/test/scala/parallelai/spyglass/jdbc/JdbcSourceExampleTest.scala
@@ -0,0 +1,53 @@
+package parallelai.spyglass.jdbc
+
+import org.scalatest.junit.JUnitRunner
+import org.junit.runner.RunWith
+import org.scalatest.FunSpec
+import com.twitter.scalding.{Tsv, JobTest}
+import org.slf4j.LoggerFactory
+import cascading.tuple.{Tuple, Fields}
+
+/**
+ * Simple example of JDBCSource testing
+ */
+@RunWith(classOf[JUnitRunner])
+class JdbcSourceExampleTest extends FunSpec {
+
+ val output = "src/test/resources/outputs/testOutput"
+
+ val log = LoggerFactory.getLogger(this.getClass.getName)
+
+ val sampleData = List(
+ (1, "c11", "c12", 11),
+ (2, "c21", "c22", 22)
+ )
+
+ JobTest("parallelai.spyglass.jdbc.example.JdbcSourceExample")
+ .arg("test", "")
+ .arg("app.conf.path", "app.conf")
+ .arg("output", output)
+ .arg("debug", "true")
+ .source(
+ new JDBCSource(
+ "db_name",
+ "com.mysql.jdbc.Driver",
+ "jdbc:mysql://<hostname>:<port>/<db_name>?zeroDateTimeBehavior=convertToNull",
+ "user",
+ "password",
+ List("KEY_ID", "COL1", "COL2", "COL3"),
+ List("bigint(20)", "varchar(45)", "varchar(45)", "bigint(20)"),
+ List("key_id"),
+ new Fields("key_id", "col1", "col2", "col3")), sampleData)
+ .sink[Tuple](Tsv(output.format("get_list"))) {
+ outputBuffer =>
+ log.debug("Output => " + outputBuffer)
+
+ it("should return the mock data provided.") {
+ assert(outputBuffer.size === 2)
+ }
+ }
+ .run
+ .finish
+
+
+}