aboutsummaryrefslogtreecommitdiffstats
path: root/src/main
diff options
context:
space:
mode:
Diffstat (limited to 'src/main')
-rw-r--r--src/main/java/parallelai/spyglass/hbase/HBaseInputFormatBase.java2
-rw-r--r--src/main/java/parallelai/spyglass/hbase/HBaseInputFormatGranular.java34
-rw-r--r--src/main/java/parallelai/spyglass/hbase/HBaseTap.java45
-rw-r--r--src/main/resources/pom.xml7
-rw-r--r--src/main/scala/parallelai/spyglass/base/JobRunner.scala10
5 files changed, 61 insertions, 37 deletions
diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseInputFormatBase.java b/src/main/java/parallelai/spyglass/hbase/HBaseInputFormatBase.java
index 6fa7fce..618bd51 100644
--- a/src/main/java/parallelai/spyglass/hbase/HBaseInputFormatBase.java
+++ b/src/main/java/parallelai/spyglass/hbase/HBaseInputFormatBase.java
@@ -78,7 +78,7 @@ public abstract class HBaseInputFormatBase implements InputFormat<ImmutableBytes
sourceMode = HBaseConstants.SourceMode.valueOf(job.get(String.format(
HBaseConstants.SOURCE_MODE, getTableName(job))));
- LOG.info(String.format("GOT SOURCE MODE (%s) as (%s) and finally", String
+ LOG.info(String.format("GOT SOURCE MODE (%s) as (%s) and finally (%s)", String
.format(HBaseConstants.SOURCE_MODE, getTableName(job)), job
.get(String.format(HBaseConstants.SOURCE_MODE, getTableName(job))),
sourceMode));
diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseInputFormatGranular.java b/src/main/java/parallelai/spyglass/hbase/HBaseInputFormatGranular.java
index 7b61047..753b27e 100644
--- a/src/main/java/parallelai/spyglass/hbase/HBaseInputFormatGranular.java
+++ b/src/main/java/parallelai/spyglass/hbase/HBaseInputFormatGranular.java
@@ -1,15 +1,5 @@
package parallelai.spyglass.hbase;
-import java.io.IOException;
-import java.net.InetAddress;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Set;
-import java.util.TreeSet;
-
-import javax.naming.NamingException;
-
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.HConstants;
@@ -25,9 +15,17 @@ import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.net.DNS;
-
import parallelai.spyglass.hbase.HBaseConstants.SourceMode;
+import javax.naming.NamingException;
+import java.io.IOException;
+import java.net.InetAddress;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Set;
+import java.util.TreeSet;
+
/**
* See HBaseInputFormatRegional first (!)
*
@@ -188,20 +186,6 @@ public class HBaseInputFormatGranular extends HBaseInputFormatBase {
byte[] rStart = cRegion.getRegionInfo().getStartKey();
byte[] rStop = cRegion.getRegionInfo().getEndKey();
-// HServerAddress regionServerAddress = cRegion
-// .getServerAddress();
-// InetAddress regionAddress = regionServerAddress
-// .getInetSocketAddress().getAddress();
-// String regionLocation;
-// try {
-// regionLocation = reverseDNS(regionAddress);
-// regionLocation = cRegion.
-// } catch (NamingException e) {
-// LOG.error("Cannot resolve the host name for "
-// + regionAddress + " because of " + e);
-// regionLocation = cRegion.getHostname();
-// }
-
String regionName = cRegion.getRegionInfo().getRegionNameAsString();
byte[] sStart = (startRow == HConstants.EMPTY_START_ROW
diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseTap.java b/src/main/java/parallelai/spyglass/hbase/HBaseTap.java
index 0b1ee98..1224179 100644
--- a/src/main/java/parallelai/spyglass/hbase/HBaseTap.java
+++ b/src/main/java/parallelai/spyglass/hbase/HBaseTap.java
@@ -12,37 +12,36 @@
package parallelai.spyglass.hbase;
-import parallelai.spyglass.hbase.HBaseConstants.SplitType;
-
-import parallelai.spyglass.hbase.HBaseConstants.SourceMode;
import cascading.flow.FlowProcess;
import cascading.tap.SinkMode;
import cascading.tap.Tap;
import cascading.tap.hadoop.io.HadoopTupleEntrySchemeIterator;
import cascading.tuple.TupleEntryCollector;
import cascading.tuple.TupleEntryIterator;
-
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.MasterNotRunningException;
-import org.apache.hadoop.hbase.ZooKeeperConnectionException;
import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import parallelai.spyglass.hbase.HBaseConstants.SourceMode;
+import parallelai.spyglass.hbase.HBaseConstants.SplitType;
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.UUID;
-
/**
* The HBaseTap class is a {@link Tap} subclass. It is used in conjunction with
* the {@HBaseFullScheme} to allow for the reading and writing
@@ -125,6 +124,28 @@ public class HBaseTap extends Tap<JobConf, RecordReader, OutputCollector> {
this.tableName = tableName;
}
+
+ private void obtainToken(JobConf conf) {
+ if (User.isHBaseSecurityEnabled(conf)) {
+ String user = conf.getUser();
+ LOG.info("obtaining HBase token for: {}", user);
+ try {
+ UserGroupInformation currentUser = UserGroupInformation.getCurrentUser();
+ user = currentUser.getUserName();
+ Credentials credentials = conf.getCredentials();
+ for (Token t : currentUser.getTokens()) {
+ LOG.debug("Token {} is available", t);
+ //there must be HBASE_AUTH_TOKEN exists, if not bad thing will happen, it's must be generated when job submission.
+ if ("HBASE_AUTH_TOKEN".equalsIgnoreCase(t.getKind().toString())) {
+ credentials.addToken(t.getKind(), t);
+ }
+ }
+ } catch (IOException e) {
+ throw new RuntimeException("Unable to obtain HBase auth token for " + user, e);
+ }
+ }
+ }
+
/**
* Method getTableName returns the tableName of this HBaseTap object.
*
@@ -153,6 +174,8 @@ public class HBaseTap extends Tap<JobConf, RecordReader, OutputCollector> {
conf.set("hbase.zookeeper.quorum", quorumNames);
}
+
+
LOG.debug("sinking to table: {}", tableName);
if (isReplace() && conf.get("mapred.task.partition") == null) {
@@ -179,7 +202,9 @@ public class HBaseTap extends Tap<JobConf, RecordReader, OutputCollector> {
for( SinkConfig sc : sinkConfigList) {
sc.configure(conf);
}
-
+
+ obtainToken(conf);
+
super.sinkConfInit(process, conf);
}
@@ -277,7 +302,9 @@ public class HBaseTap extends Tap<JobConf, RecordReader, OutputCollector> {
for( SourceConfig sc : sourceConfigList) {
sc.configure(conf);
}
-
+
+ obtainToken(conf);
+
super.sourceConfInit(process, conf);
}
diff --git a/src/main/resources/pom.xml b/src/main/resources/pom.xml
index 2557f79..404bc73 100644
--- a/src/main/resources/pom.xml
+++ b/src/main/resources/pom.xml
@@ -8,7 +8,7 @@
<description>Cascading and Scalding wrapper for HBase with advanced features</description>
<groupId>parallelai</groupId>
<artifactId>parallelai.spyglass</artifactId>
- <version>2.10_0.10_4.4</version>
+ <version>2.10_0.10_CDH5_4.4</version>
<packaging>jar</packaging>
<organization>
@@ -99,6 +99,11 @@
<artifactId>hbase-client</artifactId>
<version>0.98.1-cdh5.1.0</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.hbase</groupId>
+ <artifactId>hbase-protocol</artifactId>
+ <version>0.98.1-cdh5.1.0</version>
+ </dependency>
<dependency>
<groupId>org.slf4j</groupId>
diff --git a/src/main/scala/parallelai/spyglass/base/JobRunner.scala b/src/main/scala/parallelai/spyglass/base/JobRunner.scala
index b3a9af0..632a345 100644
--- a/src/main/scala/parallelai/spyglass/base/JobRunner.scala
+++ b/src/main/scala/parallelai/spyglass/base/JobRunner.scala
@@ -3,7 +3,10 @@ package parallelai.spyglass.base
import org.apache.hadoop.conf.Configuration
import com.twitter.scalding.Tool
import org.apache.hadoop
-
+import org.apache.hadoop.hbase.security.token.TokenUtil
+import org.apache.hadoop.security.UserGroupInformation
+import org.apache.hadoop.hbase.security.User
+
object JobRunner {
def main(args : Array[String]) {
val conf: Configuration = new Configuration
@@ -17,6 +20,11 @@ object JobRunner {
}
AppConfig.jobConfig = conf
+
+ if (User.isHBaseSecurityEnabled(conf)) {
+ println("Obtaining token for HBase security.");
+ TokenUtil.obtainAndCacheToken(conf, UserGroupInformation.getCurrentUser());
+ }
hadoop.util.ToolRunner.run(conf, new Tool, args);
}