aboutsummaryrefslogtreecommitdiffstats
path: root/skate/zipkey
diff options
context:
space:
mode:
authorMartin Czygan <martin.czygan@gmail.com>2021-07-07 23:34:28 +0200
committerMartin Czygan <martin.czygan@gmail.com>2021-07-07 23:34:28 +0200
commit7a5fbfc41c8c71576e4788c7ba891979c6f5f1a8 (patch)
treed72f36377a2362104deb71f342172812459b2347 /skate/zipkey
parent9b089b324d48e6c5d02d7f70adb585cde263f1e4 (diff)
parent9ea69942a54f1c2e13f058ba35279af3612add1b (diff)
downloadrefcat-7a5fbfc41c8c71576e4788c7ba891979c6f5f1a8.tar.gz
refcat-7a5fbfc41c8c71576e4788c7ba891979c6f5f1a8.zip
fix merge conflict
Diffstat (limited to 'skate/zipkey')
-rw-r--r--skate/zipkey/batch.go19
-rw-r--r--skate/zipkey/zipkey.go22
2 files changed, 23 insertions, 18 deletions
diff --git a/skate/zipkey/batch.go b/skate/zipkey/batch.go
index c31909c..ebbd081 100644
--- a/skate/zipkey/batch.go
+++ b/skate/zipkey/batch.go
@@ -14,7 +14,7 @@ type Batcher struct {
queue chan []*Group
wg sync.WaitGroup
err error
- closing bool
+ closing bool // https://stackoverflow.com/q/16105325/89391
}
// NewBatcher set ups a new Batcher.
@@ -27,11 +27,13 @@ func NewBatcher(gf groupFunc) *Batcher {
}
for i := 0; i < batcher.NumWorkers; i++ {
batcher.wg.Add(1)
- go batcher.worker(batcher.queue, &batcher.wg)
+ go batcher.worker()
}
return &batcher
}
+// Close tears down the batcher. If this is not called, you get goroutine leaks
+// and will miss the data from the last uncommitted batch.
func (b *Batcher) Close() error {
b.closing = true
g := make([]*Group, len(b.batch))
@@ -43,7 +45,8 @@ func (b *Batcher) Close() error {
return b.err
}
-// GroupFunc implement the groupFunc type. Not thread safe.
+// GroupFunc implement the groupFunc type. Not thread safe. Panics if called
+// after Close has been called.
func (b *Batcher) GroupFunc(g *Group) error {
if b.closing {
panic("cannot call GroupFunc after Close")
@@ -58,11 +61,13 @@ func (b *Batcher) GroupFunc(g *Group) error {
return nil
}
-// worker will wind down after a first error encountered.
-func (b *Batcher) worker(queue chan []*Group, wg *sync.WaitGroup) {
- defer wg.Done()
+// worker will wind down after any error has been encountered. Multiple threads
+// may set the error, but we currently only care whether the error is nil or
+// not.
+func (b *Batcher) worker() {
+ defer b.wg.Done()
OUTER:
- for batch := range queue {
+ for batch := range b.queue {
for _, g := range batch {
if err := b.gf(g); err != nil {
b.err = err
diff --git a/skate/zipkey/zipkey.go b/skate/zipkey/zipkey.go
index 3805535..3e8a133 100644
--- a/skate/zipkey/zipkey.go
+++ b/skate/zipkey/zipkey.go
@@ -45,16 +45,16 @@ func New(r0, r1 io.Reader, kf keyFunc, gf groupFunc) *ZipRun {
// Run starts reading from both readers. The process stops, if one reader is
// exhausted or reads from any reader fail.
-func (c *ZipRun) Run() error {
+func (z *ZipRun) Run() error {
var (
k0, k1, c0, c1 string // key: k0, k1; current line: c0, c1
done bool
err error
lineKey = func(r *bufio.Reader) (line, key string, err error) {
- if line, err = r.ReadString(c.sep); err != nil {
+ if line, err = r.ReadString(z.sep); err != nil {
return
}
- key, err = c.kf(line)
+ key, err = z.kf(line)
return
}
)
@@ -65,7 +65,7 @@ func (c *ZipRun) Run() error {
switch {
case k0 == "" || k0 < k1:
for k0 == "" || k0 < k1 {
- c0, k0, err = lineKey(c.r0)
+ c0, k0, err = lineKey(z.r0)
if err == io.EOF {
return nil
}
@@ -75,7 +75,7 @@ func (c *ZipRun) Run() error {
}
case k1 == "" || k0 > k1:
for k1 == "" || k0 > k1 {
- c1, k1, err = lineKey(c.r1)
+ c1, k1, err = lineKey(z.r1)
if err == io.EOF {
return nil
}
@@ -90,7 +90,7 @@ func (c *ZipRun) Run() error {
G1: []string{c1},
}
for {
- c0, err = c.r0.ReadString(c.sep)
+ c0, err = z.r0.ReadString(z.sep)
if err == io.EOF {
done = true
break
@@ -98,7 +98,7 @@ func (c *ZipRun) Run() error {
if err != nil {
return err
}
- k, err := c.kf(c0)
+ k, err := z.kf(c0)
if err != nil {
return err
}
@@ -111,7 +111,7 @@ func (c *ZipRun) Run() error {
}
}
for {
- c1, err = c.r1.ReadString(c.sep)
+ c1, err = z.r1.ReadString(z.sep)
if err == io.EOF {
done = true
break
@@ -119,7 +119,7 @@ func (c *ZipRun) Run() error {
if err != nil {
return err
}
- k, err := c.kf(c1)
+ k, err := z.kf(c1)
if err != nil {
return err
}
@@ -131,8 +131,8 @@ func (c *ZipRun) Run() error {
break
}
}
- if c.gf != nil {
- if err := c.gf(g); err != nil {
+ if z.gf != nil {
+ if err := z.gf(g); err != nil {
return err
}
}