From f33ef23e0889430016cf8363e914b73313758714 Mon Sep 17 00:00:00 2001 From: Martin Czygan Date: Mon, 5 Jul 2021 23:03:35 +0200 Subject: use canonical variable names --- skate/zipkey/zipkey.go | 22 +++++++++++----------- 1 file changed, 11 insertions(+), 11 deletions(-) (limited to 'skate/zipkey') diff --git a/skate/zipkey/zipkey.go b/skate/zipkey/zipkey.go index 3805535..3e8a133 100644 --- a/skate/zipkey/zipkey.go +++ b/skate/zipkey/zipkey.go @@ -45,16 +45,16 @@ func New(r0, r1 io.Reader, kf keyFunc, gf groupFunc) *ZipRun { // Run starts reading from both readers. The process stops, if one reader is // exhausted or reads from any reader fail. -func (c *ZipRun) Run() error { +func (z *ZipRun) Run() error { var ( k0, k1, c0, c1 string // key: k0, k1; current line: c0, c1 done bool err error lineKey = func(r *bufio.Reader) (line, key string, err error) { - if line, err = r.ReadString(c.sep); err != nil { + if line, err = r.ReadString(z.sep); err != nil { return } - key, err = c.kf(line) + key, err = z.kf(line) return } ) @@ -65,7 +65,7 @@ func (c *ZipRun) Run() error { switch { case k0 == "" || k0 < k1: for k0 == "" || k0 < k1 { - c0, k0, err = lineKey(c.r0) + c0, k0, err = lineKey(z.r0) if err == io.EOF { return nil } @@ -75,7 +75,7 @@ func (c *ZipRun) Run() error { } case k1 == "" || k0 > k1: for k1 == "" || k0 > k1 { - c1, k1, err = lineKey(c.r1) + c1, k1, err = lineKey(z.r1) if err == io.EOF { return nil } @@ -90,7 +90,7 @@ func (c *ZipRun) Run() error { G1: []string{c1}, } for { - c0, err = c.r0.ReadString(c.sep) + c0, err = z.r0.ReadString(z.sep) if err == io.EOF { done = true break @@ -98,7 +98,7 @@ func (c *ZipRun) Run() error { if err != nil { return err } - k, err := c.kf(c0) + k, err := z.kf(c0) if err != nil { return err } @@ -111,7 +111,7 @@ func (c *ZipRun) Run() error { } } for { - c1, err = c.r1.ReadString(c.sep) + c1, err = z.r1.ReadString(z.sep) if err == io.EOF { done = true break @@ -119,7 +119,7 @@ func (c *ZipRun) Run() error { if err != nil { return err } - k, err := c.kf(c1) + k, err := z.kf(c1) if err != nil { return err } @@ -131,8 +131,8 @@ func (c *ZipRun) Run() error { break } } - if c.gf != nil { - if err := c.gf(g); err != nil { + if z.gf != nil { + if err := z.gf(g); err != nil { return err } } -- cgit v1.2.3 From 2f6102a0c0fba658ef664f44af5e65b007930033 Mon Sep 17 00:00:00 2001 From: Martin Czygan Date: Mon, 5 Jul 2021 23:54:50 +0200 Subject: batcher: do not pass struct fields --- skate/zipkey/batch.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) (limited to 'skate/zipkey') diff --git a/skate/zipkey/batch.go b/skate/zipkey/batch.go index c31909c..9e52f90 100644 --- a/skate/zipkey/batch.go +++ b/skate/zipkey/batch.go @@ -27,7 +27,7 @@ func NewBatcher(gf groupFunc) *Batcher { } for i := 0; i < batcher.NumWorkers; i++ { batcher.wg.Add(1) - go batcher.worker(batcher.queue, &batcher.wg) + go batcher.worker() } return &batcher } @@ -59,10 +59,10 @@ func (b *Batcher) GroupFunc(g *Group) error { } // worker will wind down after a first error encountered. -func (b *Batcher) worker(queue chan []*Group, wg *sync.WaitGroup) { - defer wg.Done() +func (b *Batcher) worker() { + defer b.wg.Done() OUTER: - for batch := range queue { + for batch := range b.queue { for _, g := range batch { if err := b.gf(g); err != nil { b.err = err -- cgit v1.2.3 From 9ea69942a54f1c2e13f058ba35279af3612add1b Mon Sep 17 00:00:00 2001 From: Martin Czygan Date: Wed, 7 Jul 2021 22:36:16 +0200 Subject: update docs --- skate/zipkey/batch.go | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) (limited to 'skate/zipkey') diff --git a/skate/zipkey/batch.go b/skate/zipkey/batch.go index 9e52f90..ebbd081 100644 --- a/skate/zipkey/batch.go +++ b/skate/zipkey/batch.go @@ -14,7 +14,7 @@ type Batcher struct { queue chan []*Group wg sync.WaitGroup err error - closing bool + closing bool // https://stackoverflow.com/q/16105325/89391 } // NewBatcher set ups a new Batcher. @@ -32,6 +32,8 @@ func NewBatcher(gf groupFunc) *Batcher { return &batcher } +// Close tears down the batcher. If this is not called, you get goroutine leaks +// and will miss the data from the last uncommitted batch. func (b *Batcher) Close() error { b.closing = true g := make([]*Group, len(b.batch)) @@ -43,7 +45,8 @@ func (b *Batcher) Close() error { return b.err } -// GroupFunc implement the groupFunc type. Not thread safe. +// GroupFunc implement the groupFunc type. Not thread safe. Panics if called +// after Close has been called. func (b *Batcher) GroupFunc(g *Group) error { if b.closing { panic("cannot call GroupFunc after Close") @@ -58,7 +61,9 @@ func (b *Batcher) GroupFunc(g *Group) error { return nil } -// worker will wind down after a first error encountered. +// worker will wind down after any error has been encountered. Multiple threads +// may set the error, but we currently only care whether the error is nil or +// not. func (b *Batcher) worker() { defer b.wg.Done() OUTER: -- cgit v1.2.3