diff options
author | Martin Czygan <martin.czygan@gmail.com> | 2021-07-07 23:34:28 +0200 |
---|---|---|
committer | Martin Czygan <martin.czygan@gmail.com> | 2021-07-07 23:34:28 +0200 |
commit | 7a5fbfc41c8c71576e4788c7ba891979c6f5f1a8 (patch) | |
tree | d72f36377a2362104deb71f342172812459b2347 /skate | |
parent | 9b089b324d48e6c5d02d7f70adb585cde263f1e4 (diff) | |
parent | 9ea69942a54f1c2e13f058ba35279af3612add1b (diff) | |
download | refcat-7a5fbfc41c8c71576e4788c7ba891979c6f5f1a8.tar.gz refcat-7a5fbfc41c8c71576e4788c7ba891979c6f5f1a8.zip |
fix merge conflict
Diffstat (limited to 'skate')
-rw-r--r-- | skate/cmd/skate-wikipedia-doi/main.go | 2 | ||||
-rw-r--r-- | skate/packaging/debian/skate/DEBIAN/control | 2 | ||||
-rw-r--r-- | skate/reduce.go (renamed from skate/zippy.go) | 106 | ||||
-rw-r--r-- | skate/reduce_test.go (renamed from skate/zippy_test.go) | 0 | ||||
-rw-r--r-- | skate/xio/util.go | 20 | ||||
-rw-r--r-- | skate/zipkey/batch.go | 19 | ||||
-rw-r--r-- | skate/zipkey/zipkey.go | 22 |
7 files changed, 91 insertions, 80 deletions
diff --git a/skate/cmd/skate-wikipedia-doi/main.go b/skate/cmd/skate-wikipedia-doi/main.go index c4fdb1e..d9ee135 100644 --- a/skate/cmd/skate-wikipedia-doi/main.go +++ b/skate/cmd/skate-wikipedia-doi/main.go @@ -11,7 +11,7 @@ import ( "git.archive.org/martin/cgraph/skate" "git.archive.org/martin/cgraph/skate/parallel" - json "github.com/segmentio/encoding/json" + "github.com/segmentio/encoding/json" ) var ( diff --git a/skate/packaging/debian/skate/DEBIAN/control b/skate/packaging/debian/skate/DEBIAN/control index 22e8e51..549cbad 100644 --- a/skate/packaging/debian/skate/DEBIAN/control +++ b/skate/packaging/debian/skate/DEBIAN/control @@ -1,5 +1,5 @@ Package: skate -Version: 0.1.36 +Version: 0.1.37 Section: utils Priority: optional Architecture: amd64 diff --git a/skate/zippy.go b/skate/reduce.go index ff836e8..5b30fdf 100644 --- a/skate/zippy.go +++ b/skate/reduce.go @@ -10,8 +10,11 @@ // the readers (and string groups): release, ref, ref-as-release, open library, // wikipedia, ... // -// TODO: [ ] pass release stage through all match types -// TODO: [ ] switch to faster logging, e.g. zerolog, https://github.com/rs/zerolog#benchmarks +// TODO: +// * [ ] pass release stage through all match types +// * [ ] switch to faster logging, e.g. zerolog, https://github.com/rs/zerolog#benchmarks +// * [ ] batch, parallelize +// * [ ] unify flags to "-a", "-b" package skate import ( @@ -20,21 +23,14 @@ import ( "log" "sort" "strings" - "sync" "time" "git.archive.org/martin/cgraph/skate/set" + "git.archive.org/martin/cgraph/skate/xio" "git.archive.org/martin/cgraph/skate/zipkey" json "github.com/segmentio/encoding/json" ) -var brefPool = sync.Pool{ - New: func() interface{} { - var bref BiblioRef - return bref - }, -} - // groupLogf logs a message alongsize a serialized group for debugging. func groupLogf(g *zipkey.Group, s string, vs ...interface{}) { log.Printf(s, vs...) @@ -46,18 +42,13 @@ func groupLogf(g *zipkey.Group, s string, vs ...interface{}) { // match result, e.g. for doi matches. func ZippyExact(releases, refs io.Reader, matchResult MatchResult, w io.Writer) error { var ( - enc = json.NewEncoder(w) + enc = json.NewEncoder(xio.NewSingleWriter(w)) keyer = makeKeyFunc("\t", 1) - i = 0 - bref BiblioRef - grouper = func(g *zipkey.Group) error { - i++ - if i%10000 == 0 { - log.Printf("processed %v groups", i) - } + batcher = zipkey.NewBatcher(func(g *zipkey.Group) error { var ( target *Release ref *Ref + bref BiblioRef err error ) if len(g.G0) == 0 || len(g.G1) == 0 { @@ -72,7 +63,6 @@ func ZippyExact(releases, refs io.Reader, matchResult MatchResult, w io.Writer) groupLogf(g, "[skip] failed to parse ref: %v", err) continue } - bref = brefPool.Get().(BiblioRef) bref.Reset() bref.SourceReleaseIdent = ref.ReleaseIdent bref.SourceWorkIdent = ref.WorkIdent @@ -88,12 +78,13 @@ func ZippyExact(releases, refs io.Reader, matchResult MatchResult, w io.Writer) if err := enc.Encode(bref); err != nil { return err } - brefPool.Put(bref) } return nil - } + }) ) - zipper := zipkey.New(releases, refs, keyer, grouper) + batcher.Size = 10000 // hard-code for now; on 24 cores 10K take up over 8G of RAM + defer batcher.Close() + zipper := zipkey.New(releases, refs, keyer, batcher.GroupFunc) return zipper.Run() } @@ -101,15 +92,9 @@ func ZippyExact(releases, refs io.Reader, matchResult MatchResult, w io.Writer) // match result, e.g. used with release entities converted from open library snapshots. func ZippyExactReleases(olr, releases io.Reader, matchResult MatchResult, w io.Writer) error { var ( - enc = json.NewEncoder(w) + enc = json.NewEncoder(xio.NewSingleWriter(w)) keyer = makeKeyFunc("\t", 1) - i = 0 - bref BiblioRef - grouper = func(g *zipkey.Group) error { - i++ - if i%10000 == 0 { - log.Printf("processed %v groups", i) - } + batcher = zipkey.NewBatcher(func(g *zipkey.Group) error { var ( target, re *Release err error @@ -129,8 +114,7 @@ func ZippyExactReleases(olr, releases io.Reader, matchResult MatchResult, w io.W if target.WorkID == "" { continue } - bref = brefPool.Get().(BiblioRef) - bref.Reset() + var bref BiblioRef bref.SourceReleaseIdent = re.Ident bref.SourceWorkIdent = re.WorkID bref.SourceReleaseStage = re.ReleaseStage @@ -144,12 +128,13 @@ func ZippyExactReleases(olr, releases io.Reader, matchResult MatchResult, w io.W if err := enc.Encode(bref); err != nil { return err } - brefPool.Put(bref) } return nil - } + }) ) - zipper := zipkey.New(olr, releases, keyer, grouper) + batcher.Size = 10000 + defer batcher.Close() + zipper := zipkey.New(olr, releases, keyer, batcher.GroupFunc) return zipper.Run() } @@ -157,10 +142,9 @@ func ZippyExactReleases(olr, releases io.Reader, matchResult MatchResult, w io.W // fixed match result. func ZippyExactWiki(releases, wiki io.Reader, mr MatchResult, w io.Writer) error { var ( - enc = json.NewEncoder(w) + enc = json.NewEncoder(xio.NewSingleWriter(w)) keyer = makeKeyFunc("\t", 1) - bref BiblioRef - grouper = func(g *zipkey.Group) error { + batcher = zipkey.NewBatcher(func(g *zipkey.Group) error { var ( target *Release wiki *MinimalCitations @@ -176,8 +160,7 @@ func ZippyExactWiki(releases, wiki io.Reader, mr MatchResult, w io.Writer) error if wiki, err = parseWiki(Cut(line, 2)); err != nil { return err } - bref = brefPool.Get().(BiblioRef) - bref.Reset() + var bref BiblioRef bref.Key = fmt.Sprintf("%s_%s", slugifyString(wiki.PageTitle), target.Ident) // XXX: what should we use? bref.SourceWikipediaArticle = wiki.PageTitle bref.TargetReleaseIdent = target.Ident @@ -188,12 +171,13 @@ func ZippyExactWiki(releases, wiki io.Reader, mr MatchResult, w io.Writer) error if err := enc.Encode(bref); err != nil { return err } - brefPool.Put(bref) } return nil - } + }) ) - zipper := zipkey.New(releases, wiki, keyer, grouper) + batcher.Size = 10000 + defer batcher.Close() + zipper := zipkey.New(releases, wiki, keyer, batcher.GroupFunc) return zipper.Run() } @@ -202,9 +186,9 @@ func ZippyExactWiki(releases, wiki io.Reader, mr MatchResult, w io.Writer) error // match. func ZippyVerifyRefs(releases, refs io.Reader, w io.Writer) error { var ( - enc = json.NewEncoder(w) + enc = json.NewEncoder(xio.NewSingleWriter(w)) keyer = makeKeyFunc("\t", 1) - grouper = func(g *zipkey.Group) error { + batcher = zipkey.NewBatcher(func(g *zipkey.Group) error { var ( re, pivot *Release err error @@ -233,9 +217,11 @@ func ZippyVerifyRefs(releases, refs io.Reader, w io.Writer) error { } } return nil - } + }) ) - zipper := zipkey.New(releases, refs, keyer, grouper) + batcher.Size = 10000 + defer batcher.Close() + zipper := zipkey.New(releases, refs, keyer, batcher.GroupFunc) return zipper.Run() } @@ -285,10 +271,9 @@ func ZippyVerifyRefsOpenLibraryTable(olr, refs io.Reader, w io.Writer) error { // release) and writes biblioref. func ZippyVerifyRefsOpenLibrary(olr, refs io.Reader, w io.Writer) error { var ( - enc = json.NewEncoder(w) + enc = json.NewEncoder(xio.NewSingleWriter(w)) keyer = makeKeyFunc("\t", 1) - bref BiblioRef - grouper = func(g *zipkey.Group) error { + batcher = zipkey.NewBatcher(func(g *zipkey.Group) error { var ( ref, pivot *Release // ref (reference), pivot (open library) err error @@ -310,8 +295,7 @@ func ZippyVerifyRefsOpenLibrary(olr, refs io.Reader, w io.Writer) error { result := Verify(pivot, ref) switch result.Status { case StatusExact, StatusStrong: - bref = brefPool.Get().(BiblioRef) - bref.Reset() + var bref BiblioRef bref.SourceReleaseIdent = ref.Ident bref.SourceWorkIdent = ref.WorkID bref.SourceReleaseStage = ref.ReleaseStage @@ -325,14 +309,15 @@ func ZippyVerifyRefsOpenLibrary(olr, refs io.Reader, w io.Writer) error { if err := enc.Encode(bref); err != nil { return err } - brefPool.Put(bref) default: } } return nil - } + }) ) - zipper := zipkey.New(olr, refs, keyer, grouper) + batcher.Size = 10000 + defer batcher.Close() + zipper := zipkey.New(olr, refs, keyer, batcher.GroupFunc) return zipper.Run() } @@ -349,9 +334,9 @@ func ZippyVerifyRefsOpenLibrary(olr, refs io.Reader, w io.Writer) error { func ZippyBrefAugment(bref, raw io.Reader, w io.Writer) error { var ( stats = statsAugment{} - enc = json.NewEncoder(w) + enc = json.NewEncoder(xio.NewSingleWriter(w)) keyer = makeKeyFunc("\t", 1) - grouper = func(g *zipkey.Group) error { + batcher = zipkey.NewBatcher(func(g *zipkey.Group) error { // g.G0 contains matched docs for a given work id, g.G1 all raw // refs, with the same work id. @@ -388,9 +373,11 @@ func ZippyBrefAugment(bref, raw io.Reader, w io.Writer) error { } } return nil - } + }) ) - zipper := zipkey.New(bref, raw, keyer, grouper) + batcher.Size = 10000 + defer batcher.Close() + zipper := zipkey.New(bref, raw, keyer, batcher.GroupFunc) err := zipper.Run() log.Println(stats) return err @@ -444,7 +431,6 @@ func deduplicateBrefs(brefs []*BiblioRef) []*BiblioRef { seen.Add(v.Key) } brefs = brefs[:i] - log.Printf("trimmed brefs from %d to %d", len(brefs), i) return brefs } diff --git a/skate/zippy_test.go b/skate/reduce_test.go index 501d8cd..501d8cd 100644 --- a/skate/zippy_test.go +++ b/skate/reduce_test.go diff --git a/skate/xio/util.go b/skate/xio/util.go index 4fc8905..49f38a3 100644 --- a/skate/xio/util.go +++ b/skate/xio/util.go @@ -6,8 +6,28 @@ import ( "io" "os" "strings" + "sync" ) +// SingleWriter makes any writer thread safe. +type SingleWriter struct { + sync.Mutex + w io.Writer +} + +// NewSingleWriter returns an io.Writer that can be safely accessed by multiple +// goroutines. +func NewSingleWriter(w io.Writer) *SingleWriter { + return &SingleWriter{w: w} +} + +// Write wraps the underlying writer and gives exclusive access. +func (w *SingleWriter) Write(p []byte) (n int, err error) { + w.Lock() + defer w.Unlock() + return w.w.Write(p) +} + // OpenTwo opens two files. The caller needs to check for a single error only. func OpenTwo(f0, f1 string) (g0, g1 *os.File, err error) { if g0, err = os.Open(f0); err != nil { diff --git a/skate/zipkey/batch.go b/skate/zipkey/batch.go index c31909c..ebbd081 100644 --- a/skate/zipkey/batch.go +++ b/skate/zipkey/batch.go @@ -14,7 +14,7 @@ type Batcher struct { queue chan []*Group wg sync.WaitGroup err error - closing bool + closing bool // https://stackoverflow.com/q/16105325/89391 } // NewBatcher set ups a new Batcher. @@ -27,11 +27,13 @@ func NewBatcher(gf groupFunc) *Batcher { } for i := 0; i < batcher.NumWorkers; i++ { batcher.wg.Add(1) - go batcher.worker(batcher.queue, &batcher.wg) + go batcher.worker() } return &batcher } +// Close tears down the batcher. If this is not called, you get goroutine leaks +// and will miss the data from the last uncommitted batch. func (b *Batcher) Close() error { b.closing = true g := make([]*Group, len(b.batch)) @@ -43,7 +45,8 @@ func (b *Batcher) Close() error { return b.err } -// GroupFunc implement the groupFunc type. Not thread safe. +// GroupFunc implement the groupFunc type. Not thread safe. Panics if called +// after Close has been called. func (b *Batcher) GroupFunc(g *Group) error { if b.closing { panic("cannot call GroupFunc after Close") @@ -58,11 +61,13 @@ func (b *Batcher) GroupFunc(g *Group) error { return nil } -// worker will wind down after a first error encountered. -func (b *Batcher) worker(queue chan []*Group, wg *sync.WaitGroup) { - defer wg.Done() +// worker will wind down after any error has been encountered. Multiple threads +// may set the error, but we currently only care whether the error is nil or +// not. +func (b *Batcher) worker() { + defer b.wg.Done() OUTER: - for batch := range queue { + for batch := range b.queue { for _, g := range batch { if err := b.gf(g); err != nil { b.err = err diff --git a/skate/zipkey/zipkey.go b/skate/zipkey/zipkey.go index 3805535..3e8a133 100644 --- a/skate/zipkey/zipkey.go +++ b/skate/zipkey/zipkey.go @@ -45,16 +45,16 @@ func New(r0, r1 io.Reader, kf keyFunc, gf groupFunc) *ZipRun { // Run starts reading from both readers. The process stops, if one reader is // exhausted or reads from any reader fail. -func (c *ZipRun) Run() error { +func (z *ZipRun) Run() error { var ( k0, k1, c0, c1 string // key: k0, k1; current line: c0, c1 done bool err error lineKey = func(r *bufio.Reader) (line, key string, err error) { - if line, err = r.ReadString(c.sep); err != nil { + if line, err = r.ReadString(z.sep); err != nil { return } - key, err = c.kf(line) + key, err = z.kf(line) return } ) @@ -65,7 +65,7 @@ func (c *ZipRun) Run() error { switch { case k0 == "" || k0 < k1: for k0 == "" || k0 < k1 { - c0, k0, err = lineKey(c.r0) + c0, k0, err = lineKey(z.r0) if err == io.EOF { return nil } @@ -75,7 +75,7 @@ func (c *ZipRun) Run() error { } case k1 == "" || k0 > k1: for k1 == "" || k0 > k1 { - c1, k1, err = lineKey(c.r1) + c1, k1, err = lineKey(z.r1) if err == io.EOF { return nil } @@ -90,7 +90,7 @@ func (c *ZipRun) Run() error { G1: []string{c1}, } for { - c0, err = c.r0.ReadString(c.sep) + c0, err = z.r0.ReadString(z.sep) if err == io.EOF { done = true break @@ -98,7 +98,7 @@ func (c *ZipRun) Run() error { if err != nil { return err } - k, err := c.kf(c0) + k, err := z.kf(c0) if err != nil { return err } @@ -111,7 +111,7 @@ func (c *ZipRun) Run() error { } } for { - c1, err = c.r1.ReadString(c.sep) + c1, err = z.r1.ReadString(z.sep) if err == io.EOF { done = true break @@ -119,7 +119,7 @@ func (c *ZipRun) Run() error { if err != nil { return err } - k, err := c.kf(c1) + k, err := z.kf(c1) if err != nil { return err } @@ -131,8 +131,8 @@ func (c *ZipRun) Run() error { break } } - if c.gf != nil { - if err := c.gf(g); err != nil { + if z.gf != nil { + if err := z.gf(g); err != nil { return err } } |