aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMartin Czygan <martin.czygan@gmail.com>2021-07-06 00:45:56 +0200
committerMartin Czygan <martin.czygan@gmail.com>2021-07-06 00:45:56 +0200
commitb67ed957878e672897a1c236ea24605efa567064 (patch)
treef1ead932aba64e109fff13b7752acd47545d1b9d
parent42267fc424b52b05d20362d3f5087d44dd68316a (diff)
downloadrefcat-b67ed957878e672897a1c236ea24605efa567064.tar.gz
refcat-b67ed957878e672897a1c236ea24605efa567064.zip
reduce: move to threaded versions
-rw-r--r--skate/reduce.go55
1 files changed, 30 insertions, 25 deletions
diff --git a/skate/reduce.go b/skate/reduce.go
index ad486a1..23c5cc8 100644
--- a/skate/reduce.go
+++ b/skate/reduce.go
@@ -92,14 +92,9 @@ func ZippyExact(releases, refs io.Reader, matchResult MatchResult, w io.Writer)
// match result, e.g. used with release entities converted from open library snapshots.
func ZippyExactReleases(olr, releases io.Reader, matchResult MatchResult, w io.Writer) error {
var (
- enc = json.NewEncoder(w)
+ enc = json.NewEncoder(xio.NewSingleWriter(w))
keyer = makeKeyFunc("\t", 1)
- i = 0
- grouper = func(g *zipkey.Group) error {
- i++
- if i%10000 == 0 {
- log.Printf("processed %v groups", i)
- }
+ batcher = zipkey.NewBatcher(func(g *zipkey.Group) error {
var (
target, re *Release
err error
@@ -135,9 +130,11 @@ func ZippyExactReleases(olr, releases io.Reader, matchResult MatchResult, w io.W
}
}
return nil
- }
+ })
)
- zipper := zipkey.New(olr, releases, keyer, grouper)
+ batcher.Size = 10000
+ defer batcher.Close()
+ zipper := zipkey.New(olr, releases, keyer, batcher.GroupFunc)
return zipper.Run()
}
@@ -145,9 +142,9 @@ func ZippyExactReleases(olr, releases io.Reader, matchResult MatchResult, w io.W
// fixed match result.
func ZippyExactWiki(releases, wiki io.Reader, mr MatchResult, w io.Writer) error {
var (
- enc = json.NewEncoder(w)
+ enc = json.NewEncoder(xio.NewSingleWriter(w))
keyer = makeKeyFunc("\t", 1)
- grouper = func(g *zipkey.Group) error {
+ batcher = zipkey.NewBatcher(func(g *zipkey.Group) error {
var (
target *Release
wiki *MinimalCitations
@@ -176,9 +173,11 @@ func ZippyExactWiki(releases, wiki io.Reader, mr MatchResult, w io.Writer) error
}
}
return nil
- }
+ })
)
- zipper := zipkey.New(releases, wiki, keyer, grouper)
+ batcher.Size = 10000
+ defer batcher.Close()
+ zipper := zipkey.New(releases, wiki, keyer, batcher.GroupFunc)
return zipper.Run()
}
@@ -187,9 +186,9 @@ func ZippyExactWiki(releases, wiki io.Reader, mr MatchResult, w io.Writer) error
// match.
func ZippyVerifyRefs(releases, refs io.Reader, w io.Writer) error {
var (
- enc = json.NewEncoder(w)
+ enc = json.NewEncoder(xio.NewSingleWriter(w))
keyer = makeKeyFunc("\t", 1)
- grouper = func(g *zipkey.Group) error {
+ batcher = zipkey.NewBatcher(func(g *zipkey.Group) error {
var (
re, pivot *Release
err error
@@ -218,9 +217,11 @@ func ZippyVerifyRefs(releases, refs io.Reader, w io.Writer) error {
}
}
return nil
- }
+ })
)
- zipper := zipkey.New(releases, refs, keyer, grouper)
+ batcher.Size = 10000
+ defer batcher.Close()
+ zipper := zipkey.New(releases, refs, keyer, batcher.GroupFunc)
return zipper.Run()
}
@@ -270,9 +271,9 @@ func ZippyVerifyRefsOpenLibraryTable(olr, refs io.Reader, w io.Writer) error {
// release) and writes biblioref.
func ZippyVerifyRefsOpenLibrary(olr, refs io.Reader, w io.Writer) error {
var (
- enc = json.NewEncoder(w)
+ enc = json.NewEncoder(xio.NewSingleWriter(w))
keyer = makeKeyFunc("\t", 1)
- grouper = func(g *zipkey.Group) error {
+ batcher = zipkey.NewBatcher(func(g *zipkey.Group) error {
var (
ref, pivot *Release // ref (reference), pivot (open library)
err error
@@ -312,9 +313,11 @@ func ZippyVerifyRefsOpenLibrary(olr, refs io.Reader, w io.Writer) error {
}
}
return nil
- }
+ })
)
- zipper := zipkey.New(olr, refs, keyer, grouper)
+ batcher.Size = 10000
+ defer batcher.Close()
+ zipper := zipkey.New(olr, refs, keyer, batcher.GroupFunc)
return zipper.Run()
}
@@ -331,9 +334,9 @@ func ZippyVerifyRefsOpenLibrary(olr, refs io.Reader, w io.Writer) error {
func ZippyBrefAugment(bref, raw io.Reader, w io.Writer) error {
var (
stats = statsAugment{}
- enc = json.NewEncoder(w)
+ enc = json.NewEncoder(xio.NewSingleWriter(w))
keyer = makeKeyFunc("\t", 1)
- grouper = func(g *zipkey.Group) error {
+ batcher = zipkey.NewBatcher(func(g *zipkey.Group) error {
// g.G0 contains matched docs for a given work id, g.G1 all raw
// refs, with the same work id.
@@ -370,9 +373,11 @@ func ZippyBrefAugment(bref, raw io.Reader, w io.Writer) error {
}
}
return nil
- }
+ })
)
- zipper := zipkey.New(bref, raw, keyer, grouper)
+ batcher.Size = 10000
+ defer batcher.Close()
+ zipper := zipkey.New(bref, raw, keyer, batcher.GroupFunc)
err := zipper.Run()
log.Println(stats)
return err