aboutsummaryrefslogtreecommitdiffstats
path: root/skate/zipkey/batch_test.go
blob: 6156fbf061c00b82422bdea1a0cb326bb0bf7063 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
package zipkey

import (
	"bytes"
	"encoding/json"
	"fmt"
	"io"
	"reflect"
	"strings"
	"sync"
	"testing"
)

func TestBatcher(t *testing.T) {
	var (
		buf bytes.Buffer
		enc = json.NewEncoder(&buf)
		f   = func(g *Group) error {
			return enc.Encode(g)
		}
		b = NewBatcher(groupFunc(f))
	)
	b.GroupFunc(&Group{
		Key: "K1",
		G0:  []string{"A"},
		G1:  []string{"B"},
	})
	b.Close()
	var (
		got  = strings.TrimSpace(buf.String())
		want = `{"Key":"K1","G0":["A"],"G1":["B"]}`
	)
	if got != want {
		t.Fatalf("got %v, want %v", got, want)
	}
}

func TestBatcherLarge(t *testing.T) {
	var (
		N          = 1000000
		numWorkers = 24
		size       = 7000

		// We share a single writer across threads, so we need to guard each
		// write. TODO: measure performance impact.
		mu  sync.Mutex
		buf bytes.Buffer

		// The reducer is a simple function that will write "1" into the
		// buffer, if the groups are equal, "0" otherwise.
		f = func(g *Group) error {
			var v string
			if reflect.DeepEqual(g.G0, g.G1) {
				v = "1"
			} else {
				v = "0"
			}
			mu.Lock()
			defer mu.Unlock()
			if _, err := io.WriteString(&buf, v); err != nil {
				return err
			}
			return nil
		}
		b = NewBatcher(groupFunc(f))
	)
	b.Size = size
	b.NumWorkers = numWorkers

	for i := 0; i < N; i++ {
		var u, v string
		if i%2 == 0 {
			u, v = "a", "b"
		} else {
			u, v = "a", "a"
		}
		g := &Group{
			Key: fmt.Sprintf("%d", i),
			G0:  []string{u},
			G1:  []string{v},
		}
		if err := b.GroupFunc(g); err != nil {
			t.Fatalf("unexpected err from gf: %v", err)
		}
	}
	if err := b.Close(); err != nil {
		t.Fatalf("unexpected err from close: %v", err)
	}
	got := buf.String()
	count0, count1 := strings.Count(got, "0"), strings.Count(got, "1")
	if count1 != N/2 {
		t.Fatalf("got %v, want %v (count0=%v, buf=%s)", count1, N/2, count0, buf.String())
	}
}

// TODO: BenchmarkBatcher with different worker counts.