diff options
Diffstat (limited to 'src/main')
5 files changed, 61 insertions, 37 deletions
| diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseInputFormatBase.java b/src/main/java/parallelai/spyglass/hbase/HBaseInputFormatBase.java index 6fa7fce..618bd51 100644 --- a/src/main/java/parallelai/spyglass/hbase/HBaseInputFormatBase.java +++ b/src/main/java/parallelai/spyglass/hbase/HBaseInputFormatBase.java @@ -78,7 +78,7 @@ public abstract class HBaseInputFormatBase implements InputFormat<ImmutableBytes          sourceMode = HBaseConstants.SourceMode.valueOf(job.get(String.format(                  HBaseConstants.SOURCE_MODE, getTableName(job)))); -        LOG.info(String.format("GOT SOURCE MODE (%s) as (%s) and finally", String +        LOG.info(String.format("GOT SOURCE MODE (%s) as (%s) and finally (%s)", String                  .format(HBaseConstants.SOURCE_MODE, getTableName(job)), job                  .get(String.format(HBaseConstants.SOURCE_MODE, getTableName(job))),                  sourceMode)); diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseInputFormatGranular.java b/src/main/java/parallelai/spyglass/hbase/HBaseInputFormatGranular.java index 7b61047..753b27e 100644 --- a/src/main/java/parallelai/spyglass/hbase/HBaseInputFormatGranular.java +++ b/src/main/java/parallelai/spyglass/hbase/HBaseInputFormatGranular.java @@ -1,15 +1,5 @@  package parallelai.spyglass.hbase; -import java.io.IOException; -import java.net.InetAddress; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Set; -import java.util.TreeSet; - -import javax.naming.NamingException; -  import org.apache.commons.logging.Log;  import org.apache.commons.logging.LogFactory;  import org.apache.hadoop.hbase.HConstants; @@ -25,9 +15,17 @@ import org.apache.hadoop.mapred.JobConf;  import org.apache.hadoop.mapred.RecordReader;  import org.apache.hadoop.mapred.Reporter;  import org.apache.hadoop.net.DNS; -  import parallelai.spyglass.hbase.HBaseConstants.SourceMode; +import javax.naming.NamingException; +import java.io.IOException; +import java.net.InetAddress; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Set; +import java.util.TreeSet; +  /**   * See HBaseInputFormatRegional first (!)   * @@ -188,20 +186,6 @@ public class HBaseInputFormatGranular extends HBaseInputFormatBase {  						byte[] rStart = cRegion.getRegionInfo().getStartKey();  						byte[] rStop = cRegion.getRegionInfo().getEndKey(); -//						HServerAddress regionServerAddress = cRegion -//								.getServerAddress(); -//						InetAddress regionAddress = regionServerAddress -//								.getInetSocketAddress().getAddress(); -//						String regionLocation; -//						try { -//							regionLocation = reverseDNS(regionAddress); -//                            regionLocation = cRegion. -//						} catch (NamingException e) { -//							LOG.error("Cannot resolve the host name for " -//									+ regionAddress + " because of " + e); -//							regionLocation = cRegion.getHostname(); -//						} -                          String regionName = cRegion.getRegionInfo().getRegionNameAsString();                          byte[] sStart = (startRow == HConstants.EMPTY_START_ROW diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseTap.java b/src/main/java/parallelai/spyglass/hbase/HBaseTap.java index 0b1ee98..1224179 100644 --- a/src/main/java/parallelai/spyglass/hbase/HBaseTap.java +++ b/src/main/java/parallelai/spyglass/hbase/HBaseTap.java @@ -12,37 +12,36 @@  package parallelai.spyglass.hbase; -import parallelai.spyglass.hbase.HBaseConstants.SplitType; - -import parallelai.spyglass.hbase.HBaseConstants.SourceMode;  import cascading.flow.FlowProcess;  import cascading.tap.SinkMode;  import cascading.tap.Tap;  import cascading.tap.hadoop.io.HadoopTupleEntrySchemeIterator;  import cascading.tuple.TupleEntryCollector;  import cascading.tuple.TupleEntryIterator; -  import org.apache.hadoop.conf.Configuration;  import org.apache.hadoop.fs.Path;  import org.apache.hadoop.hbase.HBaseConfiguration;  import org.apache.hadoop.hbase.HColumnDescriptor;  import org.apache.hadoop.hbase.HTableDescriptor; -import org.apache.hadoop.hbase.MasterNotRunningException; -import org.apache.hadoop.hbase.ZooKeeperConnectionException;  import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.security.User;  import org.apache.hadoop.mapred.FileInputFormat;  import org.apache.hadoop.mapred.JobConf;  import org.apache.hadoop.mapred.OutputCollector;  import org.apache.hadoop.mapred.RecordReader; +import org.apache.hadoop.security.Credentials; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.Token;  import org.slf4j.Logger;  import org.slf4j.LoggerFactory; +import parallelai.spyglass.hbase.HBaseConstants.SourceMode; +import parallelai.spyglass.hbase.HBaseConstants.SplitType;  import java.io.IOException;  import java.io.Serializable;  import java.util.ArrayList;  import java.util.Arrays;  import java.util.UUID; -  /**   * The HBaseTap class is a {@link Tap} subclass. It is used in conjunction with   * the {@HBaseFullScheme} to allow for the reading and writing @@ -125,6 +124,28 @@ public class HBaseTap extends Tap<JobConf, RecordReader, OutputCollector> {      this.tableName = tableName;    } + +  private void obtainToken(JobConf conf) { +    if (User.isHBaseSecurityEnabled(conf)) { +        String user = conf.getUser(); +        LOG.info("obtaining HBase token for: {}", user); +        try { +            UserGroupInformation currentUser = UserGroupInformation.getCurrentUser(); +            user = currentUser.getUserName(); +            Credentials credentials = conf.getCredentials(); +            for (Token t : currentUser.getTokens()) { +                LOG.debug("Token {} is available", t); +                //there must be HBASE_AUTH_TOKEN exists, if not bad thing will happen, it's must be generated when job submission. +                if ("HBASE_AUTH_TOKEN".equalsIgnoreCase(t.getKind().toString())) { +                    credentials.addToken(t.getKind(), t); +                } +            } +        } catch (IOException e) { +            throw new RuntimeException("Unable to obtain HBase auth token for " + user, e); +        } +    } +  } +    /**     * Method getTableName returns the tableName of this HBaseTap object.     * @@ -153,6 +174,8 @@ public class HBaseTap extends Tap<JobConf, RecordReader, OutputCollector> {        conf.set("hbase.zookeeper.quorum", quorumNames);      } + +      LOG.debug("sinking to table: {}", tableName);      if (isReplace() && conf.get("mapred.task.partition") == null) { @@ -179,7 +202,9 @@ public class HBaseTap extends Tap<JobConf, RecordReader, OutputCollector> {      for( SinkConfig sc : sinkConfigList) {          sc.configure(conf);      } -     + +    obtainToken(conf); +      super.sinkConfInit(process, conf);    } @@ -277,7 +302,9 @@ public class HBaseTap extends Tap<JobConf, RecordReader, OutputCollector> {      for( SourceConfig sc : sourceConfigList) {        sc.configure(conf);      } -     + +    obtainToken(conf); +      super.sourceConfInit(process, conf);    } diff --git a/src/main/resources/pom.xml b/src/main/resources/pom.xml index 2557f79..404bc73 100644 --- a/src/main/resources/pom.xml +++ b/src/main/resources/pom.xml @@ -8,7 +8,7 @@      <description>Cascading and Scalding wrapper for HBase with advanced features</description>      <groupId>parallelai</groupId>      <artifactId>parallelai.spyglass</artifactId> -    <version>2.10_0.10_4.4</version> +    <version>2.10_0.10_CDH5_4.4</version>      <packaging>jar</packaging>      <organization> @@ -99,6 +99,11 @@              <artifactId>hbase-client</artifactId>              <version>0.98.1-cdh5.1.0</version>          </dependency> +        <dependency> +            <groupId>org.apache.hbase</groupId> +            <artifactId>hbase-protocol</artifactId> +            <version>0.98.1-cdh5.1.0</version> +        </dependency>          <dependency>              <groupId>org.slf4j</groupId> diff --git a/src/main/scala/parallelai/spyglass/base/JobRunner.scala b/src/main/scala/parallelai/spyglass/base/JobRunner.scala index b3a9af0..632a345 100644 --- a/src/main/scala/parallelai/spyglass/base/JobRunner.scala +++ b/src/main/scala/parallelai/spyglass/base/JobRunner.scala @@ -3,7 +3,10 @@ package parallelai.spyglass.base  import org.apache.hadoop.conf.Configuration  import com.twitter.scalding.Tool  import org.apache.hadoop -  +import org.apache.hadoop.hbase.security.token.TokenUtil +import org.apache.hadoop.security.UserGroupInformation +import org.apache.hadoop.hbase.security.User +  object JobRunner {    def main(args : Array[String]) {      val conf: Configuration = new Configuration @@ -17,6 +20,11 @@ object JobRunner {      }      AppConfig.jobConfig = conf + +    if (User.isHBaseSecurityEnabled(conf)) { +      println("Obtaining token for HBase security."); +      TokenUtil.obtainAndCacheToken(conf, UserGroupInformation.getCurrentUser()); +    }      hadoop.util.ToolRunner.run(conf, new Tool, args);    } | 
