aboutsummaryrefslogtreecommitdiffstats
path: root/src/main/java
diff options
context:
space:
mode:
authorgalarragas <galarragas@gmail.com>2014-09-03 10:40:13 +0100
committergalarragas <galarragas@gmail.com>2014-09-03 10:40:13 +0100
commit60e2de212ee5a545e0c21d947841dc89d7199f49 (patch)
tree2b5be9427953b962e20ca929b9d5b42d9bb4ee3f /src/main/java
parent0c84fc54a6b8e20be20552f63c9787e7fb9f2c6e (diff)
downloadSpyGlass-60e2de212ee5a545e0c21d947841dc89d7199f49.tar.gz
SpyGlass-60e2de212ee5a545e0c21d947841dc89d7199f49.zip
Adding support for Kerberos Authentication
Diffstat (limited to 'src/main/java')
-rw-r--r--src/main/java/parallelai/spyglass/hbase/HBaseInputFormatBase.java2
-rw-r--r--src/main/java/parallelai/spyglass/hbase/HBaseInputFormatGranular.java34
-rw-r--r--src/main/java/parallelai/spyglass/hbase/HBaseTap.java45
3 files changed, 46 insertions, 35 deletions
diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseInputFormatBase.java b/src/main/java/parallelai/spyglass/hbase/HBaseInputFormatBase.java
index 6fa7fce..618bd51 100644
--- a/src/main/java/parallelai/spyglass/hbase/HBaseInputFormatBase.java
+++ b/src/main/java/parallelai/spyglass/hbase/HBaseInputFormatBase.java
@@ -78,7 +78,7 @@ public abstract class HBaseInputFormatBase implements InputFormat<ImmutableBytes
sourceMode = HBaseConstants.SourceMode.valueOf(job.get(String.format(
HBaseConstants.SOURCE_MODE, getTableName(job))));
- LOG.info(String.format("GOT SOURCE MODE (%s) as (%s) and finally", String
+ LOG.info(String.format("GOT SOURCE MODE (%s) as (%s) and finally (%s)", String
.format(HBaseConstants.SOURCE_MODE, getTableName(job)), job
.get(String.format(HBaseConstants.SOURCE_MODE, getTableName(job))),
sourceMode));
diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseInputFormatGranular.java b/src/main/java/parallelai/spyglass/hbase/HBaseInputFormatGranular.java
index 7b61047..753b27e 100644
--- a/src/main/java/parallelai/spyglass/hbase/HBaseInputFormatGranular.java
+++ b/src/main/java/parallelai/spyglass/hbase/HBaseInputFormatGranular.java
@@ -1,15 +1,5 @@
package parallelai.spyglass.hbase;
-import java.io.IOException;
-import java.net.InetAddress;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Set;
-import java.util.TreeSet;
-
-import javax.naming.NamingException;
-
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.HConstants;
@@ -25,9 +15,17 @@ import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.net.DNS;
-
import parallelai.spyglass.hbase.HBaseConstants.SourceMode;
+import javax.naming.NamingException;
+import java.io.IOException;
+import java.net.InetAddress;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Set;
+import java.util.TreeSet;
+
/**
* See HBaseInputFormatRegional first (!)
*
@@ -188,20 +186,6 @@ public class HBaseInputFormatGranular extends HBaseInputFormatBase {
byte[] rStart = cRegion.getRegionInfo().getStartKey();
byte[] rStop = cRegion.getRegionInfo().getEndKey();
-// HServerAddress regionServerAddress = cRegion
-// .getServerAddress();
-// InetAddress regionAddress = regionServerAddress
-// .getInetSocketAddress().getAddress();
-// String regionLocation;
-// try {
-// regionLocation = reverseDNS(regionAddress);
-// regionLocation = cRegion.
-// } catch (NamingException e) {
-// LOG.error("Cannot resolve the host name for "
-// + regionAddress + " because of " + e);
-// regionLocation = cRegion.getHostname();
-// }
-
String regionName = cRegion.getRegionInfo().getRegionNameAsString();
byte[] sStart = (startRow == HConstants.EMPTY_START_ROW
diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseTap.java b/src/main/java/parallelai/spyglass/hbase/HBaseTap.java
index 0b1ee98..1224179 100644
--- a/src/main/java/parallelai/spyglass/hbase/HBaseTap.java
+++ b/src/main/java/parallelai/spyglass/hbase/HBaseTap.java
@@ -12,37 +12,36 @@
package parallelai.spyglass.hbase;
-import parallelai.spyglass.hbase.HBaseConstants.SplitType;
-
-import parallelai.spyglass.hbase.HBaseConstants.SourceMode;
import cascading.flow.FlowProcess;
import cascading.tap.SinkMode;
import cascading.tap.Tap;
import cascading.tap.hadoop.io.HadoopTupleEntrySchemeIterator;
import cascading.tuple.TupleEntryCollector;
import cascading.tuple.TupleEntryIterator;
-
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.MasterNotRunningException;
-import org.apache.hadoop.hbase.ZooKeeperConnectionException;
import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import parallelai.spyglass.hbase.HBaseConstants.SourceMode;
+import parallelai.spyglass.hbase.HBaseConstants.SplitType;
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.UUID;
-
/**
* The HBaseTap class is a {@link Tap} subclass. It is used in conjunction with
* the {@HBaseFullScheme} to allow for the reading and writing
@@ -125,6 +124,28 @@ public class HBaseTap extends Tap<JobConf, RecordReader, OutputCollector> {
this.tableName = tableName;
}
+
+ private void obtainToken(JobConf conf) {
+ if (User.isHBaseSecurityEnabled(conf)) {
+ String user = conf.getUser();
+ LOG.info("obtaining HBase token for: {}", user);
+ try {
+ UserGroupInformation currentUser = UserGroupInformation.getCurrentUser();
+ user = currentUser.getUserName();
+ Credentials credentials = conf.getCredentials();
+ for (Token t : currentUser.getTokens()) {
+ LOG.debug("Token {} is available", t);
+ //there must be HBASE_AUTH_TOKEN exists, if not bad thing will happen, it's must be generated when job submission.
+ if ("HBASE_AUTH_TOKEN".equalsIgnoreCase(t.getKind().toString())) {
+ credentials.addToken(t.getKind(), t);
+ }
+ }
+ } catch (IOException e) {
+ throw new RuntimeException("Unable to obtain HBase auth token for " + user, e);
+ }
+ }
+ }
+
/**
* Method getTableName returns the tableName of this HBaseTap object.
*
@@ -153,6 +174,8 @@ public class HBaseTap extends Tap<JobConf, RecordReader, OutputCollector> {
conf.set("hbase.zookeeper.quorum", quorumNames);
}
+
+
LOG.debug("sinking to table: {}", tableName);
if (isReplace() && conf.get("mapred.task.partition") == null) {
@@ -179,7 +202,9 @@ public class HBaseTap extends Tap<JobConf, RecordReader, OutputCollector> {
for( SinkConfig sc : sinkConfigList) {
sc.configure(conf);
}
-
+
+ obtainToken(conf);
+
super.sinkConfInit(process, conf);
}
@@ -277,7 +302,9 @@ public class HBaseTap extends Tap<JobConf, RecordReader, OutputCollector> {
for( SourceConfig sc : sourceConfigList) {
sc.configure(conf);
}
-
+
+ obtainToken(conf);
+
super.sourceConfInit(process, conf);
}