aboutsummaryrefslogtreecommitdiffstats
path: root/skate/zipkey/batch.go
diff options
context:
space:
mode:
Diffstat (limited to 'skate/zipkey/batch.go')
-rw-r--r--skate/zipkey/batch.go10
1 files changed, 7 insertions, 3 deletions
diff --git a/skate/zipkey/batch.go b/skate/zipkey/batch.go
index d81897d..c31909c 100644
--- a/skate/zipkey/batch.go
+++ b/skate/zipkey/batch.go
@@ -14,6 +14,7 @@ type Batcher struct {
queue chan []*Group
wg sync.WaitGroup
err error
+ closing bool
}
// NewBatcher set ups a new Batcher.
@@ -32,6 +33,7 @@ func NewBatcher(gf groupFunc) *Batcher {
}
func (b *Batcher) Close() error {
+ b.closing = true
g := make([]*Group, len(b.batch))
copy(g, b.batch)
b.queue <- g
@@ -41,8 +43,11 @@ func (b *Batcher) Close() error {
return b.err
}
-// GroupFunc implement the groupFunc type.
+// GroupFunc implement the groupFunc type. Not thread safe.
func (b *Batcher) GroupFunc(g *Group) error {
+ if b.closing {
+ panic("cannot call GroupFunc after Close")
+ }
b.batch = append(b.batch, g)
if len(b.batch) == b.Size {
g := make([]*Group, len(b.batch))
@@ -59,8 +64,7 @@ func (b *Batcher) worker(queue chan []*Group, wg *sync.WaitGroup) {
OUTER:
for batch := range queue {
for _, g := range batch {
- err := b.gf(g)
- if err != nil {
+ if err := b.gf(g); err != nil {
b.err = err
break OUTER
}