package zipkey import ( "runtime" "sync" ) // Batcher runs reducers in parallel on batches of groups. type Batcher struct { Size int NumWorkers int gf groupFunc batch []*Group queue chan []*Group wg sync.WaitGroup err error } // NewBatcher set ups a new Batcher. func NewBatcher(gf groupFunc) *Batcher { batcher := Batcher{ gf: gf, Size: 1000, NumWorkers: runtime.NumCPU(), queue: make(chan []*Group), } for i := 0; i < batcher.NumWorkers; i++ { batcher.wg.Add(1) go batcher.worker(batcher.queue, &batcher.wg) } return &batcher } func (b *Batcher) Close() error { g := make([]*Group, len(b.batch)) copy(g, b.batch) b.queue <- g b.batch = nil close(b.queue) b.wg.Wait() return b.err } // GroupFunc implement the groupFunc type. func (b *Batcher) GroupFunc(g *Group) error { b.batch = append(b.batch, g) if len(b.batch) == b.Size { g := make([]*Group, len(b.batch)) copy(g, b.batch) b.queue <- g b.batch = nil } return nil } // worker will wind down after a first error encountered. func (b *Batcher) worker(queue chan []*Group, wg *sync.WaitGroup) { defer wg.Done() OUTER: for batch := range queue { for _, g := range batch { err := b.gf(g) if err != nil { b.err = err break OUTER } } } }