aboutsummaryrefslogtreecommitdiffstats
path: root/skate
diff options
context:
space:
mode:
authorMartin Czygan <martin.czygan@gmail.com>2021-07-07 23:34:28 +0200
committerMartin Czygan <martin.czygan@gmail.com>2021-07-07 23:34:28 +0200
commit7a5fbfc41c8c71576e4788c7ba891979c6f5f1a8 (patch)
treed72f36377a2362104deb71f342172812459b2347 /skate
parent9b089b324d48e6c5d02d7f70adb585cde263f1e4 (diff)
parent9ea69942a54f1c2e13f058ba35279af3612add1b (diff)
downloadrefcat-7a5fbfc41c8c71576e4788c7ba891979c6f5f1a8.tar.gz
refcat-7a5fbfc41c8c71576e4788c7ba891979c6f5f1a8.zip
fix merge conflict
Diffstat (limited to 'skate')
-rw-r--r--skate/cmd/skate-wikipedia-doi/main.go2
-rw-r--r--skate/packaging/debian/skate/DEBIAN/control2
-rw-r--r--skate/reduce.go (renamed from skate/zippy.go)106
-rw-r--r--skate/reduce_test.go (renamed from skate/zippy_test.go)0
-rw-r--r--skate/xio/util.go20
-rw-r--r--skate/zipkey/batch.go19
-rw-r--r--skate/zipkey/zipkey.go22
7 files changed, 91 insertions, 80 deletions
diff --git a/skate/cmd/skate-wikipedia-doi/main.go b/skate/cmd/skate-wikipedia-doi/main.go
index c4fdb1e..d9ee135 100644
--- a/skate/cmd/skate-wikipedia-doi/main.go
+++ b/skate/cmd/skate-wikipedia-doi/main.go
@@ -11,7 +11,7 @@ import (
"git.archive.org/martin/cgraph/skate"
"git.archive.org/martin/cgraph/skate/parallel"
- json "github.com/segmentio/encoding/json"
+ "github.com/segmentio/encoding/json"
)
var (
diff --git a/skate/packaging/debian/skate/DEBIAN/control b/skate/packaging/debian/skate/DEBIAN/control
index 22e8e51..549cbad 100644
--- a/skate/packaging/debian/skate/DEBIAN/control
+++ b/skate/packaging/debian/skate/DEBIAN/control
@@ -1,5 +1,5 @@
Package: skate
-Version: 0.1.36
+Version: 0.1.37
Section: utils
Priority: optional
Architecture: amd64
diff --git a/skate/zippy.go b/skate/reduce.go
index ff836e8..5b30fdf 100644
--- a/skate/zippy.go
+++ b/skate/reduce.go
@@ -10,8 +10,11 @@
// the readers (and string groups): release, ref, ref-as-release, open library,
// wikipedia, ...
//
-// TODO: [ ] pass release stage through all match types
-// TODO: [ ] switch to faster logging, e.g. zerolog, https://github.com/rs/zerolog#benchmarks
+// TODO:
+// * [ ] pass release stage through all match types
+// * [ ] switch to faster logging, e.g. zerolog, https://github.com/rs/zerolog#benchmarks
+// * [ ] batch, parallelize
+// * [ ] unify flags to "-a", "-b"
package skate
import (
@@ -20,21 +23,14 @@ import (
"log"
"sort"
"strings"
- "sync"
"time"
"git.archive.org/martin/cgraph/skate/set"
+ "git.archive.org/martin/cgraph/skate/xio"
"git.archive.org/martin/cgraph/skate/zipkey"
json "github.com/segmentio/encoding/json"
)
-var brefPool = sync.Pool{
- New: func() interface{} {
- var bref BiblioRef
- return bref
- },
-}
-
// groupLogf logs a message alongsize a serialized group for debugging.
func groupLogf(g *zipkey.Group, s string, vs ...interface{}) {
log.Printf(s, vs...)
@@ -46,18 +42,13 @@ func groupLogf(g *zipkey.Group, s string, vs ...interface{}) {
// match result, e.g. for doi matches.
func ZippyExact(releases, refs io.Reader, matchResult MatchResult, w io.Writer) error {
var (
- enc = json.NewEncoder(w)
+ enc = json.NewEncoder(xio.NewSingleWriter(w))
keyer = makeKeyFunc("\t", 1)
- i = 0
- bref BiblioRef
- grouper = func(g *zipkey.Group) error {
- i++
- if i%10000 == 0 {
- log.Printf("processed %v groups", i)
- }
+ batcher = zipkey.NewBatcher(func(g *zipkey.Group) error {
var (
target *Release
ref *Ref
+ bref BiblioRef
err error
)
if len(g.G0) == 0 || len(g.G1) == 0 {
@@ -72,7 +63,6 @@ func ZippyExact(releases, refs io.Reader, matchResult MatchResult, w io.Writer)
groupLogf(g, "[skip] failed to parse ref: %v", err)
continue
}
- bref = brefPool.Get().(BiblioRef)
bref.Reset()
bref.SourceReleaseIdent = ref.ReleaseIdent
bref.SourceWorkIdent = ref.WorkIdent
@@ -88,12 +78,13 @@ func ZippyExact(releases, refs io.Reader, matchResult MatchResult, w io.Writer)
if err := enc.Encode(bref); err != nil {
return err
}
- brefPool.Put(bref)
}
return nil
- }
+ })
)
- zipper := zipkey.New(releases, refs, keyer, grouper)
+ batcher.Size = 10000 // hard-code for now; on 24 cores 10K take up over 8G of RAM
+ defer batcher.Close()
+ zipper := zipkey.New(releases, refs, keyer, batcher.GroupFunc)
return zipper.Run()
}
@@ -101,15 +92,9 @@ func ZippyExact(releases, refs io.Reader, matchResult MatchResult, w io.Writer)
// match result, e.g. used with release entities converted from open library snapshots.
func ZippyExactReleases(olr, releases io.Reader, matchResult MatchResult, w io.Writer) error {
var (
- enc = json.NewEncoder(w)
+ enc = json.NewEncoder(xio.NewSingleWriter(w))
keyer = makeKeyFunc("\t", 1)
- i = 0
- bref BiblioRef
- grouper = func(g *zipkey.Group) error {
- i++
- if i%10000 == 0 {
- log.Printf("processed %v groups", i)
- }
+ batcher = zipkey.NewBatcher(func(g *zipkey.Group) error {
var (
target, re *Release
err error
@@ -129,8 +114,7 @@ func ZippyExactReleases(olr, releases io.Reader, matchResult MatchResult, w io.W
if target.WorkID == "" {
continue
}
- bref = brefPool.Get().(BiblioRef)
- bref.Reset()
+ var bref BiblioRef
bref.SourceReleaseIdent = re.Ident
bref.SourceWorkIdent = re.WorkID
bref.SourceReleaseStage = re.ReleaseStage
@@ -144,12 +128,13 @@ func ZippyExactReleases(olr, releases io.Reader, matchResult MatchResult, w io.W
if err := enc.Encode(bref); err != nil {
return err
}
- brefPool.Put(bref)
}
return nil
- }
+ })
)
- zipper := zipkey.New(olr, releases, keyer, grouper)
+ batcher.Size = 10000
+ defer batcher.Close()
+ zipper := zipkey.New(olr, releases, keyer, batcher.GroupFunc)
return zipper.Run()
}
@@ -157,10 +142,9 @@ func ZippyExactReleases(olr, releases io.Reader, matchResult MatchResult, w io.W
// fixed match result.
func ZippyExactWiki(releases, wiki io.Reader, mr MatchResult, w io.Writer) error {
var (
- enc = json.NewEncoder(w)
+ enc = json.NewEncoder(xio.NewSingleWriter(w))
keyer = makeKeyFunc("\t", 1)
- bref BiblioRef
- grouper = func(g *zipkey.Group) error {
+ batcher = zipkey.NewBatcher(func(g *zipkey.Group) error {
var (
target *Release
wiki *MinimalCitations
@@ -176,8 +160,7 @@ func ZippyExactWiki(releases, wiki io.Reader, mr MatchResult, w io.Writer) error
if wiki, err = parseWiki(Cut(line, 2)); err != nil {
return err
}
- bref = brefPool.Get().(BiblioRef)
- bref.Reset()
+ var bref BiblioRef
bref.Key = fmt.Sprintf("%s_%s", slugifyString(wiki.PageTitle), target.Ident) // XXX: what should we use?
bref.SourceWikipediaArticle = wiki.PageTitle
bref.TargetReleaseIdent = target.Ident
@@ -188,12 +171,13 @@ func ZippyExactWiki(releases, wiki io.Reader, mr MatchResult, w io.Writer) error
if err := enc.Encode(bref); err != nil {
return err
}
- brefPool.Put(bref)
}
return nil
- }
+ })
)
- zipper := zipkey.New(releases, wiki, keyer, grouper)
+ batcher.Size = 10000
+ defer batcher.Close()
+ zipper := zipkey.New(releases, wiki, keyer, batcher.GroupFunc)
return zipper.Run()
}
@@ -202,9 +186,9 @@ func ZippyExactWiki(releases, wiki io.Reader, mr MatchResult, w io.Writer) error
// match.
func ZippyVerifyRefs(releases, refs io.Reader, w io.Writer) error {
var (
- enc = json.NewEncoder(w)
+ enc = json.NewEncoder(xio.NewSingleWriter(w))
keyer = makeKeyFunc("\t", 1)
- grouper = func(g *zipkey.Group) error {
+ batcher = zipkey.NewBatcher(func(g *zipkey.Group) error {
var (
re, pivot *Release
err error
@@ -233,9 +217,11 @@ func ZippyVerifyRefs(releases, refs io.Reader, w io.Writer) error {
}
}
return nil
- }
+ })
)
- zipper := zipkey.New(releases, refs, keyer, grouper)
+ batcher.Size = 10000
+ defer batcher.Close()
+ zipper := zipkey.New(releases, refs, keyer, batcher.GroupFunc)
return zipper.Run()
}
@@ -285,10 +271,9 @@ func ZippyVerifyRefsOpenLibraryTable(olr, refs io.Reader, w io.Writer) error {
// release) and writes biblioref.
func ZippyVerifyRefsOpenLibrary(olr, refs io.Reader, w io.Writer) error {
var (
- enc = json.NewEncoder(w)
+ enc = json.NewEncoder(xio.NewSingleWriter(w))
keyer = makeKeyFunc("\t", 1)
- bref BiblioRef
- grouper = func(g *zipkey.Group) error {
+ batcher = zipkey.NewBatcher(func(g *zipkey.Group) error {
var (
ref, pivot *Release // ref (reference), pivot (open library)
err error
@@ -310,8 +295,7 @@ func ZippyVerifyRefsOpenLibrary(olr, refs io.Reader, w io.Writer) error {
result := Verify(pivot, ref)
switch result.Status {
case StatusExact, StatusStrong:
- bref = brefPool.Get().(BiblioRef)
- bref.Reset()
+ var bref BiblioRef
bref.SourceReleaseIdent = ref.Ident
bref.SourceWorkIdent = ref.WorkID
bref.SourceReleaseStage = ref.ReleaseStage
@@ -325,14 +309,15 @@ func ZippyVerifyRefsOpenLibrary(olr, refs io.Reader, w io.Writer) error {
if err := enc.Encode(bref); err != nil {
return err
}
- brefPool.Put(bref)
default:
}
}
return nil
- }
+ })
)
- zipper := zipkey.New(olr, refs, keyer, grouper)
+ batcher.Size = 10000
+ defer batcher.Close()
+ zipper := zipkey.New(olr, refs, keyer, batcher.GroupFunc)
return zipper.Run()
}
@@ -349,9 +334,9 @@ func ZippyVerifyRefsOpenLibrary(olr, refs io.Reader, w io.Writer) error {
func ZippyBrefAugment(bref, raw io.Reader, w io.Writer) error {
var (
stats = statsAugment{}
- enc = json.NewEncoder(w)
+ enc = json.NewEncoder(xio.NewSingleWriter(w))
keyer = makeKeyFunc("\t", 1)
- grouper = func(g *zipkey.Group) error {
+ batcher = zipkey.NewBatcher(func(g *zipkey.Group) error {
// g.G0 contains matched docs for a given work id, g.G1 all raw
// refs, with the same work id.
@@ -388,9 +373,11 @@ func ZippyBrefAugment(bref, raw io.Reader, w io.Writer) error {
}
}
return nil
- }
+ })
)
- zipper := zipkey.New(bref, raw, keyer, grouper)
+ batcher.Size = 10000
+ defer batcher.Close()
+ zipper := zipkey.New(bref, raw, keyer, batcher.GroupFunc)
err := zipper.Run()
log.Println(stats)
return err
@@ -444,7 +431,6 @@ func deduplicateBrefs(brefs []*BiblioRef) []*BiblioRef {
seen.Add(v.Key)
}
brefs = brefs[:i]
- log.Printf("trimmed brefs from %d to %d", len(brefs), i)
return brefs
}
diff --git a/skate/zippy_test.go b/skate/reduce_test.go
index 501d8cd..501d8cd 100644
--- a/skate/zippy_test.go
+++ b/skate/reduce_test.go
diff --git a/skate/xio/util.go b/skate/xio/util.go
index 4fc8905..49f38a3 100644
--- a/skate/xio/util.go
+++ b/skate/xio/util.go
@@ -6,8 +6,28 @@ import (
"io"
"os"
"strings"
+ "sync"
)
+// SingleWriter makes any writer thread safe.
+type SingleWriter struct {
+ sync.Mutex
+ w io.Writer
+}
+
+// NewSingleWriter returns an io.Writer that can be safely accessed by multiple
+// goroutines.
+func NewSingleWriter(w io.Writer) *SingleWriter {
+ return &SingleWriter{w: w}
+}
+
+// Write wraps the underlying writer and gives exclusive access.
+func (w *SingleWriter) Write(p []byte) (n int, err error) {
+ w.Lock()
+ defer w.Unlock()
+ return w.w.Write(p)
+}
+
// OpenTwo opens two files. The caller needs to check for a single error only.
func OpenTwo(f0, f1 string) (g0, g1 *os.File, err error) {
if g0, err = os.Open(f0); err != nil {
diff --git a/skate/zipkey/batch.go b/skate/zipkey/batch.go
index c31909c..ebbd081 100644
--- a/skate/zipkey/batch.go
+++ b/skate/zipkey/batch.go
@@ -14,7 +14,7 @@ type Batcher struct {
queue chan []*Group
wg sync.WaitGroup
err error
- closing bool
+ closing bool // https://stackoverflow.com/q/16105325/89391
}
// NewBatcher set ups a new Batcher.
@@ -27,11 +27,13 @@ func NewBatcher(gf groupFunc) *Batcher {
}
for i := 0; i < batcher.NumWorkers; i++ {
batcher.wg.Add(1)
- go batcher.worker(batcher.queue, &batcher.wg)
+ go batcher.worker()
}
return &batcher
}
+// Close tears down the batcher. If this is not called, you get goroutine leaks
+// and will miss the data from the last uncommitted batch.
func (b *Batcher) Close() error {
b.closing = true
g := make([]*Group, len(b.batch))
@@ -43,7 +45,8 @@ func (b *Batcher) Close() error {
return b.err
}
-// GroupFunc implement the groupFunc type. Not thread safe.
+// GroupFunc implement the groupFunc type. Not thread safe. Panics if called
+// after Close has been called.
func (b *Batcher) GroupFunc(g *Group) error {
if b.closing {
panic("cannot call GroupFunc after Close")
@@ -58,11 +61,13 @@ func (b *Batcher) GroupFunc(g *Group) error {
return nil
}
-// worker will wind down after a first error encountered.
-func (b *Batcher) worker(queue chan []*Group, wg *sync.WaitGroup) {
- defer wg.Done()
+// worker will wind down after any error has been encountered. Multiple threads
+// may set the error, but we currently only care whether the error is nil or
+// not.
+func (b *Batcher) worker() {
+ defer b.wg.Done()
OUTER:
- for batch := range queue {
+ for batch := range b.queue {
for _, g := range batch {
if err := b.gf(g); err != nil {
b.err = err
diff --git a/skate/zipkey/zipkey.go b/skate/zipkey/zipkey.go
index 3805535..3e8a133 100644
--- a/skate/zipkey/zipkey.go
+++ b/skate/zipkey/zipkey.go
@@ -45,16 +45,16 @@ func New(r0, r1 io.Reader, kf keyFunc, gf groupFunc) *ZipRun {
// Run starts reading from both readers. The process stops, if one reader is
// exhausted or reads from any reader fail.
-func (c *ZipRun) Run() error {
+func (z *ZipRun) Run() error {
var (
k0, k1, c0, c1 string // key: k0, k1; current line: c0, c1
done bool
err error
lineKey = func(r *bufio.Reader) (line, key string, err error) {
- if line, err = r.ReadString(c.sep); err != nil {
+ if line, err = r.ReadString(z.sep); err != nil {
return
}
- key, err = c.kf(line)
+ key, err = z.kf(line)
return
}
)
@@ -65,7 +65,7 @@ func (c *ZipRun) Run() error {
switch {
case k0 == "" || k0 < k1:
for k0 == "" || k0 < k1 {
- c0, k0, err = lineKey(c.r0)
+ c0, k0, err = lineKey(z.r0)
if err == io.EOF {
return nil
}
@@ -75,7 +75,7 @@ func (c *ZipRun) Run() error {
}
case k1 == "" || k0 > k1:
for k1 == "" || k0 > k1 {
- c1, k1, err = lineKey(c.r1)
+ c1, k1, err = lineKey(z.r1)
if err == io.EOF {
return nil
}
@@ -90,7 +90,7 @@ func (c *ZipRun) Run() error {
G1: []string{c1},
}
for {
- c0, err = c.r0.ReadString(c.sep)
+ c0, err = z.r0.ReadString(z.sep)
if err == io.EOF {
done = true
break
@@ -98,7 +98,7 @@ func (c *ZipRun) Run() error {
if err != nil {
return err
}
- k, err := c.kf(c0)
+ k, err := z.kf(c0)
if err != nil {
return err
}
@@ -111,7 +111,7 @@ func (c *ZipRun) Run() error {
}
}
for {
- c1, err = c.r1.ReadString(c.sep)
+ c1, err = z.r1.ReadString(z.sep)
if err == io.EOF {
done = true
break
@@ -119,7 +119,7 @@ func (c *ZipRun) Run() error {
if err != nil {
return err
}
- k, err := c.kf(c1)
+ k, err := z.kf(c1)
if err != nil {
return err
}
@@ -131,8 +131,8 @@ func (c *ZipRun) Run() error {
break
}
}
- if c.gf != nil {
- if err := c.gf(g); err != nil {
+ if z.gf != nil {
+ if err := z.gf(g); err != nil {
return err
}
}