diff options
Diffstat (limited to 'skate')
-rw-r--r-- | skate/zipkey/batch.go | 14 |
1 files changed, 12 insertions, 2 deletions
diff --git a/skate/zipkey/batch.go b/skate/zipkey/batch.go index 049c3b2..ac7d462 100644 --- a/skate/zipkey/batch.go +++ b/skate/zipkey/batch.go @@ -1,11 +1,15 @@ package zipkey import ( + "log" "runtime" "sync" ) -const defaultBatchSize = 10000 +const ( + defaultBatchSize = 2000 + maxGroupSize = 500 // short circuiting threshold +) // Batcher runs reducers in parallel on batches of groups. type Batcher struct { @@ -59,7 +63,13 @@ func (b *Batcher) GroupFunc(g *Group) error { panic("cannot call GroupFunc after Close") } b.batch = append(b.batch, g) - if len(b.batch) == b.Size { + // A few groups have an extended size, e.g. thousands of members. We short + // curcuit on those. + oversized := len(g.G0) > maxGroupSize || len(g.G1) > maxGroupSize + if len(b.batch) == b.Size || oversized { + if oversized { + log.Printf("short circuiting large group (%d/%d)", len(g.G0), len(g.G1)) + } g := make([]*Group, len(b.batch)) copy(g, b.batch) b.queue <- g |