diff options
-rw-r--r-- | README.md | 362 |
1 files changed, 179 insertions, 183 deletions
@@ -25,58 +25,58 @@ To use SpyGlass as a dependency use the following repository ```xml <repositories> - <repository> - <id>conjars.org</id> - <url>http://conjars.org/repo</url> - </repository> + <repository> + <id>conjars.org</id> + <url>http://conjars.org/repo</url> + </repository> </repositories> ``` - For Scalding 0.10.0 use : +For Scalding 0.10.0 use : ```xml <dependencies> - <dependency> - <groupId>parallelai</groupId> - <artifactId>parallelai.spyglass</artifactId> - <version>2.10_0.10_4.3</version> - </dependency> + <dependency> + <groupId>parallelai</groupId> + <artifactId>parallelai.spyglass</artifactId> + <version>2.10_0.10_4.3</version> + </dependency> </dependencies> ``` - For Scalding 0.9.1 use : +For Scalding 0.9.1 use : ```xml <dependencies> - <dependency> - <groupId>parallelai</groupId> - <artifactId>parallelai.spyglass</artifactId> - <version>2.10_0.9_4.3</version> - </dependency> + <dependency> + <groupId>parallelai</groupId> + <artifactId>parallelai.spyglass</artifactId> + <version>2.10_0.9_4.3</version> + </dependency> </dependencies> ``` - or for earlier versions : +or for earlier versions : ```xml <dependencies> - <dependency> - <groupId>parallelai</groupId> - <artifactId>parallelai.spyglass</artifactId> - <version>2.9.3_4.1.0</version> - </dependency> + <dependency> + <groupId>parallelai</groupId> + <artifactId>parallelai.spyglass</artifactId> + <version>2.9.3_4.1.0</version> + </dependency> </dependencies> ``` - and +and ```xml <dependencies> - <dependency> - <groupId>parallelai</groupId> - <artifactId>parallelai.spyglass</artifactId> - <version>2.10.2_4.1.0</version> - </dependency> + <dependency> + <groupId>parallelai</groupId> + <artifactId>parallelai.spyglass</artifactId> + <version>2.10.2_4.1.0</version> + </dependency> </dependencies> ``` @@ -99,12 +99,12 @@ Requires the **_keyList_** parameter to be specified as well. ```scala val hbs2 = new HBaseSource( - "table_name", - "quorum_name:2181", - 'key, - Array("column_family"), - Array('column_name), - sourceMode = SourceMode.GET_LIST, keyList = List("5003914", "5000687", "5004897")) + "table_name", + "quorum_name:2181", + 'key, + Array("column_family"), + Array('column_name), + sourceMode = SourceMode.GET_LIST, keyList = List("5003914", "5000687", "5004897")) ``` Additionally, the **_versions_** parameter can be used to retrieve more than one version of the row. @@ -113,14 +113,14 @@ Additionally, the **_versions_** parameter can be used to retrieve more than one ```scala val hbs2 = new HBaseSource( - "table_name", - "quorum_name:2181", - 'key, - Array("column_family"), - Array('column_name), - sourceMode = SourceMode.GET_LIST, - versions = 5, - keyList = List("5003914", "5000687", "5004897")) + "table_name", + "quorum_name:2181", + 'key, + Array("column_family"), + Array('column_name), + sourceMode = SourceMode.GET_LIST, + versions = 5, + keyList = List("5003914", "5000687", "5004897")) ``` SCAN_RANGE @@ -135,35 +135,35 @@ if: (e.g.) ```scala - val hbs4 = new HBaseSource( - "table_name", - "quorum_name:2181", - 'key, - Array("column_family"), - Array('column_name), - sourceMode = SourceMode.SCAN_RANGE, stopKey = "5003914") - .read - .write(Tsv(output.format("scan_range_to_end"))) - - val hbs5 = new HBaseSource( - "table_name", - "quorum_name:2181", - 'key, - Array("column_family"), - Array('column_name), - sourceMode = SourceMode.SCAN_RANGE, startKey = "5003914") - .read - .write(Tsv(output.format("scan_range_from_start"))) - - val hbs6 = new HBaseSource( - "table_name", - "quorum_name:2181", - 'key, - Array("column_family"), - Array('column_name), - sourceMode = SourceMode.SCAN_RANGE, startKey = "5003914", stopKey = "5004897") - .read - .write(Tsv(output.format("scan_range_between"))) +val hbs4 = new HBaseSource( + "table_name", + "quorum_name:2181", + 'key, + Array("column_family"), + Array('column_name), + sourceMode = SourceMode.SCAN_RANGE, stopKey = "5003914") + .read + .write(Tsv(output.format("scan_range_to_end"))) + +val hbs5 = new HBaseSource( + "table_name", + "quorum_name:2181", + 'key, + Array("column_family"), + Array('column_name), + sourceMode = SourceMode.SCAN_RANGE, startKey = "5003914") + .read + .write(Tsv(output.format("scan_range_from_start"))) + +val hbs6 = new HBaseSource( + "table_name", + "quorum_name:2181", + 'key, + Array("column_family"), + Array('column_name), + sourceMode = SourceMode.SCAN_RANGE, startKey = "5003914", stopKey = "5004897") + .read + .write(Tsv(output.format("scan_range_between"))) ``` SCAN_ALL @@ -174,12 +174,12 @@ Returns all rows in the table ```scala val hbs2 = new HBaseSource( - "table_name", - "quorum_name:2181", - 'key, - Array("column_family"), - Array('column_name), - sourceMode = SourceMode.SCAN_ALL) + "table_name", + "quorum_name:2181", + 'key, + Array("column_family"), + Array('column_name), + sourceMode = SourceMode.SCAN_ALL) ``` 2. Write Mode Features @@ -193,11 +193,11 @@ The time dimension can be added to the row by using the **_timestamp_** paramete ```scala pipe.write(new HBaseSource( "table_name", - "quorum_name:2181", - 'key, - TABLE_SCHEMA.tail.map((x: Symbol) => "data").toArray, - TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)).toArray, - timestamp = Platform.currentTime )) + "quorum_name:2181", + 'key, + TABLE_SCHEMA.tail.map((x: Symbol) => "data").toArray, + TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)).toArray, + timestamp = Platform.currentTime )) ``` @@ -225,13 +225,12 @@ Setting the **_useSalt_** parameter to **true** enables this functionality ```scala val TABLE_SCHEMA = List('key, 'salted, 'unsalted) -val hbase07 = -new HBaseSource( "table_name", - "quorum_name:2181", 'key, - TABLE_SCHEMA.tail.map((x: Symbol) => "data").toArray, - TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)).toArray, - sourceMode = SourceMode.SCAN_RANGE, startKey = "11445", stopKey = "11455", - useSalt = true, prefixList = "0123456789" ) +val hbase07 = new HBaseSource( "table_name", + "quorum_name:2181", 'key, + TABLE_SCHEMA.tail.map((x: Symbol) => "data").toArray, + TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)).toArray, + sourceMode = SourceMode.SCAN_RANGE, startKey = "11445", stopKey = "11455", + useSalt = true, prefixList = "0123456789" ) .read // Convert from ImmutableBytesWritable to String @@ -243,10 +242,10 @@ new HBaseSource( "table_name", .toBytesWritable( TABLE_SCHEMA ) .write(new HBaseSource( "table_name", - "quorum_name:2181", 'key, - TABLE_SCHEMA.tail.map((x: Symbol) => "data").toArray, - TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)).toArray, - useSalt = true )) + "quorum_name:2181", 'key, + TABLE_SCHEMA.tail.map((x: Symbol) => "data").toArray, + TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)).toArray, + useSalt = true )) ``` @@ -267,11 +266,11 @@ class HBaseSaltTester (args: Args) extends JobBase(args) with HBasePipeConversio val hbase07 = new HBaseSource( "table_name", - "quorum_name:2181", 'key, - TABLE_SCHEMA.tail.map((x: Symbol) => "data").toArray, - TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)).toArray, - sourceMode = SourceMode.SCAN_RANGE, startKey = "11445", stopKey = "11455", - useSalt = true, prefixList = "0123456789" ) + "quorum_name:2181", 'key, + TABLE_SCHEMA.tail.map((x: Symbol) => "data").toArray, + TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)).toArray, + sourceMode = SourceMode.SCAN_RANGE, startKey = "11445", stopKey = "11455", + useSalt = true, prefixList = "0123456789" ) .read // Convert from ImmutableBytesWritable to String @@ -283,10 +282,10 @@ class HBaseSaltTester (args: Args) extends JobBase(args) with HBasePipeConversio .toBytesWritable( TABLE_SCHEMA ) .write(new HBaseSource( "table_name", - "quorum_name:2181", 'key, - TABLE_SCHEMA.tail.map((x: Symbol) => "data").toArray, - TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)).toArray, - useSalt = true )) + "quorum_name:2181", 'key, + TABLE_SCHEMA.tail.map((x: Symbol) => "data").toArray, + TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)).toArray, + useSalt = true )) } ``` @@ -324,13 +323,13 @@ e.g. val hbaseOut = new HBaseRawSource("MY_RESULTS", "hbase-local", Array("out-family"), writeNulls=false, sinkMode = SinkMode.REPLACE) hbaseSource.read .mapTo(('rowkey, 'row) -> ('rowkey, "different_family:col1", 'col2)) { - x: (ImmutableBytesWritable, Result) => { - val (rowkey, row) = x - val col1Times2 = Bytes.toInt(row.getValue(Bytes.toBytes("col-family"), - Bytes.toBytes("col1"))) * 2; - val col2 = row.getValue(Bytes.toBytes("col-family"), Bytes.toBytes("col2")); - (rowkey, col1Times2, col2) - } + x: (ImmutableBytesWritable, Result) => { + val (rowkey, row) = x + val col1Times2 = Bytes.toInt(row.getValue(Bytes.toBytes("col-family"), + Bytes.toBytes("col1"))) * 2; + val col2 = row.getValue(Bytes.toBytes("col-family"), Bytes.toBytes("col2")); + (rowkey, col1Times2, col2) + } } .write(hbaseOut) ``` @@ -344,30 +343,28 @@ e.g. ```scala val jdbcSourceRead = new JDBCSource( - "TABLE_01", - "com.mysql.jdbc.Driver", - "jdbc:mysql://localhost:3306/db?zeroDateTimeBehavior=convertToNull", - "root", - "password", - List("ID", "TEST_COLUMN1", "TEST_COLUMN2", "TEST_COLUMN3"), - List("bigint(20)", "varchar(45)", "varchar(45)", "bigint(20)"), - List("id"), - new Fields("key", "column1", "column2", "column3"), - null, null, null - ) + "TABLE_01", + "com.mysql.jdbc.Driver", + "jdbc:mysql://localhost:3306/db?zeroDateTimeBehavior=convertToNull", + "root", + "password", + List("ID", "TEST_COLUMN1", "TEST_COLUMN2", "TEST_COLUMN3"), + List("bigint(20)", "varchar(45)", "varchar(45)", "bigint(20)"), + List("id"), + new Fields("key", "column1", "column2", "column3"), + null, null, null) val jdbcSourceWrite = new JDBCSource( - "TABLE_01", - "com.mysql.jdbc.Driver", - "jdbc:mysql://localhost:3306/db?zeroDateTimeBehavior=convertToNull", - "root", - "password", - List("ID", "TEST_COLUMN1", "TEST_COLUMN2", "TEST_COLUMN3"), - List("bigint(20)", "varchar(45)", "varchar(45)", "bigint(20)"), - List("id"), - new Fields("key", "column1", "column2", "column3"), - null, null, null - ) + "TABLE_01", + "com.mysql.jdbc.Driver", + "jdbc:mysql://localhost:3306/db?zeroDateTimeBehavior=convertToNull", + "root", + "password", + List("ID", "TEST_COLUMN1", "TEST_COLUMN2", "TEST_COLUMN3"), + List("bigint(20)", "varchar(45)", "varchar(45)", "bigint(20)"), + List("id"), + new Fields("key", "column1", "column2", "column3"), + null, null, null) ``` 7. HBase Delete Functionality @@ -382,9 +379,9 @@ e.g. ```scala val eraser = toIBW(input, TABLE_SCHEMA) - .write(new HBaseSource( "_TEST.SALT.01", quorum, 'key, - TABLE_SCHEMA.tail.map((x: Symbol) => "data"), - TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)), sinkMode = SinkMode.REPLACE )) + .write(new HBaseSource( "_TEST.SALT.01", quorum, 'key, + TABLE_SCHEMA.tail.map((x: Symbol) => "data"), + TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)), sinkMode = SinkMode.REPLACE )) ``` All rows with the key will be deleted. This includes all versions too. @@ -422,54 +419,54 @@ If using Maven - create a pom.xml <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> - <modelVersion>4.0.0</modelVersion> - - <groupId>mygroup</groupId> - <artifactId>myartifact</artifactId> - <version>1.0-SNAPSHOT</version> - <name>myname</name> - - <repositories> - <repository> - <id>conjars.org</id> - <url>http://conjars.org/repo</url> - </repository> - </repositories> - - <dependencies> - <dependency> - <groupId>parallelai</groupId> - <artifactId>parallelai.spyglass</artifactId> - <!-- Scala: 2.10 | Scalding: 0.10.0 | SpyGlass: 4.3 --> - <version>2.10_0.10_4.3</version> - </dependency> - </dependencies> - - <build> - <plugins> - <plugin> - <groupId>org.scala-tools</groupId> - <artifactId>maven-scala-plugin</artifactId> - <version>2.15.2</version> - <executions> - <execution> - <id>scala-compile</id> - <goals> - <goal>compile</goal> - <goal>testCompile</goal> - </goals> - <configuration> - <args> - <arg>-make:transitive</arg> - <arg>-dependencyfile</arg> - <arg>${project.build.directory}/.scala_dependencies</arg> - </args> - </configuration> - </execution> - </executions> - </plugin> - </plugins> - </build> + <modelVersion>4.0.0</modelVersion> + + <groupId>mygroup</groupId> + <artifactId>myartifact</artifactId> + <version>1.0-SNAPSHOT</version> + <name>myname</name> + + <repositories> + <repository> + <id>conjars.org</id> + <url>http://conjars.org/repo</url> + </repository> + </repositories> + + <dependencies> + <dependency> + <groupId>parallelai</groupId> + <artifactId>parallelai.spyglass</artifactId> + <!-- Scala: 2.10 | Scalding: 0.10.0 | SpyGlass: 4.3 --> + <version>2.10_0.10_4.3</version> + </dependency> + </dependencies> + + <build> + <plugins> + <plugin> + <groupId>org.scala-tools</groupId> + <artifactId>maven-scala-plugin</artifactId> + <version>2.15.2</version> + <executions> + <execution> + <id>scala-compile</id> + <goals> + <goal>compile</goal> + <goal>testCompile</goal> + </goals> + <configuration> + <args> + <arg>-make:transitive</arg> + <arg>-dependencyfile</arg> + <arg>${project.build.directory}/.scala_dependencies</arg> + </args> + </configuration> + </execution> + </executions> + </plugin> + </plugins> + </build> </project> ``` @@ -513,6 +510,5 @@ class HBaseTest(args: Args) extends Job(args) with HBasePipeConversions { .fromBytesWritable(SCHEMA) .debug .write(TextLine("test_hbase")) - } ``` |