diff options
Diffstat (limited to 'src/main/java/parallelai/spyglass/hbase/HBaseTap.java')
-rw-r--r-- | src/main/java/parallelai/spyglass/hbase/HBaseTap.java | 45 |
1 files changed, 36 insertions, 9 deletions
diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseTap.java b/src/main/java/parallelai/spyglass/hbase/HBaseTap.java index 0b1ee98..1224179 100644 --- a/src/main/java/parallelai/spyglass/hbase/HBaseTap.java +++ b/src/main/java/parallelai/spyglass/hbase/HBaseTap.java @@ -12,37 +12,36 @@ package parallelai.spyglass.hbase; -import parallelai.spyglass.hbase.HBaseConstants.SplitType; - -import parallelai.spyglass.hbase.HBaseConstants.SourceMode; import cascading.flow.FlowProcess; import cascading.tap.SinkMode; import cascading.tap.Tap; import cascading.tap.hadoop.io.HadoopTupleEntrySchemeIterator; import cascading.tuple.TupleEntryCollector; import cascading.tuple.TupleEntryIterator; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HTableDescriptor; -import org.apache.hadoop.hbase.MasterNotRunningException; -import org.apache.hadoop.hbase.ZooKeeperConnectionException; import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.mapred.FileInputFormat; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.mapred.RecordReader; +import org.apache.hadoop.security.Credentials; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.Token; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import parallelai.spyglass.hbase.HBaseConstants.SourceMode; +import parallelai.spyglass.hbase.HBaseConstants.SplitType; import java.io.IOException; import java.io.Serializable; import java.util.ArrayList; import java.util.Arrays; import java.util.UUID; - /** * The HBaseTap class is a {@link Tap} subclass. It is used in conjunction with * the {@HBaseFullScheme} to allow for the reading and writing @@ -125,6 +124,28 @@ public class HBaseTap extends Tap<JobConf, RecordReader, OutputCollector> { this.tableName = tableName; } + + private void obtainToken(JobConf conf) { + if (User.isHBaseSecurityEnabled(conf)) { + String user = conf.getUser(); + LOG.info("obtaining HBase token for: {}", user); + try { + UserGroupInformation currentUser = UserGroupInformation.getCurrentUser(); + user = currentUser.getUserName(); + Credentials credentials = conf.getCredentials(); + for (Token t : currentUser.getTokens()) { + LOG.debug("Token {} is available", t); + //there must be HBASE_AUTH_TOKEN exists, if not bad thing will happen, it's must be generated when job submission. + if ("HBASE_AUTH_TOKEN".equalsIgnoreCase(t.getKind().toString())) { + credentials.addToken(t.getKind(), t); + } + } + } catch (IOException e) { + throw new RuntimeException("Unable to obtain HBase auth token for " + user, e); + } + } + } + /** * Method getTableName returns the tableName of this HBaseTap object. * @@ -153,6 +174,8 @@ public class HBaseTap extends Tap<JobConf, RecordReader, OutputCollector> { conf.set("hbase.zookeeper.quorum", quorumNames); } + + LOG.debug("sinking to table: {}", tableName); if (isReplace() && conf.get("mapred.task.partition") == null) { @@ -179,7 +202,9 @@ public class HBaseTap extends Tap<JobConf, RecordReader, OutputCollector> { for( SinkConfig sc : sinkConfigList) { sc.configure(conf); } - + + obtainToken(conf); + super.sinkConfInit(process, conf); } @@ -277,7 +302,9 @@ public class HBaseTap extends Tap<JobConf, RecordReader, OutputCollector> { for( SourceConfig sc : sourceConfigList) { sc.configure(conf); } - + + obtainToken(conf); + super.sourceConfInit(process, conf); } |