From 09a7e8c9d013f13a1aa1ef4e9b7f397647b79967 Mon Sep 17 00:00:00 2001 From: Martin Czygan Date: Sun, 21 Mar 2021 01:17:38 +0100 Subject: initial import of skate --- skate/parallel/processor.go | 165 +++++++++++++++++++++++++++++++++++++++ skate/parallel/processor_test.go | 120 ++++++++++++++++++++++++++++ 2 files changed, 285 insertions(+) create mode 100644 skate/parallel/processor.go create mode 100644 skate/parallel/processor_test.go (limited to 'skate/parallel') diff --git a/skate/parallel/processor.go b/skate/parallel/processor.go new file mode 100644 index 0000000..37d5643 --- /dev/null +++ b/skate/parallel/processor.go @@ -0,0 +1,165 @@ +// Package parallel implements helpers for fast processing of line oriented inputs. +package parallel + +import ( + "bufio" + "bytes" + "io" + "log" + "runtime" + "sync" + "time" +) + +// BytesBatch is a slice of byte slices. +type BytesBatch struct { + b [][]byte +} + +// NewBytesBatch creates a new BytesBatch with a given capacity. +func NewBytesBatch() *BytesBatch { + return NewBytesBatchCapacity(0) +} + +// NewBytesBatchCapacity creates a new BytesBatch with a given capacity. +func NewBytesBatchCapacity(cap int) *BytesBatch { + return &BytesBatch{b: make([][]byte, 0, cap)} +} + +// Add adds an element to the batch. +func (bb *BytesBatch) Add(b []byte) { + bb.b = append(bb.b, b) +} + +// Reset empties this batch. +func (bb *BytesBatch) Reset() { + bb.b = nil +} + +// Size returns the number of elements in the batch. +func (bb *BytesBatch) Size() int { + return len(bb.b) +} + +// Slice returns a slice of byte slices. +func (bb *BytesBatch) Slice() [][]byte { + b := make([][]byte, len(bb.b)) + for i := 0; i < len(bb.b); i++ { + b[i] = bb.b[i] + } + return b +} + +// Processor can process lines in parallel. +type Processor struct { + BatchSize int + RecordSeparator byte + NumWorkers int + SkipEmptyLines bool + Verbose bool + LogFunc func() + r io.Reader + w io.Writer + f func([]byte) ([]byte, error) +} + +// NewProcessor creates a new line processor. +func NewProcessor(r io.Reader, w io.Writer, f func([]byte) ([]byte, error)) *Processor { + return &Processor{ + BatchSize: 10000, + RecordSeparator: '\n', + NumWorkers: runtime.NumCPU(), + SkipEmptyLines: true, + r: r, + w: w, + f: f, + } +} + +// Run starts the workers, crunching through the input. +func (p *Processor) Run() error { + // wErr signals a worker or writer error. If an error occurs, the items in + // the queue are still process, just no items are added to the queue. There + // is only one way to toggle this, from false to true, so we don't care + // about race conditions here. + var wErr error + + worker := func(queue chan [][]byte, out chan []byte, f func([]byte) ([]byte, error), wg *sync.WaitGroup) { + defer wg.Done() + for batch := range queue { + for _, b := range batch { + r, err := f(b) + if err != nil { + wErr = err + } + out <- r + } + } + } + writer := func(w io.Writer, bc chan []byte, done chan bool) { + bw := bufio.NewWriter(w) + for b := range bc { + if _, err := bw.Write(b); err != nil { + wErr = err + } + } + if err := bw.Flush(); err != nil { + wErr = err + } + done <- true + } + var ( + queue = make(chan [][]byte) + out = make(chan []byte) + done = make(chan bool) + total int64 + started = time.Now() + wg sync.WaitGroup + batch = NewBytesBatchCapacity(p.BatchSize) + br = bufio.NewReader(p.r) + ) + go writer(p.w, out, done) + for i := 0; i < p.NumWorkers; i++ { + wg.Add(1) + go worker(queue, out, p.f, &wg) + } + for { + b, err := br.ReadBytes(p.RecordSeparator) + if err == io.EOF { + break + } + if err != nil { + return err + } + if len(bytes.TrimSpace(b)) == 0 && p.SkipEmptyLines { + continue + } + batch.Add(b) + if batch.Size() == p.BatchSize { + total += int64(p.BatchSize) + // To avoid checking on each loop, we only check for worker or + // write errors here. + if wErr != nil { + break + } + queue <- batch.Slice() + batch.Reset() + if p.Verbose { + log.Printf("dispatched %d lines (%0.2f lines/s)", + total, float64(total)/time.Since(started).Seconds()) + if p.LogFunc != nil { + p.LogFunc() + } + } + } + } + queue <- batch.Slice() + batch.Reset() + + close(queue) + wg.Wait() + close(out) + <-done + + return wErr +} diff --git a/skate/parallel/processor_test.go b/skate/parallel/processor_test.go new file mode 100644 index 0000000..ee4a14f --- /dev/null +++ b/skate/parallel/processor_test.go @@ -0,0 +1,120 @@ +package parallel + +import ( + "bytes" + "errors" + "io" + "strings" + "testing" +) + +var errFake1 = errors.New("fake error #1") + +func StringSliceContains(sl []string, s string) bool { + for _, v := range sl { + if s == v { + return true + } + } + return false +} + +// LinesEqualSeparator returns true, if every line in a, when separated by +// separator, can be found in b. +func LinesEqualSeparator(a, b, sep string) bool { + al := strings.Split(a, sep) + bl := strings.Split(b, sep) + if len(al) != len(bl) { + return false + } + for _, line := range al { + if !StringSliceContains(bl, line) { + return false + } + } + return true +} + +// LinesEqual returns true, if every line in a, when separated by a newline, can be found in b. +func LinesEqual(a, b string) bool { + return LinesEqualSeparator(a, b, "\n") +} + +func TestSimple(t *testing.T) { + var cases = []struct { + about string + r io.Reader + expected string + f func([]byte) ([]byte, error) + err error + }{ + { + about: `No input produces no output.`, + r: strings.NewReader(""), + expected: "", + f: func(b []byte) ([]byte, error) { return []byte{}, nil }, + err: nil, + }, + { + about: `Order is not guaranteed.`, + r: strings.NewReader("a\nb\n"), + expected: "B\nA\n", + f: func(b []byte) ([]byte, error) { return bytes.ToUpper(b), nil }, + err: nil, + }, + { + about: `Like grep, we can filter out items by returning nothing.`, + r: strings.NewReader("a\nb\n"), + expected: "B\n", + f: func(b []byte) ([]byte, error) { + if strings.TrimSpace(string(b)) == "a" { + return []byte{}, nil + } + return bytes.ToUpper(b), nil + }, + err: nil, + }, + { + about: `Empty lines skipped.`, + r: strings.NewReader("a\na\na\na\n\n\nb\n"), + expected: "B\n", + f: func(b []byte) ([]byte, error) { + if strings.TrimSpace(string(b)) == "a" { + return []byte{}, nil + } + return bytes.ToUpper(b), nil + }, + err: nil, + }, + { + about: `On empty input, the transformer func is never called.`, + r: strings.NewReader(""), + expected: "", + f: func(b []byte) ([]byte, error) { + return nil, errFake1 + }, + err: nil, + }, + { + about: `Error does not come through, if all lines are skipped.`, + r: strings.NewReader("\n"), + expected: "", + f: func(b []byte) ([]byte, error) { + return nil, errFake1 + }, + err: nil, + }, + } + + for _, c := range cases { + var buf bytes.Buffer + p := NewProcessor(c.r, &buf, c.f) + err := p.Run() + if err != c.err { + t.Errorf("p.Run: got %v, want %v", err, c.err) + } + if !LinesEqual(buf.String(), c.expected) { + t.Errorf("p.Run: got %v, want %v", buf.String(), c.expected) + } + } +} -- cgit v1.2.3