aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--skate/reduce.go10
1 files changed, 6 insertions, 4 deletions
diff --git a/skate/reduce.go b/skate/reduce.go
index a7a6d8a..58d200c 100644
--- a/skate/reduce.go
+++ b/skate/reduce.go
@@ -27,6 +27,7 @@ import (
"time"
"git.archive.org/martin/cgraph/skate/set"
+ "git.archive.org/martin/cgraph/skate/xio"
"git.archive.org/martin/cgraph/skate/zipkey"
json "github.com/segmentio/encoding/json"
)
@@ -63,11 +64,11 @@ func groupLogf(g *zipkey.Group, s string, vs ...interface{}) {
// match result, e.g. for doi matches.
func ZippyExact(releases, refs io.Reader, matchResult MatchResult, w io.Writer) error {
var (
- enc = json.NewEncoder(w)
+ enc = json.NewEncoder(xio.NewSingleWriter(w))
keyer = makeKeyFunc("\t", 1)
i = 0
bref BiblioRef
- grouper = func(g *zipkey.Group) error {
+ batcher = zipkey.NewBatcher(func(g *zipkey.Group) error {
i++
if i%10000 == 0 {
log.Printf("processed %v groups", i)
@@ -110,9 +111,10 @@ func ZippyExact(releases, refs io.Reader, matchResult MatchResult, w io.Writer)
}
}
return nil
- }
+ })
)
- zipper := zipkey.New(releases, refs, keyer, grouper)
+ defer batcher.Close()
+ zipper := zipkey.New(releases, refs, keyer, batcher.GroupFunc)
return zipper.Run()
}