aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--skate/reduce.go36
-rw-r--r--skate/zipkey/batch.go9
2 files changed, 25 insertions, 20 deletions
diff --git a/skate/reduce.go b/skate/reduce.go
index 5b30fdf..fc286d9 100644
--- a/skate/reduce.go
+++ b/skate/reduce.go
@@ -44,7 +44,7 @@ func ZippyExact(releases, refs io.Reader, matchResult MatchResult, w io.Writer)
var (
enc = json.NewEncoder(xio.NewSingleWriter(w))
keyer = makeKeyFunc("\t", 1)
- batcher = zipkey.NewBatcher(func(g *zipkey.Group) error {
+ grouper = func(g *zipkey.Group) error {
var (
target *Release
ref *Ref
@@ -80,9 +80,9 @@ func ZippyExact(releases, refs io.Reader, matchResult MatchResult, w io.Writer)
}
}
return nil
- })
+ }
+ batcher = zipkey.NewBatcherSize(grouper, 10000) // hard-code for now; on 24 cores 10K take up over 8G of RAM
)
- batcher.Size = 10000 // hard-code for now; on 24 cores 10K take up over 8G of RAM
defer batcher.Close()
zipper := zipkey.New(releases, refs, keyer, batcher.GroupFunc)
return zipper.Run()
@@ -94,7 +94,7 @@ func ZippyExactReleases(olr, releases io.Reader, matchResult MatchResult, w io.W
var (
enc = json.NewEncoder(xio.NewSingleWriter(w))
keyer = makeKeyFunc("\t", 1)
- batcher = zipkey.NewBatcher(func(g *zipkey.Group) error {
+ grouper = func(g *zipkey.Group) error {
var (
target, re *Release
err error
@@ -130,9 +130,9 @@ func ZippyExactReleases(olr, releases io.Reader, matchResult MatchResult, w io.W
}
}
return nil
- })
+ }
+ batcher = zipkey.NewBatcherSize(grouper, 10000)
)
- batcher.Size = 10000
defer batcher.Close()
zipper := zipkey.New(olr, releases, keyer, batcher.GroupFunc)
return zipper.Run()
@@ -144,7 +144,7 @@ func ZippyExactWiki(releases, wiki io.Reader, mr MatchResult, w io.Writer) error
var (
enc = json.NewEncoder(xio.NewSingleWriter(w))
keyer = makeKeyFunc("\t", 1)
- batcher = zipkey.NewBatcher(func(g *zipkey.Group) error {
+ grouper = func(g *zipkey.Group) error {
var (
target *Release
wiki *MinimalCitations
@@ -173,9 +173,9 @@ func ZippyExactWiki(releases, wiki io.Reader, mr MatchResult, w io.Writer) error
}
}
return nil
- })
+ }
+ batcher = zipkey.NewBatcherSize(grouper, 10000)
)
- batcher.Size = 10000
defer batcher.Close()
zipper := zipkey.New(releases, wiki, keyer, batcher.GroupFunc)
return zipper.Run()
@@ -188,7 +188,7 @@ func ZippyVerifyRefs(releases, refs io.Reader, w io.Writer) error {
var (
enc = json.NewEncoder(xio.NewSingleWriter(w))
keyer = makeKeyFunc("\t", 1)
- batcher = zipkey.NewBatcher(func(g *zipkey.Group) error {
+ grouper = func(g *zipkey.Group) error {
var (
re, pivot *Release
err error
@@ -217,9 +217,9 @@ func ZippyVerifyRefs(releases, refs io.Reader, w io.Writer) error {
}
}
return nil
- })
+ }
+ batcher = zipkey.NewBatcherSize(grouper, 10000)
)
- batcher.Size = 10000
defer batcher.Close()
zipper := zipkey.New(releases, refs, keyer, batcher.GroupFunc)
return zipper.Run()
@@ -273,7 +273,7 @@ func ZippyVerifyRefsOpenLibrary(olr, refs io.Reader, w io.Writer) error {
var (
enc = json.NewEncoder(xio.NewSingleWriter(w))
keyer = makeKeyFunc("\t", 1)
- batcher = zipkey.NewBatcher(func(g *zipkey.Group) error {
+ grouper = func(g *zipkey.Group) error {
var (
ref, pivot *Release // ref (reference), pivot (open library)
err error
@@ -313,9 +313,9 @@ func ZippyVerifyRefsOpenLibrary(olr, refs io.Reader, w io.Writer) error {
}
}
return nil
- })
+ }
+ batcher = zipkey.NewBatcherSize(grouper, 10000)
)
- batcher.Size = 10000
defer batcher.Close()
zipper := zipkey.New(olr, refs, keyer, batcher.GroupFunc)
return zipper.Run()
@@ -336,7 +336,7 @@ func ZippyBrefAugment(bref, raw io.Reader, w io.Writer) error {
stats = statsAugment{}
enc = json.NewEncoder(xio.NewSingleWriter(w))
keyer = makeKeyFunc("\t", 1)
- batcher = zipkey.NewBatcher(func(g *zipkey.Group) error {
+ grouper = func(g *zipkey.Group) error {
// g.G0 contains matched docs for a given work id, g.G1 all raw
// refs, with the same work id.
@@ -373,9 +373,9 @@ func ZippyBrefAugment(bref, raw io.Reader, w io.Writer) error {
}
}
return nil
- })
+ }
+ batcher = zipkey.NewBatcherSize(grouper, 10000)
)
- batcher.Size = 10000
defer batcher.Close()
zipper := zipkey.New(bref, raw, keyer, batcher.GroupFunc)
err := zipper.Run()
diff --git a/skate/zipkey/batch.go b/skate/zipkey/batch.go
index ebbd081..85d1d35 100644
--- a/skate/zipkey/batch.go
+++ b/skate/zipkey/batch.go
@@ -19,9 +19,14 @@ type Batcher struct {
// NewBatcher set ups a new Batcher.
func NewBatcher(gf groupFunc) *Batcher {
+ return NewBatcherSize(gf, 1000)
+}
+
+// NewBatcherSize initializes a batcher with a given size.
+func NewBatcherSize(gf groupFunc, size int) *Batcher {
batcher := Batcher{
gf: gf,
- Size: 1000,
+ Size: size,
NumWorkers: runtime.NumCPU(),
queue: make(chan []*Group),
}
@@ -46,7 +51,7 @@ func (b *Batcher) Close() error {
}
// GroupFunc implement the groupFunc type. Not thread safe. Panics if called
-// after Close has been called.
+// after Close.
func (b *Batcher) GroupFunc(g *Group) error {
if b.closing {
panic("cannot call GroupFunc after Close")