aboutsummaryrefslogtreecommitdiffstats
path: root/skate/zipkey/batch.go
diff options
context:
space:
mode:
Diffstat (limited to 'skate/zipkey/batch.go')
-rw-r--r--skate/zipkey/batch.go11
1 files changed, 8 insertions, 3 deletions
diff --git a/skate/zipkey/batch.go b/skate/zipkey/batch.go
index 9e52f90..ebbd081 100644
--- a/skate/zipkey/batch.go
+++ b/skate/zipkey/batch.go
@@ -14,7 +14,7 @@ type Batcher struct {
queue chan []*Group
wg sync.WaitGroup
err error
- closing bool
+ closing bool // https://stackoverflow.com/q/16105325/89391
}
// NewBatcher set ups a new Batcher.
@@ -32,6 +32,8 @@ func NewBatcher(gf groupFunc) *Batcher {
return &batcher
}
+// Close tears down the batcher. If this is not called, you get goroutine leaks
+// and will miss the data from the last uncommitted batch.
func (b *Batcher) Close() error {
b.closing = true
g := make([]*Group, len(b.batch))
@@ -43,7 +45,8 @@ func (b *Batcher) Close() error {
return b.err
}
-// GroupFunc implement the groupFunc type. Not thread safe.
+// GroupFunc implement the groupFunc type. Not thread safe. Panics if called
+// after Close has been called.
func (b *Batcher) GroupFunc(g *Group) error {
if b.closing {
panic("cannot call GroupFunc after Close")
@@ -58,7 +61,9 @@ func (b *Batcher) GroupFunc(g *Group) error {
return nil
}
-// worker will wind down after a first error encountered.
+// worker will wind down after any error has been encountered. Multiple threads
+// may set the error, but we currently only care whether the error is nil or
+// not.
func (b *Batcher) worker() {
defer b.wg.Done()
OUTER: