aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--README.md362
1 files changed, 179 insertions, 183 deletions
diff --git a/README.md b/README.md
index e3fc2ef..f76bd91 100644
--- a/README.md
+++ b/README.md
@@ -25,58 +25,58 @@ To use SpyGlass as a dependency use the following repository
```xml
<repositories>
- <repository>
- <id>conjars.org</id>
- <url>http://conjars.org/repo</url>
- </repository>
+ <repository>
+ <id>conjars.org</id>
+ <url>http://conjars.org/repo</url>
+ </repository>
</repositories>
```
- For Scalding 0.10.0 use :
+For Scalding 0.10.0 use :
```xml
<dependencies>
- <dependency>
- <groupId>parallelai</groupId>
- <artifactId>parallelai.spyglass</artifactId>
- <version>2.10_0.10_4.3</version>
- </dependency>
+ <dependency>
+ <groupId>parallelai</groupId>
+ <artifactId>parallelai.spyglass</artifactId>
+ <version>2.10_0.10_4.3</version>
+ </dependency>
</dependencies>
```
- For Scalding 0.9.1 use :
+For Scalding 0.9.1 use :
```xml
<dependencies>
- <dependency>
- <groupId>parallelai</groupId>
- <artifactId>parallelai.spyglass</artifactId>
- <version>2.10_0.9_4.3</version>
- </dependency>
+ <dependency>
+ <groupId>parallelai</groupId>
+ <artifactId>parallelai.spyglass</artifactId>
+ <version>2.10_0.9_4.3</version>
+ </dependency>
</dependencies>
```
- or for earlier versions :
+or for earlier versions :
```xml
<dependencies>
- <dependency>
- <groupId>parallelai</groupId>
- <artifactId>parallelai.spyglass</artifactId>
- <version>2.9.3_4.1.0</version>
- </dependency>
+ <dependency>
+ <groupId>parallelai</groupId>
+ <artifactId>parallelai.spyglass</artifactId>
+ <version>2.9.3_4.1.0</version>
+ </dependency>
</dependencies>
```
- and
+and
```xml
<dependencies>
- <dependency>
- <groupId>parallelai</groupId>
- <artifactId>parallelai.spyglass</artifactId>
- <version>2.10.2_4.1.0</version>
- </dependency>
+ <dependency>
+ <groupId>parallelai</groupId>
+ <artifactId>parallelai.spyglass</artifactId>
+ <version>2.10.2_4.1.0</version>
+ </dependency>
</dependencies>
```
@@ -99,12 +99,12 @@ Requires the **_keyList_** parameter to be specified as well.
```scala
val hbs2 = new HBaseSource(
- "table_name",
- "quorum_name:2181",
- 'key,
- Array("column_family"),
- Array('column_name),
- sourceMode = SourceMode.GET_LIST, keyList = List("5003914", "5000687", "5004897"))
+ "table_name",
+ "quorum_name:2181",
+ 'key,
+ Array("column_family"),
+ Array('column_name),
+ sourceMode = SourceMode.GET_LIST, keyList = List("5003914", "5000687", "5004897"))
```
Additionally, the **_versions_** parameter can be used to retrieve more than one version of the row.
@@ -113,14 +113,14 @@ Additionally, the **_versions_** parameter can be used to retrieve more than one
```scala
val hbs2 = new HBaseSource(
- "table_name",
- "quorum_name:2181",
- 'key,
- Array("column_family"),
- Array('column_name),
- sourceMode = SourceMode.GET_LIST,
- versions = 5,
- keyList = List("5003914", "5000687", "5004897"))
+ "table_name",
+ "quorum_name:2181",
+ 'key,
+ Array("column_family"),
+ Array('column_name),
+ sourceMode = SourceMode.GET_LIST,
+ versions = 5,
+ keyList = List("5003914", "5000687", "5004897"))
```
SCAN_RANGE
@@ -135,35 +135,35 @@ if:
(e.g.)
```scala
- val hbs4 = new HBaseSource(
- "table_name",
- "quorum_name:2181",
- 'key,
- Array("column_family"),
- Array('column_name),
- sourceMode = SourceMode.SCAN_RANGE, stopKey = "5003914")
- .read
- .write(Tsv(output.format("scan_range_to_end")))
-
- val hbs5 = new HBaseSource(
- "table_name",
- "quorum_name:2181",
- 'key,
- Array("column_family"),
- Array('column_name),
- sourceMode = SourceMode.SCAN_RANGE, startKey = "5003914")
- .read
- .write(Tsv(output.format("scan_range_from_start")))
-
- val hbs6 = new HBaseSource(
- "table_name",
- "quorum_name:2181",
- 'key,
- Array("column_family"),
- Array('column_name),
- sourceMode = SourceMode.SCAN_RANGE, startKey = "5003914", stopKey = "5004897")
- .read
- .write(Tsv(output.format("scan_range_between")))
+val hbs4 = new HBaseSource(
+ "table_name",
+ "quorum_name:2181",
+ 'key,
+ Array("column_family"),
+ Array('column_name),
+ sourceMode = SourceMode.SCAN_RANGE, stopKey = "5003914")
+ .read
+ .write(Tsv(output.format("scan_range_to_end")))
+
+val hbs5 = new HBaseSource(
+ "table_name",
+ "quorum_name:2181",
+ 'key,
+ Array("column_family"),
+ Array('column_name),
+ sourceMode = SourceMode.SCAN_RANGE, startKey = "5003914")
+ .read
+ .write(Tsv(output.format("scan_range_from_start")))
+
+val hbs6 = new HBaseSource(
+ "table_name",
+ "quorum_name:2181",
+ 'key,
+ Array("column_family"),
+ Array('column_name),
+ sourceMode = SourceMode.SCAN_RANGE, startKey = "5003914", stopKey = "5004897")
+ .read
+ .write(Tsv(output.format("scan_range_between")))
```
SCAN_ALL
@@ -174,12 +174,12 @@ Returns all rows in the table
```scala
val hbs2 = new HBaseSource(
- "table_name",
- "quorum_name:2181",
- 'key,
- Array("column_family"),
- Array('column_name),
- sourceMode = SourceMode.SCAN_ALL)
+ "table_name",
+ "quorum_name:2181",
+ 'key,
+ Array("column_family"),
+ Array('column_name),
+ sourceMode = SourceMode.SCAN_ALL)
```
2. Write Mode Features
@@ -193,11 +193,11 @@ The time dimension can be added to the row by using the **_timestamp_** paramete
```scala
pipe.write(new HBaseSource( "table_name",
- "quorum_name:2181",
- 'key,
- TABLE_SCHEMA.tail.map((x: Symbol) => "data").toArray,
- TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)).toArray,
- timestamp = Platform.currentTime ))
+ "quorum_name:2181",
+ 'key,
+ TABLE_SCHEMA.tail.map((x: Symbol) => "data").toArray,
+ TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)).toArray,
+ timestamp = Platform.currentTime ))
```
@@ -225,13 +225,12 @@ Setting the **_useSalt_** parameter to **true** enables this functionality
```scala
val TABLE_SCHEMA = List('key, 'salted, 'unsalted)
-val hbase07 =
-new HBaseSource( "table_name",
- "quorum_name:2181", 'key,
- TABLE_SCHEMA.tail.map((x: Symbol) => "data").toArray,
- TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)).toArray,
- sourceMode = SourceMode.SCAN_RANGE, startKey = "11445", stopKey = "11455",
- useSalt = true, prefixList = "0123456789" )
+val hbase07 = new HBaseSource( "table_name",
+ "quorum_name:2181", 'key,
+ TABLE_SCHEMA.tail.map((x: Symbol) => "data").toArray,
+ TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)).toArray,
+ sourceMode = SourceMode.SCAN_RANGE, startKey = "11445", stopKey = "11455",
+ useSalt = true, prefixList = "0123456789" )
.read
// Convert from ImmutableBytesWritable to String
@@ -243,10 +242,10 @@ new HBaseSource( "table_name",
.toBytesWritable( TABLE_SCHEMA )
.write(new HBaseSource( "table_name",
- "quorum_name:2181", 'key,
- TABLE_SCHEMA.tail.map((x: Symbol) => "data").toArray,
- TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)).toArray,
- useSalt = true ))
+ "quorum_name:2181", 'key,
+ TABLE_SCHEMA.tail.map((x: Symbol) => "data").toArray,
+ TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)).toArray,
+ useSalt = true ))
```
@@ -267,11 +266,11 @@ class HBaseSaltTester (args: Args) extends JobBase(args) with HBasePipeConversio
val hbase07 =
new HBaseSource( "table_name",
- "quorum_name:2181", 'key,
- TABLE_SCHEMA.tail.map((x: Symbol) => "data").toArray,
- TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)).toArray,
- sourceMode = SourceMode.SCAN_RANGE, startKey = "11445", stopKey = "11455",
- useSalt = true, prefixList = "0123456789" )
+ "quorum_name:2181", 'key,
+ TABLE_SCHEMA.tail.map((x: Symbol) => "data").toArray,
+ TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)).toArray,
+ sourceMode = SourceMode.SCAN_RANGE, startKey = "11445", stopKey = "11455",
+ useSalt = true, prefixList = "0123456789" )
.read
// Convert from ImmutableBytesWritable to String
@@ -283,10 +282,10 @@ class HBaseSaltTester (args: Args) extends JobBase(args) with HBasePipeConversio
.toBytesWritable( TABLE_SCHEMA )
.write(new HBaseSource( "table_name",
- "quorum_name:2181", 'key,
- TABLE_SCHEMA.tail.map((x: Symbol) => "data").toArray,
- TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)).toArray,
- useSalt = true ))
+ "quorum_name:2181", 'key,
+ TABLE_SCHEMA.tail.map((x: Symbol) => "data").toArray,
+ TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)).toArray,
+ useSalt = true ))
}
```
@@ -324,13 +323,13 @@ e.g.
val hbaseOut = new HBaseRawSource("MY_RESULTS", "hbase-local", Array("out-family"), writeNulls=false, sinkMode = SinkMode.REPLACE)
hbaseSource.read
.mapTo(('rowkey, 'row) -> ('rowkey, "different_family:col1", 'col2)) {
- x: (ImmutableBytesWritable, Result) => {
- val (rowkey, row) = x
- val col1Times2 = Bytes.toInt(row.getValue(Bytes.toBytes("col-family"),
- Bytes.toBytes("col1"))) * 2;
- val col2 = row.getValue(Bytes.toBytes("col-family"), Bytes.toBytes("col2"));
- (rowkey, col1Times2, col2)
- }
+ x: (ImmutableBytesWritable, Result) => {
+ val (rowkey, row) = x
+ val col1Times2 = Bytes.toInt(row.getValue(Bytes.toBytes("col-family"),
+ Bytes.toBytes("col1"))) * 2;
+ val col2 = row.getValue(Bytes.toBytes("col-family"), Bytes.toBytes("col2"));
+ (rowkey, col1Times2, col2)
+ }
}
.write(hbaseOut)
```
@@ -344,30 +343,28 @@ e.g.
```scala
val jdbcSourceRead = new JDBCSource(
- "TABLE_01",
- "com.mysql.jdbc.Driver",
- "jdbc:mysql://localhost:3306/db?zeroDateTimeBehavior=convertToNull",
- "root",
- "password",
- List("ID", "TEST_COLUMN1", "TEST_COLUMN2", "TEST_COLUMN3"),
- List("bigint(20)", "varchar(45)", "varchar(45)", "bigint(20)"),
- List("id"),
- new Fields("key", "column1", "column2", "column3"),
- null, null, null
- )
+ "TABLE_01",
+ "com.mysql.jdbc.Driver",
+ "jdbc:mysql://localhost:3306/db?zeroDateTimeBehavior=convertToNull",
+ "root",
+ "password",
+ List("ID", "TEST_COLUMN1", "TEST_COLUMN2", "TEST_COLUMN3"),
+ List("bigint(20)", "varchar(45)", "varchar(45)", "bigint(20)"),
+ List("id"),
+ new Fields("key", "column1", "column2", "column3"),
+ null, null, null)
val jdbcSourceWrite = new JDBCSource(
- "TABLE_01",
- "com.mysql.jdbc.Driver",
- "jdbc:mysql://localhost:3306/db?zeroDateTimeBehavior=convertToNull",
- "root",
- "password",
- List("ID", "TEST_COLUMN1", "TEST_COLUMN2", "TEST_COLUMN3"),
- List("bigint(20)", "varchar(45)", "varchar(45)", "bigint(20)"),
- List("id"),
- new Fields("key", "column1", "column2", "column3"),
- null, null, null
- )
+ "TABLE_01",
+ "com.mysql.jdbc.Driver",
+ "jdbc:mysql://localhost:3306/db?zeroDateTimeBehavior=convertToNull",
+ "root",
+ "password",
+ List("ID", "TEST_COLUMN1", "TEST_COLUMN2", "TEST_COLUMN3"),
+ List("bigint(20)", "varchar(45)", "varchar(45)", "bigint(20)"),
+ List("id"),
+ new Fields("key", "column1", "column2", "column3"),
+ null, null, null)
```
7. HBase Delete Functionality
@@ -382,9 +379,9 @@ e.g.
```scala
val eraser = toIBW(input, TABLE_SCHEMA)
- .write(new HBaseSource( "_TEST.SALT.01", quorum, 'key,
- TABLE_SCHEMA.tail.map((x: Symbol) => "data"),
- TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)), sinkMode = SinkMode.REPLACE ))
+ .write(new HBaseSource( "_TEST.SALT.01", quorum, 'key,
+ TABLE_SCHEMA.tail.map((x: Symbol) => "data"),
+ TABLE_SCHEMA.tail.map((x: Symbol) => new Fields(x.name)), sinkMode = SinkMode.REPLACE ))
```
All rows with the key will be deleted. This includes all versions too.
@@ -422,54 +419,54 @@ If using Maven - create a pom.xml
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <modelVersion>4.0.0</modelVersion>
-
- <groupId>mygroup</groupId>
- <artifactId>myartifact</artifactId>
- <version>1.0-SNAPSHOT</version>
- <name>myname</name>
-
- <repositories>
- <repository>
- <id>conjars.org</id>
- <url>http://conjars.org/repo</url>
- </repository>
- </repositories>
-
- <dependencies>
- <dependency>
- <groupId>parallelai</groupId>
- <artifactId>parallelai.spyglass</artifactId>
- <!-- Scala: 2.10 | Scalding: 0.10.0 | SpyGlass: 4.3 -->
- <version>2.10_0.10_4.3</version>
- </dependency>
- </dependencies>
-
- <build>
- <plugins>
- <plugin>
- <groupId>org.scala-tools</groupId>
- <artifactId>maven-scala-plugin</artifactId>
- <version>2.15.2</version>
- <executions>
- <execution>
- <id>scala-compile</id>
- <goals>
- <goal>compile</goal>
- <goal>testCompile</goal>
- </goals>
- <configuration>
- <args>
- <arg>-make:transitive</arg>
- <arg>-dependencyfile</arg>
- <arg>${project.build.directory}/.scala_dependencies</arg>
- </args>
- </configuration>
- </execution>
- </executions>
- </plugin>
- </plugins>
- </build>
+ <modelVersion>4.0.0</modelVersion>
+
+ <groupId>mygroup</groupId>
+ <artifactId>myartifact</artifactId>
+ <version>1.0-SNAPSHOT</version>
+ <name>myname</name>
+
+ <repositories>
+ <repository>
+ <id>conjars.org</id>
+ <url>http://conjars.org/repo</url>
+ </repository>
+ </repositories>
+
+ <dependencies>
+ <dependency>
+ <groupId>parallelai</groupId>
+ <artifactId>parallelai.spyglass</artifactId>
+ <!-- Scala: 2.10 | Scalding: 0.10.0 | SpyGlass: 4.3 -->
+ <version>2.10_0.10_4.3</version>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.scala-tools</groupId>
+ <artifactId>maven-scala-plugin</artifactId>
+ <version>2.15.2</version>
+ <executions>
+ <execution>
+ <id>scala-compile</id>
+ <goals>
+ <goal>compile</goal>
+ <goal>testCompile</goal>
+ </goals>
+ <configuration>
+ <args>
+ <arg>-make:transitive</arg>
+ <arg>-dependencyfile</arg>
+ <arg>${project.build.directory}/.scala_dependencies</arg>
+ </args>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
</project>
```
@@ -513,6 +510,5 @@ class HBaseTest(args: Args) extends Job(args) with HBasePipeConversions {
.fromBytesWritable(SCHEMA)
.debug
.write(TextLine("test_hbase"))
-
}
```