aboutsummaryrefslogtreecommitdiffstats
path: root/skate/zipkey
diff options
context:
space:
mode:
Diffstat (limited to 'skate/zipkey')
-rw-r--r--skate/zipkey/batch.go9
-rw-r--r--skate/zipkey/zipkey.go2
2 files changed, 7 insertions, 4 deletions
diff --git a/skate/zipkey/batch.go b/skate/zipkey/batch.go
index 56f6f0d..e8ba052 100644
--- a/skate/zipkey/batch.go
+++ b/skate/zipkey/batch.go
@@ -10,7 +10,9 @@ const (
maxGroupSize = 500 // short circuiting threshold
)
-// Batcher runs reducers in parallel on batches of groups.
+// Batcher runs reducers (over groups) in parallel on batches. This can have a
+// significant impact on runtime, e.g. for certain datasets and operations we
+// saw a reduction of 80% in processing time.
type Batcher struct {
Size int
NumWorkers int
@@ -27,7 +29,8 @@ func NewBatcher(gf groupFunc) *Batcher {
return NewBatcherSize(gf, defaultBatchSize)
}
-// NewBatcherSize initializes a batcher with a given size.
+// NewBatcherSize initializes a batcher with a given function to apply and a
+// batch size.
func NewBatcherSize(gf groupFunc, size int) *Batcher {
batcher := Batcher{
gf: gf,
@@ -60,7 +63,7 @@ func (b *Batcher) Close() error {
// groupFunc before. Not thread safe. Panics if called after Close.
func (b *Batcher) GroupFunc(g *Group) error {
if b.closing {
- panic("cannot call GroupFunc after Close")
+ panic("batcher: cannot call GroupFunc after Close")
}
b.batch = append(b.batch, g)
// A few groups have an extended size, e.g. thousands of members. We short
diff --git a/skate/zipkey/zipkey.go b/skate/zipkey/zipkey.go
index 3e8a133..ffd33fe 100644
--- a/skate/zipkey/zipkey.go
+++ b/skate/zipkey/zipkey.go
@@ -32,7 +32,7 @@ type ZipRun struct {
sep byte
}
-// New create a new ready to run ZipRun value.
+// New creates a new ready to run ZipRun value.
func New(r0, r1 io.Reader, kf keyFunc, gf groupFunc) *ZipRun {
return &ZipRun{
r0: bufio.NewReader(r0),