aboutsummaryrefslogtreecommitdiffstats
path: root/src/main/java/parallelai/spyglass/hbase/HBaseTap.java
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java/parallelai/spyglass/hbase/HBaseTap.java')
-rw-r--r--src/main/java/parallelai/spyglass/hbase/HBaseTap.java45
1 files changed, 36 insertions, 9 deletions
diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseTap.java b/src/main/java/parallelai/spyglass/hbase/HBaseTap.java
index 0b1ee98..1224179 100644
--- a/src/main/java/parallelai/spyglass/hbase/HBaseTap.java
+++ b/src/main/java/parallelai/spyglass/hbase/HBaseTap.java
@@ -12,37 +12,36 @@
package parallelai.spyglass.hbase;
-import parallelai.spyglass.hbase.HBaseConstants.SplitType;
-
-import parallelai.spyglass.hbase.HBaseConstants.SourceMode;
import cascading.flow.FlowProcess;
import cascading.tap.SinkMode;
import cascading.tap.Tap;
import cascading.tap.hadoop.io.HadoopTupleEntrySchemeIterator;
import cascading.tuple.TupleEntryCollector;
import cascading.tuple.TupleEntryIterator;
-
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.MasterNotRunningException;
-import org.apache.hadoop.hbase.ZooKeeperConnectionException;
import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import parallelai.spyglass.hbase.HBaseConstants.SourceMode;
+import parallelai.spyglass.hbase.HBaseConstants.SplitType;
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.UUID;
-
/**
* The HBaseTap class is a {@link Tap} subclass. It is used in conjunction with
* the {@HBaseFullScheme} to allow for the reading and writing
@@ -125,6 +124,28 @@ public class HBaseTap extends Tap<JobConf, RecordReader, OutputCollector> {
this.tableName = tableName;
}
+
+ private void obtainToken(JobConf conf) {
+ if (User.isHBaseSecurityEnabled(conf)) {
+ String user = conf.getUser();
+ LOG.info("obtaining HBase token for: {}", user);
+ try {
+ UserGroupInformation currentUser = UserGroupInformation.getCurrentUser();
+ user = currentUser.getUserName();
+ Credentials credentials = conf.getCredentials();
+ for (Token t : currentUser.getTokens()) {
+ LOG.debug("Token {} is available", t);
+ //there must be HBASE_AUTH_TOKEN exists, if not bad thing will happen, it's must be generated when job submission.
+ if ("HBASE_AUTH_TOKEN".equalsIgnoreCase(t.getKind().toString())) {
+ credentials.addToken(t.getKind(), t);
+ }
+ }
+ } catch (IOException e) {
+ throw new RuntimeException("Unable to obtain HBase auth token for " + user, e);
+ }
+ }
+ }
+
/**
* Method getTableName returns the tableName of this HBaseTap object.
*
@@ -153,6 +174,8 @@ public class HBaseTap extends Tap<JobConf, RecordReader, OutputCollector> {
conf.set("hbase.zookeeper.quorum", quorumNames);
}
+
+
LOG.debug("sinking to table: {}", tableName);
if (isReplace() && conf.get("mapred.task.partition") == null) {
@@ -179,7 +202,9 @@ public class HBaseTap extends Tap<JobConf, RecordReader, OutputCollector> {
for( SinkConfig sc : sinkConfigList) {
sc.configure(conf);
}
-
+
+ obtainToken(conf);
+
super.sinkConfInit(process, conf);
}
@@ -277,7 +302,9 @@ public class HBaseTap extends Tap<JobConf, RecordReader, OutputCollector> {
for( SourceConfig sc : sourceConfigList) {
sc.configure(conf);
}
-
+
+ obtainToken(conf);
+
super.sourceConfInit(process, conf);
}