From ad8a18332a0bd32a3bf7dd1e045c31dcd99ba237 Mon Sep 17 00:00:00 2001 From: Martin Czygan Date: Sat, 10 Jul 2021 12:41:00 +0200 Subject: reduce: short circuit large groups we saw a jump in memory usage, and it may be related to groups with thousands of elements; e.g. maybe some weird string, that appears too many times as key, e.g. 123/test; as a first measure, we sort circuit further batching; other mitigiation may to be limit groups size completely --- skate/zipkey/batch.go | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/skate/zipkey/batch.go b/skate/zipkey/batch.go index 049c3b2..ac7d462 100644 --- a/skate/zipkey/batch.go +++ b/skate/zipkey/batch.go @@ -1,11 +1,15 @@ package zipkey import ( + "log" "runtime" "sync" ) -const defaultBatchSize = 10000 +const ( + defaultBatchSize = 2000 + maxGroupSize = 500 // short circuiting threshold +) // Batcher runs reducers in parallel on batches of groups. type Batcher struct { @@ -59,7 +63,13 @@ func (b *Batcher) GroupFunc(g *Group) error { panic("cannot call GroupFunc after Close") } b.batch = append(b.batch, g) - if len(b.batch) == b.Size { + // A few groups have an extended size, e.g. thousands of members. We short + // curcuit on those. + oversized := len(g.G0) > maxGroupSize || len(g.G1) > maxGroupSize + if len(b.batch) == b.Size || oversized { + if oversized { + log.Printf("short circuiting large group (%d/%d)", len(g.G0), len(g.G1)) + } g := make([]*Group, len(b.batch)) copy(g, b.batch) b.queue <- g -- cgit v1.2.3