diff options
Diffstat (limited to 'src/test')
-rw-r--r-- | src/test/scala/parallelai/spyglass/hbase/example/HBaseReadTest.scala | 196 |
1 files changed, 196 insertions, 0 deletions
diff --git a/src/test/scala/parallelai/spyglass/hbase/example/HBaseReadTest.scala b/src/test/scala/parallelai/spyglass/hbase/example/HBaseReadTest.scala new file mode 100644 index 0000000..bbecf59 --- /dev/null +++ b/src/test/scala/parallelai/spyglass/hbase/example/HBaseReadTest.scala @@ -0,0 +1,196 @@ +package parallelai.spyglass.hbase.example + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.hbase._ +import org.apache.hadoop.hbase.client.HBaseAdmin +import org.apache.hadoop.hbase.client.HTable +import org.apache.hadoop.hbase.client.Put +import org.apache.hadoop.hbase.client.Scan +import org.apache.hadoop.hbase.util.Bytes +import junit.framework.Assert +import org.junit.runner.RunWith +import org.scalatest.WordSpecLike +import org.scalatest.MustMatchers +import org.scalatest.junit.JUnitRunner +import org.slf4j.LoggerFactory + +/** + * Generates TWO tables in HBase 'TABLE_01' and 'TABLE_02', populate with some data + * and perform a number of tests + * + * @author Antwnis@gmail.com + */ +@RunWith(classOf[JUnitRunner]) +class HBaseReadTest extends MustMatchers with WordSpecLike { + + val QUORUM = "localhost" + val QUORUM_PORT = "2181" + val STARTING_TIMESTAMP = 1260000000000L + + val log = LoggerFactory.getLogger(this.getClass.getName) + + var config:Configuration = HBaseConfiguration.create + config.set("hbase.zookeeper.quorum", QUORUM) + config.set("hbase.zookeeper.property.clientPort", QUORUM_PORT) + + log.info("Connecting to Zookeeper {}:{}", QUORUM, QUORUM_PORT) + + "An HBase integration test" must { + + "generate 2 testing HBase tables" in { + + val testingTables = List("TABLE_01", "TABLE_02") + + try { + testingTables.foreach(deleteTestTable(_,config)) + testingTables.foreach(createTestTable(_,config)) + testingTables.foreach(populateTestTable(_,config)) + testingTables.foreach(printHTable(_,config)) + + // If we've reached here - the testing data are in + Assert.assertEquals("true", "true") + } catch { + case e: Exception => + log.error("EXCEPTION ===> {}", e.toString()) + } + + } + + "perform a SCAN_ALL in an HBase table" in { + + } + + "perform a SCAN_RANGE in an HBase table" in { + + } + + "perform a GET_LIST in an HBase table" in { + + } + + + } + + + /** + * Method to disable and delete HBase Tables i.e. "int-test-01" + */ + private def deleteTestTable(tableName: String, config: Configuration ) = { + + val hbaseAdmin = new HBaseAdmin(config) + if (hbaseAdmin.tableExists(tableName)) { + log.info("Table: " + tableName + " exists.") + hbaseAdmin.disableTable(tableName) + hbaseAdmin.deleteTable(tableName) + log.info("Table: " + tableName + " disabled and deleted.") + } else { + log.info("Table: " + tableName + " does not exist.") + } + hbaseAdmin.close() + + } + + def createTestTable(tableName: String, config: Configuration) = { + + val hbase = new HBaseAdmin(config) + // Get and set the name of the new table + val newTable = new HTableDescriptor(tableName) + + val meta = new HColumnDescriptor("data") + .setMaxVersions(3) + .setInMemory(HColumnDescriptor.DEFAULT_IN_MEMORY) + .setBlockCacheEnabled(HColumnDescriptor.DEFAULT_BLOCKCACHE) + .setTimeToLive(HColumnDescriptor.DEFAULT_TTL) + + newTable.addFamily(meta) + + try { + log.info("Creating table " + tableName) + hbase.createTable(newTable) + } catch { + case et: TableExistsException => + log.error("TableExistsException for table: {} ",tableName) + log.debug(et.toString()) + case e: Exception => + log.error("IOException: " + e.toString) + } + + hbase.close + } + + private def populateTestTable(testingTable: String, config: Configuration) = { + + // Load up HBase table + val table = new HTable(config, testingTable) + + log.info("Populating table: " + testingTable) + + // Table_01 + if (testingTable == "TABLE_01") { + val put1 = new Put("2000-01-01 10:00:10".getBytes()).add("data".getBytes(), "column1".getBytes(), STARTING_TIMESTAMP, "1".getBytes()) + val put2 = new Put("2000-01-01 10:05:00".getBytes()).add("data".getBytes(), "column1".getBytes(), STARTING_TIMESTAMP, "2".getBytes()) + val put3 = new Put("2000-01-01 10:10:00".getBytes()).add("data".getBytes(), "column1".getBytes(), STARTING_TIMESTAMP, "3".getBytes()) + table.put(put1) + table.put(put2) + table.put(put3) + } else + // Table_02 + if (testingTable == "TABLE_02") { + + // 3 versions at 10 o'clock + val k1 = "2000-01-01 10:00:00".getBytes() + val put1 = new Put(k1).add("data".getBytes(), "column1".getBytes(), STARTING_TIMESTAMP, "1".getBytes()) + val put2 = new Put(k1).add("data".getBytes(), "column1".getBytes(), STARTING_TIMESTAMP + 1000L, "2".getBytes()) + val put3 = new Put(k1).add("data".getBytes(), "column1".getBytes(), STARTING_TIMESTAMP + 2000L, "3".getBytes()) + + // 3 versions at 11 o'clock + val k2 = "2000-01-01 11:00:00".getBytes() + val put4 = new Put(k2).add("data".getBytes(), "column1".getBytes(), STARTING_TIMESTAMP, "4".getBytes()) + val put5 = new Put(k2).add("data".getBytes(), "column1".getBytes(), STARTING_TIMESTAMP + 1000L, "5".getBytes()) + val put6 = new Put(k2).add("data".getBytes(), "column1".getBytes(), STARTING_TIMESTAMP + 2000L, "6".getBytes()) + + import scala.collection.JavaConverters._ + table.put(List(put1, put2, put3, put4, put5, put6).asJava) + } + + table.close + } + + /** + * Method to print-out an HTable + */ + private def printHTable(testingTable: String, config: Configuration) = { + + val table = new HTable(config, testingTable) + val scanner = table.getScanner(new Scan()) + + log.info("Printing HTable: " + Bytes.toString(table.getTableName())) + + try { + // Iterate results + // for (Result rr = scanner.next() rr != null rr = scanner.next()) { + while (scanner.iterator().hasNext) { + val rr = scanner.iterator().next + val key = Bytes.toString(rr.getRow()) + var iter = rr.list().iterator() + + var header = "Key:\t" + var data = key + "\t" + + while (iter.hasNext()) { + val kv = iter.next() + header += Bytes.toString(CellUtil.cloneFamily(kv)) + ":" + Bytes.toString(CellUtil.cloneQualifier(kv)) + "\t" + data += Bytes.toString(CellUtil.cloneValue(kv)) + "\t" + } + + log.info(header) + log.info(data) + } + } finally { + // Close scanners when done + scanner.close + table.close + } + } + +}
\ No newline at end of file |