package zipkey import ( "bytes" "encoding/json" "fmt" "io" "reflect" "strings" "sync" "testing" ) func TestBatcher(t *testing.T) { var ( buf bytes.Buffer enc = json.NewEncoder(&buf) f = func(g *Group) error { return enc.Encode(g) } b = NewBatcher(groupFunc(f)) ) b.GroupFunc(&Group{ Key: "K1", G0: []string{"A"}, G1: []string{"B"}, }) b.Close() var ( got = strings.TrimSpace(buf.String()) want = `{"Key":"K1","G0":["A"],"G1":["B"]}` ) if got != want { t.Fatalf("got %v, want %v", got, want) } } func TestBatcherLarge(t *testing.T) { var ( N = 1000000 numWorkers = 24 size = 7000 // We share a single writer across threads, so we need to guard each // write. TODO: measure performance impact. mu sync.Mutex buf bytes.Buffer // The reducer is a simple function that will write "1" into the // buffer, if the groups are equal, "0" otherwise. f = func(g *Group) error { var v string if reflect.DeepEqual(g.G0, g.G1) { v = "1" } else { v = "0" } mu.Lock() defer mu.Unlock() if _, err := io.WriteString(&buf, v); err != nil { return err } return nil } b = NewBatcher(groupFunc(f)) ) b.Size = size b.NumWorkers = numWorkers for i := 0; i < N; i++ { var u, v string if i%2 == 0 { u, v = "a", "b" } else { u, v = "a", "a" } g := &Group{ Key: fmt.Sprintf("%d", i), G0: []string{u}, G1: []string{v}, } if err := b.GroupFunc(g); err != nil { t.Fatalf("unexpected err from gf: %v", err) } } if err := b.Close(); err != nil { t.Fatalf("unexpected err from close: %v", err) } got := buf.String() count0, count1 := strings.Count(got, "0"), strings.Count(got, "1") if count1 != N/2 { t.Fatalf("got %v, want %v (count0=%v, buf=%s)", count1, N/2, count0, buf.String()) } } // TODO: BenchmarkBatcher with different worker counts.