diff options
author | Martin Czygan <martin.czygan@gmail.com> | 2021-07-10 12:41:00 +0200 |
---|---|---|
committer | Martin Czygan <martin.czygan@gmail.com> | 2021-07-10 12:41:00 +0200 |
commit | ad8a18332a0bd32a3bf7dd1e045c31dcd99ba237 (patch) | |
tree | 311bed35e6cfd0ab656c70bbb94e10f801d7dab1 | |
parent | 09fe6b8b63cb16e71a05b7b9480ef46d34009456 (diff) | |
download | refcat-ad8a18332a0bd32a3bf7dd1e045c31dcd99ba237.tar.gz refcat-ad8a18332a0bd32a3bf7dd1e045c31dcd99ba237.zip |
reduce: short circuit large groups
we saw a jump in memory usage, and it may be related to groups with
thousands of elements; e.g. maybe some weird string, that appears too
many times as key, e.g. 123/test; as a first measure, we sort circuit
further batching; other mitigiation may to be limit groups size
completely
-rw-r--r-- | skate/zipkey/batch.go | 14 |
1 files changed, 12 insertions, 2 deletions
diff --git a/skate/zipkey/batch.go b/skate/zipkey/batch.go index 049c3b2..ac7d462 100644 --- a/skate/zipkey/batch.go +++ b/skate/zipkey/batch.go @@ -1,11 +1,15 @@ package zipkey import ( + "log" "runtime" "sync" ) -const defaultBatchSize = 10000 +const ( + defaultBatchSize = 2000 + maxGroupSize = 500 // short circuiting threshold +) // Batcher runs reducers in parallel on batches of groups. type Batcher struct { @@ -59,7 +63,13 @@ func (b *Batcher) GroupFunc(g *Group) error { panic("cannot call GroupFunc after Close") } b.batch = append(b.batch, g) - if len(b.batch) == b.Size { + // A few groups have an extended size, e.g. thousands of members. We short + // curcuit on those. + oversized := len(g.G0) > maxGroupSize || len(g.G1) > maxGroupSize + if len(b.batch) == b.Size || oversized { + if oversized { + log.Printf("short circuiting large group (%d/%d)", len(g.G0), len(g.G1)) + } g := make([]*Group, len(b.batch)) copy(g, b.batch) b.queue <- g |