From 60e2de212ee5a545e0c21d947841dc89d7199f49 Mon Sep 17 00:00:00 2001 From: galarragas Date: Wed, 3 Sep 2014 10:40:13 +0100 Subject: Adding support for Kerberos Authentication --- pom.xml | 7 +++- .../spyglass/hbase/HBaseInputFormatBase.java | 2 +- .../spyglass/hbase/HBaseInputFormatGranular.java | 34 +++++----------- .../java/parallelai/spyglass/hbase/HBaseTap.java | 45 +++++++++++++++++----- src/main/resources/pom.xml | 7 +++- .../scala/parallelai/spyglass/base/JobRunner.scala | 10 ++++- 6 files changed, 67 insertions(+), 38 deletions(-) diff --git a/pom.xml b/pom.xml index b2df020..747c272 100644 --- a/pom.xml +++ b/pom.xml @@ -11,7 +11,7 @@ Cascading and Scalding wrapper for HBase with advanced features parallelai parallelai.spyglass - 2.10_0.10_4.4 + 2.10_0.10_CDH5_4.4 jar @@ -161,6 +161,11 @@ hbase-client ${hbase.version} + + org.apache.hbase + hbase-protocol + ${hbase.version} + org.slf4j diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseInputFormatBase.java b/src/main/java/parallelai/spyglass/hbase/HBaseInputFormatBase.java index 6fa7fce..618bd51 100644 --- a/src/main/java/parallelai/spyglass/hbase/HBaseInputFormatBase.java +++ b/src/main/java/parallelai/spyglass/hbase/HBaseInputFormatBase.java @@ -78,7 +78,7 @@ public abstract class HBaseInputFormatBase implements InputFormat { this.tableName = tableName; } + + private void obtainToken(JobConf conf) { + if (User.isHBaseSecurityEnabled(conf)) { + String user = conf.getUser(); + LOG.info("obtaining HBase token for: {}", user); + try { + UserGroupInformation currentUser = UserGroupInformation.getCurrentUser(); + user = currentUser.getUserName(); + Credentials credentials = conf.getCredentials(); + for (Token t : currentUser.getTokens()) { + LOG.debug("Token {} is available", t); + //there must be HBASE_AUTH_TOKEN exists, if not bad thing will happen, it's must be generated when job submission. + if ("HBASE_AUTH_TOKEN".equalsIgnoreCase(t.getKind().toString())) { + credentials.addToken(t.getKind(), t); + } + } + } catch (IOException e) { + throw new RuntimeException("Unable to obtain HBase auth token for " + user, e); + } + } + } + /** * Method getTableName returns the tableName of this HBaseTap object. * @@ -153,6 +174,8 @@ public class HBaseTap extends Tap { conf.set("hbase.zookeeper.quorum", quorumNames); } + + LOG.debug("sinking to table: {}", tableName); if (isReplace() && conf.get("mapred.task.partition") == null) { @@ -179,7 +202,9 @@ public class HBaseTap extends Tap { for( SinkConfig sc : sinkConfigList) { sc.configure(conf); } - + + obtainToken(conf); + super.sinkConfInit(process, conf); } @@ -277,7 +302,9 @@ public class HBaseTap extends Tap { for( SourceConfig sc : sourceConfigList) { sc.configure(conf); } - + + obtainToken(conf); + super.sourceConfInit(process, conf); } diff --git a/src/main/resources/pom.xml b/src/main/resources/pom.xml index 2557f79..404bc73 100644 --- a/src/main/resources/pom.xml +++ b/src/main/resources/pom.xml @@ -8,7 +8,7 @@ Cascading and Scalding wrapper for HBase with advanced features parallelai parallelai.spyglass - 2.10_0.10_4.4 + 2.10_0.10_CDH5_4.4 jar @@ -99,6 +99,11 @@ hbase-client 0.98.1-cdh5.1.0 + + org.apache.hbase + hbase-protocol + 0.98.1-cdh5.1.0 + org.slf4j diff --git a/src/main/scala/parallelai/spyglass/base/JobRunner.scala b/src/main/scala/parallelai/spyglass/base/JobRunner.scala index b3a9af0..632a345 100644 --- a/src/main/scala/parallelai/spyglass/base/JobRunner.scala +++ b/src/main/scala/parallelai/spyglass/base/JobRunner.scala @@ -3,7 +3,10 @@ package parallelai.spyglass.base import org.apache.hadoop.conf.Configuration import com.twitter.scalding.Tool import org.apache.hadoop - +import org.apache.hadoop.hbase.security.token.TokenUtil +import org.apache.hadoop.security.UserGroupInformation +import org.apache.hadoop.hbase.security.User + object JobRunner { def main(args : Array[String]) { val conf: Configuration = new Configuration @@ -17,6 +20,11 @@ object JobRunner { } AppConfig.jobConfig = conf + + if (User.isHBaseSecurityEnabled(conf)) { + println("Obtaining token for HBase security."); + TokenUtil.obtainAndCacheToken(conf, UserGroupInformation.getCurrentUser()); + } hadoop.util.ToolRunner.run(conf, new Tool, args); } -- cgit v1.2.3