aboutsummaryrefslogtreecommitdiffstats
path: root/src/main/java/parallelai/spyglass/hbase/HBaseRecordReader.java
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java/parallelai/spyglass/hbase/HBaseRecordReader.java')
-rw-r--r--src/main/java/parallelai/spyglass/hbase/HBaseRecordReader.java432
1 files changed, 306 insertions, 126 deletions
diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseRecordReader.java b/src/main/java/parallelai/spyglass/hbase/HBaseRecordReader.java
index 97077c4..d22ed71 100644
--- a/src/main/java/parallelai/spyglass/hbase/HBaseRecordReader.java
+++ b/src/main/java/parallelai/spyglass/hbase/HBaseRecordReader.java
@@ -3,12 +3,17 @@ package parallelai.spyglass.hbase;
import static org.apache.hadoop.hbase.mapreduce.TableRecordReaderImpl.LOG_PER_ROW_COUNT;
import java.io.IOException;
-import java.util.Set;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
import java.util.TreeSet;
+import java.util.Vector;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Result;
@@ -18,36 +23,38 @@ import org.apache.hadoop.hbase.client.ScannerCallable;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
-import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Writables;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.util.StringUtils;
import parallelai.spyglass.hbase.HBaseConstants.SourceMode;
-
-public class HBaseRecordReader implements RecordReader<ImmutableBytesWritable, Result> {
+public class HBaseRecordReader implements
+ RecordReader<ImmutableBytesWritable, Result> {
static final Log LOG = LogFactory.getLog(HBaseRecordReader.class);
- private byte [] startRow;
- private byte [] endRow;
- private byte [] lastSuccessfulRow;
+ private byte[] startRow;
+ private byte[] endRow;
+ private byte[] lastSuccessfulRow;
private TreeSet<String> keyList;
private SourceMode sourceMode;
private Filter trrRowFilter;
private ResultScanner scanner;
private HTable htable;
- private byte [][] trrInputColumns;
+ private byte[][] trrInputColumns;
private long timestamp;
private int rowcount;
private boolean logScannerActivity = false;
private int logPerRowCount = 100;
private boolean endRowInclusive = true;
+ private int versions = 1;
+ private boolean useSalt = false;
/**
* Restart from survivable exceptions by creating a new scanner.
- *
+ *
* @param firstRow
* @throws IOException
*/
@@ -55,26 +62,26 @@ public class HBaseRecordReader implements RecordReader<ImmutableBytesWritable, R
Scan currentScan;
if ((endRow != null) && (endRow.length > 0)) {
if (trrRowFilter != null) {
- Scan scan = new Scan(firstRow, (endRowInclusive ?
- Bytes.add(endRow, new byte[] {0}) : endRow ) );
-
+ Scan scan = new Scan(firstRow, (endRowInclusive ? Bytes.add(endRow,
+ new byte[] { 0 }) : endRow));
+
TableInputFormat.addColumns(scan, trrInputColumns);
scan.setFilter(trrRowFilter);
scan.setCacheBlocks(false);
this.scanner = this.htable.getScanner(scan);
currentScan = scan;
} else {
- LOG.debug("TIFB.restart, firstRow: " +
- Bytes.toString(firstRow) + ", endRow: " +
- Bytes.toString(endRow));
- Scan scan = new Scan(firstRow, (endRowInclusive ? Bytes.add(endRow, new byte[] {0}) : endRow ));
+ LOG.debug("TIFB.restart, firstRow: " + Bytes.toString(firstRow)
+ + ", endRow: " + Bytes.toString(endRow));
+ Scan scan = new Scan(firstRow, (endRowInclusive ? Bytes.add(endRow,
+ new byte[] { 0 }) : endRow));
TableInputFormat.addColumns(scan, trrInputColumns);
this.scanner = this.htable.getScanner(scan);
currentScan = scan;
}
} else {
- LOG.debug("TIFB.restart, firstRow: " +
- Bytes.toStringBinary(firstRow) + ", no endRow");
+ LOG.debug("TIFB.restart, firstRow: " + Bytes.toStringBinary(firstRow)
+ + ", no endRow");
Scan scan = new Scan(firstRow);
TableInputFormat.addColumns(scan, trrInputColumns);
@@ -83,7 +90,7 @@ public class HBaseRecordReader implements RecordReader<ImmutableBytesWritable, R
currentScan = scan;
}
if (logScannerActivity) {
- LOG.info("Current scan=" + currentScan.toString());
+ LOG.debug("Current scan=" + currentScan.toString());
timestamp = System.currentTimeMillis();
rowcount = 0;
}
@@ -97,6 +104,14 @@ public class HBaseRecordReader implements RecordReader<ImmutableBytesWritable, R
this.keyList = keyList;
}
+ public void setVersions(int versions) {
+ this.versions = versions;
+ }
+
+ public void setUseSalt(boolean useSalt) {
+ this.useSalt = useSalt;
+ }
+
public SourceMode getSourceMode() {
return sourceMode;
}
@@ -108,76 +123,84 @@ public class HBaseRecordReader implements RecordReader<ImmutableBytesWritable, R
public byte[] getEndRow() {
return endRow;
}
-
+
public void setEndRowInclusive(boolean isInclusive) {
endRowInclusive = isInclusive;
}
-
+
public boolean getEndRowInclusive() {
return endRowInclusive;
}
-
- private byte [] nextKey = null;
+
+ private byte[] nextKey = null;
+ private Vector<List<KeyValue>> resultVector = null;
+ Map<Long, List<KeyValue>> keyValueMap = null;
/**
* Build the scanner. Not done in constructor to allow for extension.
- *
+ *
* @throws IOException
*/
public void init() throws IOException {
- switch( sourceMode ) {
- case SCAN_ALL:
- case SCAN_RANGE:
- restartRangeScan(startRow);
- break;
-
- case GET_LIST:
- nextKey = Bytes.toBytes(keyList.pollFirst());
- break;
-
- default:
- throw new IOException(" Unknown source mode : " + sourceMode );
+ switch (sourceMode) {
+ case SCAN_ALL:
+ case SCAN_RANGE:
+ restartRangeScan(startRow);
+ break;
+
+ case GET_LIST:
+ nextKey = Bytes.toBytes(keyList.pollFirst());
+ break;
+
+ default:
+ throw new IOException(" Unknown source mode : " + sourceMode);
}
}
byte[] getStartRow() {
return this.startRow;
}
+
/**
- * @param htable the {@link HTable} to scan.
+ * @param htable
+ * the {@link HTable} to scan.
*/
public void setHTable(HTable htable) {
Configuration conf = htable.getConfiguration();
- logScannerActivity = conf.getBoolean(
- ScannerCallable.LOG_SCANNER_ACTIVITY, false);
+ logScannerActivity = conf.getBoolean(ScannerCallable.LOG_SCANNER_ACTIVITY,
+ false);
logPerRowCount = conf.getInt(LOG_PER_ROW_COUNT, 100);
this.htable = htable;
}
/**
- * @param inputColumns the columns to be placed in {@link Result}.
+ * @param inputColumns
+ * the columns to be placed in {@link Result}.
*/
- public void setInputColumns(final byte [][] inputColumns) {
+ public void setInputColumns(final byte[][] inputColumns) {
this.trrInputColumns = inputColumns;
}
/**
- * @param startRow the first row in the split
+ * @param startRow
+ * the first row in the split
*/
- public void setStartRow(final byte [] startRow) {
+ public void setStartRow(final byte[] startRow) {
this.startRow = startRow;
}
/**
- *
- * @param endRow the last row in the split
+ *
+ * @param endRow
+ * the last row in the split
*/
- public void setEndRow(final byte [] endRow) {
+ public void setEndRow(final byte[] endRow) {
this.endRow = endRow;
}
/**
- * @param rowFilter the {@link Filter} to be used.
+ * @param rowFilter
+ * the {@link Filter} to be used.
*/
public void setRowFilter(Filter rowFilter) {
this.trrRowFilter = rowFilter;
@@ -185,12 +208,13 @@ public class HBaseRecordReader implements RecordReader<ImmutableBytesWritable, R
@Override
public void close() {
- if (this.scanner != null) this.scanner.close();
+ if (this.scanner != null)
+ this.scanner.close();
}
/**
* @return ImmutableBytesWritable
- *
+ *
* @see org.apache.hadoop.mapred.RecordReader#createKey()
*/
@Override
@@ -200,7 +224,7 @@ public class HBaseRecordReader implements RecordReader<ImmutableBytesWritable, R
/**
* @return RowResult
- *
+ *
* @see org.apache.hadoop.mapred.RecordReader#createValue()
*/
@Override
@@ -222,104 +246,260 @@ public class HBaseRecordReader implements RecordReader<ImmutableBytesWritable, R
}
/**
- * @param key HStoreKey as input key.
- * @param value MapWritable as input value
+ * @param key
+ * HStoreKey as input key.
+ * @param value
+ * MapWritable as input value
* @return true if there was more data
* @throws IOException
*/
@Override
public boolean next(ImmutableBytesWritable key, Result value)
- throws IOException {
-
- switch(sourceMode) {
- case SCAN_ALL:
- case SCAN_RANGE:
- {
-
- Result result;
+ throws IOException {
+
+ switch (sourceMode) {
+ case SCAN_ALL:
+ case SCAN_RANGE: {
+
+ Result result;
+ try {
try {
- try {
- result = this.scanner.next();
- if (logScannerActivity) {
- rowcount ++;
- if (rowcount >= logPerRowCount) {
- long now = System.currentTimeMillis();
- LOG.info("Mapper took " + (now-timestamp)
- + "ms to process " + rowcount + " rows");
- timestamp = now;
- rowcount = 0;
- }
- }
- } catch (IOException e) {
- // try to handle all IOExceptions by restarting
- // the scanner, if the second call fails, it will be rethrown
- LOG.debug("recovered from " + StringUtils.stringifyException(e));
- if (lastSuccessfulRow == null) {
- LOG.warn("We are restarting the first next() invocation," +
- " if your mapper has restarted a few other times like this" +
- " then you should consider killing this job and investigate" +
- " why it's taking so long.");
- }
- if (lastSuccessfulRow == null) {
- restartRangeScan(startRow);
- } else {
- restartRangeScan(lastSuccessfulRow);
- this.scanner.next(); // skip presumed already mapped row
+ result = this.scanner.next();
+ if (logScannerActivity) {
+ rowcount++;
+ if (rowcount >= logPerRowCount) {
+ long now = System.currentTimeMillis();
+ LOG.debug("Mapper took " + (now - timestamp) + "ms to process "
+ + rowcount + " rows");
+ timestamp = now;
+ rowcount = 0;
}
- result = this.scanner.next();
}
+ } catch (IOException e) {
+ // try to handle all IOExceptions by restarting
+ // the scanner, if the second call fails, it will be rethrown
+ LOG.debug("recovered from " + StringUtils.stringifyException(e));
+ if (lastSuccessfulRow == null) {
+ LOG.warn("We are restarting the first next() invocation,"
+ + " if your mapper has restarted a few other times like this"
+ + " then you should consider killing this job and investigate"
+ + " why it's taking so long.");
+ }
+ if (lastSuccessfulRow == null) {
+ restartRangeScan(startRow);
+ } else {
+ restartRangeScan(lastSuccessfulRow);
+ this.scanner.next(); // skip presumed already mapped row
+ }
+ result = this.scanner.next();
+ }
- if (result != null && result.size() > 0) {
+ if (result != null && result.size() > 0) {
+ if( useSalt) {
+ key.set( HBaseSalter.delSaltPrefix(result.getRow()));
+ } else {
key.set(result.getRow());
- lastSuccessfulRow = key.get();
- Writables.copyWritable(result, value);
- return true;
- }
- return false;
- } catch (IOException ioe) {
- if (logScannerActivity) {
- long now = System.currentTimeMillis();
- LOG.info("Mapper took " + (now-timestamp)
- + "ms to process " + rowcount + " rows");
- LOG.info(ioe);
- String lastRow = lastSuccessfulRow == null ?
- "null" : Bytes.toStringBinary(lastSuccessfulRow);
- LOG.info("lastSuccessfulRow=" + lastRow);
}
- throw ioe;
+
+ lastSuccessfulRow = key.get();
+ Writables.copyWritable(result, value);
+ return true;
+ }
+ return false;
+ } catch (IOException ioe) {
+ if (logScannerActivity) {
+ long now = System.currentTimeMillis();
+ LOG.debug("Mapper took " + (now - timestamp) + "ms to process "
+ + rowcount + " rows");
+ LOG.debug(ioe);
+ String lastRow = lastSuccessfulRow == null ? "null" : Bytes
+ .toStringBinary(lastSuccessfulRow);
+ LOG.debug("lastSuccessfulRow=" + lastRow);
}
+ throw ioe;
}
+ }
+
+ case GET_LIST: {
+ LOG.debug(String.format("INTO next with GET LIST and Key (%s)", Bytes.toString(nextKey)));
- case GET_LIST:
- {
- Result result;
- if( nextKey != null ) {
- result = this.htable.get(new Get(nextKey));
+ if (versions == 1) {
+ if (nextKey != null) {
+ LOG.debug(String.format("Processing Key (%s)", Bytes.toString(nextKey)));
- if (result != null && result.size() > 0) {
- System.out.println("KeyList => " + keyList);
- System.out.println("Result => " + result);
- if (keyList != null || !keyList.isEmpty()) {
-
- String newKey = keyList.pollFirst();
- System.out.println("New Key => " + newKey);
- nextKey = (newKey == null || newKey.length() == 0) ? null : Bytes.toBytes(newKey);
- } else {
- nextKey = null;
- }
- key.set(result.getRow());
+ Get theGet = new Get(nextKey);
+ theGet.setMaxVersions(versions);
+
+ Result result = this.htable.get(theGet);
+
+ if (result != null && (! result.isEmpty()) ) {
+ LOG.debug(String.format("Key (%s), Version (%s), Got Result (%s)", Bytes.toString(nextKey), versions, result ) );
+
+ if (keyList != null || !keyList.isEmpty()) {
+ String newKey = keyList.pollFirst();
+ LOG.debug("New Key => " + newKey);
+ nextKey = (newKey == null || newKey.length() == 0) ? null : Bytes
+ .toBytes(newKey);
+ } else {
+ nextKey = null;
+ }
+
+ LOG.debug(String.format("=> Picked a new Key (%s)", Bytes.toString(nextKey)));
+
+ // Write the result
+ if( useSalt) {
+ key.set( HBaseSalter.delSaltPrefix(result.getRow()));
+ } else {
+ key.set(result.getRow());
+ }
lastSuccessfulRow = key.get();
Writables.copyWritable(result, value);
+
return true;
+ } else {
+ LOG.debug(" Key ("+ Bytes.toString(nextKey)+ ") return an EMPTY result. Get ("+theGet.getId()+")" ); //alg0
+
+ String newKey;
+ while((newKey = keyList.pollFirst()) != null) {
+ LOG.debug("WHILE NEXT Key => " + newKey);
+
+ nextKey = (newKey == null || newKey.length() == 0) ? null : Bytes
+ .toBytes(newKey);
+
+ if( nextKey == null ) {
+ LOG.error("BOMB! BOMB! BOMB!");
+ continue;
+ }
+
+ if( ! this.htable.exists( new Get(nextKey) ) ) {
+ LOG.debug(String.format("Key (%s) Does not exist in Table (%s)", Bytes.toString(nextKey), Bytes.toString(this.htable.getTableName()) ));
+ continue;
+ } else { break; }
+ }
+
+ nextKey = (newKey == null || newKey.length() == 0) ? null : Bytes
+ .toBytes(newKey);
+
+ LOG.debug("Final New Key => " + Bytes.toString(nextKey));
+
+ return next(key, value);
}
- return false;
} else {
+ // Nothig left. return false
return false;
}
+ } else {
+ if (resultVector != null && resultVector.size() != 0) {
+ LOG.debug(String.format("+ Version (%s), Result VECTOR <%s>", versions, resultVector ) );
+
+ List<KeyValue> resultKeyValue = resultVector.remove(resultVector.size() - 1);
+ Result result = new Result(resultKeyValue);
+
+ LOG.debug(String.format("+ Version (%s), Got Result <%s>", versions, result ) );
+
+ if( useSalt) {
+ key.set( HBaseSalter.delSaltPrefix(result.getRow()));
+ } else {
+ key.set(result.getRow());
+ }
+ lastSuccessfulRow = key.get();
+ Writables.copyWritable(result, value);
+
+ return true;
+ } else {
+ if (nextKey != null) {
+ LOG.debug(String.format("+ Processing Key (%s)", Bytes.toString(nextKey)));
+
+ Get theGet = new Get(nextKey);
+ theGet.setMaxVersions(versions);
+
+ Result resultAll = this.htable.get(theGet);
+
+ if( resultAll != null && (! resultAll.isEmpty())) {
+ List<KeyValue> keyValeList = resultAll.list();
+
+ keyValueMap = new HashMap<Long, List<KeyValue>>();
+
+ LOG.debug(String.format("+ Key (%s) Versions (%s) Val;ute map <%s>", Bytes.toString(nextKey), versions, keyValueMap));
+
+ for (KeyValue keyValue : keyValeList) {
+ long version = keyValue.getTimestamp();
+
+ if (keyValueMap.containsKey(new Long(version))) {
+ List<KeyValue> keyValueTempList = keyValueMap.get(new Long(
+ version));
+ if (keyValueTempList == null) {
+ keyValueTempList = new ArrayList<KeyValue>();
+ }
+ keyValueTempList.add(keyValue);
+ } else {
+ List<KeyValue> keyValueTempList = new ArrayList<KeyValue>();
+ keyValueMap.put(new Long(version), keyValueTempList);
+ keyValueTempList.add(keyValue);
+ }
+ }
+
+ resultVector = new Vector<List<KeyValue>>();
+ resultVector.addAll(keyValueMap.values());
+
+ List<KeyValue> resultKeyValue = resultVector.remove(resultVector.size() - 1);
+
+ Result result = new Result(resultKeyValue);
+
+ LOG.debug(String.format("+ Version (%s), Got Result (%s)", versions, result ) );
+
+ String newKey = keyList.pollFirst(); // Bytes.toString(resultKeyValue.getKey());//
+
+ System.out.println("+ New Key => " + newKey);
+ nextKey = (newKey == null || newKey.length() == 0) ? null : Bytes
+ .toBytes(newKey);
+
+ if( useSalt) {
+ key.set( HBaseSalter.delSaltPrefix(result.getRow()));
+ } else {
+ key.set(result.getRow());
+ }
+ lastSuccessfulRow = key.get();
+ Writables.copyWritable(result, value);
+ return true;
+ } else {
+ LOG.debug(String.format("+ Key (%s) return an EMPTY result. Get (%s)", Bytes.toString(nextKey), theGet.getId()) ); //alg0
+
+ String newKey;
+
+ while( (newKey = keyList.pollFirst()) != null ) {
+ LOG.debug("+ WHILE NEXT Key => " + newKey);
+
+ nextKey = (newKey == null || newKey.length() == 0) ? null : Bytes
+ .toBytes(newKey);
+
+ if( nextKey == null ) {
+ LOG.error("+ BOMB! BOMB! BOMB!");
+ continue;
+ }
+
+ if( ! this.htable.exists( new Get(nextKey) ) ) {
+ LOG.debug(String.format("+ Key (%s) Does not exist in Table (%s)", Bytes.toString(nextKey), Bytes.toString(this.htable.getTableName()) ));
+ continue;
+ } else { break; }
+ }
+
+ nextKey = (newKey == null || newKey.length() == 0) ? null : Bytes
+ .toBytes(newKey);
+
+ LOG.debug("+ Final New Key => " + Bytes.toString(nextKey));
+
+ return next(key, value);
+ }
+
+ } else {
+ return false;
+ }
+ }
}
-
- default:
- throw new IOException("Unknown source mode : " + sourceMode );
- }
+ }
+ default:
+ throw new IOException("Unknown source mode : " + sourceMode);
+ }
}
}