aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--pom.xml20
-rw-r--r--src/main/java/parallelai/spyglass/hbase/HBaseInputFormatGranular.java27
-rw-r--r--src/main/java/parallelai/spyglass/hbase/HBaseRawTap.java2
-rw-r--r--src/main/java/parallelai/spyglass/hbase/HBaseRecordReaderGranular.java11
-rw-r--r--src/main/java/parallelai/spyglass/hbase/HBaseTap.java2
-rw-r--r--src/main/scala/parallelai/spyglass/hbase/example/HBaseExampleRunner.scala1
6 files changed, 43 insertions, 20 deletions
diff --git a/pom.xml b/pom.xml
index 861a61d..d22d4c4 100644
--- a/pom.xml
+++ b/pom.xml
@@ -11,7 +11,7 @@
<name>Cascading and Scalding wrapper for HBase with advanced features</name>
<groupId>parallelai</groupId>
<artifactId>parallelai.spyglass</artifactId>
- <version>2.10_0.12.0_5.3.0</version>
+ <version>2.10_0.12.1_5.3.0</version>
<packaging>jar</packaging>
<properties>
@@ -27,6 +27,7 @@
<!-- Maven -->
<maven-compiler-plugin.version>3.0</maven-compiler-plugin.version>
<maven-scala-plugin.version>2.15.2</maven-scala-plugin.version>
+ <maven-source-plugin.version>2.4</maven-source-plugin.version>
<maven.surefire.plugin.version>2.12.3</maven.surefire.plugin.version>
<cdh.version>cdh5.3.0</cdh.version>
@@ -35,6 +36,8 @@
<hadoop.core.version>2.5.0-mr1-${cdh.version}</hadoop.core.version>
<hbase.version>0.98.6-${cdh.version}</hbase.version>
+ <joda-time.version>2.8.2</joda-time.version>
+
<!-- Scala/Scalding/Cascading properties -->
<!-- can be 2.9.3 and 2.10.2 -->
<scala.version>2.10.3</scala.version>
@@ -153,7 +156,12 @@
<!-- HBase -->
<dependency>
<groupId>org.apache.hbase</groupId>
- <artifactId>hbase</artifactId>
+ <artifactId>hbase-client</artifactId>
+ <version>${hbase.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hbase</groupId>
+ <artifactId>hbase-server</artifactId>
<version>${hbase.version}</version>
</dependency>
@@ -170,6 +178,12 @@
</dependency>
<dependency>
+ <groupId>joda-time</groupId>
+ <artifactId>joda-time</artifactId>
+ <version>${joda-time.version}</version>
+ </dependency>
+
+ <dependency>
<groupId>com.twitter.elephantbird</groupId>
<artifactId>elephant-bird-core</artifactId>
<version>4.1</version>
@@ -220,6 +234,7 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-source-plugin</artifactId>
+ <version>${maven-source-plugin.version}</version>
<executions>
<execution>
<id>attach-sources</id>
@@ -233,6 +248,7 @@
<plugin>
<groupId>org.scala-tools</groupId>
<artifactId>maven-scala-plugin</artifactId>
+ <version>${maven-scala-plugin.version}</version>
<executions>
<execution>
diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseInputFormatGranular.java b/src/main/java/parallelai/spyglass/hbase/HBaseInputFormatGranular.java
index 332bbd7..ae9db71 100644
--- a/src/main/java/parallelai/spyglass/hbase/HBaseInputFormatGranular.java
+++ b/src/main/java/parallelai/spyglass/hbase/HBaseInputFormatGranular.java
@@ -2,6 +2,8 @@ package parallelai.spyglass.hbase;
import java.io.IOException;
import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
@@ -14,7 +16,6 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionLocation;
-import org.apache.hadoop.hbase.HServerAddress;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.util.Addressing;
@@ -110,15 +111,15 @@ public class HBaseInputFormatGranular extends HBaseInputFormatBase {
: maxKey;
HRegionLocation regionLoc = table.getRegionLocation(keys.getFirst()[i]);
- HServerAddress regionServerAddress = regionLoc.getServerAddress();
- InetAddress regionAddress = regionServerAddress.getInetSocketAddress().getAddress();
+ String regionServerHostnamePort = regionLoc.getHostnamePort();
+ InetAddress regionAddress = toInetAddress(regionServerHostnamePort);
String regionLocation;
try {
regionLocation = reverseDNS(regionAddress);
} catch (NamingException e) {
LOG.error("Cannot resolve the host name for " + regionAddress
+ " because of " + e);
- regionLocation = regionServerAddress.getHostname();
+ regionLocation = toHostname(regionServerHostnamePort);
}
regionNames[i] = regionLoc.getRegionInfo().getRegionNameAsString();
@@ -189,17 +190,17 @@ public class HBaseInputFormatGranular extends HBaseInputFormatBase {
byte[] rStart = cRegion.getRegionInfo().getStartKey();
byte[] rStop = cRegion.getRegionInfo().getEndKey();
- HServerAddress regionServerAddress = cRegion
- .getServerAddress();
- InetAddress regionAddress = regionServerAddress
- .getInetSocketAddress().getAddress();
+ String regionServerHostnamePort = cRegion
+ .getHostnamePort();
+ InetAddress regionAddress =
+ toInetAddress(regionServerHostnamePort);
String regionLocation;
try {
regionLocation = reverseDNS(regionAddress);
} catch (NamingException e) {
LOG.error("Cannot resolve the host name for "
+ regionAddress + " because of " + e);
- regionLocation = regionServerAddress.getHostname();
+ regionLocation = toHostname(regionServerHostnamePort);
}
String regionName = cRegion.getRegionInfo().getRegionNameAsString();
@@ -361,6 +362,14 @@ public class HBaseInputFormatGranular extends HBaseInputFormatBase {
}
}
+ private String toHostname(String regionServerHostnamePort) {
+ return Addressing.parseHostname(regionServerHostnamePort);
+ }
+
+ private InetAddress toInetAddress(String regionServerHostnamePort) throws UnknownHostException {
+ return InetAddress.getByName(toHostname(regionServerHostnamePort));
+ }
+
private String reverseDNS(InetAddress ipAddress) throws NamingException {
String hostName = this.reverseDNSCacheMap.get(ipAddress);
if (hostName == null) {
diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseRawTap.java b/src/main/java/parallelai/spyglass/hbase/HBaseRawTap.java
index efe548d..ece7e61 100644
--- a/src/main/java/parallelai/spyglass/hbase/HBaseRawTap.java
+++ b/src/main/java/parallelai/spyglass/hbase/HBaseRawTap.java
@@ -156,7 +156,7 @@ public class HBaseRawTap extends Tap<JobConf, RecordReader, OutputCollector> {
return new Path(SCHEME + ":/" + tableName.replaceAll(":", "_"));
}
- protected HBaseAdmin getHBaseAdmin(JobConf conf) throws MasterNotRunningException, ZooKeeperConnectionException {
+ protected HBaseAdmin getHBaseAdmin(JobConf conf) throws IOException {
if (hBaseAdmin == null) {
Configuration hbaseConf = HBaseConfiguration.create(conf);
hBaseAdmin = new HBaseAdmin(hbaseConf);
diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseRecordReaderGranular.java b/src/main/java/parallelai/spyglass/hbase/HBaseRecordReaderGranular.java
index b34ed3f..8a9559f 100644
--- a/src/main/java/parallelai/spyglass/hbase/HBaseRecordReaderGranular.java
+++ b/src/main/java/parallelai/spyglass/hbase/HBaseRecordReaderGranular.java
@@ -10,7 +10,6 @@ import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.Writables;
import org.apache.hadoop.util.StringUtils;
import java.io.IOException;
@@ -251,7 +250,7 @@ public class HBaseRecordReaderGranular extends HBaseRecordReaderBase {
}
lastSuccessfulRow = key.get();
- Writables.copyWritable(result, value);
+ value.copyFrom(result);
return true;
}
return false;
@@ -302,8 +301,8 @@ public class HBaseRecordReaderGranular extends HBaseRecordReaderBase {
key.set(result.getRow());
}
lastSuccessfulRow = key.get();
- Writables.copyWritable(result, value);
-
+ value.copyFrom(result);
+
return true;
} else {
LOG.debug(" Key ("+ Bytes.toString(nextKey)+ ") return an EMPTY result. Get ("+theGet.getId()+")" ); //alg0
@@ -352,7 +351,7 @@ public class HBaseRecordReaderGranular extends HBaseRecordReaderBase {
key.set(result.getRow());
}
lastSuccessfulRow = key.get();
- Writables.copyWritable(result, value);
+ value.copyFrom(result);
return true;
} else {
@@ -408,7 +407,7 @@ public class HBaseRecordReaderGranular extends HBaseRecordReaderBase {
key.set(result.getRow());
}
lastSuccessfulRow = key.get();
- Writables.copyWritable(result, value);
+ value.copyFrom(result);
return true;
} else {
LOG.debug(String.format("+ Key (%s) return an EMPTY result. Get (%s)", Bytes.toString(nextKey), theGet.getId()) ); //alg0
diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseTap.java b/src/main/java/parallelai/spyglass/hbase/HBaseTap.java
index e4a3f78..e1897ef 100644
--- a/src/main/java/parallelai/spyglass/hbase/HBaseTap.java
+++ b/src/main/java/parallelai/spyglass/hbase/HBaseTap.java
@@ -143,7 +143,7 @@ public class HBaseTap extends Tap<JobConf, RecordReader, OutputCollector> {
return new Path(SCHEME + ":/" + tableName.replaceAll(":", "_"));
}
- protected HBaseAdmin getHBaseAdmin(JobConf conf) throws MasterNotRunningException, ZooKeeperConnectionException {
+ protected HBaseAdmin getHBaseAdmin(JobConf conf) throws IOException {
if (hBaseAdmin == null) {
Configuration hbaseConf = HBaseConfiguration.create(conf);
hBaseAdmin = new HBaseAdmin(hbaseConf);
diff --git a/src/main/scala/parallelai/spyglass/hbase/example/HBaseExampleRunner.scala b/src/main/scala/parallelai/spyglass/hbase/example/HBaseExampleRunner.scala
index c503247..949b86e 100644
--- a/src/main/scala/parallelai/spyglass/hbase/example/HBaseExampleRunner.scala
+++ b/src/main/scala/parallelai/spyglass/hbase/example/HBaseExampleRunner.scala
@@ -7,7 +7,6 @@ import parallelai.spyglass.base.JobRunner
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hbase.{HColumnDescriptor, HTableDescriptor, HBaseConfiguration}
import org.apache.hadoop.hbase.client.{Put, HTable, HConnectionManager, HBaseAdmin}
-import org.apache.hadoop.hbase.io.hfile.Compression
import org.apache.hadoop.hbase.regionserver.StoreFile
import org.apache.hadoop.hbase.util.Bytes
import parallelai.spyglass.hbase.HBaseSalter