aboutsummaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
authorKoert Kuipers <koert@tresata.com>2013-08-06 13:40:57 -0400
committerKoert Kuipers <koert@tresata.com>2013-08-06 13:40:57 -0400
commitde38789a032b586b0e278a81dedb5c8fb6d43e02 (patch)
tree89c583e5305d25e56545c83fb4046c6cd7e9fe2d /src
parent40074e8d3030d869357f17e2bc7d8948fbbf14cc (diff)
downloadSpyGlass-de38789a032b586b0e278a81dedb5c8fb6d43e02.tar.gz
SpyGlass-de38789a032b586b0e278a81dedb5c8fb6d43e02.zip
do not store nulls in hbase. see issue 5
Diffstat (limited to 'src')
-rw-r--r--src/main/java/parallelai/spyglass/hbase/HBaseScheme.java8
-rw-r--r--src/main/scala/parallelai/spyglass/hbase/HBaseConversions.scala28
2 files changed, 16 insertions, 20 deletions
diff --git a/src/main/java/parallelai/spyglass/hbase/HBaseScheme.java b/src/main/java/parallelai/spyglass/hbase/HBaseScheme.java
index 3c64e52..aa446c1 100644
--- a/src/main/java/parallelai/spyglass/hbase/HBaseScheme.java
+++ b/src/main/java/parallelai/spyglass/hbase/HBaseScheme.java
@@ -219,10 +219,7 @@ public class HBaseScheme
String fieldName = (String) fields.get(k);
byte[] fieldNameBytes = Bytes.toBytes(fieldName);
byte[] cellValue = row.getValue(familyNameBytes, fieldNameBytes);
- if (cellValue == null) {
- cellValue = new byte[0];
- }
- result.add(new ImmutableBytesWritable(cellValue));
+ result.add(cellValue != null ? new ImmutableBytesWritable(cellValue) : null);
}
}
@@ -259,7 +256,8 @@ public class HBaseScheme
Tuple tuple = values.getTuple();
ImmutableBytesWritable valueBytes = (ImmutableBytesWritable) tuple.getObject(j);
- put.add(Bytes.toBytes(familyNames[i]), Bytes.toBytes((String) fields.get(j)), valueBytes.get());
+ if (valueBytes != null)
+ put.add(Bytes.toBytes(familyNames[i]), Bytes.toBytes((String) fields.get(j)), valueBytes.get());
}
}
diff --git a/src/main/scala/parallelai/spyglass/hbase/HBaseConversions.scala b/src/main/scala/parallelai/spyglass/hbase/HBaseConversions.scala
index b6d5742..31ed3ea 100644
--- a/src/main/scala/parallelai/spyglass/hbase/HBaseConversions.scala
+++ b/src/main/scala/parallelai/spyglass/hbase/HBaseConversions.scala
@@ -10,15 +10,14 @@ import org.apache.hadoop.hbase.util.Bytes
import cascading.tuple.TupleEntry
class HBasePipeWrapper (pipe: Pipe) {
- def toBytesWritable(f: Fields): Pipe = {
+ def toBytesWritable(f: Fields): Pipe = {
asList(f)
- .foldLeft(pipe){ (p, f) => {
- p.map(f.toString -> f.toString){ from: String => {
- new ImmutableBytesWritable(Bytes.toBytes(
- if (from == null) "" else from))
- }}
- }}
- }
+ .foldLeft(pipe){ (p, f) => {
+ p.map(f.toString -> f.toString){ from: String =>
+ Option(from).map(x => new ImmutableBytesWritable(Bytes.toBytes(x))).getOrElse(null)
+ }}
+ }
+ }
// def toBytesWritable : Pipe = {
// asList(Fields.ALL.asInstanceOf[TupleEntry].getFields()).foldLeft(pipe){ (p, f) => {
@@ -30,13 +29,12 @@ class HBasePipeWrapper (pipe: Pipe) {
def fromBytesWritable(f: Fields): Pipe = {
asList(f)
- .foldLeft(pipe) { (p, fld) =>
- p.map(fld.toString -> fld.toString) { from: ImmutableBytesWritable => {
- Bytes.toString(from.get)
- }
- }
- }
- }
+ .foldLeft(pipe) { (p, fld) => {
+ p.map(fld.toString -> fld.toString) { from: ImmutableBytesWritable =>
+ Option(from).map(x => Bytes.toString(x.get)).getOrElse(null)
+ }
+ }}
+ }
// def fromBytesWritable : Pipe = {
// asList(Fields.ALL.asInstanceOf[TupleEntry].getFields()).foldLeft(pipe) { (p, fld) =>