aboutsummaryrefslogtreecommitdiffstats
path: root/skate
diff options
context:
space:
mode:
authorMartin Czygan <martin.czygan@gmail.com>2021-07-10 12:41:00 +0200
committerMartin Czygan <martin.czygan@gmail.com>2021-07-10 12:41:00 +0200
commitad8a18332a0bd32a3bf7dd1e045c31dcd99ba237 (patch)
tree311bed35e6cfd0ab656c70bbb94e10f801d7dab1 /skate
parent09fe6b8b63cb16e71a05b7b9480ef46d34009456 (diff)
downloadrefcat-ad8a18332a0bd32a3bf7dd1e045c31dcd99ba237.tar.gz
refcat-ad8a18332a0bd32a3bf7dd1e045c31dcd99ba237.zip
reduce: short circuit large groups
we saw a jump in memory usage, and it may be related to groups with thousands of elements; e.g. maybe some weird string, that appears too many times as key, e.g. 123/test; as a first measure, we sort circuit further batching; other mitigiation may to be limit groups size completely
Diffstat (limited to 'skate')
-rw-r--r--skate/zipkey/batch.go14
1 files changed, 12 insertions, 2 deletions
diff --git a/skate/zipkey/batch.go b/skate/zipkey/batch.go
index 049c3b2..ac7d462 100644
--- a/skate/zipkey/batch.go
+++ b/skate/zipkey/batch.go
@@ -1,11 +1,15 @@
package zipkey
import (
+ "log"
"runtime"
"sync"
)
-const defaultBatchSize = 10000
+const (
+ defaultBatchSize = 2000
+ maxGroupSize = 500 // short circuiting threshold
+)
// Batcher runs reducers in parallel on batches of groups.
type Batcher struct {
@@ -59,7 +63,13 @@ func (b *Batcher) GroupFunc(g *Group) error {
panic("cannot call GroupFunc after Close")
}
b.batch = append(b.batch, g)
- if len(b.batch) == b.Size {
+ // A few groups have an extended size, e.g. thousands of members. We short
+ // curcuit on those.
+ oversized := len(g.G0) > maxGroupSize || len(g.G1) > maxGroupSize
+ if len(b.batch) == b.Size || oversized {
+ if oversized {
+ log.Printf("short circuiting large group (%d/%d)", len(g.G0), len(g.G1))
+ }
g := make([]*Group, len(b.batch))
copy(g, b.batch)
b.queue <- g