diff options
Diffstat (limited to 'skate/zipkey/batch.go')
-rw-r--r-- | skate/zipkey/batch.go | 10 |
1 files changed, 7 insertions, 3 deletions
diff --git a/skate/zipkey/batch.go b/skate/zipkey/batch.go index d81897d..c31909c 100644 --- a/skate/zipkey/batch.go +++ b/skate/zipkey/batch.go @@ -14,6 +14,7 @@ type Batcher struct { queue chan []*Group wg sync.WaitGroup err error + closing bool } // NewBatcher set ups a new Batcher. @@ -32,6 +33,7 @@ func NewBatcher(gf groupFunc) *Batcher { } func (b *Batcher) Close() error { + b.closing = true g := make([]*Group, len(b.batch)) copy(g, b.batch) b.queue <- g @@ -41,8 +43,11 @@ func (b *Batcher) Close() error { return b.err } -// GroupFunc implement the groupFunc type. +// GroupFunc implement the groupFunc type. Not thread safe. func (b *Batcher) GroupFunc(g *Group) error { + if b.closing { + panic("cannot call GroupFunc after Close") + } b.batch = append(b.batch, g) if len(b.batch) == b.Size { g := make([]*Group, len(b.batch)) @@ -59,8 +64,7 @@ func (b *Batcher) worker(queue chan []*Group, wg *sync.WaitGroup) { OUTER: for batch := range queue { for _, g := range batch { - err := b.gf(g) - if err != nil { + if err := b.gf(g); err != nil { b.err = err break OUTER } |