aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--skate/reduce.go14
-rw-r--r--skate/zipkey/batch.go4
2 files changed, 9 insertions, 9 deletions
diff --git a/skate/reduce.go b/skate/reduce.go
index 8161955..5bcdb37 100644
--- a/skate/reduce.go
+++ b/skate/reduce.go
@@ -31,8 +31,6 @@ import (
"github.com/segmentio/encoding/json"
)
-const defaultBatchSize = 10000
-
// groupLogf logs a message alongsize a serialized group for debugging.
func groupLogf(g *zipkey.Group, s string, vs ...interface{}) {
log.Printf(s, vs...)
@@ -83,7 +81,7 @@ func ZippyExact(releases, refs io.Reader, matchResult MatchResult, w io.Writer)
}
return nil
}
- batcher = zipkey.NewBatcherSize(grouper, defaultBatchSize) // hard-code for now; on 24 cores 10K take up over 8G of RAM
+ batcher = zipkey.NewBatcher(grouper) // hard-code for now; on 24 cores 10K take up over 8G of RAM
)
defer batcher.Close()
zipper := zipkey.New(releases, refs, keyer, batcher.GroupFunc)
@@ -133,7 +131,7 @@ func ZippyExactReleases(olr, releases io.Reader, matchResult MatchResult, w io.W
}
return nil
}
- batcher = zipkey.NewBatcherSize(grouper, defaultBatchSize)
+ batcher = zipkey.NewBatcher(grouper)
)
defer batcher.Close()
zipper := zipkey.New(olr, releases, keyer, batcher.GroupFunc)
@@ -176,7 +174,7 @@ func ZippyExactWiki(releases, wiki io.Reader, mr MatchResult, w io.Writer) error
}
return nil
}
- batcher = zipkey.NewBatcherSize(grouper, defaultBatchSize)
+ batcher = zipkey.NewBatcher(grouper)
)
defer batcher.Close()
zipper := zipkey.New(releases, wiki, keyer, batcher.GroupFunc)
@@ -220,7 +218,7 @@ func ZippyVerifyRefs(releases, refs io.Reader, w io.Writer) error {
}
return nil
}
- batcher = zipkey.NewBatcherSize(grouper, defaultBatchSize)
+ batcher = zipkey.NewBatcher(grouper)
)
defer batcher.Close()
zipper := zipkey.New(releases, refs, keyer, batcher.GroupFunc)
@@ -316,7 +314,7 @@ func ZippyVerifyRefsOpenLibrary(olr, refs io.Reader, w io.Writer) error {
}
return nil
}
- batcher = zipkey.NewBatcherSize(grouper, defaultBatchSize)
+ batcher = zipkey.NewBatcher(grouper)
)
defer batcher.Close()
zipper := zipkey.New(olr, refs, keyer, batcher.GroupFunc)
@@ -376,7 +374,7 @@ func ZippyBrefAugment(bref, raw io.Reader, w io.Writer) error {
}
return nil
}
- batcher = zipkey.NewBatcherSize(grouper, defaultBatchSize)
+ batcher = zipkey.NewBatcher(grouper)
)
defer batcher.Close()
zipper := zipkey.New(bref, raw, keyer, batcher.GroupFunc)
diff --git a/skate/zipkey/batch.go b/skate/zipkey/batch.go
index 9924b65..049c3b2 100644
--- a/skate/zipkey/batch.go
+++ b/skate/zipkey/batch.go
@@ -5,6 +5,8 @@ import (
"sync"
)
+const defaultBatchSize = 10000
+
// Batcher runs reducers in parallel on batches of groups.
type Batcher struct {
Size int
@@ -19,7 +21,7 @@ type Batcher struct {
// NewBatcher set ups a new Batcher with a batch size of 1000.
func NewBatcher(gf groupFunc) *Batcher {
- return NewBatcherSize(gf, 1000)
+ return NewBatcherSize(gf, defaultBatchSize)
}
// NewBatcherSize initializes a batcher with a given size.