package zipkey import ( "runtime" "sync" ) // Batcher runs reducers in parallel on batches of groups. type Batcher struct { Size int NumWorkers int gf groupFunc batch []*Group queue chan []*Group wg sync.WaitGroup err error closing bool } // NewBatcher set ups a new Batcher. func NewBatcher(gf groupFunc) *Batcher { batcher := Batcher{ gf: gf, Size: 1000, NumWorkers: runtime.NumCPU(), queue: make(chan []*Group), } for i := 0; i < batcher.NumWorkers; i++ { batcher.wg.Add(1) go batcher.worker() } return &batcher } func (b *Batcher) Close() error { b.closing = true g := make([]*Group, len(b.batch)) copy(g, b.batch) b.queue <- g b.batch = nil close(b.queue) b.wg.Wait() return b.err } // GroupFunc implement the groupFunc type. Not thread safe. func (b *Batcher) GroupFunc(g *Group) error { if b.closing { panic("cannot call GroupFunc after Close") } b.batch = append(b.batch, g) if len(b.batch) == b.Size { g := make([]*Group, len(b.batch)) copy(g, b.batch) b.queue <- g b.batch = nil } return nil } // worker will wind down after a first error encountered. func (b *Batcher) worker() { defer b.wg.Done() OUTER: for batch := range b.queue { for _, g := range batch { if err := b.gf(g); err != nil { b.err = err break OUTER } } } }