diff options
-rw-r--r-- | skate/reduce.go | 36 | ||||
-rw-r--r-- | skate/zipkey/batch.go | 9 |
2 files changed, 25 insertions, 20 deletions
diff --git a/skate/reduce.go b/skate/reduce.go index 5b30fdf..fc286d9 100644 --- a/skate/reduce.go +++ b/skate/reduce.go @@ -44,7 +44,7 @@ func ZippyExact(releases, refs io.Reader, matchResult MatchResult, w io.Writer) var ( enc = json.NewEncoder(xio.NewSingleWriter(w)) keyer = makeKeyFunc("\t", 1) - batcher = zipkey.NewBatcher(func(g *zipkey.Group) error { + grouper = func(g *zipkey.Group) error { var ( target *Release ref *Ref @@ -80,9 +80,9 @@ func ZippyExact(releases, refs io.Reader, matchResult MatchResult, w io.Writer) } } return nil - }) + } + batcher = zipkey.NewBatcherSize(grouper, 10000) // hard-code for now; on 24 cores 10K take up over 8G of RAM ) - batcher.Size = 10000 // hard-code for now; on 24 cores 10K take up over 8G of RAM defer batcher.Close() zipper := zipkey.New(releases, refs, keyer, batcher.GroupFunc) return zipper.Run() @@ -94,7 +94,7 @@ func ZippyExactReleases(olr, releases io.Reader, matchResult MatchResult, w io.W var ( enc = json.NewEncoder(xio.NewSingleWriter(w)) keyer = makeKeyFunc("\t", 1) - batcher = zipkey.NewBatcher(func(g *zipkey.Group) error { + grouper = func(g *zipkey.Group) error { var ( target, re *Release err error @@ -130,9 +130,9 @@ func ZippyExactReleases(olr, releases io.Reader, matchResult MatchResult, w io.W } } return nil - }) + } + batcher = zipkey.NewBatcherSize(grouper, 10000) ) - batcher.Size = 10000 defer batcher.Close() zipper := zipkey.New(olr, releases, keyer, batcher.GroupFunc) return zipper.Run() @@ -144,7 +144,7 @@ func ZippyExactWiki(releases, wiki io.Reader, mr MatchResult, w io.Writer) error var ( enc = json.NewEncoder(xio.NewSingleWriter(w)) keyer = makeKeyFunc("\t", 1) - batcher = zipkey.NewBatcher(func(g *zipkey.Group) error { + grouper = func(g *zipkey.Group) error { var ( target *Release wiki *MinimalCitations @@ -173,9 +173,9 @@ func ZippyExactWiki(releases, wiki io.Reader, mr MatchResult, w io.Writer) error } } return nil - }) + } + batcher = zipkey.NewBatcherSize(grouper, 10000) ) - batcher.Size = 10000 defer batcher.Close() zipper := zipkey.New(releases, wiki, keyer, batcher.GroupFunc) return zipper.Run() @@ -188,7 +188,7 @@ func ZippyVerifyRefs(releases, refs io.Reader, w io.Writer) error { var ( enc = json.NewEncoder(xio.NewSingleWriter(w)) keyer = makeKeyFunc("\t", 1) - batcher = zipkey.NewBatcher(func(g *zipkey.Group) error { + grouper = func(g *zipkey.Group) error { var ( re, pivot *Release err error @@ -217,9 +217,9 @@ func ZippyVerifyRefs(releases, refs io.Reader, w io.Writer) error { } } return nil - }) + } + batcher = zipkey.NewBatcherSize(grouper, 10000) ) - batcher.Size = 10000 defer batcher.Close() zipper := zipkey.New(releases, refs, keyer, batcher.GroupFunc) return zipper.Run() @@ -273,7 +273,7 @@ func ZippyVerifyRefsOpenLibrary(olr, refs io.Reader, w io.Writer) error { var ( enc = json.NewEncoder(xio.NewSingleWriter(w)) keyer = makeKeyFunc("\t", 1) - batcher = zipkey.NewBatcher(func(g *zipkey.Group) error { + grouper = func(g *zipkey.Group) error { var ( ref, pivot *Release // ref (reference), pivot (open library) err error @@ -313,9 +313,9 @@ func ZippyVerifyRefsOpenLibrary(olr, refs io.Reader, w io.Writer) error { } } return nil - }) + } + batcher = zipkey.NewBatcherSize(grouper, 10000) ) - batcher.Size = 10000 defer batcher.Close() zipper := zipkey.New(olr, refs, keyer, batcher.GroupFunc) return zipper.Run() @@ -336,7 +336,7 @@ func ZippyBrefAugment(bref, raw io.Reader, w io.Writer) error { stats = statsAugment{} enc = json.NewEncoder(xio.NewSingleWriter(w)) keyer = makeKeyFunc("\t", 1) - batcher = zipkey.NewBatcher(func(g *zipkey.Group) error { + grouper = func(g *zipkey.Group) error { // g.G0 contains matched docs for a given work id, g.G1 all raw // refs, with the same work id. @@ -373,9 +373,9 @@ func ZippyBrefAugment(bref, raw io.Reader, w io.Writer) error { } } return nil - }) + } + batcher = zipkey.NewBatcherSize(grouper, 10000) ) - batcher.Size = 10000 defer batcher.Close() zipper := zipkey.New(bref, raw, keyer, batcher.GroupFunc) err := zipper.Run() diff --git a/skate/zipkey/batch.go b/skate/zipkey/batch.go index ebbd081..85d1d35 100644 --- a/skate/zipkey/batch.go +++ b/skate/zipkey/batch.go @@ -19,9 +19,14 @@ type Batcher struct { // NewBatcher set ups a new Batcher. func NewBatcher(gf groupFunc) *Batcher { + return NewBatcherSize(gf, 1000) +} + +// NewBatcherSize initializes a batcher with a given size. +func NewBatcherSize(gf groupFunc, size int) *Batcher { batcher := Batcher{ gf: gf, - Size: 1000, + Size: size, NumWorkers: runtime.NumCPU(), queue: make(chan []*Group), } @@ -46,7 +51,7 @@ func (b *Batcher) Close() error { } // GroupFunc implement the groupFunc type. Not thread safe. Panics if called -// after Close has been called. +// after Close. func (b *Batcher) GroupFunc(g *Group) error { if b.closing { panic("cannot call GroupFunc after Close") |