aboutsummaryrefslogtreecommitdiffstats
path: root/skate/zipkey/batch.go
diff options
context:
space:
mode:
Diffstat (limited to 'skate/zipkey/batch.go')
-rw-r--r--skate/zipkey/batch.go14
1 files changed, 12 insertions, 2 deletions
diff --git a/skate/zipkey/batch.go b/skate/zipkey/batch.go
index 049c3b2..ac7d462 100644
--- a/skate/zipkey/batch.go
+++ b/skate/zipkey/batch.go
@@ -1,11 +1,15 @@
package zipkey
import (
+ "log"
"runtime"
"sync"
)
-const defaultBatchSize = 10000
+const (
+ defaultBatchSize = 2000
+ maxGroupSize = 500 // short circuiting threshold
+)
// Batcher runs reducers in parallel on batches of groups.
type Batcher struct {
@@ -59,7 +63,13 @@ func (b *Batcher) GroupFunc(g *Group) error {
panic("cannot call GroupFunc after Close")
}
b.batch = append(b.batch, g)
- if len(b.batch) == b.Size {
+ // A few groups have an extended size, e.g. thousands of members. We short
+ // curcuit on those.
+ oversized := len(g.G0) > maxGroupSize || len(g.G1) > maxGroupSize
+ if len(b.batch) == b.Size || oversized {
+ if oversized {
+ log.Printf("short circuiting large group (%d/%d)", len(g.G0), len(g.G1))
+ }
g := make([]*Group, len(b.batch))
copy(g, b.batch)
b.queue <- g