aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--src/main/java/parallelai/spyglass/hbase/HBaseTap.java28
1 files changed, 27 insertions, 1 deletions
diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseTap.java b/src/main/java/parallelai/spyglass/hbase/HBaseTap.java
index bfe6670..3576e96 100644
--- a/src/main/java/parallelai/spyglass/hbase/HBaseTap.java
+++ b/src/main/java/parallelai/spyglass/hbase/HBaseTap.java
@@ -12,6 +12,11 @@
package parallelai.spyglass.hbase;
+import cascading.tap.TapException;
+import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
import parallelai.spyglass.hbase.HBaseConstants.SplitType;
import parallelai.spyglass.hbase.HBaseConstants.SourceMode;
@@ -179,10 +184,30 @@ public class HBaseTap extends Tap<JobConf, RecordReader, OutputCollector> {
for( SinkConfig sc : sinkConfigList) {
sc.configure(conf);
}
-
+ obtainToken(conf);
+
super.sinkConfInit(process, conf);
}
+ private void obtainToken(JobConf conf) {
+ if (User.isHBaseSecurityEnabled(conf)) {
+ String user = conf.getUser();
+ LOG.info("obtaining HBase token for: {}", user);
+ try {
+ UserGroupInformation currentUser = UserGroupInformation.getCurrentUser();
+ user = currentUser.getUserName();
+ Credentials credentials = conf.getCredentials();
+ for (Token t : currentUser.getTokens()) {
+ LOG.debug("Token {} is available", t);
+ if ("HBASE_AUTH_TOKEN".equalsIgnoreCase(t.getKind().toString()))
+ credentials.addToken(t.getKind(), t);
+ }
+ } catch (IOException e) {
+ throw new TapException("Unable to obtain HBase auth token for " + user, e);
+ }
+ }
+ }
+
@Override
public String getIdentifier() {
return id;
@@ -277,6 +302,7 @@ public class HBaseTap extends Tap<JobConf, RecordReader, OutputCollector> {
for( SourceConfig sc : sourceConfigList) {
sc.configure(conf);
}
+ obtainToken(conf);
super.sourceConfInit(process, conf);
}