diff options
author | Martin Czygan <martin.czygan@gmail.com> | 2021-07-06 00:45:56 +0200 |
---|---|---|
committer | Martin Czygan <martin.czygan@gmail.com> | 2021-07-06 00:45:56 +0200 |
commit | b67ed957878e672897a1c236ea24605efa567064 (patch) | |
tree | f1ead932aba64e109fff13b7752acd47545d1b9d | |
parent | 42267fc424b52b05d20362d3f5087d44dd68316a (diff) | |
download | refcat-b67ed957878e672897a1c236ea24605efa567064.tar.gz refcat-b67ed957878e672897a1c236ea24605efa567064.zip |
reduce: move to threaded versions
-rw-r--r-- | skate/reduce.go | 55 |
1 files changed, 30 insertions, 25 deletions
diff --git a/skate/reduce.go b/skate/reduce.go index ad486a1..23c5cc8 100644 --- a/skate/reduce.go +++ b/skate/reduce.go @@ -92,14 +92,9 @@ func ZippyExact(releases, refs io.Reader, matchResult MatchResult, w io.Writer) // match result, e.g. used with release entities converted from open library snapshots. func ZippyExactReleases(olr, releases io.Reader, matchResult MatchResult, w io.Writer) error { var ( - enc = json.NewEncoder(w) + enc = json.NewEncoder(xio.NewSingleWriter(w)) keyer = makeKeyFunc("\t", 1) - i = 0 - grouper = func(g *zipkey.Group) error { - i++ - if i%10000 == 0 { - log.Printf("processed %v groups", i) - } + batcher = zipkey.NewBatcher(func(g *zipkey.Group) error { var ( target, re *Release err error @@ -135,9 +130,11 @@ func ZippyExactReleases(olr, releases io.Reader, matchResult MatchResult, w io.W } } return nil - } + }) ) - zipper := zipkey.New(olr, releases, keyer, grouper) + batcher.Size = 10000 + defer batcher.Close() + zipper := zipkey.New(olr, releases, keyer, batcher.GroupFunc) return zipper.Run() } @@ -145,9 +142,9 @@ func ZippyExactReleases(olr, releases io.Reader, matchResult MatchResult, w io.W // fixed match result. func ZippyExactWiki(releases, wiki io.Reader, mr MatchResult, w io.Writer) error { var ( - enc = json.NewEncoder(w) + enc = json.NewEncoder(xio.NewSingleWriter(w)) keyer = makeKeyFunc("\t", 1) - grouper = func(g *zipkey.Group) error { + batcher = zipkey.NewBatcher(func(g *zipkey.Group) error { var ( target *Release wiki *MinimalCitations @@ -176,9 +173,11 @@ func ZippyExactWiki(releases, wiki io.Reader, mr MatchResult, w io.Writer) error } } return nil - } + }) ) - zipper := zipkey.New(releases, wiki, keyer, grouper) + batcher.Size = 10000 + defer batcher.Close() + zipper := zipkey.New(releases, wiki, keyer, batcher.GroupFunc) return zipper.Run() } @@ -187,9 +186,9 @@ func ZippyExactWiki(releases, wiki io.Reader, mr MatchResult, w io.Writer) error // match. func ZippyVerifyRefs(releases, refs io.Reader, w io.Writer) error { var ( - enc = json.NewEncoder(w) + enc = json.NewEncoder(xio.NewSingleWriter(w)) keyer = makeKeyFunc("\t", 1) - grouper = func(g *zipkey.Group) error { + batcher = zipkey.NewBatcher(func(g *zipkey.Group) error { var ( re, pivot *Release err error @@ -218,9 +217,11 @@ func ZippyVerifyRefs(releases, refs io.Reader, w io.Writer) error { } } return nil - } + }) ) - zipper := zipkey.New(releases, refs, keyer, grouper) + batcher.Size = 10000 + defer batcher.Close() + zipper := zipkey.New(releases, refs, keyer, batcher.GroupFunc) return zipper.Run() } @@ -270,9 +271,9 @@ func ZippyVerifyRefsOpenLibraryTable(olr, refs io.Reader, w io.Writer) error { // release) and writes biblioref. func ZippyVerifyRefsOpenLibrary(olr, refs io.Reader, w io.Writer) error { var ( - enc = json.NewEncoder(w) + enc = json.NewEncoder(xio.NewSingleWriter(w)) keyer = makeKeyFunc("\t", 1) - grouper = func(g *zipkey.Group) error { + batcher = zipkey.NewBatcher(func(g *zipkey.Group) error { var ( ref, pivot *Release // ref (reference), pivot (open library) err error @@ -312,9 +313,11 @@ func ZippyVerifyRefsOpenLibrary(olr, refs io.Reader, w io.Writer) error { } } return nil - } + }) ) - zipper := zipkey.New(olr, refs, keyer, grouper) + batcher.Size = 10000 + defer batcher.Close() + zipper := zipkey.New(olr, refs, keyer, batcher.GroupFunc) return zipper.Run() } @@ -331,9 +334,9 @@ func ZippyVerifyRefsOpenLibrary(olr, refs io.Reader, w io.Writer) error { func ZippyBrefAugment(bref, raw io.Reader, w io.Writer) error { var ( stats = statsAugment{} - enc = json.NewEncoder(w) + enc = json.NewEncoder(xio.NewSingleWriter(w)) keyer = makeKeyFunc("\t", 1) - grouper = func(g *zipkey.Group) error { + batcher = zipkey.NewBatcher(func(g *zipkey.Group) error { // g.G0 contains matched docs for a given work id, g.G1 all raw // refs, with the same work id. @@ -370,9 +373,11 @@ func ZippyBrefAugment(bref, raw io.Reader, w io.Writer) error { } } return nil - } + }) ) - zipper := zipkey.New(bref, raw, keyer, grouper) + batcher.Size = 10000 + defer batcher.Close() + zipper := zipkey.New(bref, raw, keyer, batcher.GroupFunc) err := zipper.Run() log.Println(stats) return err |