package zipkey import ( "runtime" "sync" ) const ( defaultBatchSize = 2000 maxGroupSize = 500 // short circuiting threshold ) // Batcher runs reducers (over groups) in parallel on batches. This can have a // significant impact on runtime, e.g. for certain datasets and operations we // saw a reduction of 80% in processing time. type Batcher struct { Size int NumWorkers int gf groupFunc batch []*Group queue chan []*Group wg sync.WaitGroup err error closing bool // https://stackoverflow.com/q/16105325/89391 } // NewBatcher set ups a new Batcher with a default batch size. func NewBatcher(gf groupFunc) *Batcher { return NewBatcherSize(gf, defaultBatchSize) } // NewBatcherSize initializes a batcher with a given function to apply and a // batch size. func NewBatcherSize(gf groupFunc, size int) *Batcher { batcher := Batcher{ gf: gf, Size: size, NumWorkers: runtime.NumCPU(), queue: make(chan []*Group), } batcher.wg.Add(batcher.NumWorkers) for i := 0; i < batcher.NumWorkers; i++ { go batcher.worker() } return &batcher } // Close tears down the batcher. If this is not called, you get goroutine leaks // and will miss the data from the last uncommitted batch. Calling this // function more than once will result in a panic. func (b *Batcher) Close() error { b.closing = true g := make([]*Group, len(b.batch)) copy(g, b.batch) b.queue <- g b.batch = nil close(b.queue) b.wg.Wait() return b.err } // GroupFunc is a drop-in for a groupFunc. Use this function, where you used // groupFunc before. Not thread safe. Panics if called after Close. func (b *Batcher) GroupFunc(g *Group) error { if b.closing { panic("batcher: cannot call GroupFunc after Close") } b.batch = append(b.batch, g) // A few groups have an extended size, e.g. thousands of members. We short // curcuit on those to save memory. oversized := len(g.G0) > maxGroupSize || len(g.G1) > maxGroupSize if len(b.batch) == b.Size || oversized { g := make([]*Group, len(b.batch)) copy(g, b.batch) b.queue <- g b.batch = nil } return b.err } // worker will wind down after any error has been encountered. Multiple threads // may set the error, but we currently only care whether the error is nil or // not. func (b *Batcher) worker() { defer b.wg.Done() OUTER: for batch := range b.queue { for _, g := range batch { if err := b.gf(g); err != nil { b.err = err break OUTER } } } }