diff options
Diffstat (limited to 'skate/zipkey/batch.go')
-rw-r--r-- | skate/zipkey/batch.go | 69 |
1 files changed, 69 insertions, 0 deletions
diff --git a/skate/zipkey/batch.go b/skate/zipkey/batch.go new file mode 100644 index 0000000..d81897d --- /dev/null +++ b/skate/zipkey/batch.go @@ -0,0 +1,69 @@ +package zipkey + +import ( + "runtime" + "sync" +) + +// Batcher runs reducers in parallel on batches of groups. +type Batcher struct { + Size int + NumWorkers int + gf groupFunc + batch []*Group + queue chan []*Group + wg sync.WaitGroup + err error +} + +// NewBatcher set ups a new Batcher. +func NewBatcher(gf groupFunc) *Batcher { + batcher := Batcher{ + gf: gf, + Size: 1000, + NumWorkers: runtime.NumCPU(), + queue: make(chan []*Group), + } + for i := 0; i < batcher.NumWorkers; i++ { + batcher.wg.Add(1) + go batcher.worker(batcher.queue, &batcher.wg) + } + return &batcher +} + +func (b *Batcher) Close() error { + g := make([]*Group, len(b.batch)) + copy(g, b.batch) + b.queue <- g + b.batch = nil + close(b.queue) + b.wg.Wait() + return b.err +} + +// GroupFunc implement the groupFunc type. +func (b *Batcher) GroupFunc(g *Group) error { + b.batch = append(b.batch, g) + if len(b.batch) == b.Size { + g := make([]*Group, len(b.batch)) + copy(g, b.batch) + b.queue <- g + b.batch = nil + } + return nil +} + +// worker will wind down after a first error encountered. +func (b *Batcher) worker(queue chan []*Group, wg *sync.WaitGroup) { + defer wg.Done() +OUTER: + for batch := range queue { + for _, g := range batch { + err := b.gf(g) + if err != nil { + b.err = err + break OUTER + } + } + } +} |