aboutsummaryrefslogtreecommitdiffstats
path: root/skate/parallel
diff options
context:
space:
mode:
Diffstat (limited to 'skate/parallel')
-rw-r--r--skate/parallel/processor.go165
-rw-r--r--skate/parallel/processor_test.go120
2 files changed, 285 insertions, 0 deletions
diff --git a/skate/parallel/processor.go b/skate/parallel/processor.go
new file mode 100644
index 0000000..37d5643
--- /dev/null
+++ b/skate/parallel/processor.go
@@ -0,0 +1,165 @@
+// Package parallel implements helpers for fast processing of line oriented inputs.
+package parallel
+
+import (
+ "bufio"
+ "bytes"
+ "io"
+ "log"
+ "runtime"
+ "sync"
+ "time"
+)
+
+// BytesBatch is a slice of byte slices.
+type BytesBatch struct {
+ b [][]byte
+}
+
+// NewBytesBatch creates a new BytesBatch with a given capacity.
+func NewBytesBatch() *BytesBatch {
+ return NewBytesBatchCapacity(0)
+}
+
+// NewBytesBatchCapacity creates a new BytesBatch with a given capacity.
+func NewBytesBatchCapacity(cap int) *BytesBatch {
+ return &BytesBatch{b: make([][]byte, 0, cap)}
+}
+
+// Add adds an element to the batch.
+func (bb *BytesBatch) Add(b []byte) {
+ bb.b = append(bb.b, b)
+}
+
+// Reset empties this batch.
+func (bb *BytesBatch) Reset() {
+ bb.b = nil
+}
+
+// Size returns the number of elements in the batch.
+func (bb *BytesBatch) Size() int {
+ return len(bb.b)
+}
+
+// Slice returns a slice of byte slices.
+func (bb *BytesBatch) Slice() [][]byte {
+ b := make([][]byte, len(bb.b))
+ for i := 0; i < len(bb.b); i++ {
+ b[i] = bb.b[i]
+ }
+ return b
+}
+
+// Processor can process lines in parallel.
+type Processor struct {
+ BatchSize int
+ RecordSeparator byte
+ NumWorkers int
+ SkipEmptyLines bool
+ Verbose bool
+ LogFunc func()
+ r io.Reader
+ w io.Writer
+ f func([]byte) ([]byte, error)
+}
+
+// NewProcessor creates a new line processor.
+func NewProcessor(r io.Reader, w io.Writer, f func([]byte) ([]byte, error)) *Processor {
+ return &Processor{
+ BatchSize: 10000,
+ RecordSeparator: '\n',
+ NumWorkers: runtime.NumCPU(),
+ SkipEmptyLines: true,
+ r: r,
+ w: w,
+ f: f,
+ }
+}
+
+// Run starts the workers, crunching through the input.
+func (p *Processor) Run() error {
+ // wErr signals a worker or writer error. If an error occurs, the items in
+ // the queue are still process, just no items are added to the queue. There
+ // is only one way to toggle this, from false to true, so we don't care
+ // about race conditions here.
+ var wErr error
+
+ worker := func(queue chan [][]byte, out chan []byte, f func([]byte) ([]byte, error), wg *sync.WaitGroup) {
+ defer wg.Done()
+ for batch := range queue {
+ for _, b := range batch {
+ r, err := f(b)
+ if err != nil {
+ wErr = err
+ }
+ out <- r
+ }
+ }
+ }
+ writer := func(w io.Writer, bc chan []byte, done chan bool) {
+ bw := bufio.NewWriter(w)
+ for b := range bc {
+ if _, err := bw.Write(b); err != nil {
+ wErr = err
+ }
+ }
+ if err := bw.Flush(); err != nil {
+ wErr = err
+ }
+ done <- true
+ }
+ var (
+ queue = make(chan [][]byte)
+ out = make(chan []byte)
+ done = make(chan bool)
+ total int64
+ started = time.Now()
+ wg sync.WaitGroup
+ batch = NewBytesBatchCapacity(p.BatchSize)
+ br = bufio.NewReader(p.r)
+ )
+ go writer(p.w, out, done)
+ for i := 0; i < p.NumWorkers; i++ {
+ wg.Add(1)
+ go worker(queue, out, p.f, &wg)
+ }
+ for {
+ b, err := br.ReadBytes(p.RecordSeparator)
+ if err == io.EOF {
+ break
+ }
+ if err != nil {
+ return err
+ }
+ if len(bytes.TrimSpace(b)) == 0 && p.SkipEmptyLines {
+ continue
+ }
+ batch.Add(b)
+ if batch.Size() == p.BatchSize {
+ total += int64(p.BatchSize)
+ // To avoid checking on each loop, we only check for worker or
+ // write errors here.
+ if wErr != nil {
+ break
+ }
+ queue <- batch.Slice()
+ batch.Reset()
+ if p.Verbose {
+ log.Printf("dispatched %d lines (%0.2f lines/s)",
+ total, float64(total)/time.Since(started).Seconds())
+ if p.LogFunc != nil {
+ p.LogFunc()
+ }
+ }
+ }
+ }
+ queue <- batch.Slice()
+ batch.Reset()
+
+ close(queue)
+ wg.Wait()
+ close(out)
+ <-done
+
+ return wErr
+}
diff --git a/skate/parallel/processor_test.go b/skate/parallel/processor_test.go
new file mode 100644
index 0000000..ee4a14f
--- /dev/null
+++ b/skate/parallel/processor_test.go
@@ -0,0 +1,120 @@
+package parallel
+
+import (
+ "bytes"
+ "errors"
+ "io"
+ "strings"
+ "testing"
+)
+
+var errFake1 = errors.New("fake error #1")
+
+func StringSliceContains(sl []string, s string) bool {
+ for _, v := range sl {
+ if s == v {
+ return true
+ }
+ }
+ return false
+}
+
+// LinesEqualSeparator returns true, if every line in a, when separated by
+// separator, can be found in b.
+func LinesEqualSeparator(a, b, sep string) bool {
+ al := strings.Split(a, sep)
+ bl := strings.Split(b, sep)
+ if len(al) != len(bl) {
+ return false
+ }
+ for _, line := range al {
+ if !StringSliceContains(bl, line) {
+ return false
+ }
+ }
+ return true
+}
+
+// LinesEqual returns true, if every line in a, when separated by a newline, can be found in b.
+func LinesEqual(a, b string) bool {
+ return LinesEqualSeparator(a, b, "\n")
+}
+
+func TestSimple(t *testing.T) {
+ var cases = []struct {
+ about string
+ r io.Reader
+ expected string
+ f func([]byte) ([]byte, error)
+ err error
+ }{
+ {
+ about: `No input produces no output.`,
+ r: strings.NewReader(""),
+ expected: "",
+ f: func(b []byte) ([]byte, error) { return []byte{}, nil },
+ err: nil,
+ },
+ {
+ about: `Order is not guaranteed.`,
+ r: strings.NewReader("a\nb\n"),
+ expected: "B\nA\n",
+ f: func(b []byte) ([]byte, error) { return bytes.ToUpper(b), nil },
+ err: nil,
+ },
+ {
+ about: `Like grep, we can filter out items by returning nothing.`,
+ r: strings.NewReader("a\nb\n"),
+ expected: "B\n",
+ f: func(b []byte) ([]byte, error) {
+ if strings.TrimSpace(string(b)) == "a" {
+ return []byte{}, nil
+ }
+ return bytes.ToUpper(b), nil
+ },
+ err: nil,
+ },
+ {
+ about: `Empty lines skipped.`,
+ r: strings.NewReader("a\na\na\na\n\n\nb\n"),
+ expected: "B\n",
+ f: func(b []byte) ([]byte, error) {
+ if strings.TrimSpace(string(b)) == "a" {
+ return []byte{}, nil
+ }
+ return bytes.ToUpper(b), nil
+ },
+ err: nil,
+ },
+ {
+ about: `On empty input, the transformer func is never called.`,
+ r: strings.NewReader(""),
+ expected: "",
+ f: func(b []byte) ([]byte, error) {
+ return nil, errFake1
+ },
+ err: nil,
+ },
+ {
+ about: `Error does not come through, if all lines are skipped.`,
+ r: strings.NewReader("\n"),
+ expected: "",
+ f: func(b []byte) ([]byte, error) {
+ return nil, errFake1
+ },
+ err: nil,
+ },
+ }
+
+ for _, c := range cases {
+ var buf bytes.Buffer
+ p := NewProcessor(c.r, &buf, c.f)
+ err := p.Run()
+ if err != c.err {
+ t.Errorf("p.Run: got %v, want %v", err, c.err)
+ }
+ if !LinesEqual(buf.String(), c.expected) {
+ t.Errorf("p.Run: got %v, want %v", buf.String(), c.expected)
+ }
+ }
+}