diff options
author | Martin Czygan <martin.czygan@gmail.com> | 2021-07-04 10:11:37 +0200 |
---|---|---|
committer | Martin Czygan <martin.czygan@gmail.com> | 2021-07-04 10:11:37 +0200 |
commit | 0935913a70d4fb12851f1c085c0e9dd6bb0cf5e8 (patch) | |
tree | 3401fb1dee42a09f0ba0c8656f5e09f924168d9c /skate/zipkey/batch.go | |
parent | c505030a6fa4594240ae5e344ea496efe7e51ede (diff) | |
parent | 6bb05e88506b5b09dd5d73d50235ebdb9cc34934 (diff) | |
download | refcat-0935913a70d4fb12851f1c085c0e9dd6bb0cf5e8.tar.gz refcat-0935913a70d4fb12851f1c085c0e9dd6bb0cf5e8.zip |
Merge branch 'master' of git.archive.org:martin/cgraph
* 'master' of git.archive.org:martin/cgraph:
wip: batch reducers
Diffstat (limited to 'skate/zipkey/batch.go')
-rw-r--r-- | skate/zipkey/batch.go | 69 |
1 files changed, 69 insertions, 0 deletions
diff --git a/skate/zipkey/batch.go b/skate/zipkey/batch.go new file mode 100644 index 0000000..d81897d --- /dev/null +++ b/skate/zipkey/batch.go @@ -0,0 +1,69 @@ +package zipkey + +import ( + "runtime" + "sync" +) + +// Batcher runs reducers in parallel on batches of groups. +type Batcher struct { + Size int + NumWorkers int + gf groupFunc + batch []*Group + queue chan []*Group + wg sync.WaitGroup + err error +} + +// NewBatcher set ups a new Batcher. +func NewBatcher(gf groupFunc) *Batcher { + batcher := Batcher{ + gf: gf, + Size: 1000, + NumWorkers: runtime.NumCPU(), + queue: make(chan []*Group), + } + for i := 0; i < batcher.NumWorkers; i++ { + batcher.wg.Add(1) + go batcher.worker(batcher.queue, &batcher.wg) + } + return &batcher +} + +func (b *Batcher) Close() error { + g := make([]*Group, len(b.batch)) + copy(g, b.batch) + b.queue <- g + b.batch = nil + close(b.queue) + b.wg.Wait() + return b.err +} + +// GroupFunc implement the groupFunc type. +func (b *Batcher) GroupFunc(g *Group) error { + b.batch = append(b.batch, g) + if len(b.batch) == b.Size { + g := make([]*Group, len(b.batch)) + copy(g, b.batch) + b.queue <- g + b.batch = nil + } + return nil +} + +// worker will wind down after a first error encountered. +func (b *Batcher) worker(queue chan []*Group, wg *sync.WaitGroup) { + defer wg.Done() +OUTER: + for batch := range queue { + for _, g := range batch { + err := b.gf(g) + if err != nil { + b.err = err + break OUTER + } + } + } +} |