diff options
author | Martin Czygan <martin.czygan@gmail.com> | 2021-07-07 23:34:28 +0200 |
---|---|---|
committer | Martin Czygan <martin.czygan@gmail.com> | 2021-07-07 23:34:28 +0200 |
commit | 7a5fbfc41c8c71576e4788c7ba891979c6f5f1a8 (patch) | |
tree | d72f36377a2362104deb71f342172812459b2347 /skate/zipkey | |
parent | 9b089b324d48e6c5d02d7f70adb585cde263f1e4 (diff) | |
parent | 9ea69942a54f1c2e13f058ba35279af3612add1b (diff) | |
download | refcat-7a5fbfc41c8c71576e4788c7ba891979c6f5f1a8.tar.gz refcat-7a5fbfc41c8c71576e4788c7ba891979c6f5f1a8.zip |
fix merge conflict
Diffstat (limited to 'skate/zipkey')
-rw-r--r-- | skate/zipkey/batch.go | 19 | ||||
-rw-r--r-- | skate/zipkey/zipkey.go | 22 |
2 files changed, 23 insertions, 18 deletions
diff --git a/skate/zipkey/batch.go b/skate/zipkey/batch.go index c31909c..ebbd081 100644 --- a/skate/zipkey/batch.go +++ b/skate/zipkey/batch.go @@ -14,7 +14,7 @@ type Batcher struct { queue chan []*Group wg sync.WaitGroup err error - closing bool + closing bool // https://stackoverflow.com/q/16105325/89391 } // NewBatcher set ups a new Batcher. @@ -27,11 +27,13 @@ func NewBatcher(gf groupFunc) *Batcher { } for i := 0; i < batcher.NumWorkers; i++ { batcher.wg.Add(1) - go batcher.worker(batcher.queue, &batcher.wg) + go batcher.worker() } return &batcher } +// Close tears down the batcher. If this is not called, you get goroutine leaks +// and will miss the data from the last uncommitted batch. func (b *Batcher) Close() error { b.closing = true g := make([]*Group, len(b.batch)) @@ -43,7 +45,8 @@ func (b *Batcher) Close() error { return b.err } -// GroupFunc implement the groupFunc type. Not thread safe. +// GroupFunc implement the groupFunc type. Not thread safe. Panics if called +// after Close has been called. func (b *Batcher) GroupFunc(g *Group) error { if b.closing { panic("cannot call GroupFunc after Close") @@ -58,11 +61,13 @@ func (b *Batcher) GroupFunc(g *Group) error { return nil } -// worker will wind down after a first error encountered. -func (b *Batcher) worker(queue chan []*Group, wg *sync.WaitGroup) { - defer wg.Done() +// worker will wind down after any error has been encountered. Multiple threads +// may set the error, but we currently only care whether the error is nil or +// not. +func (b *Batcher) worker() { + defer b.wg.Done() OUTER: - for batch := range queue { + for batch := range b.queue { for _, g := range batch { if err := b.gf(g); err != nil { b.err = err diff --git a/skate/zipkey/zipkey.go b/skate/zipkey/zipkey.go index 3805535..3e8a133 100644 --- a/skate/zipkey/zipkey.go +++ b/skate/zipkey/zipkey.go @@ -45,16 +45,16 @@ func New(r0, r1 io.Reader, kf keyFunc, gf groupFunc) *ZipRun { // Run starts reading from both readers. The process stops, if one reader is // exhausted or reads from any reader fail. -func (c *ZipRun) Run() error { +func (z *ZipRun) Run() error { var ( k0, k1, c0, c1 string // key: k0, k1; current line: c0, c1 done bool err error lineKey = func(r *bufio.Reader) (line, key string, err error) { - if line, err = r.ReadString(c.sep); err != nil { + if line, err = r.ReadString(z.sep); err != nil { return } - key, err = c.kf(line) + key, err = z.kf(line) return } ) @@ -65,7 +65,7 @@ func (c *ZipRun) Run() error { switch { case k0 == "" || k0 < k1: for k0 == "" || k0 < k1 { - c0, k0, err = lineKey(c.r0) + c0, k0, err = lineKey(z.r0) if err == io.EOF { return nil } @@ -75,7 +75,7 @@ func (c *ZipRun) Run() error { } case k1 == "" || k0 > k1: for k1 == "" || k0 > k1 { - c1, k1, err = lineKey(c.r1) + c1, k1, err = lineKey(z.r1) if err == io.EOF { return nil } @@ -90,7 +90,7 @@ func (c *ZipRun) Run() error { G1: []string{c1}, } for { - c0, err = c.r0.ReadString(c.sep) + c0, err = z.r0.ReadString(z.sep) if err == io.EOF { done = true break @@ -98,7 +98,7 @@ func (c *ZipRun) Run() error { if err != nil { return err } - k, err := c.kf(c0) + k, err := z.kf(c0) if err != nil { return err } @@ -111,7 +111,7 @@ func (c *ZipRun) Run() error { } } for { - c1, err = c.r1.ReadString(c.sep) + c1, err = z.r1.ReadString(z.sep) if err == io.EOF { done = true break @@ -119,7 +119,7 @@ func (c *ZipRun) Run() error { if err != nil { return err } - k, err := c.kf(c1) + k, err := z.kf(c1) if err != nil { return err } @@ -131,8 +131,8 @@ func (c *ZipRun) Run() error { break } } - if c.gf != nil { - if err := c.gf(g); err != nil { + if z.gf != nil { + if err := z.gf(g); err != nil { return err } } |