diff options
| author | cra14 <chandan.rajah2@bskyb.com> | 2013-04-26 12:47:12 +0100 | 
|---|---|---|
| committer | cra14 <chandan.rajah2@bskyb.com> | 2013-04-26 12:47:12 +0100 | 
| commit | cbf6c2903bfd0a5fe528c54382ea791c45637ded (patch) | |
| tree | 2ca67f31c4d0c1779c163cb48234e821616ec6e1 | |
| parent | d6d712287b2bcd74f0c5bbc3ecbb106741443d7c (diff) | |
| download | SpyGlass-cbf6c2903bfd0a5fe528c54382ea791c45637ded.tar.gz SpyGlass-cbf6c2903bfd0a5fe528c54382ea791c45637ded.zip | |
First public release of Spy Glass code base
16 files changed, 2793 insertions, 0 deletions
| @@ -1,6 +1,16 @@  *.class  *.log +.classpath +.project +.settings +.cache +/target +/outputs +/bin +/alternateLocation +*.iml +  # sbt specific  dist/*  target/ @@ -2,3 +2,108 @@ SpyGlass  ========  Cascading and Scalding wrapper for HBase with advanced read features + + +Example +======= + +package parallelai.spyglass.hbase.example + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.hbase.HBaseConfiguration +import org.apache.hadoop.hbase.client.HConnectionManager +import org.apache.hadoop.hbase.client.HTable +import org.apache.hadoop.hbase.util.Bytes +import org.apache.log4j.Level +import org.apache.log4j.Logger + +import com.twitter.scalding._ +import com.twitter.scalding.Args + +import parallelai.spyglass.base.JobBase +import parallelai.spyglass.hbase.HBaseSource +import parallelai.spyglass.hbase.HBaseConstants.SourceMode + +class HBaseExample(args: Args) extends JobBase(args) { + +  val isDebug: Boolean = args("debug").toBoolean + +  if (isDebug) Logger.getRootLogger().setLevel(Level.DEBUG) + +  val output = args("output") + +  println(output) + +  val jobConf = getJobConf + +  val quorumNames = "cldmgr.prod.bigdata.bskyb.com:2181" + +  case class HBaseTableStore( +      conf: Configuration, +      quorum: String, +      tableName: String) { + +    val tableBytes = Bytes.toBytes(tableName) +    val connection = HConnectionManager.getConnection(conf) +    val maxThreads = conf.getInt("hbase.htable.threads.max", 1) + +    conf.set("hbase.zookeeper.quorum", quorumNames); + +    val htable = new HTable(HBaseConfiguration.create(conf), tableName) + +  } + +  val hTableStore = HBaseTableStore(getJobConf, quorumNames, "skybet.test.tbet") + +  val hbs2 = new HBaseSource( +    "table_name", +    "quorum_name:2181", +    'key, +    Array("column_family"), +    Array('column_name), +    sourceMode = SourceMode.GET_LIST, keyList = List("5003914", "5000687", "5004897")) +    .read +    .write(Tsv(output.format("get_list"))) + +  val hbs3 = new HBaseSource( +    "table_name", +    "quorum_name:2181", +    'key, +    Array("column_family"), +    Array('column_name), +    sourceMode = SourceMode.SCAN_ALL) //, stopKey = "99460693") +    .read +    .write(Tsv(output.format("scan_all"))) + +  val hbs4 = new HBaseSource( +    "table_name", +    "quorum_name:2181", +    'key, +    Array("column_family"), +    Array('column_name), +    sourceMode = SourceMode.SCAN_RANGE, stopKey = "5003914") +    .read +    .write(Tsv(output.format("scan_range_to_end"))) + +  val hbs5 = new HBaseSource( +    "table_name", +    "quorum_name:2181", +    'key, +    Array("column_family"), +    Array('column_name), +    sourceMode = SourceMode.SCAN_RANGE, startKey = "5003914") +    .read +    .write(Tsv(output.format("scan_range_from_start"))) + +  val hbs6 = new HBaseSource( +    "table_name", +    "quorum_name:2181", +    'key, +    Array("column_family"), +    Array('column_name), +    sourceMode = SourceMode.SCAN_RANGE, startKey = "5003914", stopKey = "5004897") +    .read +    .write(Tsv(output.format("scan_range_between"))) + +}  + @@ -0,0 +1,417 @@ +<project xmlns="http://maven.apache.org/POM/4.0.0" +         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" +         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 +                             http://maven.apache.org/maven-v4_0_0.xsd"> +    <modelVersion>4.0.0</modelVersion> + +	<organization> +		<name>Parallel AI</name> +		<url>http://www.parallelai.com</url> +	</organization> + +	 +    <name>Cascading and Scalding wrapper for HBase with advanced features</name> +    <groupId>parallelai</groupId> +    <artifactId>parallelai.spyglass</artifactId> +    <version>1.0.2</version> +    <packaging>jar</packaging> + +	<properties> +		<!-- Java compilation level --> +		<maven.compiler.source>1.6</maven.compiler.source> +		<maven.compiler.target>1.6</maven.compiler.target> +		<maven-compiler-plugin.version>3.0</maven-compiler-plugin.version> +		<maven-scala-plugin.version>2.15.2</maven-scala-plugin.version> +		<maven-war-plugin.version>2.3</maven-war-plugin.version> + +		<!-- UTF-8 Encoding settings --> +		<encoding>UTF-8</encoding> +		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> +		<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> + +		<!-- Cloudera's Distribution Including Apache Hadoop version 4.2.0 --> +		<datafu.version>0.0.4-cdh4.2.0</datafu.version> +		<flume.version>1.3.0-cdh4.2.0</flume.version> +		<hadoop.version>2.0.0-cdh4.2.0</hadoop.version> +		<hbase.version>0.94.2-cdh4.2.0</hbase.version> +		<hive.version>0.10.0-cdh4.2.0</hive.version> +		<mahout.version>0.7-cdh4.2.0</mahout.version> +		<mapreduce.version>2.0.0-mr1-cdh4.2.0</mapreduce.version> +		<oozie.version>3.3.0-cdh4.2.0</oozie.version> +		<oozie-hadoop.version>2.0.0-cdh4.2.0.oozie-3.3.0-cdh4.2.0</oozie-hadoop.version> +		<oozie-sharelib.version>3.3.0-cdh4.2.0</oozie-sharelib.version> +		<pig.version>0.10.0-cdh4.2.0</pig.version> +		<sqoop.version>1.4.2-cdh4.2.0</sqoop.version> +		<whirr.version>0.8.0-cdh4.2.0</whirr.version> +		<zookeeper.version>3.4.5-cdh4.2.0</zookeeper.version> +		 +		<!-- Scala/Scalding/Cascading properties --> +		<scala.version>2.9.2</scala.version> +		<scalding.version>0.8.3</scalding.version> +		<cascading.version>2.1.0</cascading.version> +		<scalding-commons.version>0.1.1</scalding-commons.version> +		<scalatest.version>1.7.1</scalatest.version> + +		<trove4j.version>3.0.3</trove4j.version> +		<maple.version>0.2.8</maple.version> +		<specs2.version>1.12.4.1</specs2.version> +		<typesafe.config.version>1.0.0</typesafe.config.version> +		 +		<!-- Other libraries properties --> +		<junit.version>4.10</junit.version> +		<slf4j.version>1.7.2</slf4j.version> +		<trove4j.version>3.0.3</trove4j.version> +		<javax.servlet.version>2.5</javax.servlet.version> +		<uncommons-maths.version>1.2.2a</uncommons-maths.version> +		<maven.surefire.plugin.version>2.12.3</maven.surefire.plugin.version> + +	</properties> + +	<!-- Repositories --> +	<repositories> +		<repository> +			<id>scala-tools.org</id> +			<name>Scala Tools</name> +			<url>https://oss.sonatype.org/content/repositories/snapshots/</url> +		</repository> +		<repository> +			<id>cloudera-cdh4</id> +			<name>Cloudera CDH4</name> +			<url>https://repository.cloudera.com/artifactory/cloudera-repos/</url> +		</repository> +		<repository> +			<id>conjars</id> +			<name>Con Jars</name> +			<url>http://conjars.org/repo</url> +		</repository> +	</repositories> + +	<!-- Profiles --> +	<profiles> +		<profile> +			<id>windows_profile</id> +			<activation> +				<activeByDefault>false</activeByDefault> +				<os> +					<family>Windows</family> +				</os> +			</activation> +			<properties> +				<toolsjar>${JAVA_HOME}\\lib\\tools.jar</toolsjar> +			</properties> +			<dependencies> +				<dependency> +					<groupId>jdk.tools</groupId> +					<artifactId>jdk.tools</artifactId> +					<version>1.6</version> +					<scope>system</scope> +					<systemPath>${toolsjar}</systemPath> +				</dependency> +			</dependencies> +		</profile> +		<profile> +			<id>osx_profile</id> +			<activation> +				<activeByDefault>false</activeByDefault> +				<os> +					<family>mac</family> +				</os> +			</activation> +			<properties> +				<toolsjar>${java.home}/../Classes/classes.jar</toolsjar> +			</properties> +			<dependencies> +				<dependency> +					<groupId>jdk.tools</groupId> +					<artifactId>jdk.tools</artifactId> +					<version>1.6</version> +					<scope>system</scope> +					<systemPath>${toolsjar}</systemPath> +				</dependency> +			</dependencies> +		</profile> +	</profiles> + + + +	<dependencies> +	     +		<!-- Hadoop --> +		<dependency> +			<groupId>org.apache.hadoop</groupId> +			<artifactId>hadoop-mapreduce-client-core</artifactId> +			<version>${hadoop.version}</version> +		</dependency> +		<dependency> +			<groupId>org.apache.hadoop</groupId> +			<artifactId>hadoop-annotations</artifactId> +			<version>${hadoop.version}</version> +		</dependency> +		<dependency> +			<groupId>org.apache.hadoop</groupId> +			<artifactId>hadoop-archives</artifactId> +			<version>${hadoop.version}</version> +		</dependency> +		<dependency> +			<groupId>org.apache.hadoop</groupId> +			<artifactId>hadoop-assemblies</artifactId> +			<version>${hadoop.version}</version> +		</dependency> +		<dependency> +			<groupId>org.apache.hadoop</groupId> +			<artifactId>hadoop-auth</artifactId> +			<version>${hadoop.version}</version> +		</dependency> +		<dependency> +			<groupId>org.apache.hadoop</groupId> +			<artifactId>hadoop-client</artifactId> +			<version>${mapreduce.version}</version> +		</dependency> +		<dependency> +			<groupId>org.apache.hadoop</groupId> +			<artifactId>hadoop-common</artifactId> +			<version>${hadoop.version}</version> +		</dependency> +		<dependency> +			<groupId>org.apache.hadoop</groupId> +			<artifactId>hadoop-datajoin</artifactId> +			<version>${hadoop.version}</version> +		</dependency> +		<dependency> +			<groupId>org.apache.hadoop</groupId> +			<artifactId>hadoop-dist</artifactId> +			<version>${hadoop.version}</version> +		</dependency> +		<dependency> +			<groupId>org.apache.hadoop</groupId> +			<artifactId>hadoop-distcp</artifactId> +			<version>${hadoop.version}</version> +		</dependency> +		<dependency> +			<groupId>org.apache.hadoop</groupId> +			<artifactId>hadoop-extras</artifactId> +			<version>${hadoop.version}</version> +		</dependency> +		<dependency> +			<groupId>org.apache.hadoop</groupId> +			<artifactId>hadoop-gridmix</artifactId> +			<version>${hadoop.version}</version> +		</dependency> +		<dependency> +			<groupId>org.apache.hadoop</groupId> +			<artifactId>hadoop-hdfs</artifactId> +			<version>${hadoop.version}</version> +		</dependency> +		<!-- Hadoop MRv1 --> +		<dependency> +			<groupId>org.apache.hadoop</groupId> +			<artifactId>hadoop-core</artifactId> +			<version>${mapreduce.version}</version> +		</dependency> +		<dependency> +			<groupId>org.apache.hadoop</groupId> +			<artifactId>hadoop-tools</artifactId> +			<version>${mapreduce.version}</version> +		</dependency> +		<!-- Mahout dependencies --> +		<dependency> +			<groupId>org.apache.mahout</groupId> +			<artifactId>mahout-core</artifactId> +			<version>${mahout.version}</version> +			<exclusions> +				<exclusion> +					<groupId>jfree</groupId> +					<artifactId>jfreechart</artifactId> +				</exclusion> +			</exclusions> +		</dependency> +		<dependency> +			<groupId>org.slf4j</groupId> +			<artifactId>slf4j-api</artifactId> +			<version>${slf4j.version}</version> +		</dependency> +		<dependency> +			<groupId>org.slf4j</groupId> +			<artifactId>slf4j-jcl</artifactId> +			<version>${slf4j.version}</version> +		</dependency> +		<!-- Gnue Trove Java lib --> +		<dependency> +			<groupId>net.sf.trove4j</groupId> +			<artifactId>trove4j</artifactId> +			<version>${trove4j.version}</version> +		</dependency> +		<dependency> +			<groupId>com.twitter</groupId> +			<artifactId>scalding_${scala.version}</artifactId> +			<version>${scalding.version}</version> +	      	<exclusions> +	        	<exclusion>  <!-- Declare exclusion, in order to use custom maple build --> +	          		<groupId>com.twitter</groupId> +	          		<artifactId>maple</artifactId> +	        	</exclusion> +	      	</exclusions> +		</dependency> + +		<!--  scalding-commons Contains Checkpoints --> + 		<dependency> +			<groupId>com.twitter</groupId> +			<artifactId>scalding-commons_${scala.version}</artifactId> +			<version>${scalding-commons.version}</version> +		</dependency> +		<dependency> +			<groupId>org.specs2</groupId> +			<artifactId>specs2_${scala.version}</artifactId> +			<version>${specs2.version}</version> +			<scope>test</scope> +		</dependency> +		<dependency> +			<groupId>org.apache.hbase</groupId> +			<artifactId>hbase</artifactId> +			<version>${hbase.version}</version> +		</dependency> +		<!-- Three dependencies for ScalaSpec --> +		<dependency> +			<groupId>org.scala-lang</groupId> +			<artifactId>scala-library</artifactId> +			<version>${scala.version}</version> +		</dependency> +		<dependency> +			<groupId>org.scalatest</groupId> +			<artifactId>scalatest_${scala.version}</artifactId> +			<version>${scalatest.version}</version> +			<!--  <scope>test</scope>  --> +		</dependency> +		<dependency> +			<groupId>junit</groupId> +			<artifactId>junit</artifactId> +			<version>${junit.version}</version> +			<!--  <scope>test</scope>  --> +		</dependency> +		<dependency> +			<groupId>com.typesafe</groupId> +			<artifactId>config</artifactId> +			<version>${typesafe.config.version}</version> +		</dependency> +		<dependency> +			<groupId>org.uncommons.maths</groupId> +			<artifactId>uncommons-maths</artifactId> +			<version>${uncommons-maths.version}</version> +			<exclusions> +				<exclusion> +					<groupId>jfree</groupId> +					<artifactId>jfreechart</artifactId> +				</exclusion> +			</exclusions> +		</dependency> + +	</dependencies> +	<!-- From https://wiki.scala-lang.org/display/SIW/ScalaEclipseMaven --> + + +	<build> +		<sourceDirectory>src/main/java</sourceDirectory> +	    <testSourceDirectory>src/test/scala</testSourceDirectory>  +		<plugins> +      <plugin> +        <groupId>org.apache.maven.plugins</groupId> +        <artifactId>maven-jar-plugin</artifactId> +        <version>2.4</version> +        <configuration> +          <includes> +            <include>**/*</include> +          </includes> +        </configuration> +      </plugin> +		<!-- This plugin is not supported by Eclipse, so maybe we shouldn't be using it --> +		<plugin> +			<groupId>org.scala-tools</groupId> +			<artifactId>maven-scala-plugin</artifactId> +			<executions> + +				<execution> +					<id>compile</id> +					<goals> +						<goal>compile</goal> +					</goals> +					<phase>compile</phase> +				</execution> +				<execution> +					<id>test-compile</id> +					<goals> +						<goal>testCompile</goal> +					</goals> +					<phase>test-compile</phase> +				</execution> +                <execution> +                   <phase>process-resources</phase> +                   <goals> +                     <goal>compile</goal> +                   </goals> +                </execution> +			</executions> +		</plugin> +		<plugin> +			<artifactId>maven-compiler-plugin</artifactId> +			<version>${maven-compiler-plugin.version}</version> +			<configuration> +				<source>${maven.compiler.source}</source> +				<target>${maven.compiler.target}</target> +			</configuration> +		</plugin> +			<plugin> +				<groupId>org.apache.maven.plugins</groupId> +				<artifactId>maven-dependency-plugin</artifactId> +				<executions> +					<execution> +						<id>copy-dependencies</id> +						<phase>install</phase> +						<goals> +							<goal>copy-dependencies</goal> +						</goals> +						<configuration> +							<outputDirectory>./alternateLocation</outputDirectory> <!-- ${project.build.directory} --> +							<overWriteReleases>false</overWriteReleases> +							<overWriteSnapshots>false</overWriteSnapshots> +							<overWriteIfNewer>true</overWriteIfNewer> +						</configuration> +					</execution> +				</executions> +			</plugin> +		</plugins> +		<pluginManagement> +			<plugins> +				<!--Run with : mvn dependency:copy-dependencies --> +				<plugin> +					<groupId>org.eclipse.m2e</groupId> +					<artifactId>lifecycle-mapping</artifactId> +					<version>1.0.0</version> +					<configuration> +						<lifecycleMappingMetadata> +							<pluginExecutions> +								<pluginExecution> +									<pluginExecutionFilter> +										<groupId> +											org.apache.maven.plugins +										</groupId> +										<artifactId> +											maven-dependency-plugin +										</artifactId> +										<versionRange>[2.1,)</versionRange> +										<goals> +											<goal>copy-dependencies</goal> +										</goals> +									</pluginExecutionFilter> +									<action> +										<ignore></ignore> +									</action> +								</pluginExecution> +							</pluginExecutions> +						</lifecycleMappingMetadata> +					</configuration> +				</plugin> +			</plugins> +		</pluginManagement> +	</build> + +</project>
\ No newline at end of file diff --git a/src/main/java/parallelai/spyglass/base/JobLibLoader.java b/src/main/java/parallelai/spyglass/base/JobLibLoader.java new file mode 100644 index 0000000..af5bdf4 --- /dev/null +++ b/src/main/java/parallelai/spyglass/base/JobLibLoader.java @@ -0,0 +1,70 @@ +package parallelai.spyglass.base; + +import org.apache.log4j.Logger; +import org.apache.log4j.LogManager; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.filecache.DistributedCache; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocatedFileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.RemoteIterator; + +public class JobLibLoader { + +	private static Logger logger = LogManager.getLogger(JobLibLoader.class); +	 +	public static void loadJars(String libPathStr, Configuration config) { +		 + +		try { +			Path libPath = new Path(libPathStr); + +			FileSystem fs = FileSystem.get(config); + +			RemoteIterator<LocatedFileStatus> itr = fs.listFiles(libPath, true); + +			while (itr.hasNext()) { +				LocatedFileStatus f = itr.next(); + +				if (!f.isDirectory() && f.getPath().getName().endsWith("jar")) { +					logger.info("Loading Jar : " + f.getPath().getName()); +					DistributedCache.addFileToClassPath(f.getPath(), config); +				} +			} +		} catch (Exception e) { +			e.printStackTrace(); +			logger.error(e.toString()); +		} +	} + +	public static void addFiletoCache(String libPathStr, Configuration config) { + +		try { +			Path filePath = new Path(libPathStr); +			DistributedCache.addCacheFile(filePath.toUri(), config); +			// DistributedCache.createSymlink(config); + +			// config.set("mapred.cache.files", libPathStr); +			// config.set("mapred.create.symlink", "yes"); + +		} catch (Exception e) { +			e.printStackTrace(); +		} +	} + +	public static Path[] getFileFromCache(String libPathStr, +			Configuration config) { +		Path[] localFiles = null; +		try { +			logger.info("Local Cache => " + DistributedCache.getLocalCacheFiles(config)); +			logger.info("Hadoop Cache => "+ DistributedCache.getCacheFiles(config)); +			if (DistributedCache.getLocalCacheFiles(config) != null) { +				localFiles = DistributedCache.getLocalCacheFiles(config); +			} +			logger.info("LocalFiles => " + localFiles); +		} catch (Exception e) { +			e.printStackTrace(); +		} +		return localFiles; +	} +}
\ No newline at end of file diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseConstants.java b/src/main/java/parallelai/spyglass/hbase/HBaseConstants.java new file mode 100644 index 0000000..b546107 --- /dev/null +++ b/src/main/java/parallelai/spyglass/hbase/HBaseConstants.java @@ -0,0 +1,19 @@ +package parallelai.spyglass.hbase; + +import org.apache.hadoop.conf.Configuration; + +public class HBaseConstants { +   +  public enum SourceMode { +    EMPTY, +    SCAN_ALL, +    SCAN_RANGE, +    GET_LIST; +  } + +  public static final String START_KEY = "hbase.%s.startkey"; +  public static final String STOP_KEY = "hbase.%s.stopkey";  +  public static final String SOURCE_MODE = "hbase.%s.source.mode"; +  public static final String KEY_LIST = "hbase.%s.key.list"; + +} diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseInputFormat.java b/src/main/java/parallelai/spyglass/hbase/HBaseInputFormat.java new file mode 100644 index 0000000..f1f4fb7 --- /dev/null +++ b/src/main/java/parallelai/spyglass/hbase/HBaseInputFormat.java @@ -0,0 +1,531 @@ +package parallelai.spyglass.hbase; + +import java.io.IOException; +import java.net.InetAddress; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.TreeSet; +import java.util.UUID; + +import javax.naming.NamingException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HRegionLocation; +import org.apache.hadoop.hbase.HServerAddress; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.filter.Filter; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.util.Addressing; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Pair; +import org.apache.hadoop.hbase.util.Strings; +import org.apache.hadoop.mapred.InputFormat; +import org.apache.hadoop.mapred.InputSplit; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.JobConfigurable; +import org.apache.hadoop.mapred.RecordReader; +import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.net.DNS; +import org.apache.hadoop.util.StringUtils; + +import parallelai.spyglass.hbase.HBaseConstants.SourceMode; + + +public class HBaseInputFormat +  implements InputFormat<ImmutableBytesWritable, Result>, JobConfigurable { +   +  private final Log LOG = LogFactory.getLog(HBaseInputFormat.class); + +  private final String id = UUID.randomUUID().toString(); + +  private byte [][] inputColumns; +  private HTable table; +  private HBaseRecordReader tableRecordReader; +  private Filter rowFilter; +  private String tableName = ""; + +  private HashMap<InetAddress, String> reverseDNSCacheMap = +    new HashMap<InetAddress, String>(); +   +  private String nameServer = null; +   +//  private Scan scan = null; + +   +  @SuppressWarnings("deprecation") +  @Override +  public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException { +    if (this.table == null) { +      throw new IOException("No table was provided"); +    } +     +    if (this.inputColumns == null || this.inputColumns.length == 0) { +      throw new IOException("Expecting at least one column"); +    } +     +    Pair<byte[][], byte[][]> keys = table.getStartEndKeys(); + +    if (keys == null || keys.getFirst() == null || keys.getFirst().length == 0) { +      HRegionLocation regLoc = table.getRegionLocation(HConstants.EMPTY_BYTE_ARRAY, false); + +      if (null == regLoc) { +        throw new IOException("Expecting at least one region."); +      } + +      List<HBaseTableSplit> splits = new ArrayList<HBaseTableSplit>(1); +      HBaseTableSplit split = new HBaseTableSplit(table.getTableName(), +          HConstants.EMPTY_BYTE_ARRAY, HConstants.EMPTY_BYTE_ARRAY, regLoc +              .getHostnamePort().split(Addressing.HOSTNAME_PORT_SEPARATOR)[0], SourceMode.EMPTY); +      splits.add(split); +       +      return splits.toArray(new HBaseTableSplit[splits.size()]); +    } +     +    if( keys.getSecond() == null || keys.getSecond().length == 0) { +      throw new IOException("Expecting at least one region."); +    } +     +    if( keys.getFirst().length != keys.getSecond().length ) { +      throw new IOException("Regions for start and end key do not match"); +    } +     +    byte[] minKey = keys.getFirst()[keys.getFirst().length - 1]; +    byte[] maxKey = keys.getSecond()[0]; +     +    LOG.info( "".format("SETTING min key (%s) and max key (%s)", Bytes.toString(minKey), Bytes.toString(maxKey))); + +    byte [][] regStartKeys = keys.getFirst(); +    byte [][] regStopKeys = keys.getSecond(); +    String [] regions = new String[regStartKeys.length]; +     +    for( int i = 0; i < regStartKeys.length; i++ ) { +      minKey = (regStartKeys[i] != null && regStartKeys[i].length != 0 ) && (Bytes.compareTo(regStartKeys[i], minKey) < 0 ) ? regStartKeys[i] : minKey; +      maxKey = (regStopKeys[i] != null && regStopKeys[i].length != 0) && (Bytes.compareTo(regStopKeys[i], maxKey) > 0 ) ? regStopKeys[i] : maxKey; +       +      HServerAddress regionServerAddress =  +          table.getRegionLocation(keys.getFirst()[i]).getServerAddress(); +        InetAddress regionAddress = +          regionServerAddress.getInetSocketAddress().getAddress(); +        String regionLocation; +        try { +          regionLocation = reverseDNS(regionAddress); +        } catch (NamingException e) { +          LOG.error("Cannot resolve the host name for " + regionAddress + +              " because of " + e); +          regionLocation = regionServerAddress.getHostname(); +        } + +//       HServerAddress regionServerAddress = table.getRegionLocation(keys.getFirst()[i]).getServerAddress(); +//      InetAddress regionAddress = regionServerAddress.getInetSocketAddress().getAddress(); +// +//      String regionLocation; +// +//      try { +//        regionLocation = reverseDNS(regionAddress); +//      } catch (NamingException e) { +//        LOG.error("Cannot resolve the host name for " + regionAddress + " because of " + e); +//        regionLocation = regionServerAddress.getHostname(); +//      } + +//      String regionLocation = table.getRegionLocation(keys.getFirst()[i]).getHostname(); +       +      LOG.debug( "***** " + regionLocation ); +       +      if( regionLocation == null || regionLocation.length() == 0 ) +        throw new IOException( "The region info for regiosn " + i + " is null or empty"); + +      regions[i] = regionLocation; +       +      LOG.info("".format("Region (%s) has start key (%s) and stop key (%s)", regions[i], Bytes.toString(regStartKeys[i]), Bytes.toString(regStopKeys[i]) )); +    } +     +    byte[] startRow = HConstants.EMPTY_START_ROW; +    byte[] stopRow = HConstants.EMPTY_END_ROW; +     +    LOG.info( "".format("Found min key (%s) and max key (%s)", Bytes.toString(minKey), Bytes.toString(maxKey))); +     +    LOG.info("SOURCE MODE is : " + sourceMode);     + +    switch( sourceMode ) { +    case SCAN_ALL: +      startRow = HConstants.EMPTY_START_ROW; +      stopRow = HConstants.EMPTY_END_ROW; + +      LOG.info( "".format("SCAN ALL: Found start key (%s) and stop key (%s)", Bytes.toString(startRow), Bytes.toString(stopRow))); +      break; +       +    case SCAN_RANGE: +      startRow = (startKey != null && startKey.length() != 0) ? Bytes.toBytes(startKey) : HConstants.EMPTY_START_ROW ; +      stopRow = (stopKey != null && stopKey.length() != 0) ? Bytes.toBytes(stopKey) : HConstants.EMPTY_END_ROW ; + +      LOG.info( "".format("SCAN RANGE: Found start key (%s) and stop key (%s)", Bytes.toString(startRow), Bytes.toString(stopRow))); +      break; +    } +     +    switch( sourceMode ) { +      case EMPTY: +      case SCAN_ALL: +      case SCAN_RANGE: +      { +//        startRow = (Bytes.compareTo(startRow, minKey) < 0) ? minKey : startRow; +//        stopRow = (Bytes.compareTo(stopRow, maxKey) > 0) ? maxKey : stopRow; +         +        List<HBaseTableSplit> splits = new ArrayList<HBaseTableSplit>(); +         +        List<HRegionLocation> validRegions = table.getRegionsInRange(startRow, stopRow); +         +        int maxRegions = validRegions.size(); +        int currentRegion = 1; +         +        for( HRegionLocation cRegion : validRegions ) { +          byte [] rStart = cRegion.getRegionInfo().getStartKey(); +          byte [] rStop = cRegion.getRegionInfo().getEndKey(); +           +          HServerAddress regionServerAddress = cRegion.getServerAddress(); +            InetAddress regionAddress = +              regionServerAddress.getInetSocketAddress().getAddress(); +            String regionLocation; +            try { +              regionLocation = reverseDNS(regionAddress); +            } catch (NamingException e) { +              LOG.error("Cannot resolve the host name for " + regionAddress + +                  " because of " + e); +              regionLocation = regionServerAddress.getHostname(); +            } +             +            byte [] sStart = (startRow == HConstants.EMPTY_START_ROW || (Bytes.compareTo(startRow, rStart) <= 0 ) ? rStart : startRow); +            byte [] sStop = (stopRow == HConstants.EMPTY_END_ROW || (Bytes.compareTo(stopRow, rStop) >= 0 && rStop.length != 0) ? rStop : stopRow);  +             +            LOG.info("".format("BOOL start (%s) stop (%s) length (%d)",  +                (startRow == HConstants.EMPTY_START_ROW || (Bytes.compareTo(startRow, rStart) <= 0 )), +                    (stopRow == HConstants.EMPTY_END_ROW || (Bytes.compareTo(stopRow, rStop) >= 0 )), +                    rStop.length +                    )); +           +          HBaseTableSplit split = new HBaseTableSplit( +              table.getTableName(), +              sStart, +              sStop, +              regionLocation, +              SourceMode.SCAN_RANGE +           ); +           +          split.setEndRowInclusive( currentRegion == maxRegions ); + +          currentRegion ++; +                   +           LOG.info("".format("START KEY (%s) STOP KEY (%s) rSTART (%s) rSTOP (%s) sSTART (%s) sSTOP (%s) REGION [%s] SPLIT [%s]",  +               Bytes.toString(startRow), Bytes.toString(stopRow), +               Bytes.toString(rStart), Bytes.toString(rStop), +               Bytes.toString(sStart),  +               Bytes.toString(sStop),  +               cRegion.getHostnamePort(), split) ); +           +           splits.add(split); +        } +         +// +//        for (int i = 0; i < keys.getFirst().length; i++) { +// +//          if ( ! includeRegionInSplit(keys.getFirst()[i], keys.getSecond()[i])) { +//            LOG.info("NOT including regions : " + regions[i]); +//            continue; +//          } +//           +//          // determine if the given start an stop key fall into the region +//          if ((startRow.length == 0 || keys.getSecond()[i].length == 0 || +//               Bytes.compareTo(startRow, keys.getSecond()[i]) < 0) && +//              (stopRow.length == 0 || +//               Bytes.compareTo(stopRow, keys.getFirst()[i]) > 0)) { +//             +//            byte[] splitStart = startRow.length == 0 || +//              Bytes.compareTo(keys.getFirst()[i], startRow) >= 0 ? +//                keys.getFirst()[i] : startRow; +//            byte[] splitStop = (stopRow.length == 0 || +//              Bytes.compareTo(keys.getSecond()[i], stopRow) <= 0) && +//              keys.getSecond()[i].length > 0 ? +//                keys.getSecond()[i] : stopRow; +//                HBaseTableSplit split = new HBaseTableSplit(table.getTableName(), +//              splitStart, splitStop, regions[i], SourceMode.SCAN_RANGE); +//            splits.add(split); +//             +//            LOG.info("getSplits: split -> " + i + " -> " + split); +//          } +//        } +         +        LOG.info("RETURNED SPLITS: split -> " + splits); + +        return splits.toArray(new HBaseTableSplit[splits.size()]); +      }  +         +      case GET_LIST: +      { +        if( keyList == null || keyList.size() == 0 ) { +          throw new IOException("Source Mode is GET_LIST but key list is EMPTY"); +        }  +         +        List<HBaseTableSplit> splits = new ArrayList<HBaseTableSplit>(); +         +        for (int i = 0; i < keys.getFirst().length; i++) { + +          if ( ! includeRegionInSplit(keys.getFirst()[i], keys.getSecond()[i])) { +            continue; +          } +           +          LOG.info("".format("Getting region (%s) subset (%s) to (%s)", regions[i], Bytes.toString(regStartKeys[i]), Bytes.toString(regStartKeys[i] ))); + +          Set<String> regionsSubSet = null; +           +          if( (regStartKeys[i] == null || regStartKeys[i].length == 0) && (regStopKeys[i] == null || regStopKeys[i].length == 0) ) { +            LOG.info("REGION start is empty"); +            LOG.info("REGION stop is empty"); +            regionsSubSet = keyList; +          } else if( regStartKeys[i] == null || regStartKeys[i].length == 0 ) { +            LOG.info("REGION start is empty"); +            regionsSubSet = keyList.headSet(Bytes.toString(regStopKeys[i]), true); +          } else if( regStopKeys[i] == null || regStopKeys[i].length == 0 ) { +            LOG.info("REGION stop is empty"); +            regionsSubSet = keyList.tailSet(Bytes.toString(regStartKeys[i]), true); +          } else if( Bytes.compareTo(regStartKeys[i], regStopKeys[i]) <= 0 ) { +            regionsSubSet = keyList.subSet(Bytes.toString(regStartKeys[i]), true, Bytes.toString(regStopKeys[i]), true); +          } else { +            throw new IOException("".format("For REGION (%s) Start Key (%s) > Stop Key(%s)",  +                regions[i], Bytes.toString(regStartKeys[i]), Bytes.toString(regStopKeys[i]))); +          } +           +          if( regionsSubSet == null || regionsSubSet.size() == 0) { +            LOG.info( "EMPTY: Key is for region " + regions[i] + " is null"); +             +            continue; +          } + +          TreeSet<String> regionKeyList = new TreeSet<String>(regionsSubSet); + +          LOG.info("".format("Regions [%s] has key list <%s>", regions[i], regionKeyList )); +             +          HBaseTableSplit split = new HBaseTableSplit( +              table.getTableName(), regionKeyList, +              regions[i],  +              SourceMode.GET_LIST); +          splits.add(split); +        } +           +        return splits.toArray(new HBaseTableSplit[splits.size()]); +      }  + +      default: +        throw new IOException("Unknown source Mode : " + sourceMode ); +    } +  } +   +  private String reverseDNS(InetAddress ipAddress) throws NamingException { +    String hostName = this.reverseDNSCacheMap.get(ipAddress); +    if (hostName == null) { +      hostName = Strings.domainNamePointerToHostName(DNS.reverseDns(ipAddress, this.nameServer)); +      this.reverseDNSCacheMap.put(ipAddress, hostName); +    } +    return hostName; +  } + + +  @Override +  public RecordReader<ImmutableBytesWritable, Result> getRecordReader( +      InputSplit split, JobConf job, Reporter reporter) throws IOException { +     +    if( ! (split instanceof HBaseTableSplit ) ) +      throw new IOException("Table Split is not type HBaseTableSplit"); + +    HBaseTableSplit tSplit = (HBaseTableSplit) split; +     +    HBaseRecordReader trr = new HBaseRecordReader(); + +    switch( tSplit.getSourceMode() ) { +      case SCAN_ALL: +      case SCAN_RANGE: +      { +        LOG.info("".format("For split [%s] we have start key (%s) and stop key (%s)", tSplit, tSplit.getStartRow(), tSplit.getEndRow() )); +         +        trr.setStartRow(tSplit.getStartRow()); +        trr.setEndRow(tSplit.getEndRow()); +        trr.setEndRowInclusive(tSplit.getEndRowInclusive()); +      } +       +      break; +       +      case GET_LIST: +      { +        LOG.info("".format("For split [%s] we have key list (%s)", tSplit, tSplit.getKeyList() )); +         +        trr.setKeyList(tSplit.getKeyList()); +      } +       +      break; +       +      default: +        throw new IOException( "Unknown source mode : " + tSplit.getSourceMode() ); +    } +     +    trr.setSourceMode(tSplit.getSourceMode()); +    trr.setHTable(this.table); +    trr.setInputColumns(this.inputColumns); +    trr.setRowFilter(this.rowFilter); + +    trr.init(); + +    return trr; +  } +   +   +   +  /* Configuration Section */ + +  /** +   * space delimited list of columns +   */ +  public static final String COLUMN_LIST = "hbase.tablecolumns"; + +  /** +   * Use this jobconf param to specify the input table +   */ +  private static final String INPUT_TABLE = "hbase.inputtable"; + +  private String startKey = null; +  private String stopKey = null; +   +  private SourceMode sourceMode = SourceMode.EMPTY; +  private TreeSet<String> keyList = null; +   +  public void configure(JobConf job) { +    String tableName = getTableName(job); +    String colArg = job.get(COLUMN_LIST); +    String[] colNames = colArg.split(" "); +    byte [][] m_cols = new byte[colNames.length][]; +    for (int i = 0; i < m_cols.length; i++) { +      m_cols[i] = Bytes.toBytes(colNames[i]); +    } +    setInputColumns(m_cols); +     +    try { +      setHTable(new HTable(HBaseConfiguration.create(job), tableName)); +    } catch (Exception e) { +      LOG.error( "************* Table could not be created" ); +      LOG.error(StringUtils.stringifyException(e)); +    } +     +    LOG.debug("Entered : " + this.getClass() + " : configure()" ); +     +    sourceMode = SourceMode.valueOf( job.get( String.format(HBaseConstants.SOURCE_MODE, getTableName(job) ) ) ) ; +     +    LOG.info( "".format("GOT SOURCE MODE (%s) as (%s) and finally",  +        String.format(HBaseConstants.SOURCE_MODE, getTableName(job) ), job.get( String.format(HBaseConstants.SOURCE_MODE, getTableName(job) )), sourceMode )); + +    switch( sourceMode ) { +      case SCAN_RANGE: +        LOG.info("HIT SCAN_RANGE"); +         +        startKey = getJobProp(job, String.format(HBaseConstants.START_KEY, getTableName(job) ) ); +        stopKey = getJobProp(job, String.format(HBaseConstants.STOP_KEY, getTableName(job) ) ); + +        LOG.info(String.format("Setting start key (%s) and stop key (%s)", startKey, stopKey) ); +        break; +         +      case GET_LIST: +        LOG.info("HIT GET_LIST"); +         +        Collection<String> keys = job.getStringCollection(String.format(HBaseConstants.KEY_LIST, getTableName(job))); +        keyList = new TreeSet<String> (keys); +         +        LOG.info( "GOT KEY LIST : " + keys ); +        LOG.info(String.format("SETTING key list (%s)", keyList) ); + +        break; +         +      case EMPTY: +        LOG.info("HIT EMPTY"); +         +        sourceMode = sourceMode.SCAN_ALL; +        break; +       +      default: +        LOG.info("HIT DEFAULT"); +         +        break; +    } +  } + +  public void validateInput(JobConf job) throws IOException { +    // expecting exactly one path +    String tableName = getTableName(job); +     +    if (tableName == null) { +      throw new IOException("expecting one table name"); +    } +    LOG.debug("".format("Found Table name [%s]", tableName)); +     + +    // connected to table? +    if (getHTable() == null) { +      throw new IOException("could not connect to table '" + +        tableName + "'"); +    } +    LOG.debug("".format("Found Table [%s]", getHTable().getTableName())); + +    // expecting at least one column +    String colArg = job.get(COLUMN_LIST); +    if (colArg == null || colArg.length() == 0) { +      throw new IOException("expecting at least one column"); +    } +    LOG.debug("".format("Found Columns [%s]", colArg)); + +    LOG.debug("".format("Found Start & STop Key [%s][%s]", startKey, stopKey)); +     +    if( sourceMode == SourceMode.EMPTY ) { +      throw new IOException("SourceMode should not be EMPTY"); +    } +     +    if( sourceMode == SourceMode.GET_LIST && (keyList == null || keyList.size() == 0) ) { +      throw new IOException( "Source mode is GET_LIST bu key list is empty"); +    } +  } +   +   +  /* Getters & Setters */ +  private HTable getHTable() { return this.table; } +  private void setHTable(HTable ht) { this.table = ht; } +  private void setInputColumns( byte [][] ic ) { this.inputColumns = ic; } + +   +  private void setJobProp( JobConf job, String key, String value) { +    if( job.get(key) != null ) throw new RuntimeException("".format("Job Conf already has key [%s] with value [%s]", key, job.get(key))); +    job.set(key,  value); +  } +   +  private String getJobProp( JobConf job, String key ) { return job.get(key); } +   +  public static void setTableName(JobConf job, String tableName) { +    // Make sure that table has not been set before +    String oldTableName = getTableName(job); +    if(oldTableName != null) { +      throw new RuntimeException("table name already set to: '" +        + oldTableName + "'"); +    } +     +    job.set(INPUT_TABLE, tableName); +  } +   +  public static String getTableName(JobConf job) { +    return job.get(INPUT_TABLE); +  } + +  protected boolean includeRegionInSplit(final byte[] startKey, final byte [] endKey) { +    return true; +  } +} diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseRecordReader.java b/src/main/java/parallelai/spyglass/hbase/HBaseRecordReader.java new file mode 100644 index 0000000..97077c4 --- /dev/null +++ b/src/main/java/parallelai/spyglass/hbase/HBaseRecordReader.java @@ -0,0 +1,325 @@ +package parallelai.spyglass.hbase; + +import static org.apache.hadoop.hbase.mapreduce.TableRecordReaderImpl.LOG_PER_ROW_COUNT; + +import java.io.IOException; +import java.util.Set; +import java.util.TreeSet; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.ScannerCallable; +import org.apache.hadoop.hbase.filter.Filter; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.mapreduce.TableInputFormat; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Writables; +import org.apache.hadoop.mapred.RecordReader; +import org.apache.hadoop.util.StringUtils; + +import parallelai.spyglass.hbase.HBaseConstants.SourceMode; + + +public class HBaseRecordReader implements RecordReader<ImmutableBytesWritable, Result> { + +  static final Log LOG = LogFactory.getLog(HBaseRecordReader.class); + +  private byte [] startRow; +  private byte [] endRow; +  private byte [] lastSuccessfulRow; +  private TreeSet<String> keyList; +  private SourceMode sourceMode; +  private Filter trrRowFilter; +  private ResultScanner scanner; +  private HTable htable; +  private byte [][] trrInputColumns; +  private long timestamp; +  private int rowcount; +  private boolean logScannerActivity = false; +  private int logPerRowCount = 100; +  private boolean endRowInclusive = true; + +  /** +   * Restart from survivable exceptions by creating a new scanner. +   * +   * @param firstRow +   * @throws IOException +   */ +  public void restartRangeScan(byte[] firstRow) throws IOException { +    Scan currentScan; +    if ((endRow != null) && (endRow.length > 0)) { +      if (trrRowFilter != null) { +        Scan scan = new Scan(firstRow, (endRowInclusive ?   +            Bytes.add(endRow, new byte[] {0}) : endRow ) ); +         +        TableInputFormat.addColumns(scan, trrInputColumns); +        scan.setFilter(trrRowFilter); +        scan.setCacheBlocks(false); +        this.scanner = this.htable.getScanner(scan); +        currentScan = scan; +      } else { +        LOG.debug("TIFB.restart, firstRow: " + +            Bytes.toString(firstRow) + ", endRow: " + +            Bytes.toString(endRow)); +        Scan scan = new Scan(firstRow, (endRowInclusive ? Bytes.add(endRow, new byte[] {0}) : endRow )); +        TableInputFormat.addColumns(scan, trrInputColumns); +        this.scanner = this.htable.getScanner(scan); +        currentScan = scan; +      } +    } else { +      LOG.debug("TIFB.restart, firstRow: " + +          Bytes.toStringBinary(firstRow) + ", no endRow"); + +      Scan scan = new Scan(firstRow); +      TableInputFormat.addColumns(scan, trrInputColumns); +      scan.setFilter(trrRowFilter); +      this.scanner = this.htable.getScanner(scan); +      currentScan = scan; +    } +    if (logScannerActivity) { +      LOG.info("Current scan=" + currentScan.toString()); +      timestamp = System.currentTimeMillis(); +      rowcount = 0; +    } +  } + +  public TreeSet<String> getKeyList() { +    return keyList; +  } + +  public void setKeyList(TreeSet<String> keyList) { +    this.keyList = keyList; +  } + +  public SourceMode getSourceMode() { +    return sourceMode; +  } + +  public void setSourceMode(SourceMode sourceMode) { +    this.sourceMode = sourceMode; +  } + +  public byte[] getEndRow() { +    return endRow; +  } +   +  public void setEndRowInclusive(boolean isInclusive) { +    endRowInclusive = isInclusive; +  } +   +  public boolean getEndRowInclusive() { +    return endRowInclusive; +  } +   +  private byte [] nextKey = null; + +  /** +   * Build the scanner. Not done in constructor to allow for extension. +   * +   * @throws IOException +   */ +  public void init() throws IOException { +    switch( sourceMode ) { +      case SCAN_ALL: +      case SCAN_RANGE: +        restartRangeScan(startRow); +        break; +         +      case GET_LIST: +        nextKey = Bytes.toBytes(keyList.pollFirst()); +        break; +         +      default: +        throw new IOException(" Unknown source mode : " + sourceMode ); +    } +  } + +  byte[] getStartRow() { +    return this.startRow; +  } +  /** +   * @param htable the {@link HTable} to scan. +   */ +  public void setHTable(HTable htable) { +    Configuration conf = htable.getConfiguration(); +    logScannerActivity = conf.getBoolean( +      ScannerCallable.LOG_SCANNER_ACTIVITY, false); +    logPerRowCount = conf.getInt(LOG_PER_ROW_COUNT, 100); +    this.htable = htable; +  } + +  /** +   * @param inputColumns the columns to be placed in {@link Result}. +   */ +  public void setInputColumns(final byte [][] inputColumns) { +    this.trrInputColumns = inputColumns; +  } + +  /** +   * @param startRow the first row in the split +   */ +  public void setStartRow(final byte [] startRow) { +    this.startRow = startRow; +  } + +  /** +   * +   * @param endRow the last row in the split +   */ +  public void setEndRow(final byte [] endRow) { +    this.endRow = endRow; +  } + +  /** +   * @param rowFilter the {@link Filter} to be used. +   */ +  public void setRowFilter(Filter rowFilter) { +    this.trrRowFilter = rowFilter; +  } + +  @Override +  public void close() { +	  if (this.scanner != null) this.scanner.close(); +  } + +  /** +   * @return ImmutableBytesWritable +   * +   * @see org.apache.hadoop.mapred.RecordReader#createKey() +   */ +  @Override +  public ImmutableBytesWritable createKey() { +    return new ImmutableBytesWritable(); +  } + +  /** +   * @return RowResult +   * +   * @see org.apache.hadoop.mapred.RecordReader#createValue() +   */ +  @Override +  public Result createValue() { +    return new Result(); +  } + +  @Override +  public long getPos() { +    // This should be the ordinal tuple in the range; +    // not clear how to calculate... +    return 0; +  } + +  @Override +  public float getProgress() { +    // Depends on the total number of tuples and getPos +    return 0; +  } + +  /** +   * @param key HStoreKey as input key. +   * @param value MapWritable as input value +   * @return true if there was more data +   * @throws IOException +   */ +  @Override +  public boolean next(ImmutableBytesWritable key, Result value) +  throws IOException { +     +    switch(sourceMode) { +      case SCAN_ALL: +      case SCAN_RANGE: +      { +         +        Result result; +        try { +          try { +            result = this.scanner.next(); +            if (logScannerActivity) { +              rowcount ++; +              if (rowcount >= logPerRowCount) { +                long now = System.currentTimeMillis(); +                LOG.info("Mapper took " + (now-timestamp) +                  + "ms to process " + rowcount + " rows"); +                timestamp = now; +                rowcount = 0; +              } +            } +          } catch (IOException e) { +            // try to handle all IOExceptions by restarting +            // the scanner, if the second call fails, it will be rethrown +            LOG.debug("recovered from " + StringUtils.stringifyException(e)); +            if (lastSuccessfulRow == null) { +              LOG.warn("We are restarting the first next() invocation," + +                  " if your mapper has restarted a few other times like this" + +                  " then you should consider killing this job and investigate" + +                  " why it's taking so long."); +            } +            if (lastSuccessfulRow == null) { +              restartRangeScan(startRow); +            } else { +              restartRangeScan(lastSuccessfulRow); +              this.scanner.next();    // skip presumed already mapped row +            } +            result = this.scanner.next(); +          } + +          if (result != null && result.size() > 0) { +            key.set(result.getRow()); +            lastSuccessfulRow = key.get(); +            Writables.copyWritable(result, value); +            return true; +          } +          return false; +        } catch (IOException ioe) { +          if (logScannerActivity) { +            long now = System.currentTimeMillis(); +            LOG.info("Mapper took " + (now-timestamp) +              + "ms to process " + rowcount + " rows"); +            LOG.info(ioe); +            String lastRow = lastSuccessfulRow == null ? +              "null" : Bytes.toStringBinary(lastSuccessfulRow); +            LOG.info("lastSuccessfulRow=" + lastRow); +          } +          throw ioe; +        } +      } +       +      case GET_LIST: +      { +        Result result; +        if( nextKey != null ) { +          result = this.htable.get(new Get(nextKey)); +           +          if (result != null && result.size() > 0) { +        	System.out.println("KeyList => " + keyList); +        	System.out.println("Result => " + result); +        	if (keyList != null || !keyList.isEmpty()) { +        		 +        		String newKey = keyList.pollFirst(); +        		System.out.println("New Key => " + newKey); +        		nextKey = (newKey == null || newKey.length() == 0) ? null : Bytes.toBytes(newKey); +        	} else { +        		nextKey = null; +        	} +            key.set(result.getRow()); +            lastSuccessfulRow = key.get(); +            Writables.copyWritable(result, value); +            return true; +          } +          return false; +        } else { +          return false; +        } +      } +       +      default: +        throw new IOException("Unknown source mode : " + sourceMode ); +    }  +  } +} diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseScheme.java b/src/main/java/parallelai/spyglass/hbase/HBaseScheme.java new file mode 100644 index 0000000..e5acc30 --- /dev/null +++ b/src/main/java/parallelai/spyglass/hbase/HBaseScheme.java @@ -0,0 +1,336 @@ +/* + * Copyright (c) 2009 Concurrent, Inc. + * + * This work has been released into the public domain + * by the copyright holder. This applies worldwide. + * + * In case this is not legally possible: + * The copyright holder grants any entity the right + * to use this work for any purpose, without any + * conditions, unless such conditions are required by law. + */ + +package parallelai.spyglass.hbase; + +import cascading.flow.FlowProcess; +import cascading.scheme.Scheme; +import cascading.scheme.SinkCall; +import cascading.scheme.SourceCall; +import cascading.tap.Tap; +import cascading.tuple.Fields; +import cascading.tuple.Tuple; +import cascading.tuple.TupleEntry; +import cascading.util.Util; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.mapred.TableOutputFormat; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.OutputCollector; +import org.apache.hadoop.mapred.RecordReader; +//import org.mortbay.log.Log; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Arrays; +import java.util.HashSet; + +/** + * The HBaseScheme class is a {@link Scheme} subclass. It is used in conjunction with the {@HBaseTap} to + * allow for the reading and writing of data to and from a HBase cluster. + * + * @see HBaseTap + */ +public class HBaseScheme +//    extends Scheme<JobConf, RecordReader, OutputCollector, Object[], Object[]> { +      extends Scheme<JobConf, RecordReader, OutputCollector, Object[], Object[]> { +  /** Field LOG */ +  private static final Logger LOG = LoggerFactory.getLogger(HBaseScheme.class); + +  /** Field keyFields */ +  private Fields keyField; +   +  /** Long timestamp */ +  private long timeStamp; +  +  /** String familyNames */ +  private String[] familyNames; +  /** Field valueFields */ +  private Fields[] valueFields; + +  /** String columns */ +  private transient String[] columns; +  /** Field fields */ +  private transient byte[][] fields; +  + +  /** +   * Constructor HBaseScheme creates a new HBaseScheme instance. +   * +   * @param keyFields   of type Fields +   * @param familyName  of type String +   * @param valueFields of type Fields +   */ +  public HBaseScheme(Fields keyFields, String familyName, Fields valueFields) { +    this(keyFields, new String[]{familyName}, Fields.fields(valueFields)); +  } + +  /** +   * Constructor HBaseScheme creates a new HBaseScheme instance. +   * +   * @param keyFields   of type Fields +   * @param familyNames of type String[] +   * @param valueFields of type Fields[] +   */ +  public HBaseScheme(Fields keyFields, String[] familyNames, Fields[] valueFields) { +    this.keyField = keyFields; +    this.familyNames = familyNames; +    this.valueFields = valueFields; +    this.timeStamp = System.currentTimeMillis(); +     +    setSourceSink(this.keyField, this.valueFields); + +    validate(); +  } + +  /** +   * Constructor HBaseScheme creates a new HBaseScheme instance. +   * +   * @param keyFields   of type Fields +   * @param timeStamp   of type Long +   * @param familyNames of type String[] +   * @param valueFields of type Fields[] +   */ +  public HBaseScheme(Fields keyFields, long timeStamp, String[] familyNames, Fields[] valueFields) { +    this.keyField = keyFields; +    this.timeStamp = timeStamp; +    this.familyNames = familyNames; +    this.valueFields = valueFields; + +    setSourceSink(this.keyField, this.valueFields); + +    validate(); +  }   +     +  /** +   * Constructor HBaseScheme creates a new HBaseScheme instance using fully qualified column names +   * +   * @param keyField    of type String +   * @param valueFields of type Fields +   */ +  public HBaseScheme(Fields keyField, Fields valueFields) { +    this(keyField, Fields.fields(valueFields)); +  } + +  /** +   * Constructor HBaseScheme creates a new HBaseScheme instance using fully qualified column names +   * +   * @param keyField    of type Field +   * @param valueFields of type Field[] +   */ +  public HBaseScheme(Fields keyField, Fields[] valueFields) { +    this.keyField = keyField; +    this.valueFields = valueFields; +    this.timeStamp = System.currentTimeMillis(); +     +    validate(); + +    setSourceSink(this.keyField, this.valueFields); +  } + +  private void validate() { +    if (keyField.size() != 1) { +      throw new IllegalArgumentException("may only have one key field, found: " + keyField.print()); +    } +  } + +  private void setSourceSink(Fields keyFields, Fields[] columnFields) { +    Fields allFields = keyFields; + +    if (columnFields.length != 0) { +      allFields = Fields.join(keyFields, Fields.join(columnFields)); // prepend +    } + +    setSourceFields(allFields); +    setSinkFields(allFields); +  } + +  /** +   * Method getFamilyNames returns the set of familyNames of this HBaseScheme object. +   * +   * @return the familyNames (type String[]) of this HBaseScheme object. +   */ +  public String[] getFamilyNames() { +    HashSet<String> familyNameSet = new HashSet<String>(); + +    if (familyNames == null) { +      for (String columnName : columns(null, this.valueFields)) { +        int pos = columnName.indexOf(":"); +        familyNameSet.add(hbaseColumn(pos > 0 ? columnName.substring(0, pos) : columnName)); +      } +    } else { +      for (String familyName : familyNames) { +        familyNameSet.add(familyName); +      } +    } +    return familyNameSet.toArray(new String[0]); +  } + +  @Override +  public void sourcePrepare(FlowProcess<JobConf> flowProcess, +      SourceCall<Object[], RecordReader> sourceCall) { +    Object[] pair = +        new Object[]{sourceCall.getInput().createKey(), sourceCall.getInput().createValue()}; + +    sourceCall.setContext(pair); +  } + +  @Override +  public void sourceCleanup(FlowProcess<JobConf> flowProcess, +      SourceCall<Object[], RecordReader> sourceCall) { +    sourceCall.setContext(null); +  } + +  @Override +  public boolean source(FlowProcess<JobConf> flowProcess, +      SourceCall<Object[], RecordReader> sourceCall) throws IOException { +    Tuple result = new Tuple(); + +    Object key = sourceCall.getContext()[0]; +    Object value = sourceCall.getContext()[1]; +    boolean hasNext = sourceCall.getInput().next(key, value); +    if (!hasNext) { return false; } + +    // Skip nulls +    if (key == null || value == null) { return true; } + +    ImmutableBytesWritable keyWritable = (ImmutableBytesWritable) key; +    Result row = (Result) value; +    result.add(keyWritable); + +    for (int i = 0; i < this.familyNames.length; i++) { +      String familyName = this.familyNames[i]; +      byte[] familyNameBytes = Bytes.toBytes(familyName); +      Fields fields = this.valueFields[i]; +      for (int k = 0; k < fields.size(); k++) { +        String fieldName = (String) fields.get(k); +        byte[] fieldNameBytes = Bytes.toBytes(fieldName); +        byte[] cellValue = row.getValue(familyNameBytes, fieldNameBytes); +        if (cellValue == null) { +            cellValue = new byte[0]; +        } +        result.add(new ImmutableBytesWritable(cellValue)); +      } +    } + +    sourceCall.getIncomingEntry().setTuple(result); + +    return true; +  } + +  @Override +  public void sink(FlowProcess<JobConf> flowProcess, SinkCall<Object[], OutputCollector> sinkCall) +      throws IOException { +    TupleEntry tupleEntry = sinkCall.getOutgoingEntry(); +    OutputCollector outputCollector = sinkCall.getOutput(); +    Tuple key = tupleEntry.selectTuple(keyField); +    ImmutableBytesWritable keyBytes = (ImmutableBytesWritable) key.getObject(0); +    Put put = new Put(keyBytes.get(), this.timeStamp); + +    for (int i = 0; i < valueFields.length; i++) { +      Fields fieldSelector = valueFields[i]; +      TupleEntry values = tupleEntry.selectEntry(fieldSelector); + +      for (int j = 0; j < values.getFields().size(); j++) { +        Fields fields = values.getFields(); +        Tuple tuple = values.getTuple(); + +        ImmutableBytesWritable valueBytes = (ImmutableBytesWritable) tuple.getObject(j); +        put.add(Bytes.toBytes(familyNames[i]), Bytes.toBytes((String) fields.get(j)), valueBytes.get()); +      } +    } + +    outputCollector.collect(null, put); +  } + +  @Override +  public void sinkConfInit(FlowProcess<JobConf> process, +      Tap<JobConf, RecordReader, OutputCollector> tap, JobConf conf) { +    conf.setOutputFormat(TableOutputFormat.class); + +    conf.setOutputKeyClass(ImmutableBytesWritable.class); +    conf.setOutputValueClass(Put.class); +  } + +  @Override +  public void sourceConfInit(FlowProcess<JobConf> process, +      Tap<JobConf, RecordReader, OutputCollector> tap, JobConf conf) { +    conf.setInputFormat(HBaseInputFormat.class); + +    String columns = getColumns(); +    LOG.debug("sourcing from columns: {}", columns); +    conf.set(HBaseInputFormat.COLUMN_LIST, columns); +  } + +  private String getColumns() { +    return Util.join(columns(this.familyNames, this.valueFields), " "); +  } + +  private String[] columns(String[] familyNames, Fields[] fieldsArray) { +    if (columns != null) { return columns; } + +    int size = 0; + +    for (Fields fields : fieldsArray) { size += fields.size(); } + +    columns = new String[size]; + +    int count = 0; + +    for (int i = 0; i < fieldsArray.length; i++) { +      Fields fields = fieldsArray[i]; + +      for (int j = 0; j < fields.size(); j++) { +        if (familyNames == null) { columns[count++] = hbaseColumn((String) fields.get(j)); } else { +          columns[count++] = hbaseColumn(familyNames[i]) + (String) fields.get(j); +        } +      } +    } + +    return columns; +  } + +  private String hbaseColumn(String column) { +    if (column.indexOf(":") < 0) { return column + ":"; } + +    return column; +  } + +  @Override +  public boolean equals(Object object) { +    if (this == object) { return true; } +    if (object == null || getClass() != object.getClass()) { return false; } +    if (!super.equals(object)) { return false; } + +    HBaseScheme that = (HBaseScheme) object; + +    if (!Arrays.equals(familyNames, that.familyNames)) { return false; } +    if (keyField != null ? !keyField.equals(that.keyField) : that.keyField != null) { +      return false; +    } +    if (!Arrays.equals(valueFields, that.valueFields)) { return false; } + +    return true; +  } + +  @Override +  public int hashCode() { +    int result = super.hashCode(); +    result = 31 * result + (keyField != null ? keyField.hashCode() : 0); +    result = 31 * result + (familyNames != null ? Arrays.hashCode(familyNames) : 0); +    result = 31 * result + (valueFields != null ? Arrays.hashCode(valueFields) : 0); +    return result; +  } +} diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseTableSplit.java b/src/main/java/parallelai/spyglass/hbase/HBaseTableSplit.java new file mode 100644 index 0000000..1d48e1d --- /dev/null +++ b/src/main/java/parallelai/spyglass/hbase/HBaseTableSplit.java @@ -0,0 +1,190 @@ +package parallelai.spyglass.hbase; + +import java.awt.event.KeyListener; +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.io.Serializable; +import java.util.Set; +import java.util.TreeSet; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.mapred.InputSplit; + +import parallelai.spyglass.hbase.HBaseConstants.SourceMode; + + +public class HBaseTableSplit implements InputSplit, Comparable<HBaseTableSplit>, Serializable { + +  private final Log LOG = LogFactory.getLog(HBaseTableSplit.class); +   +  private byte [] m_tableName = null; +  private byte [] m_startRow = null; +  private byte [] m_endRow = null; +  private String m_regionLocation = null; +  private TreeSet<String> m_keyList = null; +  private SourceMode m_sourceMode = SourceMode.EMPTY; +  private boolean m_endRowInclusive = true; + +  /** default constructor */ +  public HBaseTableSplit() { +    this(HConstants.EMPTY_BYTE_ARRAY, HConstants.EMPTY_BYTE_ARRAY, +      HConstants.EMPTY_BYTE_ARRAY, "", SourceMode.EMPTY); +  } + +  /** +   * Constructor +   * @param tableName +   * @param startRow +   * @param endRow +   * @param location +   */ +  public HBaseTableSplit(final byte [] tableName, final byte [] startRow, final byte [] endRow, +      final String location, final SourceMode sourceMode) { +    this.m_tableName = tableName; +    this.m_startRow = startRow; +    this.m_endRow = endRow; +    this.m_regionLocation = location; +    this.m_sourceMode = sourceMode; +  } +   +  public HBaseTableSplit( final byte [] tableName, final TreeSet<String> keyList, final String location, final SourceMode sourceMode ) { +    this.m_tableName = tableName; +    this.m_keyList = keyList; +    this.m_sourceMode = sourceMode; +    this.m_regionLocation = location; +  } + +  /** @return table name */ +  public byte [] getTableName() { +    return this.m_tableName; +  } + +  /** @return starting row key */ +  public byte [] getStartRow() { +    return this.m_startRow; +  } + +  /** @return end row key */ +  public byte [] getEndRow() { +    return this.m_endRow; +  } +   +  public boolean getEndRowInclusive() { +    return m_endRowInclusive; +  } +   +  public void setEndRowInclusive(boolean isInclusive) { +    m_endRowInclusive = isInclusive; +  } +   +  /** @return list of keys to get */ +  public TreeSet<String> getKeyList() { +    return m_keyList; +  } +   +  /** @return get the source mode */ +  public SourceMode getSourceMode() { +    return m_sourceMode; +  } + +  /** @return the region's hostname */ +  public String getRegionLocation() { +    LOG.info("REGION GETTER : " + m_regionLocation); +     +    return this.m_regionLocation; +  } + +  public String[] getLocations() { +    LOG.info("REGION ARRAY : " + m_regionLocation); + +    return new String[] {this.m_regionLocation}; +  } + +  @Override +  public long getLength() { +    // Not clear how to obtain this... seems to be used only for sorting splits +    return 0; +  } + +  @Override +  public void readFields(DataInput in) throws IOException { +    LOG.info("READ ME : " + in.toString()); + +    this.m_tableName = Bytes.readByteArray(in); +    this.m_regionLocation = Bytes.toString(Bytes.readByteArray(in)); +    this.m_sourceMode = SourceMode.valueOf(Bytes.toString(Bytes.readByteArray(in))); +     +    switch(this.m_sourceMode) { +      case SCAN_RANGE: +        this.m_startRow = Bytes.readByteArray(in); +        this.m_endRow = Bytes.readByteArray(in); +        this.m_endRowInclusive = Bytes.toBoolean(Bytes.readByteArray(in)); +        break; +         +      case GET_LIST: +        this.m_keyList = new TreeSet<String>(); +         +        int m = Bytes.toInt(Bytes.readByteArray(in)); +         +        for( int i = 0; i < m; i++) { +          this.m_keyList.add(Bytes.toString(Bytes.readByteArray(in))); +        } +        break; +    } +     +    LOG.info("READ and CREATED : " + this); +  } + +  @Override +  public void write(DataOutput out) throws IOException { +    LOG.info("WRITE : " + this); + +    Bytes.writeByteArray(out, this.m_tableName); +    Bytes.writeByteArray(out, Bytes.toBytes(this.m_regionLocation)); +    Bytes.writeByteArray(out, Bytes.toBytes(this.m_sourceMode.name())); +     +    switch( this.m_sourceMode ) { +      case SCAN_RANGE: +        Bytes.writeByteArray(out, this.m_startRow); +        Bytes.writeByteArray(out, this.m_endRow); +        Bytes.writeByteArray(out, Bytes.toBytes(this.m_endRowInclusive)); +        break; +         +      case GET_LIST: +        Bytes.writeByteArray(out, Bytes.toBytes(this.m_keyList.size())); +         +        for( String k: this.m_keyList ) { +          Bytes.writeByteArray(out, Bytes.toBytes(k)); +        } +        break; +    } + +    LOG.info("WROTE : " + out.toString()); +  } + +  @Override +  public String toString() { +    return "".format("Table Name (%s) Region (%s) Source Mode (%s) Start Key (%s) Stop Key (%s) Key List (%s)",  +        Bytes.toString(m_tableName), m_regionLocation, m_sourceMode, Bytes.toString(m_startRow), Bytes.toString(m_endRow), m_keyList); +  } + +  @Override +  public int compareTo(HBaseTableSplit o) { +    switch(m_sourceMode) { +      case SCAN_ALL: +      case SCAN_RANGE: +        return Bytes.compareTo(getStartRow(), o.getStartRow()); +         +      case GET_LIST: +        return m_keyList.equals( o.getKeyList() ) ? 0 : -1;  +         +      default: +        return -1; +    } +     +  } +}
\ No newline at end of file diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseTap.java b/src/main/java/parallelai/spyglass/hbase/HBaseTap.java new file mode 100644 index 0000000..9a0ed0e --- /dev/null +++ b/src/main/java/parallelai/spyglass/hbase/HBaseTap.java @@ -0,0 +1,360 @@ +/* + * Copyright (c) 2009 Concurrent, Inc. + * + * This work has been released into the public domain + * by the copyright holder. This applies worldwide. + * + * In case this is not legally possible: + * The copyright holder grants any entity the right + * to use this work for any purpose, without any + * conditions, unless such conditions are required by law. + */ + +package parallelai.spyglass.hbase; + +import cascading.flow.FlowProcess; +import cascading.tap.SinkMode; +import cascading.tap.Tap; +import cascading.tap.hadoop.io.HadoopTupleEntrySchemeIterator; +import cascading.tuple.TupleEntryCollector; +import cascading.tuple.TupleEntryIterator; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.MasterNotRunningException; +import org.apache.hadoop.hbase.ZooKeeperConnectionException; +import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.mapreduce.TableOutputFormat; +import org.apache.hadoop.mapred.FileInputFormat; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.OutputCollector; +import org.apache.hadoop.mapred.RecordReader; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import parallelai.spyglass.hbase.HBaseConstants.SourceMode; +import sun.reflect.generics.reflectiveObjects.NotImplementedException; + +import java.io.IOException; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Map.Entry; +import java.util.UUID; + +/** + * The HBaseTap class is a {@link Tap} subclass. It is used in conjunction with + * the {@HBaseFullScheme} to allow for the reading and writing + * of data to and from a HBase cluster. + */ +public class HBaseTap extends Tap<JobConf, RecordReader, OutputCollector> { +  /** Field LOG */ +  private static final Logger LOG = LoggerFactory.getLogger(HBaseTap.class); + +  private final String id = UUID.randomUUID().toString(); + +  /** Field SCHEME */ +  public static final String SCHEME = "hbase"; + +  /** Field hBaseAdmin */ +  private transient HBaseAdmin hBaseAdmin; +  +  /** Field hostName */ +  private String quorumNames; +  /** Field tableName */ +  private String tableName; + +  /** +   * Constructor HBaseTap creates a new HBaseTap instance. +   * +   * @param tableName +   *          of type String +   * @param HBaseFullScheme +   *          of type HBaseFullScheme +   */ +  public HBaseTap(String tableName, HBaseScheme HBaseFullScheme) { +    super(HBaseFullScheme, SinkMode.UPDATE); +    this.tableName = tableName; +  } + +  /** +   * Constructor HBaseTap creates a new HBaseTap instance. +   * +   * @param tableName +   *          of type String +   * @param HBaseFullScheme +   *          of type HBaseFullScheme +   * @param sinkMode +   *          of type SinkMode +   */ +  public HBaseTap(String tableName, HBaseScheme HBaseFullScheme, SinkMode sinkMode) { +    super(HBaseFullScheme, sinkMode); +    this.tableName = tableName; +  } + +  /** +   * Constructor HBaseTap creates a new HBaseTap instance. +   * +   * @param tableName +   *          of type String +   * @param HBaseFullScheme +   *          of type HBaseFullScheme +   */ +  public HBaseTap(String quorumNames, String tableName, HBaseScheme HBaseFullScheme) { +    super(HBaseFullScheme, SinkMode.UPDATE); +    this.quorumNames = quorumNames; +    this.tableName = tableName; +  } + +  /** +   * Constructor HBaseTap creates a new HBaseTap instance. +   * +   * @param tableName +   *          of type String +   * @param HBaseFullScheme +   *          of type HBaseFullScheme +   * @param sinkMode +   *          of type SinkMode +   */ +  public HBaseTap(String quorumNames, String tableName, HBaseScheme HBaseFullScheme, SinkMode sinkMode) { +    super(HBaseFullScheme, sinkMode); +    this.quorumNames = quorumNames; +    this.tableName = tableName; +  } + +  /** +   * Method getTableName returns the tableName of this HBaseTap object. +   * +   * @return the tableName (type String) of this HBaseTap object. +   */ +  public String getTableName() { +    return tableName; +  } + +  public Path getPath() { +    return new Path(SCHEME + ":/" + tableName.replaceAll(":", "_")); +  } + +  protected HBaseAdmin getHBaseAdmin(JobConf conf) throws MasterNotRunningException, ZooKeeperConnectionException { +    if (hBaseAdmin == null) { +      Configuration hbaseConf = HBaseConfiguration.create(conf); +      hBaseAdmin = new HBaseAdmin(hbaseConf); +    } + +    return hBaseAdmin; +  } + +  @Override +  public void sinkConfInit(FlowProcess<JobConf> process, JobConf conf) { +    if(quorumNames != null) { +      conf.set("hbase.zookeeper.quorum", quorumNames); +    } + +    LOG.debug("sinking to table: {}", tableName); + +    if (isReplace() && conf.get("mapred.task.partition") == null) { +      try { +        deleteResource(conf); + +      } catch (IOException e) { +        throw new RuntimeException("could not delete resource: " + e); +      } +    } + +    else if (isUpdate()) { +      try { +          createResource(conf); +      } catch (IOException e) { +          throw new RuntimeException(tableName + " does not exist !", e); +      } + +    } + +    conf.set(TableOutputFormat.OUTPUT_TABLE, tableName); +    super.sinkConfInit(process, conf); +  } + +  @Override +  public String getIdentifier() { +    return id; +  } + +  @Override +  public TupleEntryIterator openForRead(FlowProcess<JobConf> jobConfFlowProcess, RecordReader recordReader) throws IOException { +    return new HadoopTupleEntrySchemeIterator(jobConfFlowProcess, this, recordReader); +  } + +  @Override +  public TupleEntryCollector openForWrite(FlowProcess<JobConf> jobConfFlowProcess, OutputCollector outputCollector) throws IOException { +    HBaseTapCollector hBaseCollector = new HBaseTapCollector( jobConfFlowProcess, this ); +    hBaseCollector.prepare(); +    return hBaseCollector; +  } + +  @Override +  public boolean createResource(JobConf jobConf) throws IOException { +    HBaseAdmin hBaseAdmin = getHBaseAdmin(jobConf); + +    if (hBaseAdmin.tableExists(tableName)) { +      return true; +    } + +    LOG.info("creating hbase table: {}", tableName); + +    HTableDescriptor tableDescriptor = new HTableDescriptor(tableName); + +    String[] familyNames = ((HBaseScheme) getScheme()).getFamilyNames(); + +    for (String familyName : familyNames) { +      tableDescriptor.addFamily(new HColumnDescriptor(familyName)); +    } + +    hBaseAdmin.createTable(tableDescriptor); + +    return true; +  } + +  @Override +  public boolean deleteResource(JobConf jobConf) throws IOException { +    // TODO: for now we don't do anything just to be safe +    return true; +  } + +  @Override +  public boolean resourceExists(JobConf jobConf) throws IOException { +    return getHBaseAdmin(jobConf).tableExists(tableName); +  } + +  @Override +  public long getModifiedTime(JobConf jobConf) throws IOException { +    return System.currentTimeMillis(); // currently unable to find last mod time +                                       // on a table +  } + +  @Override +  public void sourceConfInit(FlowProcess<JobConf> process, JobConf conf) { +    // a hack for MultiInputFormat to see that there is a child format +    FileInputFormat.setInputPaths( conf, getPath() ); + +    if(quorumNames != null) { +      conf.set("hbase.zookeeper.quorum", quorumNames); +    } + +    LOG.debug("sourcing from table: {}", tableName); + +    // TODO: Make this a bit smarter to store table name per flow. +//    process.getID(); +//     +//    super.getFullIdentifier(conf); +     +    HBaseInputFormat.setTableName(conf, tableName); +     +    for( SourceConfig sc : sourceConfigList) { +      sc.configure(conf); +    } +     +    super.sourceConfInit(process, conf); +  } + +  @Override +  public boolean equals(Object object) { +    if (this == object) { +      return true; +    } +    if (object == null || getClass() != object.getClass()) { +      return false; +    } +    if (!super.equals(object)) { +      return false; +    } + +    HBaseTap hBaseTap = (HBaseTap) object; + +    if (tableName != null ? !tableName.equals(hBaseTap.tableName) : hBaseTap.tableName != null) { +      return false; +    } + +    return true; +  } + +  @Override +  public int hashCode() { +    int result = super.hashCode(); +    result = 31 * result + (tableName != null ? tableName.hashCode() : 0); +    return result; +  } +   +  private static class SourceConfig implements Serializable { +    public String tableName = null; +    public SourceMode sourceMode = SourceMode.SCAN_ALL; +    public String startKey = null; +    public String stopKey = null; +    public String [] keyList = null; +     +    public void configure(Configuration jobConf) { +      switch( sourceMode ) { +        case SCAN_RANGE: +          jobConf.set( String.format(HBaseConstants.SOURCE_MODE, tableName), sourceMode.toString()); +           +          if( startKey != null && startKey.length() > 0 ) +            jobConf.set( String.format(HBaseConstants.START_KEY, tableName), startKey); +           +          if( stopKey != null && stopKey.length() > 0 ) +            jobConf.set( String.format(HBaseConstants.STOP_KEY, tableName), stopKey); +           +          LOG.info("".format("Setting SOURCE MODE (%s) to (%s)", String.format(HBaseConstants.SOURCE_MODE, tableName), sourceMode.toString())); +          LOG.info("".format("Setting START KEY (%s) to (%s)", String.format(HBaseConstants.START_KEY, tableName), startKey)); +          LOG.info("".format("Setting STOP KEY (%s) to (%s)", String.format(HBaseConstants.STOP_KEY, tableName), stopKey)); +          break; +           +        case GET_LIST: +          jobConf.set( String.format(HBaseConstants.SOURCE_MODE, tableName), sourceMode.toString()); +          jobConf.setStrings( String.format(HBaseConstants.KEY_LIST, tableName), keyList); + +          LOG.info("".format("Setting SOURCE MODE (%s) to (%s)", String.format(HBaseConstants.SOURCE_MODE, tableName), sourceMode.toString())); +          LOG.info("".format("Setting KEY LIST (%s) to (%s)", String.format(HBaseConstants.KEY_LIST, tableName), keyList)); +          break; +           +        default: +          jobConf.set( String.format(HBaseConstants.SOURCE_MODE, tableName), sourceMode.toString()); + +          LOG.info("".format("Setting SOURCE MODE (%s) to (%s)", String.format(HBaseConstants.SOURCE_MODE, tableName), sourceMode.toString())); +          break; +      } +    } +  } +   +  private ArrayList<SourceConfig> sourceConfigList = new ArrayList<SourceConfig>(); + +  public void setHBaseRangeParms(String startKey, String stopKey ) { +    SourceConfig sc = new SourceConfig(); +     +    sc.sourceMode = SourceMode.SCAN_RANGE; +    sc.tableName = tableName; +    sc.startKey = startKey; +    sc.stopKey = stopKey; +     +    sourceConfigList.add(sc); +  } + +  public void setHBaseListParms(String [] keyList ) { +    SourceConfig sc = new SourceConfig(); +     +    sc.sourceMode = SourceMode.GET_LIST; +    sc.tableName = tableName; +    sc.keyList = keyList; +     +    sourceConfigList.add(sc); +  } +   +  public void setHBaseScanAllParms() { +    SourceConfig sc = new SourceConfig(); +     +    sc.sourceMode = SourceMode.SCAN_ALL; +    sc.tableName = tableName; + +    sourceConfigList.add(sc); +  } +} diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseTapCollector.java b/src/main/java/parallelai/spyglass/hbase/HBaseTapCollector.java new file mode 100644 index 0000000..098f957 --- /dev/null +++ b/src/main/java/parallelai/spyglass/hbase/HBaseTapCollector.java @@ -0,0 +1,117 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements.  See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership.  The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License.  You may obtain a copy of the License at + * + *     http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package parallelai.spyglass.hbase; + +import cascading.flow.FlowProcess; +import cascading.flow.hadoop.HadoopFlowProcess; +import cascading.tap.Tap; +import cascading.tap.TapException; +import cascading.tuple.TupleEntrySchemeCollector; +import org.apache.hadoop.mapred.*; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; + +/** + * Class HBaseTapCollector is a kind of + * {@link cascading.tuple.TupleEntrySchemeCollector} that writes tuples to the + * resource managed by a particular {@link HBaseTap} instance. + */ +public class HBaseTapCollector extends TupleEntrySchemeCollector implements OutputCollector { +  /** Field LOG */ +  private static final Logger LOG = LoggerFactory.getLogger(HBaseTapCollector.class); +  /** Field conf */ +  private final JobConf conf; +  /** Field writer */ +  private RecordWriter writer; +  /** Field flowProcess */ +  private final FlowProcess<JobConf> hadoopFlowProcess; +  /** Field tap */ +  private final Tap<JobConf, RecordReader, OutputCollector> tap; +  /** Field reporter */ +  private final Reporter reporter = Reporter.NULL; + +  /** +   * Constructor TapCollector creates a new TapCollector instance. +   *  +   * @param flowProcess +   * @param tap +   *          of type Tap +   * @throws IOException +   *           when fails to initialize +   */ +  public HBaseTapCollector(FlowProcess<JobConf> flowProcess, Tap<JobConf, RecordReader, OutputCollector> tap) throws IOException { +    super(flowProcess, tap.getScheme()); +    this.hadoopFlowProcess = flowProcess; +    this.tap = tap; +    this.conf = new JobConf(flowProcess.getConfigCopy()); +    this.setOutput(this); +  } + +  @Override +  public void prepare() { +    try { +      initialize(); +    } catch (IOException e) { +      throw new RuntimeException(e); +    } + +    super.prepare(); +  } + +  private void initialize() throws IOException { +    tap.sinkConfInit(hadoopFlowProcess, conf); +    OutputFormat outputFormat = conf.getOutputFormat(); +    LOG.info("Output format class is: " + outputFormat.getClass().toString()); +    writer = outputFormat.getRecordWriter(null, conf, tap.getIdentifier(), Reporter.NULL); +    sinkCall.setOutput(this); +  } + +  @Override +  public void close() { +    try { +      LOG.info("closing tap collector for: {}", tap); +      writer.close(reporter); +    } catch (IOException exception) { +      LOG.warn("exception closing: {}", exception); +      throw new TapException("exception closing HBaseTapCollector", exception); +    } finally { +      super.close(); +    } +  } + +  /** +   * Method collect writes the given values to the {@link Tap} this instance +   * encapsulates. +   *  +   * @param writableComparable +   *          of type WritableComparable +   * @param writable +   *          of type Writable +   * @throws IOException +   *           when +   */ +  public void collect(Object writableComparable, Object writable) throws IOException { +    if (hadoopFlowProcess instanceof HadoopFlowProcess) +      ((HadoopFlowProcess) hadoopFlowProcess).getReporter().progress(); + +    writer.write(writableComparable, writable); +  } +} diff --git a/src/main/scala/parallelai/spyglass/base/JobBase.scala b/src/main/scala/parallelai/spyglass/base/JobBase.scala new file mode 100644 index 0000000..7040fa7 --- /dev/null +++ b/src/main/scala/parallelai/spyglass/base/JobBase.scala @@ -0,0 +1,75 @@ +package parallelai.spyglass.base + +import com.twitter.scalding.Job +import com.twitter.scalding.Args +import org.apache.hadoop.fs.Path +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.FileSystem +import org.apache.hadoop.filecache.DistributedCache +import com.twitter.scalding.Mode +import com.twitter.scalding.HadoopMode +import com.typesafe.config.ConfigFactory +import org.slf4j.Logger +import org.slf4j.LoggerFactory +import com.twitter.scalding.NullSource + +class JobBase(args: Args) extends Job(args) { +  def getOrElseString(key: String, default: String): String = { +    args.m.getOrElse[List[String]](key, List(default)).head +  } +    +  def getOrElseList(key: String, default: List[String]): List[String] = { +    args.m.getOrElse[List[String]](key, default) +  }    + +  def getString(key: String): String = { +    args.m.get(key) match { +      case Some(v) => v.head +      case None => sys.error(String.format("Argument [%s] - NOT FOUND", key)) +    } +  } +   +  def getList(key: String): List[String] = { +    args.m.get(key) match { +      case Some(v) => v +      case None => sys.error(String.format("Argument [%s] - NOT FOUND", key)) +    } +  } +   +  def getJobConf(): Configuration = { +	AppConfig.jobConfig +  }   + +   +  val appConfig = ConfigFactory.parseFile(new java.io.File(getString("app.conf.path"))) +   +  val log = LoggerFactory.getLogger(getOrElseString("app.log.name", this.getClass().getName())) +   +  def modeString(): String = { +      Mode.mode match { +        case x:HadoopMode  => "--hdfs" +        case _ => "--local" +    } +  } +   +  // Execute at instantiation +  Mode.mode match { +    case x:HadoopMode => { +      log.info("In Hadoop Mode") +      JobLibLoader.loadJars(getString("job.lib.path"), AppConfig.jobConfig); +    } +    case _ => { +      log.info("In Local Mode") +    } +  } +   +  def registerNullSourceSinkTaps(): Unit = { +    val expectedSampleEndToEndOutput = List(("", ""),("", ""),("", "")) +	val sourceTap = NullSource +			.writeFrom(expectedSampleEndToEndOutput) +  } +} + +object AppConfig { +  implicit var jobConfig : Configuration = new Configuration() +}
\ No newline at end of file diff --git a/src/main/scala/parallelai/spyglass/base/JobRunner.scala b/src/main/scala/parallelai/spyglass/base/JobRunner.scala new file mode 100644 index 0000000..b3a9af0 --- /dev/null +++ b/src/main/scala/parallelai/spyglass/base/JobRunner.scala @@ -0,0 +1,23 @@ +package parallelai.spyglass.base + +import org.apache.hadoop.conf.Configuration +import com.twitter.scalding.Tool +import org.apache.hadoop +  +object JobRunner { +  def main(args : Array[String]) { +    val conf: Configuration = new Configuration +     +    // TODO replace println with logging +    if (args.contains("--heapInc")) { +	    println("Setting JVM Memory/Heap Size for every child mapper and reducer."); +	    val jvmOpts = "-Xmx4096m -XX:+PrintGCDetails -XX:+UseG1GC -XX:InitiatingHeapOccupancyPercent=50" +	    println("**** JVM Options : " + jvmOpts ) +	    conf.set("mapred.child.java.opts", jvmOpts); +    } +      +    AppConfig.jobConfig = conf +      +    hadoop.util.ToolRunner.run(conf, new Tool, args); +  } +}
\ No newline at end of file diff --git a/src/main/scala/parallelai/spyglass/hbase/HBaseSource.scala b/src/main/scala/parallelai/spyglass/hbase/HBaseSource.scala new file mode 100644 index 0000000..e46ef50 --- /dev/null +++ b/src/main/scala/parallelai/spyglass/hbase/HBaseSource.scala @@ -0,0 +1,82 @@ +package parallelai.spyglass.hbase + +import java.io.IOException +import org.apache.hadoop.hbase.io.ImmutableBytesWritable +import org.apache.hadoop.hbase.util.Bytes +import com.twitter.scalding.AccessMode +import com.twitter.scalding.Hdfs +import com.twitter.scalding.Mode +import com.twitter.scalding.Read +import com.twitter.scalding.Source +import com.twitter.scalding.Write +import cascading.scheme.Scheme +import cascading.tap.SinkMode +import cascading.tap.Tap +import cascading.tuple.Fields +import org.apache.hadoop.mapred.RecordReader +import scala.compat.Platform +import org.apache.hadoop.mapred.OutputCollector +import org.apache.hadoop.mapred.JobConf +import parallelai.spyglass.hbase.HBaseConstants.SourceMode + +object Conversions { +  implicit def bytesToString(bytes: Array[Byte]): String = Bytes.toString(bytes) +  implicit def bytesToLong(bytes: Array[Byte]): Long = augmentString(bytesToString(bytes)).toLong +  implicit def ibwToString(ibw: ImmutableBytesWritable): String = bytesToString(ibw.get()) +  implicit def stringToibw(s: String):ImmutableBytesWritable = new ImmutableBytesWritable(Bytes.toBytes(s)) +} + +class HBaseSource( +    tableName: String = null, +    quorumNames: String = "localhost", +    keyFields: Fields = null, +    familyNames: Array[String] = null, +    valueFields: Array[Fields] = null, +    timestamp: Long = Platform.currentTime, +    sourceMode: SourceMode = SourceMode.SCAN_ALL, +    startKey: String = null, +    stopKey: String = null, +    keyList: List[String] = null +  ) extends Source { +     +  override val hdfsScheme = new HBaseScheme(keyFields, timestamp, familyNames, valueFields) +    .asInstanceOf[Scheme[JobConf, RecordReader[_, _], OutputCollector[_, _], _, _]] + +  override def createTap(readOrWrite: AccessMode)(implicit mode: Mode): Tap[_, _, _] = { +    val hBaseScheme = hdfsScheme match { +      case hbase: HBaseScheme => hbase +      case _ => throw new ClassCastException("Failed casting from Scheme to HBaseScheme") +    }  +    mode match {  +      case hdfsMode @ Hdfs(_, _) => readOrWrite match { +        case Read => {  +          val hbt = new HBaseTap(quorumNames, tableName, hBaseScheme, SinkMode.KEEP) +           +          sourceMode match { +            case SourceMode.SCAN_RANGE => { +              hbt.setHBaseRangeParms(startKey, stopKey) +            } +            case SourceMode.SCAN_ALL => { +              hbt.setHBaseScanAllParms() +            } +            case SourceMode.GET_LIST => { +              if( keyList == null )  +                throw new IOException("Key list cannot be null when Source Mode is " + sourceMode) +               +              hbt.setHBaseListParms(keyList.toArray[String]) +            } +            case _ => throw new IOException("Unknown Source Mode (%)".format(sourceMode)) +          } +           +          hbt.asInstanceOf[Tap[_,_,_]] +        } +        case Write => { +          val hbt = new HBaseTap(quorumNames, tableName, hBaseScheme, SinkMode.UPDATE) +           +          hbt.asInstanceOf[Tap[_,_,_]] +        } +      } +      case _ => super.createTap(readOrWrite)(mode) +    } +  } +} diff --git a/src/main/scala/parallelai/spyglass/hbase/example/HBaseExample.scala b/src/main/scala/parallelai/spyglass/hbase/example/HBaseExample.scala new file mode 100644 index 0000000..4c86b07 --- /dev/null +++ b/src/main/scala/parallelai/spyglass/hbase/example/HBaseExample.scala @@ -0,0 +1,99 @@ +package parallelai.spyglass.hbase.example + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.hbase.HBaseConfiguration +import org.apache.hadoop.hbase.client.HConnectionManager +import org.apache.hadoop.hbase.client.HTable +import org.apache.hadoop.hbase.util.Bytes +import org.apache.log4j.Level +import org.apache.log4j.Logger + +import com.twitter.scalding._ +import com.twitter.scalding.Args + +import parallelai.spyglass.base.JobBase +import parallelai.spyglass.hbase.HBaseSource +import parallelai.spyglass.hbase.HBaseConstants.SourceMode + +class HBaseExample(args: Args) extends JobBase(args) { + +  val isDebug: Boolean = args("debug").toBoolean + +  if (isDebug) Logger.getRootLogger().setLevel(Level.DEBUG) + +  val output = args("output") + +  println(output) + +  val jobConf = getJobConf + +  val quorumNames = "cldmgr.prod.bigdata.bskyb.com:2181" + +  case class HBaseTableStore( +      conf: Configuration, +      quorum: String, +      tableName: String) { + +    val tableBytes = Bytes.toBytes(tableName) +    val connection = HConnectionManager.getConnection(conf) +    val maxThreads = conf.getInt("hbase.htable.threads.max", 1) + +    conf.set("hbase.zookeeper.quorum", quorumNames); + +    val htable = new HTable(HBaseConfiguration.create(conf), tableName) + +  } + +  val hTableStore = HBaseTableStore(getJobConf, quorumNames, "skybet.test.tbet") + +  val hbs2 = new HBaseSource( +    "table_name", +    "quorum_name:2181", +    'key, +    Array("column_family"), +    Array('column_name), +    sourceMode = SourceMode.GET_LIST, keyList = List("5003914", "5000687", "5004897")) +    .read +    .write(Tsv(output.format("get_list"))) + +  val hbs3 = new HBaseSource( +    "table_name", +    "quorum_name:2181", +    'key, +    Array("column_family"), +    Array('column_name), +    sourceMode = SourceMode.SCAN_ALL) //, stopKey = "99460693") +    .read +    .write(Tsv(output.format("scan_all"))) + +  val hbs4 = new HBaseSource( +    "table_name", +    "quorum_name:2181", +    'key, +    Array("column_family"), +    Array('column_name), +    sourceMode = SourceMode.SCAN_RANGE, stopKey = "5003914") +    .read +    .write(Tsv(output.format("scan_range_to_end"))) + +  val hbs5 = new HBaseSource( +    "table_name", +    "quorum_name:2181", +    'key, +    Array("column_family"), +    Array('column_name), +    sourceMode = SourceMode.SCAN_RANGE, startKey = "5003914") +    .read +    .write(Tsv(output.format("scan_range_from_start"))) + +  val hbs6 = new HBaseSource( +    "table_name", +    "quorum_name:2181", +    'key, +    Array("column_family"), +    Array('column_name), +    sourceMode = SourceMode.SCAN_RANGE, startKey = "5003914", stopKey = "5004897") +    .read +    .write(Tsv(output.format("scan_range_between"))) + +} 
\ No newline at end of file diff --git a/src/main/scala/parallelai/spyglass/hbase/example/HBaseExampleRunner.scala b/src/main/scala/parallelai/spyglass/hbase/example/HBaseExampleRunner.scala new file mode 100644 index 0000000..d6b762e --- /dev/null +++ b/src/main/scala/parallelai/spyglass/hbase/example/HBaseExampleRunner.scala @@ -0,0 +1,34 @@ +package parallelai.spyglass.hbase.example + +import com.twitter.scalding.Tool +import org.joda.time.format.DateTimeFormat +import java.util.Formatter.DateTime + +object HBaseExampleRunner extends App { +  val appPath = System.getenv("BIGDATA_APPCONF_PATH")  +  assert  (appPath != null, {"Environment Variable BIGDATA_APPCONF_PATH is undefined or Null"}) +  println( "Application Path is [%s]".format(appPath) ) +   +  val modeString = if( args.length == 0 ) { "--hdfs" } else { args(0) match { +    case "hdfs" => "--hdfs" +    case _ => "--local" +  }} +   +  println(modeString) +   +  val jobLibPath = modeString match { +    case "--hdfs" => { +      val jobLibPath = System.getenv("BIGDATA_JOB_LIB_PATH")  +      assert  (jobLibPath != null, {"Environment Variable BIGDATA_JOB_LIB_PATH is undefined or Null"}) +      println( "Job Library Path Path is [%s]".format(jobLibPath) ) +      jobLibPath +    } +    case _ => "" +  } + +  val output = "HBaseTest.%s.tsv" + +  Tool.main(Array(classOf[HBaseExample].getName, modeString, "--app.conf.path", appPath, +    "--output", output, "--debug", "true", "--job.lib.path", jobLibPath )) +  +}
\ No newline at end of file | 
