1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
|
package zipkey
import (
"bytes"
"encoding/json"
"fmt"
"io"
"reflect"
"strings"
"sync"
"testing"
)
func TestBatcher(t *testing.T) {
var (
buf bytes.Buffer
enc = json.NewEncoder(&buf)
f = func(g *Group) error {
return enc.Encode(g)
}
b = NewBatcher(groupFunc(f))
)
b.GroupFunc(&Group{
Key: "K1",
G0: []string{"A"},
G1: []string{"B"},
})
b.Close()
var (
got = strings.TrimSpace(buf.String())
want = `{"Key":"K1","G0":["A"],"G1":["B"]}`
)
if got != want {
t.Fatalf("got %v, want %v", got, want)
}
}
func TestBatcherLarge(t *testing.T) {
var (
N = 1000000
numWorkers = 24
size = 7000
// We share a single writer across threads, so we need to guard each
// write. TODO: measure performance impact.
mu sync.Mutex
buf bytes.Buffer
// The reducer is a simple function that will write "1" into the
// buffer, if the groups are equal, "0" otherwise.
f = func(g *Group) error {
var v string
if reflect.DeepEqual(g.G0, g.G1) {
v = "1"
} else {
v = "0"
}
mu.Lock()
defer mu.Unlock()
if _, err := io.WriteString(&buf, v); err != nil {
return err
}
return nil
}
b = NewBatcher(groupFunc(f))
)
b.Size = size
b.NumWorkers = numWorkers
for i := 0; i < N; i++ {
var u, v string
if i%2 == 0 {
u, v = "a", "b"
} else {
u, v = "a", "a"
}
g := &Group{
Key: fmt.Sprintf("%d", i),
G0: []string{u},
G1: []string{v},
}
if err := b.GroupFunc(g); err != nil {
t.Fatalf("unexpected err from gf: %v", err)
}
}
if err := b.Close(); err != nil {
t.Fatalf("unexpected err from close: %v", err)
}
got := buf.String()
count0, count1 := strings.Count(got, "0"), strings.Count(got, "1")
if count1 != N/2 {
t.Fatalf("got %v, want %v (count0=%v, buf=%s)", count1, N/2, count0, buf.String())
}
}
// TODO: BenchmarkBatcher with different worker counts.
|