aboutsummaryrefslogtreecommitdiffstats
path: root/src/test/scala/parallelai
diff options
context:
space:
mode:
Diffstat (limited to 'src/test/scala/parallelai')
-rw-r--r--src/test/scala/parallelai/spyglass/hbase/example/HBaseReadTest.scala196
-rw-r--r--src/test/scala/parallelai/spyglass/hbase/example/SimpleHBaseSourceExampleTest.scala58
-rw-r--r--src/test/scala/parallelai/spyglass/jdbc/GenerateTestingTables.java201
-rw-r--r--src/test/scala/parallelai/spyglass/jdbc/example/JdbcSourceExampleTest.scala54
4 files changed, 201 insertions, 308 deletions
diff --git a/src/test/scala/parallelai/spyglass/hbase/example/HBaseReadTest.scala b/src/test/scala/parallelai/spyglass/hbase/example/HBaseReadTest.scala
deleted file mode 100644
index bbecf59..0000000
--- a/src/test/scala/parallelai/spyglass/hbase/example/HBaseReadTest.scala
+++ /dev/null
@@ -1,196 +0,0 @@
-package parallelai.spyglass.hbase.example
-
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.hbase._
-import org.apache.hadoop.hbase.client.HBaseAdmin
-import org.apache.hadoop.hbase.client.HTable
-import org.apache.hadoop.hbase.client.Put
-import org.apache.hadoop.hbase.client.Scan
-import org.apache.hadoop.hbase.util.Bytes
-import junit.framework.Assert
-import org.junit.runner.RunWith
-import org.scalatest.WordSpecLike
-import org.scalatest.MustMatchers
-import org.scalatest.junit.JUnitRunner
-import org.slf4j.LoggerFactory
-
-/**
- * Generates TWO tables in HBase 'TABLE_01' and 'TABLE_02', populate with some data
- * and perform a number of tests
- *
- * @author Antwnis@gmail.com
- */
-@RunWith(classOf[JUnitRunner])
-class HBaseReadTest extends MustMatchers with WordSpecLike {
-
- val QUORUM = "localhost"
- val QUORUM_PORT = "2181"
- val STARTING_TIMESTAMP = 1260000000000L
-
- val log = LoggerFactory.getLogger(this.getClass.getName)
-
- var config:Configuration = HBaseConfiguration.create
- config.set("hbase.zookeeper.quorum", QUORUM)
- config.set("hbase.zookeeper.property.clientPort", QUORUM_PORT)
-
- log.info("Connecting to Zookeeper {}:{}", QUORUM, QUORUM_PORT)
-
- "An HBase integration test" must {
-
- "generate 2 testing HBase tables" in {
-
- val testingTables = List("TABLE_01", "TABLE_02")
-
- try {
- testingTables.foreach(deleteTestTable(_,config))
- testingTables.foreach(createTestTable(_,config))
- testingTables.foreach(populateTestTable(_,config))
- testingTables.foreach(printHTable(_,config))
-
- // If we've reached here - the testing data are in
- Assert.assertEquals("true", "true")
- } catch {
- case e: Exception =>
- log.error("EXCEPTION ===> {}", e.toString())
- }
-
- }
-
- "perform a SCAN_ALL in an HBase table" in {
-
- }
-
- "perform a SCAN_RANGE in an HBase table" in {
-
- }
-
- "perform a GET_LIST in an HBase table" in {
-
- }
-
-
- }
-
-
- /**
- * Method to disable and delete HBase Tables i.e. "int-test-01"
- */
- private def deleteTestTable(tableName: String, config: Configuration ) = {
-
- val hbaseAdmin = new HBaseAdmin(config)
- if (hbaseAdmin.tableExists(tableName)) {
- log.info("Table: " + tableName + " exists.")
- hbaseAdmin.disableTable(tableName)
- hbaseAdmin.deleteTable(tableName)
- log.info("Table: " + tableName + " disabled and deleted.")
- } else {
- log.info("Table: " + tableName + " does not exist.")
- }
- hbaseAdmin.close()
-
- }
-
- def createTestTable(tableName: String, config: Configuration) = {
-
- val hbase = new HBaseAdmin(config)
- // Get and set the name of the new table
- val newTable = new HTableDescriptor(tableName)
-
- val meta = new HColumnDescriptor("data")
- .setMaxVersions(3)
- .setInMemory(HColumnDescriptor.DEFAULT_IN_MEMORY)
- .setBlockCacheEnabled(HColumnDescriptor.DEFAULT_BLOCKCACHE)
- .setTimeToLive(HColumnDescriptor.DEFAULT_TTL)
-
- newTable.addFamily(meta)
-
- try {
- log.info("Creating table " + tableName)
- hbase.createTable(newTable)
- } catch {
- case et: TableExistsException =>
- log.error("TableExistsException for table: {} ",tableName)
- log.debug(et.toString())
- case e: Exception =>
- log.error("IOException: " + e.toString)
- }
-
- hbase.close
- }
-
- private def populateTestTable(testingTable: String, config: Configuration) = {
-
- // Load up HBase table
- val table = new HTable(config, testingTable)
-
- log.info("Populating table: " + testingTable)
-
- // Table_01
- if (testingTable == "TABLE_01") {
- val put1 = new Put("2000-01-01 10:00:10".getBytes()).add("data".getBytes(), "column1".getBytes(), STARTING_TIMESTAMP, "1".getBytes())
- val put2 = new Put("2000-01-01 10:05:00".getBytes()).add("data".getBytes(), "column1".getBytes(), STARTING_TIMESTAMP, "2".getBytes())
- val put3 = new Put("2000-01-01 10:10:00".getBytes()).add("data".getBytes(), "column1".getBytes(), STARTING_TIMESTAMP, "3".getBytes())
- table.put(put1)
- table.put(put2)
- table.put(put3)
- } else
- // Table_02
- if (testingTable == "TABLE_02") {
-
- // 3 versions at 10 o'clock
- val k1 = "2000-01-01 10:00:00".getBytes()
- val put1 = new Put(k1).add("data".getBytes(), "column1".getBytes(), STARTING_TIMESTAMP, "1".getBytes())
- val put2 = new Put(k1).add("data".getBytes(), "column1".getBytes(), STARTING_TIMESTAMP + 1000L, "2".getBytes())
- val put3 = new Put(k1).add("data".getBytes(), "column1".getBytes(), STARTING_TIMESTAMP + 2000L, "3".getBytes())
-
- // 3 versions at 11 o'clock
- val k2 = "2000-01-01 11:00:00".getBytes()
- val put4 = new Put(k2).add("data".getBytes(), "column1".getBytes(), STARTING_TIMESTAMP, "4".getBytes())
- val put5 = new Put(k2).add("data".getBytes(), "column1".getBytes(), STARTING_TIMESTAMP + 1000L, "5".getBytes())
- val put6 = new Put(k2).add("data".getBytes(), "column1".getBytes(), STARTING_TIMESTAMP + 2000L, "6".getBytes())
-
- import scala.collection.JavaConverters._
- table.put(List(put1, put2, put3, put4, put5, put6).asJava)
- }
-
- table.close
- }
-
- /**
- * Method to print-out an HTable
- */
- private def printHTable(testingTable: String, config: Configuration) = {
-
- val table = new HTable(config, testingTable)
- val scanner = table.getScanner(new Scan())
-
- log.info("Printing HTable: " + Bytes.toString(table.getTableName()))
-
- try {
- // Iterate results
- // for (Result rr = scanner.next() rr != null rr = scanner.next()) {
- while (scanner.iterator().hasNext) {
- val rr = scanner.iterator().next
- val key = Bytes.toString(rr.getRow())
- var iter = rr.list().iterator()
-
- var header = "Key:\t"
- var data = key + "\t"
-
- while (iter.hasNext()) {
- val kv = iter.next()
- header += Bytes.toString(CellUtil.cloneFamily(kv)) + ":" + Bytes.toString(CellUtil.cloneQualifier(kv)) + "\t"
- data += Bytes.toString(CellUtil.cloneValue(kv)) + "\t"
- }
-
- log.info(header)
- log.info(data)
- }
- } finally {
- // Close scanners when done
- scanner.close
- table.close
- }
- }
-
-} \ No newline at end of file
diff --git a/src/test/scala/parallelai/spyglass/hbase/example/SimpleHBaseSourceExampleTest.scala b/src/test/scala/parallelai/spyglass/hbase/example/SimpleHBaseSourceExampleTest.scala
deleted file mode 100644
index 30342da..0000000
--- a/src/test/scala/parallelai/spyglass/hbase/example/SimpleHBaseSourceExampleTest.scala
+++ /dev/null
@@ -1,58 +0,0 @@
-package parallelai.spyglass.hbase.example
-
-import org.junit.runner.RunWith
-import com.twitter.scalding.{JobTest, TupleConversions}
-import org.scalatest.FunSpec
-import org.scalatest.junit.JUnitRunner
-import org.slf4j.LoggerFactory
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable
-import cascading.tuple.{Tuple, Fields}
-import org.apache.hadoop.hbase.util.Bytes
-import scala._
-import com.twitter.scalding.Tsv
-import parallelai.spyglass.hbase.HBaseSource
-import parallelai.spyglass.hbase.HBaseConstants.SourceMode
-
-/**
- * Example of how to define tests for HBaseSource
- */
-@RunWith(classOf[JUnitRunner])
-class SimpleHBaseSourceExampleTest extends FunSpec with TupleConversions {
-
- val output = "/tmp/testOutput"
-
- val log = LoggerFactory.getLogger(this.getClass.getName)
-
- val sampleData = List(
- List("1", "kk1", "pp1"),
- List("2", "kk2", "pp2"),
- List("3", "kk3", "pp3")
- )
-
- JobTest("parallelai.spyglass.hbase.example.SimpleHBaseSourceExample")
- .arg("test", "")
- .arg("app.conf.path", "app.conf")
- .arg("output", output)
- .arg("debug", "true")
- .source[Tuple](
- new HBaseSource(
- "table_name",
- "quorum_name:2181",
- new Fields("key"),
- List("column_family"),
- List(new Fields("column_name1", "column_name2")),
- sourceMode = SourceMode.GET_LIST, keyList = List("1", "2", "3")),
- sampleData.map(l => new Tuple(l.map(s => {new ImmutableBytesWritable(Bytes.toBytes(s))}):_*)))
- .sink[Tuple](Tsv(output format "get_list")) {
- outputBuffer =>
- log.debug("Output => " + outputBuffer)
-
- it("should return the test data provided.") {
- println("outputBuffer.size => " + outputBuffer.size)
- assert(outputBuffer.size === 3)
- }
- }
- .run
- .finish
-
-}
diff --git a/src/test/scala/parallelai/spyglass/jdbc/GenerateTestingTables.java b/src/test/scala/parallelai/spyglass/jdbc/GenerateTestingTables.java
new file mode 100644
index 0000000..54ec8fc
--- /dev/null
+++ b/src/test/scala/parallelai/spyglass/jdbc/GenerateTestingTables.java
@@ -0,0 +1,201 @@
+package parallelai.spyglass.jdbc;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+
+import junit.framework.Assert;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.junit.Test;
+
+/**
+ * Class generates TWO tables in database 'TABLE_01' and 'TABLE_02'
+ *
+ * Those tables are used by the 'integration-testing' of JDBCSource in file
+ * JdbcSourceShouldReadWrite.scala
+ *
+ * Run with: mvn -Dtestparallelai.spyglass.jdbc.GenerateTestingTables test
+ *
+ */
+public class GenerateTestingTables {
+
+ // JDBC driver name and database URL
+ static final String JDBC_DRIVER = "com.mysql.jdbc.Driver";
+ static final String DB_PORT = "3306";
+ static final String DB_NAME = "database_name";
+
+
+ static final String DB_URL = "jdbc:mysql://<hostname>:<port>/<db_name>?zeroDateTimeBehavior=convertToNull";
+
+ // Database credentials
+ static final String USER = "user";
+ static final String PASS = "password";
+
+ public static enum TestingTable {
+ TABLE_01, TABLE_02
+ }
+
+ private static final Log LOG = LogFactory
+ .getLog(GenerateTestingTables.class);
+
+ @Test
+ public void fakeTest() {
+
+ // Connect to Quorum
+ LOG.info("Connecting to " + DB_URL + ":" + DB_PORT);
+
+ Connection conn = null;
+ Statement stmt = null;
+ try {
+ // STEP 2: Register JDBC driver
+ Class.forName("com.mysql.jdbc.Driver");
+
+ // STEP 3: Open a connection
+ LOG.info("Connecting to a selected database...");
+ conn = DriverManager.getConnection(DB_URL, USER, PASS);
+ LOG.info("Connected database successfully...");
+
+
+ // Delete test tables
+ deleteTestTable(conn, TestingTable.TABLE_01.name());
+ deleteTestTable(conn, TestingTable.TABLE_02.name());
+
+ // Generate test tables
+ createTestTable(conn, TestingTable.TABLE_01);
+ createTestTable(conn, TestingTable.TABLE_02);
+
+ // Populate test tables
+ populateTestTable(conn, TestingTable.TABLE_01);
+ //populateTestTable(conn, TestingTable.TABLE_02);
+
+ // Print content of test table
+ printHTable(conn, TestingTable.TABLE_01);
+
+ // If we've reached here - the testing data are in
+ Assert.assertEquals("true", "true");
+
+
+ } catch (SQLException se) {
+ // Handle errors for JDBC
+ se.printStackTrace();
+ LOG.error(se.toString());
+ } catch (Exception e) {
+ // Handle errors for Class.forName
+ e.printStackTrace();
+ LOG.error(e.toString());
+ } finally {
+ // finally block used to close resources
+ try {
+ if (stmt != null)
+ conn.close();
+ } catch (SQLException se) {
+ }// do nothing
+ try {
+ if (conn != null)
+ conn.close();
+ } catch (SQLException se) {
+ se.printStackTrace();
+ LOG.error(se.toString());
+ }// end finally try
+ }// end try
+
+ }
+
+ private static void populateTestTable(Connection connection, TestingTable testingTable)
+ throws SQLException {
+
+
+ // Load up table
+ LOG.info("Populating table in given database...");
+ Statement stmt = connection.createStatement();
+
+
+ String [] queries = {
+ "insert into " + testingTable.name() + " values (1, 'A', 'X', 123)",
+ "insert into " + testingTable.name() + " values (2, 'B', 'Y', 234)",
+ "insert into " + testingTable.name() + " values (3, 'C', 'Z', 345)",
+ };
+
+ Statement statement = connection.createStatement();
+
+ for (String query : queries) {
+ statement.addBatch(query);
+ }
+ statement.executeBatch();
+ LOG.info("Populated table in given database...");
+
+ statement.close();
+
+ }
+
+ private static void createTestTable(Connection connection, TestingTable testingTable)
+ throws SQLException {
+
+ LOG.info("Creating table in given database...");
+ Statement stmt = connection.createStatement();
+
+ String sql = "CREATE TABLE " + testingTable.name() + " "
+ + "(id INTEGER not NULL, " + " test_column1 VARCHAR(255), "
+ + " test_column2 VARCHAR(255), " + " test_column3 INTEGER, "
+ + " PRIMARY KEY ( id ))";
+
+ stmt.executeUpdate(sql);
+ LOG.info("Created table in given database...");
+
+ stmt.close();
+ }
+
+ /**
+ * Method to disable and delete HBase Tables i.e. "int-test-01"
+ */
+ private static void deleteTestTable(Connection connection, String tableName) throws SQLException {
+
+
+ // Execute a query
+ LOG.info("Deleting table in given database...");
+ Statement stmt = connection.createStatement();
+
+ String sql = "DROP TABLE IF EXISTS " + tableName;
+
+ int result = stmt.executeUpdate(sql);
+ LOG.info("Deleted table in given database... " + result);
+
+
+ stmt.close();
+ }
+
+ /**
+ * Method to print-out an HTable
+ */
+ private static void printHTable(Connection connection, TestingTable testingTable)
+ throws SQLException {
+
+ // Execute a query
+ LOG.info("Printing table in given database...");
+ Statement stmt = connection.createStatement();
+
+ String sql = "SELECT * FROM " + testingTable.name();
+
+ ResultSet resultSet = stmt.executeQuery(sql);
+ LOG.info("Get data from table in given database...");
+
+ while (resultSet.next()) {
+ Integer key = resultSet.getInt("id");
+ String testColumn1 = resultSet.getString("test_column1");
+ String testColumn2 = resultSet.getString("test_column2");
+ Integer testColumn3 = resultSet.getInt("test_column3");
+
+ LOG.info(key + " : " + testColumn1 + " : " + testColumn2 + " : " + testColumn3);
+ }
+
+ }
+
+ public static void main(String[] args) {
+ GenerateTestingTables test = new GenerateTestingTables();
+ test.fakeTest();
+ }
+} \ No newline at end of file
diff --git a/src/test/scala/parallelai/spyglass/jdbc/example/JdbcSourceExampleTest.scala b/src/test/scala/parallelai/spyglass/jdbc/example/JdbcSourceExampleTest.scala
deleted file mode 100644
index b268fa1..0000000
--- a/src/test/scala/parallelai/spyglass/jdbc/example/JdbcSourceExampleTest.scala
+++ /dev/null
@@ -1,54 +0,0 @@
-package parallelai.spyglass.jdbc.example
-
-import org.scalatest.junit.JUnitRunner
-import org.junit.runner.RunWith
-import org.scalatest.FunSpec
-import com.twitter.scalding.{Tsv, JobTest, TupleConversions}
-import org.slf4j.LoggerFactory
-import cascading.tuple.{Tuple, Fields}
-import parallelai.spyglass.jdbc.JDBCSource
-
-/**
- * Simple example of JDBCSource testing
- */
-@RunWith(classOf[JUnitRunner])
-class JdbcSourceExampleTest extends FunSpec with TupleConversions {
-
- val output = "src/test/resources/outputs/testOutput"
-
- val log = LoggerFactory.getLogger(this.getClass.getName)
-
- val sampleData = List(
- (1, "c11", "c12", 11),
- (2, "c21", "c22", 22)
- )
-
- JobTest("parallelai.spyglass.jdbc.example.JdbcSourceExample")
- .arg("test", "")
- .arg("app.conf.path", "app.conf")
- .arg("output", output)
- .arg("debug", "true")
- .source(
- new JDBCSource(
- "db_name",
- "com.mysql.jdbc.Driver",
- "jdbc:mysql://<hostname>:<port>/<db_name>?zeroDateTimeBehavior=convertToNull",
- "user",
- "password",
- List("KEY_ID", "COL1", "COL2", "COL3"),
- List("bigint(20)", "varchar(45)", "varchar(45)", "bigint(20)"),
- List("key_id"),
- new Fields("key_id", "col1", "col2", "col3")), sampleData)
- .sink[Tuple](Tsv(output.format("get_list"))) {
- outputBuffer =>
- log.debug("Output => " + outputBuffer)
-
- it("should return the mock data provided.") {
- assert(outputBuffer.size === 2)
- }
- }
- .run
- .finish
-
-
-}