diff options
-rw-r--r-- | pom.xml | 7 | ||||
-rw-r--r-- | src/main/java/parallelai/spyglass/hbase/HBaseInputFormatBase.java | 2 | ||||
-rw-r--r-- | src/main/java/parallelai/spyglass/hbase/HBaseInputFormatGranular.java | 34 | ||||
-rw-r--r-- | src/main/java/parallelai/spyglass/hbase/HBaseTap.java | 45 | ||||
-rw-r--r-- | src/main/resources/pom.xml | 7 | ||||
-rw-r--r-- | src/main/scala/parallelai/spyglass/base/JobRunner.scala | 10 |
6 files changed, 67 insertions, 38 deletions
@@ -11,7 +11,7 @@ <name>Cascading and Scalding wrapper for HBase with advanced features</name> <groupId>parallelai</groupId> <artifactId>parallelai.spyglass</artifactId> - <version>2.10_0.10_4.4</version> + <version>2.10_0.10_CDH5_4.4</version> <packaging>jar</packaging> <properties> @@ -161,6 +161,11 @@ <artifactId>hbase-client</artifactId> <version>${hbase.version}</version> </dependency> + <dependency> + <groupId>org.apache.hbase</groupId> + <artifactId>hbase-protocol</artifactId> + <version>${hbase.version}</version> + </dependency> <dependency> <groupId>org.slf4j</groupId> diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseInputFormatBase.java b/src/main/java/parallelai/spyglass/hbase/HBaseInputFormatBase.java index 6fa7fce..618bd51 100644 --- a/src/main/java/parallelai/spyglass/hbase/HBaseInputFormatBase.java +++ b/src/main/java/parallelai/spyglass/hbase/HBaseInputFormatBase.java @@ -78,7 +78,7 @@ public abstract class HBaseInputFormatBase implements InputFormat<ImmutableBytes sourceMode = HBaseConstants.SourceMode.valueOf(job.get(String.format( HBaseConstants.SOURCE_MODE, getTableName(job)))); - LOG.info(String.format("GOT SOURCE MODE (%s) as (%s) and finally", String + LOG.info(String.format("GOT SOURCE MODE (%s) as (%s) and finally (%s)", String .format(HBaseConstants.SOURCE_MODE, getTableName(job)), job .get(String.format(HBaseConstants.SOURCE_MODE, getTableName(job))), sourceMode)); diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseInputFormatGranular.java b/src/main/java/parallelai/spyglass/hbase/HBaseInputFormatGranular.java index 7b61047..753b27e 100644 --- a/src/main/java/parallelai/spyglass/hbase/HBaseInputFormatGranular.java +++ b/src/main/java/parallelai/spyglass/hbase/HBaseInputFormatGranular.java @@ -1,15 +1,5 @@ package parallelai.spyglass.hbase; -import java.io.IOException; -import java.net.InetAddress; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Set; -import java.util.TreeSet; - -import javax.naming.NamingException; - import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.HConstants; @@ -25,9 +15,17 @@ import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.RecordReader; import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.net.DNS; - import parallelai.spyglass.hbase.HBaseConstants.SourceMode; +import javax.naming.NamingException; +import java.io.IOException; +import java.net.InetAddress; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Set; +import java.util.TreeSet; + /** * See HBaseInputFormatRegional first (!) * @@ -188,20 +186,6 @@ public class HBaseInputFormatGranular extends HBaseInputFormatBase { byte[] rStart = cRegion.getRegionInfo().getStartKey(); byte[] rStop = cRegion.getRegionInfo().getEndKey(); -// HServerAddress regionServerAddress = cRegion -// .getServerAddress(); -// InetAddress regionAddress = regionServerAddress -// .getInetSocketAddress().getAddress(); -// String regionLocation; -// try { -// regionLocation = reverseDNS(regionAddress); -// regionLocation = cRegion. -// } catch (NamingException e) { -// LOG.error("Cannot resolve the host name for " -// + regionAddress + " because of " + e); -// regionLocation = cRegion.getHostname(); -// } - String regionName = cRegion.getRegionInfo().getRegionNameAsString(); byte[] sStart = (startRow == HConstants.EMPTY_START_ROW diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseTap.java b/src/main/java/parallelai/spyglass/hbase/HBaseTap.java index 0b1ee98..1224179 100644 --- a/src/main/java/parallelai/spyglass/hbase/HBaseTap.java +++ b/src/main/java/parallelai/spyglass/hbase/HBaseTap.java @@ -12,37 +12,36 @@ package parallelai.spyglass.hbase; -import parallelai.spyglass.hbase.HBaseConstants.SplitType; - -import parallelai.spyglass.hbase.HBaseConstants.SourceMode; import cascading.flow.FlowProcess; import cascading.tap.SinkMode; import cascading.tap.Tap; import cascading.tap.hadoop.io.HadoopTupleEntrySchemeIterator; import cascading.tuple.TupleEntryCollector; import cascading.tuple.TupleEntryIterator; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HTableDescriptor; -import org.apache.hadoop.hbase.MasterNotRunningException; -import org.apache.hadoop.hbase.ZooKeeperConnectionException; import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.mapred.FileInputFormat; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.mapred.RecordReader; +import org.apache.hadoop.security.Credentials; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.Token; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import parallelai.spyglass.hbase.HBaseConstants.SourceMode; +import parallelai.spyglass.hbase.HBaseConstants.SplitType; import java.io.IOException; import java.io.Serializable; import java.util.ArrayList; import java.util.Arrays; import java.util.UUID; - /** * The HBaseTap class is a {@link Tap} subclass. It is used in conjunction with * the {@HBaseFullScheme} to allow for the reading and writing @@ -125,6 +124,28 @@ public class HBaseTap extends Tap<JobConf, RecordReader, OutputCollector> { this.tableName = tableName; } + + private void obtainToken(JobConf conf) { + if (User.isHBaseSecurityEnabled(conf)) { + String user = conf.getUser(); + LOG.info("obtaining HBase token for: {}", user); + try { + UserGroupInformation currentUser = UserGroupInformation.getCurrentUser(); + user = currentUser.getUserName(); + Credentials credentials = conf.getCredentials(); + for (Token t : currentUser.getTokens()) { + LOG.debug("Token {} is available", t); + //there must be HBASE_AUTH_TOKEN exists, if not bad thing will happen, it's must be generated when job submission. + if ("HBASE_AUTH_TOKEN".equalsIgnoreCase(t.getKind().toString())) { + credentials.addToken(t.getKind(), t); + } + } + } catch (IOException e) { + throw new RuntimeException("Unable to obtain HBase auth token for " + user, e); + } + } + } + /** * Method getTableName returns the tableName of this HBaseTap object. * @@ -153,6 +174,8 @@ public class HBaseTap extends Tap<JobConf, RecordReader, OutputCollector> { conf.set("hbase.zookeeper.quorum", quorumNames); } + + LOG.debug("sinking to table: {}", tableName); if (isReplace() && conf.get("mapred.task.partition") == null) { @@ -179,7 +202,9 @@ public class HBaseTap extends Tap<JobConf, RecordReader, OutputCollector> { for( SinkConfig sc : sinkConfigList) { sc.configure(conf); } - + + obtainToken(conf); + super.sinkConfInit(process, conf); } @@ -277,7 +302,9 @@ public class HBaseTap extends Tap<JobConf, RecordReader, OutputCollector> { for( SourceConfig sc : sourceConfigList) { sc.configure(conf); } - + + obtainToken(conf); + super.sourceConfInit(process, conf); } diff --git a/src/main/resources/pom.xml b/src/main/resources/pom.xml index 2557f79..404bc73 100644 --- a/src/main/resources/pom.xml +++ b/src/main/resources/pom.xml @@ -8,7 +8,7 @@ <description>Cascading and Scalding wrapper for HBase with advanced features</description> <groupId>parallelai</groupId> <artifactId>parallelai.spyglass</artifactId> - <version>2.10_0.10_4.4</version> + <version>2.10_0.10_CDH5_4.4</version> <packaging>jar</packaging> <organization> @@ -99,6 +99,11 @@ <artifactId>hbase-client</artifactId> <version>0.98.1-cdh5.1.0</version> </dependency> + <dependency> + <groupId>org.apache.hbase</groupId> + <artifactId>hbase-protocol</artifactId> + <version>0.98.1-cdh5.1.0</version> + </dependency> <dependency> <groupId>org.slf4j</groupId> diff --git a/src/main/scala/parallelai/spyglass/base/JobRunner.scala b/src/main/scala/parallelai/spyglass/base/JobRunner.scala index b3a9af0..632a345 100644 --- a/src/main/scala/parallelai/spyglass/base/JobRunner.scala +++ b/src/main/scala/parallelai/spyglass/base/JobRunner.scala @@ -3,7 +3,10 @@ package parallelai.spyglass.base import org.apache.hadoop.conf.Configuration import com.twitter.scalding.Tool import org.apache.hadoop - +import org.apache.hadoop.hbase.security.token.TokenUtil +import org.apache.hadoop.security.UserGroupInformation +import org.apache.hadoop.hbase.security.User + object JobRunner { def main(args : Array[String]) { val conf: Configuration = new Configuration @@ -17,6 +20,11 @@ object JobRunner { } AppConfig.jobConfig = conf + + if (User.isHBaseSecurityEnabled(conf)) { + println("Obtaining token for HBase security."); + TokenUtil.obtainAndCacheToken(conf, UserGroupInformation.getCurrentUser()); + } hadoop.util.ToolRunner.run(conf, new Tool, args); } |