aboutsummaryrefslogtreecommitdiffstats
path: root/skate/zipkey/batch.go
diff options
context:
space:
mode:
Diffstat (limited to 'skate/zipkey/batch.go')
-rw-r--r--skate/zipkey/batch.go69
1 files changed, 69 insertions, 0 deletions
diff --git a/skate/zipkey/batch.go b/skate/zipkey/batch.go
new file mode 100644
index 0000000..d81897d
--- /dev/null
+++ b/skate/zipkey/batch.go
@@ -0,0 +1,69 @@
+package zipkey
+
+import (
+ "runtime"
+ "sync"
+)
+
+// Batcher runs reducers in parallel on batches of groups.
+type Batcher struct {
+ Size int
+ NumWorkers int
+ gf groupFunc
+ batch []*Group
+ queue chan []*Group
+ wg sync.WaitGroup
+ err error
+}
+
+// NewBatcher set ups a new Batcher.
+func NewBatcher(gf groupFunc) *Batcher {
+ batcher := Batcher{
+ gf: gf,
+ Size: 1000,
+ NumWorkers: runtime.NumCPU(),
+ queue: make(chan []*Group),
+ }
+ for i := 0; i < batcher.NumWorkers; i++ {
+ batcher.wg.Add(1)
+ go batcher.worker(batcher.queue, &batcher.wg)
+ }
+ return &batcher
+}
+
+func (b *Batcher) Close() error {
+ g := make([]*Group, len(b.batch))
+ copy(g, b.batch)
+ b.queue <- g
+ b.batch = nil
+ close(b.queue)
+ b.wg.Wait()
+ return b.err
+}
+
+// GroupFunc implement the groupFunc type.
+func (b *Batcher) GroupFunc(g *Group) error {
+ b.batch = append(b.batch, g)
+ if len(b.batch) == b.Size {
+ g := make([]*Group, len(b.batch))
+ copy(g, b.batch)
+ b.queue <- g
+ b.batch = nil
+ }
+ return nil
+}
+
+// worker will wind down after a first error encountered.
+func (b *Batcher) worker(queue chan []*Group, wg *sync.WaitGroup) {
+ defer wg.Done()
+OUTER:
+ for batch := range queue {
+ for _, g := range batch {
+ err := b.gf(g)
+ if err != nil {
+ b.err = err
+ break OUTER
+ }
+ }
+ }
+}