From 269f14a4e3330f50b6527be27bcdd2b68f49fea0 Mon Sep 17 00:00:00 2001 From: Martin Czygan Date: Mon, 5 Jul 2021 23:27:28 +0200 Subject: test-run: batch reduce processing for performance --- skate/reduce.go | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) (limited to 'skate') diff --git a/skate/reduce.go b/skate/reduce.go index a7a6d8a..58d200c 100644 --- a/skate/reduce.go +++ b/skate/reduce.go @@ -27,6 +27,7 @@ import ( "time" "git.archive.org/martin/cgraph/skate/set" + "git.archive.org/martin/cgraph/skate/xio" "git.archive.org/martin/cgraph/skate/zipkey" json "github.com/segmentio/encoding/json" ) @@ -63,11 +64,11 @@ func groupLogf(g *zipkey.Group, s string, vs ...interface{}) { // match result, e.g. for doi matches. func ZippyExact(releases, refs io.Reader, matchResult MatchResult, w io.Writer) error { var ( - enc = json.NewEncoder(w) + enc = json.NewEncoder(xio.NewSingleWriter(w)) keyer = makeKeyFunc("\t", 1) i = 0 bref BiblioRef - grouper = func(g *zipkey.Group) error { + batcher = zipkey.NewBatcher(func(g *zipkey.Group) error { i++ if i%10000 == 0 { log.Printf("processed %v groups", i) @@ -110,9 +111,10 @@ func ZippyExact(releases, refs io.Reader, matchResult MatchResult, w io.Writer) } } return nil - } + }) ) - zipper := zipkey.New(releases, refs, keyer, grouper) + defer batcher.Close() + zipper := zipkey.New(releases, refs, keyer, batcher.GroupFunc) return zipper.Run() } -- cgit v1.2.3