diff options
author | Martin Czygan <martin.czygan@gmail.com> | 2021-07-09 18:08:04 +0200 |
---|---|---|
committer | Martin Czygan <martin.czygan@gmail.com> | 2021-07-09 18:08:04 +0200 |
commit | 4df87f9381ce73074abdb1b00635a342f2abadfd (patch) | |
tree | 48a43598f5231381fb08919483e992e72fa3856f /skate | |
parent | 6fa7259028723ca4eb2d2aa62cc21032bee66e6f (diff) | |
download | refcat-4df87f9381ce73074abdb1b00635a342f2abadfd.tar.gz refcat-4df87f9381ce73074abdb1b00635a342f2abadfd.zip |
reduce: move batch size
Diffstat (limited to 'skate')
-rw-r--r-- | skate/reduce.go | 14 | ||||
-rw-r--r-- | skate/zipkey/batch.go | 4 |
2 files changed, 9 insertions, 9 deletions
diff --git a/skate/reduce.go b/skate/reduce.go index 8161955..5bcdb37 100644 --- a/skate/reduce.go +++ b/skate/reduce.go @@ -31,8 +31,6 @@ import ( "github.com/segmentio/encoding/json" ) -const defaultBatchSize = 10000 - // groupLogf logs a message alongsize a serialized group for debugging. func groupLogf(g *zipkey.Group, s string, vs ...interface{}) { log.Printf(s, vs...) @@ -83,7 +81,7 @@ func ZippyExact(releases, refs io.Reader, matchResult MatchResult, w io.Writer) } return nil } - batcher = zipkey.NewBatcherSize(grouper, defaultBatchSize) // hard-code for now; on 24 cores 10K take up over 8G of RAM + batcher = zipkey.NewBatcher(grouper) // hard-code for now; on 24 cores 10K take up over 8G of RAM ) defer batcher.Close() zipper := zipkey.New(releases, refs, keyer, batcher.GroupFunc) @@ -133,7 +131,7 @@ func ZippyExactReleases(olr, releases io.Reader, matchResult MatchResult, w io.W } return nil } - batcher = zipkey.NewBatcherSize(grouper, defaultBatchSize) + batcher = zipkey.NewBatcher(grouper) ) defer batcher.Close() zipper := zipkey.New(olr, releases, keyer, batcher.GroupFunc) @@ -176,7 +174,7 @@ func ZippyExactWiki(releases, wiki io.Reader, mr MatchResult, w io.Writer) error } return nil } - batcher = zipkey.NewBatcherSize(grouper, defaultBatchSize) + batcher = zipkey.NewBatcher(grouper) ) defer batcher.Close() zipper := zipkey.New(releases, wiki, keyer, batcher.GroupFunc) @@ -220,7 +218,7 @@ func ZippyVerifyRefs(releases, refs io.Reader, w io.Writer) error { } return nil } - batcher = zipkey.NewBatcherSize(grouper, defaultBatchSize) + batcher = zipkey.NewBatcher(grouper) ) defer batcher.Close() zipper := zipkey.New(releases, refs, keyer, batcher.GroupFunc) @@ -316,7 +314,7 @@ func ZippyVerifyRefsOpenLibrary(olr, refs io.Reader, w io.Writer) error { } return nil } - batcher = zipkey.NewBatcherSize(grouper, defaultBatchSize) + batcher = zipkey.NewBatcher(grouper) ) defer batcher.Close() zipper := zipkey.New(olr, refs, keyer, batcher.GroupFunc) @@ -376,7 +374,7 @@ func ZippyBrefAugment(bref, raw io.Reader, w io.Writer) error { } return nil } - batcher = zipkey.NewBatcherSize(grouper, defaultBatchSize) + batcher = zipkey.NewBatcher(grouper) ) defer batcher.Close() zipper := zipkey.New(bref, raw, keyer, batcher.GroupFunc) diff --git a/skate/zipkey/batch.go b/skate/zipkey/batch.go index 9924b65..049c3b2 100644 --- a/skate/zipkey/batch.go +++ b/skate/zipkey/batch.go @@ -5,6 +5,8 @@ import ( "sync" ) +const defaultBatchSize = 10000 + // Batcher runs reducers in parallel on batches of groups. type Batcher struct { Size int @@ -19,7 +21,7 @@ type Batcher struct { // NewBatcher set ups a new Batcher with a batch size of 1000. func NewBatcher(gf groupFunc) *Batcher { - return NewBatcherSize(gf, 1000) + return NewBatcherSize(gf, defaultBatchSize) } // NewBatcherSize initializes a batcher with a given size. |