aboutsummaryrefslogtreecommitdiffstats
path: root/skate/parallel/processor.go
diff options
context:
space:
mode:
Diffstat (limited to 'skate/parallel/processor.go')
-rw-r--r--skate/parallel/processor.go165
1 files changed, 165 insertions, 0 deletions
diff --git a/skate/parallel/processor.go b/skate/parallel/processor.go
new file mode 100644
index 0000000..37d5643
--- /dev/null
+++ b/skate/parallel/processor.go
@@ -0,0 +1,165 @@
+// Package parallel implements helpers for fast processing of line oriented inputs.
+package parallel
+
+import (
+ "bufio"
+ "bytes"
+ "io"
+ "log"
+ "runtime"
+ "sync"
+ "time"
+)
+
+// BytesBatch is a slice of byte slices.
+type BytesBatch struct {
+ b [][]byte
+}
+
+// NewBytesBatch creates a new BytesBatch with a given capacity.
+func NewBytesBatch() *BytesBatch {
+ return NewBytesBatchCapacity(0)
+}
+
+// NewBytesBatchCapacity creates a new BytesBatch with a given capacity.
+func NewBytesBatchCapacity(cap int) *BytesBatch {
+ return &BytesBatch{b: make([][]byte, 0, cap)}
+}
+
+// Add adds an element to the batch.
+func (bb *BytesBatch) Add(b []byte) {
+ bb.b = append(bb.b, b)
+}
+
+// Reset empties this batch.
+func (bb *BytesBatch) Reset() {
+ bb.b = nil
+}
+
+// Size returns the number of elements in the batch.
+func (bb *BytesBatch) Size() int {
+ return len(bb.b)
+}
+
+// Slice returns a slice of byte slices.
+func (bb *BytesBatch) Slice() [][]byte {
+ b := make([][]byte, len(bb.b))
+ for i := 0; i < len(bb.b); i++ {
+ b[i] = bb.b[i]
+ }
+ return b
+}
+
+// Processor can process lines in parallel.
+type Processor struct {
+ BatchSize int
+ RecordSeparator byte
+ NumWorkers int
+ SkipEmptyLines bool
+ Verbose bool
+ LogFunc func()
+ r io.Reader
+ w io.Writer
+ f func([]byte) ([]byte, error)
+}
+
+// NewProcessor creates a new line processor.
+func NewProcessor(r io.Reader, w io.Writer, f func([]byte) ([]byte, error)) *Processor {
+ return &Processor{
+ BatchSize: 10000,
+ RecordSeparator: '\n',
+ NumWorkers: runtime.NumCPU(),
+ SkipEmptyLines: true,
+ r: r,
+ w: w,
+ f: f,
+ }
+}
+
+// Run starts the workers, crunching through the input.
+func (p *Processor) Run() error {
+ // wErr signals a worker or writer error. If an error occurs, the items in
+ // the queue are still process, just no items are added to the queue. There
+ // is only one way to toggle this, from false to true, so we don't care
+ // about race conditions here.
+ var wErr error
+
+ worker := func(queue chan [][]byte, out chan []byte, f func([]byte) ([]byte, error), wg *sync.WaitGroup) {
+ defer wg.Done()
+ for batch := range queue {
+ for _, b := range batch {
+ r, err := f(b)
+ if err != nil {
+ wErr = err
+ }
+ out <- r
+ }
+ }
+ }
+ writer := func(w io.Writer, bc chan []byte, done chan bool) {
+ bw := bufio.NewWriter(w)
+ for b := range bc {
+ if _, err := bw.Write(b); err != nil {
+ wErr = err
+ }
+ }
+ if err := bw.Flush(); err != nil {
+ wErr = err
+ }
+ done <- true
+ }
+ var (
+ queue = make(chan [][]byte)
+ out = make(chan []byte)
+ done = make(chan bool)
+ total int64
+ started = time.Now()
+ wg sync.WaitGroup
+ batch = NewBytesBatchCapacity(p.BatchSize)
+ br = bufio.NewReader(p.r)
+ )
+ go writer(p.w, out, done)
+ for i := 0; i < p.NumWorkers; i++ {
+ wg.Add(1)
+ go worker(queue, out, p.f, &wg)
+ }
+ for {
+ b, err := br.ReadBytes(p.RecordSeparator)
+ if err == io.EOF {
+ break
+ }
+ if err != nil {
+ return err
+ }
+ if len(bytes.TrimSpace(b)) == 0 && p.SkipEmptyLines {
+ continue
+ }
+ batch.Add(b)
+ if batch.Size() == p.BatchSize {
+ total += int64(p.BatchSize)
+ // To avoid checking on each loop, we only check for worker or
+ // write errors here.
+ if wErr != nil {
+ break
+ }
+ queue <- batch.Slice()
+ batch.Reset()
+ if p.Verbose {
+ log.Printf("dispatched %d lines (%0.2f lines/s)",
+ total, float64(total)/time.Since(started).Seconds())
+ if p.LogFunc != nil {
+ p.LogFunc()
+ }
+ }
+ }
+ }
+ queue <- batch.Slice()
+ batch.Reset()
+
+ close(queue)
+ wg.Wait()
+ close(out)
+ <-done
+
+ return wErr
+}