aboutsummaryrefslogtreecommitdiffstats
path: root/skate
diff options
context:
space:
mode:
Diffstat (limited to 'skate')
-rw-r--r--skate/zipkey/batch.go10
-rw-r--r--skate/zipkey/batch_test.go57
2 files changed, 64 insertions, 3 deletions
diff --git a/skate/zipkey/batch.go b/skate/zipkey/batch.go
index d81897d..c31909c 100644
--- a/skate/zipkey/batch.go
+++ b/skate/zipkey/batch.go
@@ -14,6 +14,7 @@ type Batcher struct {
queue chan []*Group
wg sync.WaitGroup
err error
+ closing bool
}
// NewBatcher set ups a new Batcher.
@@ -32,6 +33,7 @@ func NewBatcher(gf groupFunc) *Batcher {
}
func (b *Batcher) Close() error {
+ b.closing = true
g := make([]*Group, len(b.batch))
copy(g, b.batch)
b.queue <- g
@@ -41,8 +43,11 @@ func (b *Batcher) Close() error {
return b.err
}
-// GroupFunc implement the groupFunc type.
+// GroupFunc implement the groupFunc type. Not thread safe.
func (b *Batcher) GroupFunc(g *Group) error {
+ if b.closing {
+ panic("cannot call GroupFunc after Close")
+ }
b.batch = append(b.batch, g)
if len(b.batch) == b.Size {
g := make([]*Group, len(b.batch))
@@ -59,8 +64,7 @@ func (b *Batcher) worker(queue chan []*Group, wg *sync.WaitGroup) {
OUTER:
for batch := range queue {
for _, g := range batch {
- err := b.gf(g)
- if err != nil {
+ if err := b.gf(g); err != nil {
b.err = err
break OUTER
}
diff --git a/skate/zipkey/batch_test.go b/skate/zipkey/batch_test.go
index 7c6a48c..38a1307 100644
--- a/skate/zipkey/batch_test.go
+++ b/skate/zipkey/batch_test.go
@@ -3,7 +3,11 @@ package zipkey
import (
"bytes"
"encoding/json"
+ "fmt"
+ "io"
+ "reflect"
"strings"
+ "sync"
"testing"
)
@@ -30,3 +34,56 @@ func TestBatcher(t *testing.T) {
t.Fatalf("got %v, want %v", got, want)
}
}
+
+func TestBatcherLarge(t *testing.T) {
+ var (
+ N = 1000000
+ numWorkers = 24
+ size = 7000
+ // We share a single writer across threads, so we need to guard each
+ // write. TODO: measure performance impact.
+ mu sync.Mutex
+ buf bytes.Buffer
+ f = func(g *Group) error {
+ var v string
+ if reflect.DeepEqual(g.G0, g.G1) {
+ v = "1"
+ } else {
+ v = "0"
+ }
+ mu.Lock()
+ defer mu.Unlock()
+ if _, err := io.WriteString(&buf, v); err != nil {
+ return err
+ }
+ return nil
+ }
+ b = NewBatcher(groupFunc(f))
+ )
+ b.Size = size
+ b.NumWorkers = numWorkers
+ for i := 0; i < N; i++ {
+ var u, v string
+ if i%2 == 0 {
+ u, v = "a", "b"
+ } else {
+ u, v = "a", "a"
+ }
+ g := &Group{
+ Key: fmt.Sprintf("%d", i),
+ G0: []string{u},
+ G1: []string{v},
+ }
+ if err := b.GroupFunc(g); err != nil {
+ t.Fatalf("unexpected err from gf: %v", err)
+ }
+ }
+ if err := b.Close(); err != nil {
+ t.Fatalf("unexpected err from close: %v", err)
+ }
+ got := buf.String()
+ count0, count1 := strings.Count(got, "0"), strings.Count(got, "1")
+ if count1 != N/2 {
+ t.Fatalf("got %v, want %v (count0=%v, buf=%s)", count1, N/2, count0, buf.String())
+ }
+}