aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--skate/reduce.go7
-rw-r--r--skate/zipkey/batch.go6
2 files changed, 7 insertions, 6 deletions
diff --git a/skate/reduce.go b/skate/reduce.go
index e362ca4..9986152 100644
--- a/skate/reduce.go
+++ b/skate/reduce.go
@@ -432,10 +432,11 @@ func ZippyBrefAugment(bref, raw io.Reader, w io.Writer) error {
batcher = zipkey.NewBatcher(grouper)
)
defer batcher.Close()
+ defer func() {
+ log.Println(stats)
+ }()
zipper := zipkey.New(bref, raw, keyer, batcher.GroupFunc)
- err := zipper.Run()
- log.Println(stats)
- return err
+ return zipper.Run()
}
// removeSelfLinks removes self-referential links. TODO: Those should be caught
diff --git a/skate/zipkey/batch.go b/skate/zipkey/batch.go
index f8b945c..6ab7eee 100644
--- a/skate/zipkey/batch.go
+++ b/skate/zipkey/batch.go
@@ -55,7 +55,7 @@ func (b *Batcher) Close() error {
return b.err
}
-// GroupFunc is a drop for a groupFunc. Use this function, where you used
+// GroupFunc is a drop-in for a groupFunc. Use this function, where you used
// grouper before. Not thread safe. Panics if called after Close.
func (b *Batcher) GroupFunc(g *Group) error {
if b.closing {
@@ -63,7 +63,7 @@ func (b *Batcher) GroupFunc(g *Group) error {
}
b.batch = append(b.batch, g)
// A few groups have an extended size, e.g. thousands of members. We short
- // curcuit on those.
+ // curcuit on those to save memory.
oversized := len(g.G0) > maxGroupSize || len(g.G1) > maxGroupSize
if len(b.batch) == b.Size || oversized {
g := make([]*Group, len(b.batch))
@@ -71,7 +71,7 @@ func (b *Batcher) GroupFunc(g *Group) error {
b.queue <- g
b.batch = nil
}
- return nil
+ return b.err
}
// worker will wind down after any error has been encountered. Multiple threads