From 9794996b3bda583ffd1d59b2f831518210033be3 Mon Sep 17 00:00:00 2001 From: Martin Czygan Date: Mon, 5 Jul 2021 22:20:25 +0200 Subject: use most sensible file name * we have map, so we should have reduce --- skate/reduce.go | 599 +++++++++++++++++++++++++++++++++++++++++++++++++++ skate/reduce_test.go | 451 ++++++++++++++++++++++++++++++++++++++ skate/zippy.go | 599 --------------------------------------------------- skate/zippy_test.go | 451 -------------------------------------- 4 files changed, 1050 insertions(+), 1050 deletions(-) create mode 100644 skate/reduce.go create mode 100644 skate/reduce_test.go delete mode 100644 skate/zippy.go delete mode 100644 skate/zippy_test.go diff --git a/skate/reduce.go b/skate/reduce.go new file mode 100644 index 0000000..ff836e8 --- /dev/null +++ b/skate/reduce.go @@ -0,0 +1,599 @@ +// This file contains various "reducers", e.g. merging data from two streams and +// applying a function on groups of documents with a shared key. +// +// Note: This is a bit repetitive, but we do not want to introduce any other +// abstraction for now. Since most of the logic is in the "grouper" functions, +// we could make them top level values and then assemble the zipkey runner on +// the fly. +// +// The most confusing aspect currently is the variety of schemas hidden within +// the readers (and string groups): release, ref, ref-as-release, open library, +// wikipedia, ... +// +// TODO: [ ] pass release stage through all match types +// TODO: [ ] switch to faster logging, e.g. zerolog, https://github.com/rs/zerolog#benchmarks +package skate + +import ( + "fmt" + "io" + "log" + "sort" + "strings" + "sync" + "time" + + "git.archive.org/martin/cgraph/skate/set" + "git.archive.org/martin/cgraph/skate/zipkey" + json "github.com/segmentio/encoding/json" +) + +var brefPool = sync.Pool{ + New: func() interface{} { + var bref BiblioRef + return bref + }, +} + +// groupLogf logs a message alongsize a serialized group for debugging. +func groupLogf(g *zipkey.Group, s string, vs ...interface{}) { + log.Printf(s, vs...) + b, _ := json.MarshalIndent(g, "", " ") + log.Println(string(b)) +} + +// ZippyExact takes a release and refs reader (key, doc) and assigns a fixed +// match result, e.g. for doi matches. +func ZippyExact(releases, refs io.Reader, matchResult MatchResult, w io.Writer) error { + var ( + enc = json.NewEncoder(w) + keyer = makeKeyFunc("\t", 1) + i = 0 + bref BiblioRef + grouper = func(g *zipkey.Group) error { + i++ + if i%10000 == 0 { + log.Printf("processed %v groups", i) + } + var ( + target *Release + ref *Ref + err error + ) + if len(g.G0) == 0 || len(g.G1) == 0 { + return nil + } + if target, err = parseRelease(Cut(g.G0[0], 2)); err != nil { + groupLogf(g, "[skip] failed to parse release: %v", err) + return nil + } + for _, line := range g.G1 { + if ref, err = parseRef(Cut(line, 2)); err != nil { + groupLogf(g, "[skip] failed to parse ref: %v", err) + continue + } + bref = brefPool.Get().(BiblioRef) + bref.Reset() + bref.SourceReleaseIdent = ref.ReleaseIdent + bref.SourceWorkIdent = ref.WorkIdent + bref.SourceReleaseStage = ref.ReleaseStage + bref.SourceYear = fmt.Sprintf("%d", ref.ReleaseYear) + bref.RefIndex = ref.Index + 1 // we want 1-index (also helps with omitempty) + bref.RefKey = ref.Key + bref.TargetReleaseIdent = target.Ident + bref.TargetWorkIdent = target.WorkID + bref.MatchProvenance = ref.RefSource + bref.MatchStatus = matchResult.Status.Short() + bref.MatchReason = matchResult.Reason.Short() + if err := enc.Encode(bref); err != nil { + return err + } + brefPool.Put(bref) + } + return nil + } + ) + zipper := zipkey.New(releases, refs, keyer, grouper) + return zipper.Run() +} + +// ZippyExactReleases takes two release readers (key, doc) and assigns a fixed +// match result, e.g. used with release entities converted from open library snapshots. +func ZippyExactReleases(olr, releases io.Reader, matchResult MatchResult, w io.Writer) error { + var ( + enc = json.NewEncoder(w) + keyer = makeKeyFunc("\t", 1) + i = 0 + bref BiblioRef + grouper = func(g *zipkey.Group) error { + i++ + if i%10000 == 0 { + log.Printf("processed %v groups", i) + } + var ( + target, re *Release + err error + ) + if len(g.G0) == 0 || len(g.G1) == 0 { + return nil + } + if target, err = parseRelease(Cut(g.G0[0], 2)); err != nil { + groupLogf(g, "[skip] failed to parse release: %v", err) + return nil + } + for _, line := range g.G1 { + if re, err = parseRelease(Cut(line, 2)); err != nil { + groupLogf(g, "[skip] failed to parse release: %v", err) + continue + } + if target.WorkID == "" { + continue + } + bref = brefPool.Get().(BiblioRef) + bref.Reset() + bref.SourceReleaseIdent = re.Ident + bref.SourceWorkIdent = re.WorkID + bref.SourceReleaseStage = re.ReleaseStage + bref.SourceYear = fmt.Sprintf("%d", re.ReleaseYear()) + bref.RefIndex = re.Extra.Skate.Ref.Index + 1 // we want 1-index (also helps with omitempty) + bref.RefKey = re.Extra.Skate.Ref.Key + bref.TargetOpenLibraryWork = target.WorkID + bref.MatchProvenance = re.Extra.Skate.Ref.Source + bref.MatchStatus = matchResult.Status.Short() + bref.MatchReason = matchResult.Reason.Short() + if err := enc.Encode(bref); err != nil { + return err + } + brefPool.Put(bref) + } + return nil + } + ) + zipper := zipkey.New(olr, releases, keyer, grouper) + return zipper.Run() +} + +// ZippyExactWiki takes a release and wiki reader (key, doc) and assigns a +// fixed match result. +func ZippyExactWiki(releases, wiki io.Reader, mr MatchResult, w io.Writer) error { + var ( + enc = json.NewEncoder(w) + keyer = makeKeyFunc("\t", 1) + bref BiblioRef + grouper = func(g *zipkey.Group) error { + var ( + target *Release + wiki *MinimalCitations + err error + ) + if len(g.G0) == 0 || len(g.G1) == 0 { + return nil + } + if target, err = parseRelease(Cut(g.G0[0], 2)); err != nil { + return err + } + for _, line := range g.G1 { + if wiki, err = parseWiki(Cut(line, 2)); err != nil { + return err + } + bref = brefPool.Get().(BiblioRef) + bref.Reset() + bref.Key = fmt.Sprintf("%s_%s", slugifyString(wiki.PageTitle), target.Ident) // XXX: what should we use? + bref.SourceWikipediaArticle = wiki.PageTitle + bref.TargetReleaseIdent = target.Ident + bref.TargetWorkIdent = target.WorkID + bref.MatchProvenance = "wikipedia" + bref.MatchStatus = mr.Status.Short() + bref.MatchReason = mr.Reason.Short() + if err := enc.Encode(bref); err != nil { + return err + } + brefPool.Put(bref) + } + return nil + } + ) + zipper := zipkey.New(releases, wiki, keyer, grouper) + return zipper.Run() +} + +// ZippyVerifyRefs takes a release and refs (as release) reader (key, doc), run +// fuzzy verification and will emit a biblioref document, if exact or strong +// match. +func ZippyVerifyRefs(releases, refs io.Reader, w io.Writer) error { + var ( + enc = json.NewEncoder(w) + keyer = makeKeyFunc("\t", 1) + grouper = func(g *zipkey.Group) error { + var ( + re, pivot *Release + err error + ) + if len(g.G0) == 0 || len(g.G1) == 0 { + return nil + } + if pivot, err = parseRelease(Cut(g.G0[0], 2)); err != nil { + return err + } + for _, line := range g.G1 { + if re, err = parseRelease(Cut(line, 2)); err != nil { + return err + } + result := Verify(pivot, re) + switch result.Status { + case StatusExact, StatusStrong: + if result.Reason == ReasonDOI { + continue + } + br := generateBiblioRef(re, pivot, result, "fuzzy") + if err := enc.Encode(br); err != nil { + return err + } + default: + } + } + return nil + } + ) + zipper := zipkey.New(releases, refs, keyer, grouper) + return zipper.Run() +} + +// ZippyVerifyRefsOpenLibraryTable takes OL editions (as release) and refs (as +// release) and emits a match table for manual inspection. +func ZippyVerifyRefsOpenLibraryTable(olr, refs io.Reader, w io.Writer) error { + var ( + keyer = makeKeyFunc("\t", 1) + grouper = func(g *zipkey.Group) error { + var ( + re, pivot *Release + err error + ) + if len(g.G0) == 0 || len(g.G1) == 0 { + return nil + } + // We take a single edition from OL. + if pivot, err = parseRelease(Cut(g.G0[0], 2)); err != nil { + return err + } + for _, line := range g.G1 { + if re, err = parseRelease(Cut(line, 2)); err != nil { + return err + } + // The refs have a container name, but not a title, but here we + // compare against titles from open library. + re.Title = re.ContainerName + result := Verify(pivot, re) + fmt.Printf("%s\t%s\t%s\t%s\t%s\t%s\t%s\t%s\n", + result.Status.Short(), + result.Reason.Short(), + pivot.Extra.OpenLibrary.WorkID, + FindByPrefix(pivot.Extra.OpenLibrary.SourceRecords, "ia:"), + re.Ident, + CutSep(g.G0[0], "\t", 1), + pivot.Title, + re.Title) + } + return nil + } + ) + zipper := zipkey.New(olr, refs, keyer, grouper) + return zipper.Run() +} + +// ZippyVerifyRefsOpenLibrary takes OL editions (as release) and refs (as +// release) and writes biblioref. +func ZippyVerifyRefsOpenLibrary(olr, refs io.Reader, w io.Writer) error { + var ( + enc = json.NewEncoder(w) + keyer = makeKeyFunc("\t", 1) + bref BiblioRef + grouper = func(g *zipkey.Group) error { + var ( + ref, pivot *Release // ref (reference), pivot (open library) + err error + ) + if len(g.G0) == 0 || len(g.G1) == 0 { + return nil + } + // We take a single edition from OL. + if pivot, err = parseRelease(Cut(g.G0[0], 2)); err != nil { + return err + } + for _, line := range g.G1 { + if ref, err = parseRelease(Cut(line, 2)); err != nil { + return err + } + // The refs have a container name, but not a title, but here we + // compare against titles from open library. + ref.Title = ref.ContainerName + result := Verify(pivot, ref) + switch result.Status { + case StatusExact, StatusStrong: + bref = brefPool.Get().(BiblioRef) + bref.Reset() + bref.SourceReleaseIdent = ref.Ident + bref.SourceWorkIdent = ref.WorkID + bref.SourceReleaseStage = ref.ReleaseStage + bref.SourceYear = fmt.Sprintf("%d", ref.ReleaseYear()) + bref.RefIndex = ref.Extra.Skate.Ref.Index + 1 // we want 1-index (also helps with omitempty) + bref.RefKey = ref.Extra.Skate.Ref.Key + bref.TargetOpenLibraryWork = pivot.WorkID + bref.MatchProvenance = ref.Extra.Skate.Ref.Source + bref.MatchStatus = result.Status.Short() + bref.MatchReason = result.Reason.Short() + if err := enc.Encode(bref); err != nil { + return err + } + brefPool.Put(bref) + default: + } + } + return nil + } + ) + zipper := zipkey.New(olr, refs, keyer, grouper) + return zipper.Run() +} + +// ZippyBrefAugment takes all matched docs from bref and adds docs from raw +// refs, which have not been matched. It also gets rid of duplicate matches. +// Note: This operates on two streams: raw refs with about 2.5B (07/2021) and +// matches, which will be about 1B; in essence we have to iterate through about +// 3.5B records; small tweak here may be worthwhile. +// +// We can identify, which docs have been matched by checking the source ident, +// ref index and key. +// +// TODO: This needs to be completed and made fast. +func ZippyBrefAugment(bref, raw io.Reader, w io.Writer) error { + var ( + stats = statsAugment{} + enc = json.NewEncoder(w) + keyer = makeKeyFunc("\t", 1) + grouper = func(g *zipkey.Group) error { + // g.G0 contains matched docs for a given work id, g.G1 all raw + // refs, with the same work id. + + // First, iterate over all matches and sort out duplicates, e.g. + // docs that have the same source and target id. + log.Printf("group K=%s, G0=%d, G1=%d", g.Key, len(g.G0), len(g.G1)) + matched, err := uniqueMatches(CutBatch(g.G0, 2), &stats) + if err != nil { + return err + } + var refs = make([]*Ref, len(g.G1)) + for i := 0; i < len(refs); i++ { + var ( + data []byte = []byte(Cut(g.G1[i], 2)) + ref Ref + ) + if err := json.Unmarshal(data, &ref); err != nil { + return err + } + refs[i] = &ref + } + // TODO: this slows down this process; be a bit smarter about slices. + matched = matchedRefsExtend(matched, refs, &stats) + // At this point, we may have duplicates by "_id", e.g. source + // release ident and ref index (example: + // 4kg2dejsgzaf3cszs2lt5hz4by_9, which appears three times, one + // exact match, and twice unmatched). + matched = deduplicateBrefs(matched) + matched = removeSelfLinks(matched) + for _, bref := range matched { + stats.total++ + if err := enc.Encode(bref); err != nil { + return err + } + } + return nil + } + ) + zipper := zipkey.New(bref, raw, keyer, grouper) + err := zipper.Run() + log.Println(stats) + return err +} + +// removeSelfLinks removes self-referential links. TODO: Those should be caught +// at the root cause. +func removeSelfLinks(brefs []*BiblioRef) (result []*BiblioRef) { + var i int + for _, bref := range brefs { + if bref.SourceReleaseIdent == bref.TargetReleaseIdent { + continue + } + brefs[i] = bref + i++ + } + brefs = brefs[:i] + return brefs +} + +// deduplicateBrefs deduplicates by the document id (for elasticsearch), which +// may help filter out some duplicates but not all. +func deduplicateBrefs(brefs []*BiblioRef) []*BiblioRef { + // Sort by match status, exact first, unmatched last. + sort.Slice(brefs, func(i, j int) bool { + switch { + case brefs[i].MatchStatus == StatusExact.Short(): + return true + case brefs[i].MatchStatus == StatusStrong.Short(): + return true + case brefs[i].MatchStatus == StatusWeak.Short(): + return false + case brefs[i].MatchStatus == StatusAmbiguous.Short(): + return false + case brefs[i].MatchStatus != StatusUnmatched.Short(): + return true + default: + return false + } + }) + var ( + seen = set.New() + i int + ) + for _, v := range brefs { + if seen.Contains(v.Key) { + continue + } + brefs[i] = v + i++ + seen.Add(v.Key) + } + brefs = brefs[:i] + log.Printf("trimmed brefs from %d to %d", len(brefs), i) + return brefs +} + +// matchedRefsExtend takes a set of (unique) biblioref docs and will emit that +// set of biblioref docs (unchanged) plus raw references as biblioref, which +// did not result in a match (determined by e.g. ref key and index). XXX: We +// may have duplicate refs as well - how to distinguish them? +func matchedRefsExtend(matched []*BiblioRef, refs []*Ref, stats *statsAugment) []*BiblioRef { + seen := set.New() // store "key + index" of matched items + for _, m := range matched { + s := m.RefKey + fmt.Sprintf("%d", m.RefIndex) + seen.Add(s) + } + for _, r := range refs { + s := r.Key + fmt.Sprintf("%d", r.Index) + if seen.Contains(s) { + stats.skipMatchedRef++ + log.Printf("skip-matched-ref [%d]: from %d matches; ident=%v, title=%s, key=%v, index=%d", + stats.skipMatchedRef, len(matched), r.ReleaseIdent, r.Biblio.Title, r.Key, r.Index) + continue + } + var bref BiblioRef + bref.IndexedTs = time.Now().UTC().Format(time.RFC3339) + bref.Key = fmt.Sprintf("%s_%d", r.ReleaseIdent, r.Index) + bref.RefIndex = r.Index + bref.RefKey = r.Key + bref.SourceReleaseIdent = r.ReleaseIdent + bref.SourceReleaseStage = r.ReleaseStage + bref.SourceWorkIdent = r.WorkIdent + bref.SourceYear = fmt.Sprintf("%d", r.ReleaseYear) + bref.TargetUnstructured = r.Biblio.Unstructured + // Reuse fields for debugging, for now. + bref.MatchStatus = StatusUnmatched.Short() + bref.MatchReason = ReasonUnknown.Short() + matched = append(matched, &bref) + } + return matched +} + +// uniqueMatches takes a list of bref docs (unserialized) and will return a +// list of deserialized bref docs, containing unique matches only (e.g. filter +// out duplicate matches, e.g. from exact and fuzzy). We are including +// "skate-bref-id" post-processing here as well (but there is surely a better +// place for that). +func uniqueMatches(docs []string, stats *statsAugment) (result []*BiblioRef, err error) { + var brefs []*BiblioRef + for _, doc := range docs { + var bref BiblioRef + if err := json.Unmarshal([]byte(doc), &bref); err != nil { + return nil, err + } + // On-the-fly add elasticsearch "_id" and indexed timestamp, if not already set. + if bref.Key == "" && bref.SourceReleaseIdent != "" { + bref.Key = fmt.Sprintf("%s_%d", bref.SourceReleaseIdent, bref.RefIndex) + bref.IndexedTs = time.Now().UTC().Format(time.RFC3339) + } + brefs = append(brefs, &bref) + } + // Make sure exact matches come first. XXX: bug? + sort.Slice(brefs, func(i, j int) bool { + return brefs[i].MatchStatus == StatusExact.Short() + }) + seen := set.New() + for _, doc := range brefs { + h := doc.LinkHash() + if seen.Contains(h) { + stats.skipDuplicatedBref++ + log.Printf("skip-dup-bref [%d]: hash=%v source=%v status=%v reason=%v", + stats.skipDuplicatedBref, h, doc.SourceReleaseIdent, doc.MatchStatus, doc.MatchReason) + continue + } + seen.Add(h) + result = append(result, doc) + } + return result, nil +} + +type statsAugment struct { + skipDuplicatedBref int64 + skipMatchedRef int64 + total int64 +} + +func (s statsAugment) String() string { + return fmt.Sprintf("total=%d, skipMatchedRef=%d, skipDuplicatedBref=%d", + s.total, s.skipMatchedRef, s.skipDuplicatedBref) +} + +// CutBatch runs Cut over a list of lines. +func CutBatch(lines []string, column int) (result []string) { + for _, line := range lines { + result = append(result, Cut(line, column)) + } + return result +} + +// Cut returns a specific column (1-indexed) from a line, returns empty string +// if column is invalid. +func Cut(line string, column int) string { + return CutSep(line, "\t", column) +} + +// CutSep allows to specify a separator, column is 1-indexed. +func CutSep(line, sep string, column int) string { + parts := strings.Split(strings.TrimSpace(line), sep) + if len(parts) < column { + return "" + } else { + return parts[column-1] + } +} + +// FindByPrefix return the first element for a slice of strings, which matches a prefix. +func FindByPrefix(ss []string, prefix string) string { + for _, s := range ss { + if strings.HasPrefix(s, prefix) { + return s + } + } + return "" +} + +// makeKeyFunc creates a function that can be used as keyFunc, selecting a +// column from fields separated by sep; column is 1-indexed. +func makeKeyFunc(sep string, column int) func(string) (string, error) { + return func(s string) (string, error) { + if k := CutSep(s, sep, column); k != "" { + return k, nil + } + return "", fmt.Errorf("cannot get key from column %d in line (len=%d): %s", column, len(s), s) + } +} + +func parseRelease(s string) (r *Release, err error) { + err = json.Unmarshal([]byte(s), &r) + return +} + +func parseRef(s string) (r *Ref, err error) { + err = json.Unmarshal([]byte(s), &r) + return +} + +func parseWiki(s string) (r *MinimalCitations, err error) { + err = json.Unmarshal([]byte(s), &r) + return +} + +func parseBiblioref(s string) (r *BiblioRef, err error) { + err = json.Unmarshal([]byte(s), &r) + return +} diff --git a/skate/reduce_test.go b/skate/reduce_test.go new file mode 100644 index 0000000..501d8cd --- /dev/null +++ b/skate/reduce_test.go @@ -0,0 +1,451 @@ +package skate + +import ( + "bytes" + "io/ioutil" + "reflect" + "testing" + + "git.archive.org/martin/cgraph/skate/atomic" + "git.archive.org/martin/cgraph/skate/xio" + "github.com/kr/pretty" +) + +func TestLineColumn(t *testing.T) { + var cases = []struct { + line string + sep string + column int + result string + }{ + {"", "", 2, ""}, + {"1 2 3", " ", 1, "1"}, + {"1 2 3", " ", 2, "2"}, + {"1 2 3", " ", 3, "3"}, + {"1 2 3", " ", 4, ""}, + {"1 2 3", "\t", 1, "1 2 3"}, + } + for _, c := range cases { + result := CutSep(c.line, c.sep, c.column) + if result != c.result { + t.Fatalf("got %v, want %v", result, c.result) + } + } +} + +func TestCutBatch(t *testing.T) { + var cases = []struct { + lines []string + column int + result []string + }{ + { + []string{}, + 1, + nil, + }, + { + []string{}, + 9, + nil, + }, + { + []string{"1\t2\n", "3\t4\n"}, + 2, + []string{"2", "4"}, + }, + } + for _, c := range cases { + result := CutBatch(c.lines, c.column) + if !reflect.DeepEqual(result, c.result) { + t.Fatalf("got %v (%d), want %v (%d)", + result, len(result), c.result, len(c.result)) + } + } +} + +func TestUniqueMatches(t *testing.T) { + var cases = []struct { + about string + docs []string + result []*BiblioRef + err error + }{ + { + about: "missing fields are ignored", + docs: []string{`{}`}, + result: []*BiblioRef{&BiblioRef{}}, + err: nil, + }, + { + about: "a single doc is passed on", + docs: []string{`{ + "_id": "s1_0", + "source_release_ident": "s1", + "target_release_ident": "t1"}`}, + result: []*BiblioRef{&BiblioRef{ + Key: "s1_0", + SourceReleaseIdent: "s1", + TargetReleaseIdent: "t1", + }}, + err: nil, + }, + { + about: "we want to keep the exact match, if available", + docs: []string{` + {"_id": "s1_0", + "source_release_ident": "s1", + "target_release_ident": "t1", + "match_status": "fuzzy"}`, + `{"_id": "s1_1", + "source_release_ident": "s1", + "target_release_ident": "t1", + "match_status": "exact"}`, + }, + result: []*BiblioRef{&BiblioRef{ + Key: "s1_1", + SourceReleaseIdent: "s1", + TargetReleaseIdent: "t1", + MatchStatus: "exact", + }}, + err: nil, + }, + { + about: "if both are exact, we just take (any) one", + docs: []string{` + {"_id": "s1_0", + "source_release_ident": "s1", + "target_release_ident": "t1", + "match_status": "exact", + "match_reason": "a"}`, + `{"_id": "s1_1", + "source_release_ident": "s1", + "target_release_ident": "t1", + "match_status": "exact", + "match_reason": "b"}`, + }, + result: []*BiblioRef{&BiblioRef{ + Key: "s1_1", + SourceReleaseIdent: "s1", + TargetReleaseIdent: "t1", + MatchStatus: "exact", + MatchReason: "b", + }}, + err: nil, + }, + { + about: "regression; a buggy sort?", + docs: []string{` + {"_id": "s1_0", + "source_release_ident": "s1", + "target_release_ident": "t1", + "match_status": "exact", + "match_reason": "a"}`, + `{"_id": "s1_1", + "source_release_ident": "s1", + "target_release_ident": "t1", + "match_status": "fuzzy", + "match_reason": "b"}`, + }, + result: []*BiblioRef{&BiblioRef{ + Key: "s1_0", + SourceReleaseIdent: "s1", + TargetReleaseIdent: "t1", + MatchStatus: "exact", + MatchReason: "a", + }}, + err: nil, + }, + } + for _, c := range cases { + result, err := uniqueMatches(c.docs, &statsAugment{}) + if err != c.err { + t.Fatalf("got %v, want %v (%s)", err, c.err, c.about) + } + if !reflect.DeepEqual(result, c.result) { + t.Fatalf("got %#v, want %#v (%s)", + pretty.Sprint(result), + pretty.Sprint(c.result), c.about) + } + } +} + +func TestMatchedRefsExtend(t *testing.T) { + var cases = []struct { + matched []*BiblioRef + refs []*Ref + result []*BiblioRef + }{ + { + matched: []*BiblioRef{}, + refs: []*Ref{}, + result: []*BiblioRef{}, + }, + { + matched: []*BiblioRef{ + &BiblioRef{ + RefIndex: 2, + RefKey: "K2", + }, + }, + refs: []*Ref{}, + result: []*BiblioRef{ + &BiblioRef{ + RefIndex: 2, + RefKey: "K2", + }, + }, + }, + { + matched: []*BiblioRef{ + &BiblioRef{ + SourceReleaseIdent: "pud5shsflfgrth77lmlernavjm", + RefIndex: 2, + RefKey: "K2", + }, + }, + refs: []*Ref{ + &Ref{ + ReleaseIdent: "0000", + Biblio: Biblio{ + Title: "Title", + }, + Index: 3, + Key: "K3", + }, + }, + result: []*BiblioRef{ + &BiblioRef{ + SourceReleaseIdent: "pud5shsflfgrth77lmlernavjm", + RefIndex: 2, + RefKey: "K2", + }, + &BiblioRef{ + Key: "0000_3", + SourceReleaseIdent: "0000", + RefIndex: 3, + RefKey: "K3", + MatchStatus: StatusUnmatched.Short(), + MatchReason: ReasonUnknown.Short(), + SourceYear: "0", + }, + }, + }, + { + matched: []*BiblioRef{ + &BiblioRef{ + SourceReleaseIdent: "pud5shsflfgrth77lmlernavjm", + RefIndex: 2, + RefKey: "K2", + }, + }, + refs: []*Ref{ + &Ref{ + ReleaseIdent: "0000", + Biblio: Biblio{ + Title: "Title", + }, + Index: 2, + Key: "K2", + }, + }, + result: []*BiblioRef{ + &BiblioRef{ + SourceReleaseIdent: "pud5shsflfgrth77lmlernavjm", + RefIndex: 2, + RefKey: "K2", + }, + }, + }, + } + for i, c := range cases { + result := matchedRefsExtend(c.matched, c.refs, &statsAugment{}) + for _, v := range result { + v.IndexedTs = "" // we do not want to mock out time, now + } + if !reflect.DeepEqual(result, c.result) { + t.Fatalf("[%d]: got %v, want %v (%v)", + i+1, result, c.result, pretty.Diff(result, c.result)) + } + } +} + +func TestRemoveSelfLinks(t *testing.T) { + var cases = []struct { + brefs []*BiblioRef + result []*BiblioRef + }{ + { + brefs: nil, + result: nil, + }, + { + brefs: []*BiblioRef{}, + result: []*BiblioRef{}, + }, + { + brefs: []*BiblioRef{ + &BiblioRef{SourceReleaseIdent: "a", TargetReleaseIdent: "a"}, + &BiblioRef{SourceReleaseIdent: "a", TargetReleaseIdent: "b"}, + }, + result: []*BiblioRef{ + &BiblioRef{SourceReleaseIdent: "a", TargetReleaseIdent: "b"}, + }, + }, + { + brefs: []*BiblioRef{ + &BiblioRef{SourceReleaseIdent: "a", TargetReleaseIdent: "a"}, + &BiblioRef{SourceReleaseIdent: "a", TargetReleaseIdent: "a"}, + &BiblioRef{SourceReleaseIdent: "a", TargetReleaseIdent: "a"}, + &BiblioRef{SourceReleaseIdent: "a", TargetReleaseIdent: "b"}, + }, + result: []*BiblioRef{ + &BiblioRef{SourceReleaseIdent: "a", TargetReleaseIdent: "b"}, + }, + }, + } + for i, c := range cases { + result := removeSelfLinks(c.brefs) + if !reflect.DeepEqual(result, c.result) { + t.Fatalf("[%d]: got %v, want %v (%v)", + i, result, c.result, pretty.Diff(result, c.result)) + } + } +} + +func TestDeduplicateBrefs(t *testing.T) { + var cases = []struct { + brefs []*BiblioRef + result []*BiblioRef + }{ + { + brefs: nil, + result: nil, + }, + { + brefs: []*BiblioRef{}, + result: []*BiblioRef{}, + }, + { + brefs: []*BiblioRef{ + &BiblioRef{Key: "123", MatchStatus: StatusStrong.Short()}, + &BiblioRef{Key: "123", MatchStatus: StatusExact.Short()}, + }, + result: []*BiblioRef{ + &BiblioRef{Key: "123", MatchStatus: StatusExact.Short()}, + }, + }, + { + brefs: []*BiblioRef{ + &BiblioRef{Key: "123", MatchStatus: StatusStrong.Short()}, + &BiblioRef{Key: "123", MatchStatus: StatusUnmatched.Short()}, + }, + result: []*BiblioRef{ + &BiblioRef{Key: "123", MatchStatus: StatusStrong.Short()}, + }, + }, + { + brefs: []*BiblioRef{ + &BiblioRef{Key: "123", MatchStatus: StatusStrong.Short()}, + &BiblioRef{Key: "123", MatchStatus: StatusWeak.Short()}, + }, + result: []*BiblioRef{ + &BiblioRef{Key: "123", MatchStatus: StatusStrong.Short()}, + }, + }, + { + brefs: []*BiblioRef{ + &BiblioRef{Key: "123", MatchStatus: StatusStrong.Short()}, + &BiblioRef{Key: "123", MatchStatus: StatusAmbiguous.Short()}, + }, + result: []*BiblioRef{ + &BiblioRef{Key: "123", MatchStatus: StatusStrong.Short()}, + }, + }, + } + for i, c := range cases { + result := deduplicateBrefs(c.brefs) + if !reflect.DeepEqual(result, c.result) { + t.Fatalf("[%d]: got %v, want %v (%v)", + i, result, c.result, pretty.Diff(result, c.result)) + } + } +} + +func TestZippyExact(t *testing.T) { + var cases = []struct { + a, b, out string + err error + }{ + { + a: "testdata/zippy/cE00a.json", + b: "testdata/zippy/cE00b.json", + out: "testdata/zippy/cE00r.json", + err: nil, + }, + { + a: "testdata/zippy/cE01a.json", + b: "testdata/zippy/cE01b.json", + out: "testdata/zippy/cE01r.json", + err: nil, + }, + { + a: "testdata/zippy/cE02a.json", + b: "testdata/zippy/cE02b.json", + out: "testdata/zippy/cE02r.json", + err: nil, + }, + } + for i, c := range cases { + a, b, err := xio.OpenTwo(c.a, c.b) + if err != nil { + t.Errorf("failed to open test files: %v, %v", c.a, c.b) + } + var ( + buf bytes.Buffer + matchResult = MatchResult{Status: StatusExact, Reason: ReasonDOI} + ) + err = ZippyExact(a, b, matchResult, &buf) + if err != c.err { + t.Errorf("[%d] got %v, want %v", i, err, c.err) + } + ok, err := equalsFilename(&buf, c.out) + if err != nil { + t.Errorf("failed to open test file: %v", c.out) + } + if !ok { + filename, err := tempWriteFile(&buf) + if err != nil { + t.Logf("could not write temp file") + } + t.Errorf("[%d] output mismatch (buffer length=%d, content=%v), want %v", i, buf.Len(), filename, c.out) + } + } +} + +// equalsFilename returns true, if the contents of a given buffer matches the +// contents of a file given by filename. +func equalsFilename(buf *bytes.Buffer, filename string) (bool, error) { + b, err := ioutil.ReadFile(filename) + if err != nil { + return false, err + } + bb := buf.Bytes() + if len(bb) == 0 && len(b) == 0 { + return true, nil + } + return reflect.DeepEqual(b, bb), nil +} + +// tempWriteFile writes the content of a buffer to a temporary file and returns +// its path. +func tempWriteFile(buf *bytes.Buffer) (string, error) { + f, err := ioutil.TempFile("", "skate-test-*") + if err != nil { + return "", err + } + if err = atomic.WriteFile(f.Name(), buf.Bytes(), 0755); err != nil { + return "", err + } + return f.Name(), nil +} diff --git a/skate/zippy.go b/skate/zippy.go deleted file mode 100644 index ff836e8..0000000 --- a/skate/zippy.go +++ /dev/null @@ -1,599 +0,0 @@ -// This file contains various "reducers", e.g. merging data from two streams and -// applying a function on groups of documents with a shared key. -// -// Note: This is a bit repetitive, but we do not want to introduce any other -// abstraction for now. Since most of the logic is in the "grouper" functions, -// we could make them top level values and then assemble the zipkey runner on -// the fly. -// -// The most confusing aspect currently is the variety of schemas hidden within -// the readers (and string groups): release, ref, ref-as-release, open library, -// wikipedia, ... -// -// TODO: [ ] pass release stage through all match types -// TODO: [ ] switch to faster logging, e.g. zerolog, https://github.com/rs/zerolog#benchmarks -package skate - -import ( - "fmt" - "io" - "log" - "sort" - "strings" - "sync" - "time" - - "git.archive.org/martin/cgraph/skate/set" - "git.archive.org/martin/cgraph/skate/zipkey" - json "github.com/segmentio/encoding/json" -) - -var brefPool = sync.Pool{ - New: func() interface{} { - var bref BiblioRef - return bref - }, -} - -// groupLogf logs a message alongsize a serialized group for debugging. -func groupLogf(g *zipkey.Group, s string, vs ...interface{}) { - log.Printf(s, vs...) - b, _ := json.MarshalIndent(g, "", " ") - log.Println(string(b)) -} - -// ZippyExact takes a release and refs reader (key, doc) and assigns a fixed -// match result, e.g. for doi matches. -func ZippyExact(releases, refs io.Reader, matchResult MatchResult, w io.Writer) error { - var ( - enc = json.NewEncoder(w) - keyer = makeKeyFunc("\t", 1) - i = 0 - bref BiblioRef - grouper = func(g *zipkey.Group) error { - i++ - if i%10000 == 0 { - log.Printf("processed %v groups", i) - } - var ( - target *Release - ref *Ref - err error - ) - if len(g.G0) == 0 || len(g.G1) == 0 { - return nil - } - if target, err = parseRelease(Cut(g.G0[0], 2)); err != nil { - groupLogf(g, "[skip] failed to parse release: %v", err) - return nil - } - for _, line := range g.G1 { - if ref, err = parseRef(Cut(line, 2)); err != nil { - groupLogf(g, "[skip] failed to parse ref: %v", err) - continue - } - bref = brefPool.Get().(BiblioRef) - bref.Reset() - bref.SourceReleaseIdent = ref.ReleaseIdent - bref.SourceWorkIdent = ref.WorkIdent - bref.SourceReleaseStage = ref.ReleaseStage - bref.SourceYear = fmt.Sprintf("%d", ref.ReleaseYear) - bref.RefIndex = ref.Index + 1 // we want 1-index (also helps with omitempty) - bref.RefKey = ref.Key - bref.TargetReleaseIdent = target.Ident - bref.TargetWorkIdent = target.WorkID - bref.MatchProvenance = ref.RefSource - bref.MatchStatus = matchResult.Status.Short() - bref.MatchReason = matchResult.Reason.Short() - if err := enc.Encode(bref); err != nil { - return err - } - brefPool.Put(bref) - } - return nil - } - ) - zipper := zipkey.New(releases, refs, keyer, grouper) - return zipper.Run() -} - -// ZippyExactReleases takes two release readers (key, doc) and assigns a fixed -// match result, e.g. used with release entities converted from open library snapshots. -func ZippyExactReleases(olr, releases io.Reader, matchResult MatchResult, w io.Writer) error { - var ( - enc = json.NewEncoder(w) - keyer = makeKeyFunc("\t", 1) - i = 0 - bref BiblioRef - grouper = func(g *zipkey.Group) error { - i++ - if i%10000 == 0 { - log.Printf("processed %v groups", i) - } - var ( - target, re *Release - err error - ) - if len(g.G0) == 0 || len(g.G1) == 0 { - return nil - } - if target, err = parseRelease(Cut(g.G0[0], 2)); err != nil { - groupLogf(g, "[skip] failed to parse release: %v", err) - return nil - } - for _, line := range g.G1 { - if re, err = parseRelease(Cut(line, 2)); err != nil { - groupLogf(g, "[skip] failed to parse release: %v", err) - continue - } - if target.WorkID == "" { - continue - } - bref = brefPool.Get().(BiblioRef) - bref.Reset() - bref.SourceReleaseIdent = re.Ident - bref.SourceWorkIdent = re.WorkID - bref.SourceReleaseStage = re.ReleaseStage - bref.SourceYear = fmt.Sprintf("%d", re.ReleaseYear()) - bref.RefIndex = re.Extra.Skate.Ref.Index + 1 // we want 1-index (also helps with omitempty) - bref.RefKey = re.Extra.Skate.Ref.Key - bref.TargetOpenLibraryWork = target.WorkID - bref.MatchProvenance = re.Extra.Skate.Ref.Source - bref.MatchStatus = matchResult.Status.Short() - bref.MatchReason = matchResult.Reason.Short() - if err := enc.Encode(bref); err != nil { - return err - } - brefPool.Put(bref) - } - return nil - } - ) - zipper := zipkey.New(olr, releases, keyer, grouper) - return zipper.Run() -} - -// ZippyExactWiki takes a release and wiki reader (key, doc) and assigns a -// fixed match result. -func ZippyExactWiki(releases, wiki io.Reader, mr MatchResult, w io.Writer) error { - var ( - enc = json.NewEncoder(w) - keyer = makeKeyFunc("\t", 1) - bref BiblioRef - grouper = func(g *zipkey.Group) error { - var ( - target *Release - wiki *MinimalCitations - err error - ) - if len(g.G0) == 0 || len(g.G1) == 0 { - return nil - } - if target, err = parseRelease(Cut(g.G0[0], 2)); err != nil { - return err - } - for _, line := range g.G1 { - if wiki, err = parseWiki(Cut(line, 2)); err != nil { - return err - } - bref = brefPool.Get().(BiblioRef) - bref.Reset() - bref.Key = fmt.Sprintf("%s_%s", slugifyString(wiki.PageTitle), target.Ident) // XXX: what should we use? - bref.SourceWikipediaArticle = wiki.PageTitle - bref.TargetReleaseIdent = target.Ident - bref.TargetWorkIdent = target.WorkID - bref.MatchProvenance = "wikipedia" - bref.MatchStatus = mr.Status.Short() - bref.MatchReason = mr.Reason.Short() - if err := enc.Encode(bref); err != nil { - return err - } - brefPool.Put(bref) - } - return nil - } - ) - zipper := zipkey.New(releases, wiki, keyer, grouper) - return zipper.Run() -} - -// ZippyVerifyRefs takes a release and refs (as release) reader (key, doc), run -// fuzzy verification and will emit a biblioref document, if exact or strong -// match. -func ZippyVerifyRefs(releases, refs io.Reader, w io.Writer) error { - var ( - enc = json.NewEncoder(w) - keyer = makeKeyFunc("\t", 1) - grouper = func(g *zipkey.Group) error { - var ( - re, pivot *Release - err error - ) - if len(g.G0) == 0 || len(g.G1) == 0 { - return nil - } - if pivot, err = parseRelease(Cut(g.G0[0], 2)); err != nil { - return err - } - for _, line := range g.G1 { - if re, err = parseRelease(Cut(line, 2)); err != nil { - return err - } - result := Verify(pivot, re) - switch result.Status { - case StatusExact, StatusStrong: - if result.Reason == ReasonDOI { - continue - } - br := generateBiblioRef(re, pivot, result, "fuzzy") - if err := enc.Encode(br); err != nil { - return err - } - default: - } - } - return nil - } - ) - zipper := zipkey.New(releases, refs, keyer, grouper) - return zipper.Run() -} - -// ZippyVerifyRefsOpenLibraryTable takes OL editions (as release) and refs (as -// release) and emits a match table for manual inspection. -func ZippyVerifyRefsOpenLibraryTable(olr, refs io.Reader, w io.Writer) error { - var ( - keyer = makeKeyFunc("\t", 1) - grouper = func(g *zipkey.Group) error { - var ( - re, pivot *Release - err error - ) - if len(g.G0) == 0 || len(g.G1) == 0 { - return nil - } - // We take a single edition from OL. - if pivot, err = parseRelease(Cut(g.G0[0], 2)); err != nil { - return err - } - for _, line := range g.G1 { - if re, err = parseRelease(Cut(line, 2)); err != nil { - return err - } - // The refs have a container name, but not a title, but here we - // compare against titles from open library. - re.Title = re.ContainerName - result := Verify(pivot, re) - fmt.Printf("%s\t%s\t%s\t%s\t%s\t%s\t%s\t%s\n", - result.Status.Short(), - result.Reason.Short(), - pivot.Extra.OpenLibrary.WorkID, - FindByPrefix(pivot.Extra.OpenLibrary.SourceRecords, "ia:"), - re.Ident, - CutSep(g.G0[0], "\t", 1), - pivot.Title, - re.Title) - } - return nil - } - ) - zipper := zipkey.New(olr, refs, keyer, grouper) - return zipper.Run() -} - -// ZippyVerifyRefsOpenLibrary takes OL editions (as release) and refs (as -// release) and writes biblioref. -func ZippyVerifyRefsOpenLibrary(olr, refs io.Reader, w io.Writer) error { - var ( - enc = json.NewEncoder(w) - keyer = makeKeyFunc("\t", 1) - bref BiblioRef - grouper = func(g *zipkey.Group) error { - var ( - ref, pivot *Release // ref (reference), pivot (open library) - err error - ) - if len(g.G0) == 0 || len(g.G1) == 0 { - return nil - } - // We take a single edition from OL. - if pivot, err = parseRelease(Cut(g.G0[0], 2)); err != nil { - return err - } - for _, line := range g.G1 { - if ref, err = parseRelease(Cut(line, 2)); err != nil { - return err - } - // The refs have a container name, but not a title, but here we - // compare against titles from open library. - ref.Title = ref.ContainerName - result := Verify(pivot, ref) - switch result.Status { - case StatusExact, StatusStrong: - bref = brefPool.Get().(BiblioRef) - bref.Reset() - bref.SourceReleaseIdent = ref.Ident - bref.SourceWorkIdent = ref.WorkID - bref.SourceReleaseStage = ref.ReleaseStage - bref.SourceYear = fmt.Sprintf("%d", ref.ReleaseYear()) - bref.RefIndex = ref.Extra.Skate.Ref.Index + 1 // we want 1-index (also helps with omitempty) - bref.RefKey = ref.Extra.Skate.Ref.Key - bref.TargetOpenLibraryWork = pivot.WorkID - bref.MatchProvenance = ref.Extra.Skate.Ref.Source - bref.MatchStatus = result.Status.Short() - bref.MatchReason = result.Reason.Short() - if err := enc.Encode(bref); err != nil { - return err - } - brefPool.Put(bref) - default: - } - } - return nil - } - ) - zipper := zipkey.New(olr, refs, keyer, grouper) - return zipper.Run() -} - -// ZippyBrefAugment takes all matched docs from bref and adds docs from raw -// refs, which have not been matched. It also gets rid of duplicate matches. -// Note: This operates on two streams: raw refs with about 2.5B (07/2021) and -// matches, which will be about 1B; in essence we have to iterate through about -// 3.5B records; small tweak here may be worthwhile. -// -// We can identify, which docs have been matched by checking the source ident, -// ref index and key. -// -// TODO: This needs to be completed and made fast. -func ZippyBrefAugment(bref, raw io.Reader, w io.Writer) error { - var ( - stats = statsAugment{} - enc = json.NewEncoder(w) - keyer = makeKeyFunc("\t", 1) - grouper = func(g *zipkey.Group) error { - // g.G0 contains matched docs for a given work id, g.G1 all raw - // refs, with the same work id. - - // First, iterate over all matches and sort out duplicates, e.g. - // docs that have the same source and target id. - log.Printf("group K=%s, G0=%d, G1=%d", g.Key, len(g.G0), len(g.G1)) - matched, err := uniqueMatches(CutBatch(g.G0, 2), &stats) - if err != nil { - return err - } - var refs = make([]*Ref, len(g.G1)) - for i := 0; i < len(refs); i++ { - var ( - data []byte = []byte(Cut(g.G1[i], 2)) - ref Ref - ) - if err := json.Unmarshal(data, &ref); err != nil { - return err - } - refs[i] = &ref - } - // TODO: this slows down this process; be a bit smarter about slices. - matched = matchedRefsExtend(matched, refs, &stats) - // At this point, we may have duplicates by "_id", e.g. source - // release ident and ref index (example: - // 4kg2dejsgzaf3cszs2lt5hz4by_9, which appears three times, one - // exact match, and twice unmatched). - matched = deduplicateBrefs(matched) - matched = removeSelfLinks(matched) - for _, bref := range matched { - stats.total++ - if err := enc.Encode(bref); err != nil { - return err - } - } - return nil - } - ) - zipper := zipkey.New(bref, raw, keyer, grouper) - err := zipper.Run() - log.Println(stats) - return err -} - -// removeSelfLinks removes self-referential links. TODO: Those should be caught -// at the root cause. -func removeSelfLinks(brefs []*BiblioRef) (result []*BiblioRef) { - var i int - for _, bref := range brefs { - if bref.SourceReleaseIdent == bref.TargetReleaseIdent { - continue - } - brefs[i] = bref - i++ - } - brefs = brefs[:i] - return brefs -} - -// deduplicateBrefs deduplicates by the document id (for elasticsearch), which -// may help filter out some duplicates but not all. -func deduplicateBrefs(brefs []*BiblioRef) []*BiblioRef { - // Sort by match status, exact first, unmatched last. - sort.Slice(brefs, func(i, j int) bool { - switch { - case brefs[i].MatchStatus == StatusExact.Short(): - return true - case brefs[i].MatchStatus == StatusStrong.Short(): - return true - case brefs[i].MatchStatus == StatusWeak.Short(): - return false - case brefs[i].MatchStatus == StatusAmbiguous.Short(): - return false - case brefs[i].MatchStatus != StatusUnmatched.Short(): - return true - default: - return false - } - }) - var ( - seen = set.New() - i int - ) - for _, v := range brefs { - if seen.Contains(v.Key) { - continue - } - brefs[i] = v - i++ - seen.Add(v.Key) - } - brefs = brefs[:i] - log.Printf("trimmed brefs from %d to %d", len(brefs), i) - return brefs -} - -// matchedRefsExtend takes a set of (unique) biblioref docs and will emit that -// set of biblioref docs (unchanged) plus raw references as biblioref, which -// did not result in a match (determined by e.g. ref key and index). XXX: We -// may have duplicate refs as well - how to distinguish them? -func matchedRefsExtend(matched []*BiblioRef, refs []*Ref, stats *statsAugment) []*BiblioRef { - seen := set.New() // store "key + index" of matched items - for _, m := range matched { - s := m.RefKey + fmt.Sprintf("%d", m.RefIndex) - seen.Add(s) - } - for _, r := range refs { - s := r.Key + fmt.Sprintf("%d", r.Index) - if seen.Contains(s) { - stats.skipMatchedRef++ - log.Printf("skip-matched-ref [%d]: from %d matches; ident=%v, title=%s, key=%v, index=%d", - stats.skipMatchedRef, len(matched), r.ReleaseIdent, r.Biblio.Title, r.Key, r.Index) - continue - } - var bref BiblioRef - bref.IndexedTs = time.Now().UTC().Format(time.RFC3339) - bref.Key = fmt.Sprintf("%s_%d", r.ReleaseIdent, r.Index) - bref.RefIndex = r.Index - bref.RefKey = r.Key - bref.SourceReleaseIdent = r.ReleaseIdent - bref.SourceReleaseStage = r.ReleaseStage - bref.SourceWorkIdent = r.WorkIdent - bref.SourceYear = fmt.Sprintf("%d", r.ReleaseYear) - bref.TargetUnstructured = r.Biblio.Unstructured - // Reuse fields for debugging, for now. - bref.MatchStatus = StatusUnmatched.Short() - bref.MatchReason = ReasonUnknown.Short() - matched = append(matched, &bref) - } - return matched -} - -// uniqueMatches takes a list of bref docs (unserialized) and will return a -// list of deserialized bref docs, containing unique matches only (e.g. filter -// out duplicate matches, e.g. from exact and fuzzy). We are including -// "skate-bref-id" post-processing here as well (but there is surely a better -// place for that). -func uniqueMatches(docs []string, stats *statsAugment) (result []*BiblioRef, err error) { - var brefs []*BiblioRef - for _, doc := range docs { - var bref BiblioRef - if err := json.Unmarshal([]byte(doc), &bref); err != nil { - return nil, err - } - // On-the-fly add elasticsearch "_id" and indexed timestamp, if not already set. - if bref.Key == "" && bref.SourceReleaseIdent != "" { - bref.Key = fmt.Sprintf("%s_%d", bref.SourceReleaseIdent, bref.RefIndex) - bref.IndexedTs = time.Now().UTC().Format(time.RFC3339) - } - brefs = append(brefs, &bref) - } - // Make sure exact matches come first. XXX: bug? - sort.Slice(brefs, func(i, j int) bool { - return brefs[i].MatchStatus == StatusExact.Short() - }) - seen := set.New() - for _, doc := range brefs { - h := doc.LinkHash() - if seen.Contains(h) { - stats.skipDuplicatedBref++ - log.Printf("skip-dup-bref [%d]: hash=%v source=%v status=%v reason=%v", - stats.skipDuplicatedBref, h, doc.SourceReleaseIdent, doc.MatchStatus, doc.MatchReason) - continue - } - seen.Add(h) - result = append(result, doc) - } - return result, nil -} - -type statsAugment struct { - skipDuplicatedBref int64 - skipMatchedRef int64 - total int64 -} - -func (s statsAugment) String() string { - return fmt.Sprintf("total=%d, skipMatchedRef=%d, skipDuplicatedBref=%d", - s.total, s.skipMatchedRef, s.skipDuplicatedBref) -} - -// CutBatch runs Cut over a list of lines. -func CutBatch(lines []string, column int) (result []string) { - for _, line := range lines { - result = append(result, Cut(line, column)) - } - return result -} - -// Cut returns a specific column (1-indexed) from a line, returns empty string -// if column is invalid. -func Cut(line string, column int) string { - return CutSep(line, "\t", column) -} - -// CutSep allows to specify a separator, column is 1-indexed. -func CutSep(line, sep string, column int) string { - parts := strings.Split(strings.TrimSpace(line), sep) - if len(parts) < column { - return "" - } else { - return parts[column-1] - } -} - -// FindByPrefix return the first element for a slice of strings, which matches a prefix. -func FindByPrefix(ss []string, prefix string) string { - for _, s := range ss { - if strings.HasPrefix(s, prefix) { - return s - } - } - return "" -} - -// makeKeyFunc creates a function that can be used as keyFunc, selecting a -// column from fields separated by sep; column is 1-indexed. -func makeKeyFunc(sep string, column int) func(string) (string, error) { - return func(s string) (string, error) { - if k := CutSep(s, sep, column); k != "" { - return k, nil - } - return "", fmt.Errorf("cannot get key from column %d in line (len=%d): %s", column, len(s), s) - } -} - -func parseRelease(s string) (r *Release, err error) { - err = json.Unmarshal([]byte(s), &r) - return -} - -func parseRef(s string) (r *Ref, err error) { - err = json.Unmarshal([]byte(s), &r) - return -} - -func parseWiki(s string) (r *MinimalCitations, err error) { - err = json.Unmarshal([]byte(s), &r) - return -} - -func parseBiblioref(s string) (r *BiblioRef, err error) { - err = json.Unmarshal([]byte(s), &r) - return -} diff --git a/skate/zippy_test.go b/skate/zippy_test.go deleted file mode 100644 index 501d8cd..0000000 --- a/skate/zippy_test.go +++ /dev/null @@ -1,451 +0,0 @@ -package skate - -import ( - "bytes" - "io/ioutil" - "reflect" - "testing" - - "git.archive.org/martin/cgraph/skate/atomic" - "git.archive.org/martin/cgraph/skate/xio" - "github.com/kr/pretty" -) - -func TestLineColumn(t *testing.T) { - var cases = []struct { - line string - sep string - column int - result string - }{ - {"", "", 2, ""}, - {"1 2 3", " ", 1, "1"}, - {"1 2 3", " ", 2, "2"}, - {"1 2 3", " ", 3, "3"}, - {"1 2 3", " ", 4, ""}, - {"1 2 3", "\t", 1, "1 2 3"}, - } - for _, c := range cases { - result := CutSep(c.line, c.sep, c.column) - if result != c.result { - t.Fatalf("got %v, want %v", result, c.result) - } - } -} - -func TestCutBatch(t *testing.T) { - var cases = []struct { - lines []string - column int - result []string - }{ - { - []string{}, - 1, - nil, - }, - { - []string{}, - 9, - nil, - }, - { - []string{"1\t2\n", "3\t4\n"}, - 2, - []string{"2", "4"}, - }, - } - for _, c := range cases { - result := CutBatch(c.lines, c.column) - if !reflect.DeepEqual(result, c.result) { - t.Fatalf("got %v (%d), want %v (%d)", - result, len(result), c.result, len(c.result)) - } - } -} - -func TestUniqueMatches(t *testing.T) { - var cases = []struct { - about string - docs []string - result []*BiblioRef - err error - }{ - { - about: "missing fields are ignored", - docs: []string{`{}`}, - result: []*BiblioRef{&BiblioRef{}}, - err: nil, - }, - { - about: "a single doc is passed on", - docs: []string{`{ - "_id": "s1_0", - "source_release_ident": "s1", - "target_release_ident": "t1"}`}, - result: []*BiblioRef{&BiblioRef{ - Key: "s1_0", - SourceReleaseIdent: "s1", - TargetReleaseIdent: "t1", - }}, - err: nil, - }, - { - about: "we want to keep the exact match, if available", - docs: []string{` - {"_id": "s1_0", - "source_release_ident": "s1", - "target_release_ident": "t1", - "match_status": "fuzzy"}`, - `{"_id": "s1_1", - "source_release_ident": "s1", - "target_release_ident": "t1", - "match_status": "exact"}`, - }, - result: []*BiblioRef{&BiblioRef{ - Key: "s1_1", - SourceReleaseIdent: "s1", - TargetReleaseIdent: "t1", - MatchStatus: "exact", - }}, - err: nil, - }, - { - about: "if both are exact, we just take (any) one", - docs: []string{` - {"_id": "s1_0", - "source_release_ident": "s1", - "target_release_ident": "t1", - "match_status": "exact", - "match_reason": "a"}`, - `{"_id": "s1_1", - "source_release_ident": "s1", - "target_release_ident": "t1", - "match_status": "exact", - "match_reason": "b"}`, - }, - result: []*BiblioRef{&BiblioRef{ - Key: "s1_1", - SourceReleaseIdent: "s1", - TargetReleaseIdent: "t1", - MatchStatus: "exact", - MatchReason: "b", - }}, - err: nil, - }, - { - about: "regression; a buggy sort?", - docs: []string{` - {"_id": "s1_0", - "source_release_ident": "s1", - "target_release_ident": "t1", - "match_status": "exact", - "match_reason": "a"}`, - `{"_id": "s1_1", - "source_release_ident": "s1", - "target_release_ident": "t1", - "match_status": "fuzzy", - "match_reason": "b"}`, - }, - result: []*BiblioRef{&BiblioRef{ - Key: "s1_0", - SourceReleaseIdent: "s1", - TargetReleaseIdent: "t1", - MatchStatus: "exact", - MatchReason: "a", - }}, - err: nil, - }, - } - for _, c := range cases { - result, err := uniqueMatches(c.docs, &statsAugment{}) - if err != c.err { - t.Fatalf("got %v, want %v (%s)", err, c.err, c.about) - } - if !reflect.DeepEqual(result, c.result) { - t.Fatalf("got %#v, want %#v (%s)", - pretty.Sprint(result), - pretty.Sprint(c.result), c.about) - } - } -} - -func TestMatchedRefsExtend(t *testing.T) { - var cases = []struct { - matched []*BiblioRef - refs []*Ref - result []*BiblioRef - }{ - { - matched: []*BiblioRef{}, - refs: []*Ref{}, - result: []*BiblioRef{}, - }, - { - matched: []*BiblioRef{ - &BiblioRef{ - RefIndex: 2, - RefKey: "K2", - }, - }, - refs: []*Ref{}, - result: []*BiblioRef{ - &BiblioRef{ - RefIndex: 2, - RefKey: "K2", - }, - }, - }, - { - matched: []*BiblioRef{ - &BiblioRef{ - SourceReleaseIdent: "pud5shsflfgrth77lmlernavjm", - RefIndex: 2, - RefKey: "K2", - }, - }, - refs: []*Ref{ - &Ref{ - ReleaseIdent: "0000", - Biblio: Biblio{ - Title: "Title", - }, - Index: 3, - Key: "K3", - }, - }, - result: []*BiblioRef{ - &BiblioRef{ - SourceReleaseIdent: "pud5shsflfgrth77lmlernavjm", - RefIndex: 2, - RefKey: "K2", - }, - &BiblioRef{ - Key: "0000_3", - SourceReleaseIdent: "0000", - RefIndex: 3, - RefKey: "K3", - MatchStatus: StatusUnmatched.Short(), - MatchReason: ReasonUnknown.Short(), - SourceYear: "0", - }, - }, - }, - { - matched: []*BiblioRef{ - &BiblioRef{ - SourceReleaseIdent: "pud5shsflfgrth77lmlernavjm", - RefIndex: 2, - RefKey: "K2", - }, - }, - refs: []*Ref{ - &Ref{ - ReleaseIdent: "0000", - Biblio: Biblio{ - Title: "Title", - }, - Index: 2, - Key: "K2", - }, - }, - result: []*BiblioRef{ - &BiblioRef{ - SourceReleaseIdent: "pud5shsflfgrth77lmlernavjm", - RefIndex: 2, - RefKey: "K2", - }, - }, - }, - } - for i, c := range cases { - result := matchedRefsExtend(c.matched, c.refs, &statsAugment{}) - for _, v := range result { - v.IndexedTs = "" // we do not want to mock out time, now - } - if !reflect.DeepEqual(result, c.result) { - t.Fatalf("[%d]: got %v, want %v (%v)", - i+1, result, c.result, pretty.Diff(result, c.result)) - } - } -} - -func TestRemoveSelfLinks(t *testing.T) { - var cases = []struct { - brefs []*BiblioRef - result []*BiblioRef - }{ - { - brefs: nil, - result: nil, - }, - { - brefs: []*BiblioRef{}, - result: []*BiblioRef{}, - }, - { - brefs: []*BiblioRef{ - &BiblioRef{SourceReleaseIdent: "a", TargetReleaseIdent: "a"}, - &BiblioRef{SourceReleaseIdent: "a", TargetReleaseIdent: "b"}, - }, - result: []*BiblioRef{ - &BiblioRef{SourceReleaseIdent: "a", TargetReleaseIdent: "b"}, - }, - }, - { - brefs: []*BiblioRef{ - &BiblioRef{SourceReleaseIdent: "a", TargetReleaseIdent: "a"}, - &BiblioRef{SourceReleaseIdent: "a", TargetReleaseIdent: "a"}, - &BiblioRef{SourceReleaseIdent: "a", TargetReleaseIdent: "a"}, - &BiblioRef{SourceReleaseIdent: "a", TargetReleaseIdent: "b"}, - }, - result: []*BiblioRef{ - &BiblioRef{SourceReleaseIdent: "a", TargetReleaseIdent: "b"}, - }, - }, - } - for i, c := range cases { - result := removeSelfLinks(c.brefs) - if !reflect.DeepEqual(result, c.result) { - t.Fatalf("[%d]: got %v, want %v (%v)", - i, result, c.result, pretty.Diff(result, c.result)) - } - } -} - -func TestDeduplicateBrefs(t *testing.T) { - var cases = []struct { - brefs []*BiblioRef - result []*BiblioRef - }{ - { - brefs: nil, - result: nil, - }, - { - brefs: []*BiblioRef{}, - result: []*BiblioRef{}, - }, - { - brefs: []*BiblioRef{ - &BiblioRef{Key: "123", MatchStatus: StatusStrong.Short()}, - &BiblioRef{Key: "123", MatchStatus: StatusExact.Short()}, - }, - result: []*BiblioRef{ - &BiblioRef{Key: "123", MatchStatus: StatusExact.Short()}, - }, - }, - { - brefs: []*BiblioRef{ - &BiblioRef{Key: "123", MatchStatus: StatusStrong.Short()}, - &BiblioRef{Key: "123", MatchStatus: StatusUnmatched.Short()}, - }, - result: []*BiblioRef{ - &BiblioRef{Key: "123", MatchStatus: StatusStrong.Short()}, - }, - }, - { - brefs: []*BiblioRef{ - &BiblioRef{Key: "123", MatchStatus: StatusStrong.Short()}, - &BiblioRef{Key: "123", MatchStatus: StatusWeak.Short()}, - }, - result: []*BiblioRef{ - &BiblioRef{Key: "123", MatchStatus: StatusStrong.Short()}, - }, - }, - { - brefs: []*BiblioRef{ - &BiblioRef{Key: "123", MatchStatus: StatusStrong.Short()}, - &BiblioRef{Key: "123", MatchStatus: StatusAmbiguous.Short()}, - }, - result: []*BiblioRef{ - &BiblioRef{Key: "123", MatchStatus: StatusStrong.Short()}, - }, - }, - } - for i, c := range cases { - result := deduplicateBrefs(c.brefs) - if !reflect.DeepEqual(result, c.result) { - t.Fatalf("[%d]: got %v, want %v (%v)", - i, result, c.result, pretty.Diff(result, c.result)) - } - } -} - -func TestZippyExact(t *testing.T) { - var cases = []struct { - a, b, out string - err error - }{ - { - a: "testdata/zippy/cE00a.json", - b: "testdata/zippy/cE00b.json", - out: "testdata/zippy/cE00r.json", - err: nil, - }, - { - a: "testdata/zippy/cE01a.json", - b: "testdata/zippy/cE01b.json", - out: "testdata/zippy/cE01r.json", - err: nil, - }, - { - a: "testdata/zippy/cE02a.json", - b: "testdata/zippy/cE02b.json", - out: "testdata/zippy/cE02r.json", - err: nil, - }, - } - for i, c := range cases { - a, b, err := xio.OpenTwo(c.a, c.b) - if err != nil { - t.Errorf("failed to open test files: %v, %v", c.a, c.b) - } - var ( - buf bytes.Buffer - matchResult = MatchResult{Status: StatusExact, Reason: ReasonDOI} - ) - err = ZippyExact(a, b, matchResult, &buf) - if err != c.err { - t.Errorf("[%d] got %v, want %v", i, err, c.err) - } - ok, err := equalsFilename(&buf, c.out) - if err != nil { - t.Errorf("failed to open test file: %v", c.out) - } - if !ok { - filename, err := tempWriteFile(&buf) - if err != nil { - t.Logf("could not write temp file") - } - t.Errorf("[%d] output mismatch (buffer length=%d, content=%v), want %v", i, buf.Len(), filename, c.out) - } - } -} - -// equalsFilename returns true, if the contents of a given buffer matches the -// contents of a file given by filename. -func equalsFilename(buf *bytes.Buffer, filename string) (bool, error) { - b, err := ioutil.ReadFile(filename) - if err != nil { - return false, err - } - bb := buf.Bytes() - if len(bb) == 0 && len(b) == 0 { - return true, nil - } - return reflect.DeepEqual(b, bb), nil -} - -// tempWriteFile writes the content of a buffer to a temporary file and returns -// its path. -func tempWriteFile(buf *bytes.Buffer) (string, error) { - f, err := ioutil.TempFile("", "skate-test-*") - if err != nil { - return "", err - } - if err = atomic.WriteFile(f.Name(), buf.Bytes(), 0755); err != nil { - return "", err - } - return f.Name(), nil -} -- cgit v1.2.3 From 24090f8c65c82942272654356117ee0825d60523 Mon Sep 17 00:00:00 2001 From: Martin Czygan Date: Mon, 5 Jul 2021 22:49:57 +0200 Subject: reduce: reduce allocations --- skate/reduce.go | 29 ++++++++++++++++++++++++----- 1 file changed, 24 insertions(+), 5 deletions(-) diff --git a/skate/reduce.go b/skate/reduce.go index ff836e8..a7a6d8a 100644 --- a/skate/reduce.go +++ b/skate/reduce.go @@ -10,8 +10,11 @@ // the readers (and string groups): release, ref, ref-as-release, open library, // wikipedia, ... // -// TODO: [ ] pass release stage through all match types -// TODO: [ ] switch to faster logging, e.g. zerolog, https://github.com/rs/zerolog#benchmarks +// TODO: +// * [ ] pass release stage through all match types +// * [ ] switch to faster logging, e.g. zerolog, https://github.com/rs/zerolog#benchmarks +// * [ ] batch, parallelize +// * [ ] unify flags to "-a", "-b" package skate import ( @@ -35,6 +38,20 @@ var brefPool = sync.Pool{ }, } +var releasePool = sync.Pool{ + New: func() interface{} { + var r Release + return &r + }, +} + +var refPool = sync.Pool{ + New: func() interface{} { + var r Ref + return &r + }, +} + // groupLogf logs a message alongsize a serialized group for debugging. func groupLogf(g *zipkey.Group, s string, vs ...interface{}) { log.Printf(s, vs...) @@ -56,10 +73,12 @@ func ZippyExact(releases, refs io.Reader, matchResult MatchResult, w io.Writer) log.Printf("processed %v groups", i) } var ( - target *Release - ref *Ref + target = releasePool.Get().(*Release) + ref = refPool.Get().(*Ref) err error ) + defer releasePool.Put(target) + defer refPool.Put(ref) if len(g.G0) == 0 || len(g.G1) == 0 { return nil } @@ -73,6 +92,7 @@ func ZippyExact(releases, refs io.Reader, matchResult MatchResult, w io.Writer) continue } bref = brefPool.Get().(BiblioRef) + defer brefPool.Put(bref) bref.Reset() bref.SourceReleaseIdent = ref.ReleaseIdent bref.SourceWorkIdent = ref.WorkIdent @@ -88,7 +108,6 @@ func ZippyExact(releases, refs io.Reader, matchResult MatchResult, w io.Writer) if err := enc.Encode(bref); err != nil { return err } - brefPool.Put(bref) } return nil } -- cgit v1.2.3 From f33ef23e0889430016cf8363e914b73313758714 Mon Sep 17 00:00:00 2001 From: Martin Czygan Date: Mon, 5 Jul 2021 23:03:35 +0200 Subject: use canonical variable names --- skate/zipkey/zipkey.go | 22 +++++++++++----------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/skate/zipkey/zipkey.go b/skate/zipkey/zipkey.go index 3805535..3e8a133 100644 --- a/skate/zipkey/zipkey.go +++ b/skate/zipkey/zipkey.go @@ -45,16 +45,16 @@ func New(r0, r1 io.Reader, kf keyFunc, gf groupFunc) *ZipRun { // Run starts reading from both readers. The process stops, if one reader is // exhausted or reads from any reader fail. -func (c *ZipRun) Run() error { +func (z *ZipRun) Run() error { var ( k0, k1, c0, c1 string // key: k0, k1; current line: c0, c1 done bool err error lineKey = func(r *bufio.Reader) (line, key string, err error) { - if line, err = r.ReadString(c.sep); err != nil { + if line, err = r.ReadString(z.sep); err != nil { return } - key, err = c.kf(line) + key, err = z.kf(line) return } ) @@ -65,7 +65,7 @@ func (c *ZipRun) Run() error { switch { case k0 == "" || k0 < k1: for k0 == "" || k0 < k1 { - c0, k0, err = lineKey(c.r0) + c0, k0, err = lineKey(z.r0) if err == io.EOF { return nil } @@ -75,7 +75,7 @@ func (c *ZipRun) Run() error { } case k1 == "" || k0 > k1: for k1 == "" || k0 > k1 { - c1, k1, err = lineKey(c.r1) + c1, k1, err = lineKey(z.r1) if err == io.EOF { return nil } @@ -90,7 +90,7 @@ func (c *ZipRun) Run() error { G1: []string{c1}, } for { - c0, err = c.r0.ReadString(c.sep) + c0, err = z.r0.ReadString(z.sep) if err == io.EOF { done = true break @@ -98,7 +98,7 @@ func (c *ZipRun) Run() error { if err != nil { return err } - k, err := c.kf(c0) + k, err := z.kf(c0) if err != nil { return err } @@ -111,7 +111,7 @@ func (c *ZipRun) Run() error { } } for { - c1, err = c.r1.ReadString(c.sep) + c1, err = z.r1.ReadString(z.sep) if err == io.EOF { done = true break @@ -119,7 +119,7 @@ func (c *ZipRun) Run() error { if err != nil { return err } - k, err := c.kf(c1) + k, err := z.kf(c1) if err != nil { return err } @@ -131,8 +131,8 @@ func (c *ZipRun) Run() error { break } } - if c.gf != nil { - if err := c.gf(g); err != nil { + if z.gf != nil { + if err := z.gf(g); err != nil { return err } } -- cgit v1.2.3 From 3a8af268f74a3abc2389306be030ee42c0fc7120 Mon Sep 17 00:00:00 2001 From: Martin Czygan Date: Mon, 5 Jul 2021 23:24:47 +0200 Subject: add thread safe writer --- skate/xio/util.go | 20 ++++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/skate/xio/util.go b/skate/xio/util.go index 4fc8905..49f38a3 100644 --- a/skate/xio/util.go +++ b/skate/xio/util.go @@ -6,8 +6,28 @@ import ( "io" "os" "strings" + "sync" ) +// SingleWriter makes any writer thread safe. +type SingleWriter struct { + sync.Mutex + w io.Writer +} + +// NewSingleWriter returns an io.Writer that can be safely accessed by multiple +// goroutines. +func NewSingleWriter(w io.Writer) *SingleWriter { + return &SingleWriter{w: w} +} + +// Write wraps the underlying writer and gives exclusive access. +func (w *SingleWriter) Write(p []byte) (n int, err error) { + w.Lock() + defer w.Unlock() + return w.w.Write(p) +} + // OpenTwo opens two files. The caller needs to check for a single error only. func OpenTwo(f0, f1 string) (g0, g1 *os.File, err error) { if g0, err = os.Open(f0); err != nil { -- cgit v1.2.3 From 269f14a4e3330f50b6527be27bcdd2b68f49fea0 Mon Sep 17 00:00:00 2001 From: Martin Czygan Date: Mon, 5 Jul 2021 23:27:28 +0200 Subject: test-run: batch reduce processing for performance --- skate/reduce.go | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/skate/reduce.go b/skate/reduce.go index a7a6d8a..58d200c 100644 --- a/skate/reduce.go +++ b/skate/reduce.go @@ -27,6 +27,7 @@ import ( "time" "git.archive.org/martin/cgraph/skate/set" + "git.archive.org/martin/cgraph/skate/xio" "git.archive.org/martin/cgraph/skate/zipkey" json "github.com/segmentio/encoding/json" ) @@ -63,11 +64,11 @@ func groupLogf(g *zipkey.Group, s string, vs ...interface{}) { // match result, e.g. for doi matches. func ZippyExact(releases, refs io.Reader, matchResult MatchResult, w io.Writer) error { var ( - enc = json.NewEncoder(w) + enc = json.NewEncoder(xio.NewSingleWriter(w)) keyer = makeKeyFunc("\t", 1) i = 0 bref BiblioRef - grouper = func(g *zipkey.Group) error { + batcher = zipkey.NewBatcher(func(g *zipkey.Group) error { i++ if i%10000 == 0 { log.Printf("processed %v groups", i) @@ -110,9 +111,10 @@ func ZippyExact(releases, refs io.Reader, matchResult MatchResult, w io.Writer) } } return nil - } + }) ) - zipper := zipkey.New(releases, refs, keyer, grouper) + defer batcher.Close() + zipper := zipkey.New(releases, refs, keyer, batcher.GroupFunc) return zipper.Run() } -- cgit v1.2.3 From c45a82884f0d86f06fc1eada23ac3fe83be18bc1 Mon Sep 17 00:00:00 2001 From: Martin Czygan Date: Mon, 5 Jul 2021 23:29:18 +0200 Subject: reduce: hard-code batch size for testing --- skate/reduce.go | 1 + 1 file changed, 1 insertion(+) diff --git a/skate/reduce.go b/skate/reduce.go index 58d200c..8325bc0 100644 --- a/skate/reduce.go +++ b/skate/reduce.go @@ -113,6 +113,7 @@ func ZippyExact(releases, refs io.Reader, matchResult MatchResult, w io.Writer) return nil }) ) + batcher.Size = 50000 // hard-code for now defer batcher.Close() zipper := zipkey.New(releases, refs, keyer, batcher.GroupFunc) return zipper.Run() -- cgit v1.2.3 From db954963e097238f9df13e6c7820f19d9d83f0f0 Mon Sep 17 00:00:00 2001 From: Martin Czygan Date: Mon, 5 Jul 2021 23:29:40 +0200 Subject: v0.1.37 --- skate/packaging/debian/skate/DEBIAN/control | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/skate/packaging/debian/skate/DEBIAN/control b/skate/packaging/debian/skate/DEBIAN/control index 22e8e51..549cbad 100644 --- a/skate/packaging/debian/skate/DEBIAN/control +++ b/skate/packaging/debian/skate/DEBIAN/control @@ -1,5 +1,5 @@ Package: skate -Version: 0.1.36 +Version: 0.1.37 Section: utils Priority: optional Architecture: amd64 -- cgit v1.2.3 From e9a3ce1f7112a55bffeeb6d7a9dd26584da8276d Mon Sep 17 00:00:00 2001 From: Martin Czygan Date: Mon, 5 Jul 2021 23:38:38 +0200 Subject: we need a safe encoder, not just a safe writer --- skate/reduce.go | 2 +- skate/xio/util.go | 19 +++++++++++++++++++ 2 files changed, 20 insertions(+), 1 deletion(-) diff --git a/skate/reduce.go b/skate/reduce.go index 8325bc0..61e8cd6 100644 --- a/skate/reduce.go +++ b/skate/reduce.go @@ -64,7 +64,7 @@ func groupLogf(g *zipkey.Group, s string, vs ...interface{}) { // match result, e.g. for doi matches. func ZippyExact(releases, refs io.Reader, matchResult MatchResult, w io.Writer) error { var ( - enc = json.NewEncoder(xio.NewSingleWriter(w)) + enc = xio.NewSafeEncoder(json.NewEncoder(w)) keyer = makeKeyFunc("\t", 1) i = 0 bref BiblioRef diff --git a/skate/xio/util.go b/skate/xio/util.go index 49f38a3..9f6cc26 100644 --- a/skate/xio/util.go +++ b/skate/xio/util.go @@ -9,6 +9,25 @@ import ( "sync" ) +type Encoder interface { + Encode(interface{}) error +} + +type SafeEncoder struct { + sync.Mutex + enc Encoder +} + +func NewSafeEncoder(enc Encoder) *SafeEncoder { + return &SafeEncoder{enc: enc} +} + +func (s *SafeEncoder) Encode(v interface{}) error { + s.Lock() + defer s.Unlock() + return s.enc.Encode(v) +} + // SingleWriter makes any writer thread safe. type SingleWriter struct { sync.Mutex -- cgit v1.2.3 From c5342e90974c7b857b462b06afdcbd3498a1bcf4 Mon Sep 17 00:00:00 2001 From: Martin Czygan Date: Mon, 5 Jul 2021 23:45:03 +0200 Subject: wip: debug with stdlib json --- skate/reduce.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/skate/reduce.go b/skate/reduce.go index 61e8cd6..c941a67 100644 --- a/skate/reduce.go +++ b/skate/reduce.go @@ -26,6 +26,8 @@ import ( "sync" "time" + stdjson "encoding/json" + "git.archive.org/martin/cgraph/skate/set" "git.archive.org/martin/cgraph/skate/xio" "git.archive.org/martin/cgraph/skate/zipkey" @@ -64,7 +66,7 @@ func groupLogf(g *zipkey.Group, s string, vs ...interface{}) { // match result, e.g. for doi matches. func ZippyExact(releases, refs io.Reader, matchResult MatchResult, w io.Writer) error { var ( - enc = xio.NewSafeEncoder(json.NewEncoder(w)) + enc = xio.NewSafeEncoder(stdjson.NewEncoder(w)) keyer = makeKeyFunc("\t", 1) i = 0 bref BiblioRef -- cgit v1.2.3 From 2f6102a0c0fba658ef664f44af5e65b007930033 Mon Sep 17 00:00:00 2001 From: Martin Czygan Date: Mon, 5 Jul 2021 23:54:50 +0200 Subject: batcher: do not pass struct fields --- skate/zipkey/batch.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/skate/zipkey/batch.go b/skate/zipkey/batch.go index c31909c..9e52f90 100644 --- a/skate/zipkey/batch.go +++ b/skate/zipkey/batch.go @@ -27,7 +27,7 @@ func NewBatcher(gf groupFunc) *Batcher { } for i := 0; i < batcher.NumWorkers; i++ { batcher.wg.Add(1) - go batcher.worker(batcher.queue, &batcher.wg) + go batcher.worker() } return &batcher } @@ -59,10 +59,10 @@ func (b *Batcher) GroupFunc(g *Group) error { } // worker will wind down after a first error encountered. -func (b *Batcher) worker(queue chan []*Group, wg *sync.WaitGroup) { - defer wg.Done() +func (b *Batcher) worker() { + defer b.wg.Done() OUTER: - for batch := range queue { + for batch := range b.queue { for _, g := range batch { if err := b.gf(g); err != nil { b.err = err -- cgit v1.2.3 From b5eee42e8918ab8e07684b7d15c07c443d995912 Mon Sep 17 00:00:00 2001 From: Martin Czygan Date: Tue, 6 Jul 2021 00:16:44 +0200 Subject: wip: improve reduce performance --- skate/reduce.go | 58 ++++++++------------------------------------------------- 1 file changed, 8 insertions(+), 50 deletions(-) diff --git a/skate/reduce.go b/skate/reduce.go index c941a67..c5da99e 100644 --- a/skate/reduce.go +++ b/skate/reduce.go @@ -23,38 +23,14 @@ import ( "log" "sort" "strings" - "sync" "time" - stdjson "encoding/json" - "git.archive.org/martin/cgraph/skate/set" "git.archive.org/martin/cgraph/skate/xio" "git.archive.org/martin/cgraph/skate/zipkey" json "github.com/segmentio/encoding/json" ) -var brefPool = sync.Pool{ - New: func() interface{} { - var bref BiblioRef - return bref - }, -} - -var releasePool = sync.Pool{ - New: func() interface{} { - var r Release - return &r - }, -} - -var refPool = sync.Pool{ - New: func() interface{} { - var r Ref - return &r - }, -} - // groupLogf logs a message alongsize a serialized group for debugging. func groupLogf(g *zipkey.Group, s string, vs ...interface{}) { log.Printf(s, vs...) @@ -66,22 +42,15 @@ func groupLogf(g *zipkey.Group, s string, vs ...interface{}) { // match result, e.g. for doi matches. func ZippyExact(releases, refs io.Reader, matchResult MatchResult, w io.Writer) error { var ( - enc = xio.NewSafeEncoder(stdjson.NewEncoder(w)) + enc = json.NewEncoder(xio.NewSingleWriter(w)) keyer = makeKeyFunc("\t", 1) - i = 0 - bref BiblioRef batcher = zipkey.NewBatcher(func(g *zipkey.Group) error { - i++ - if i%10000 == 0 { - log.Printf("processed %v groups", i) - } var ( - target = releasePool.Get().(*Release) - ref = refPool.Get().(*Ref) + target *Release + ref *Ref + bref BiblioRef err error ) - defer releasePool.Put(target) - defer refPool.Put(ref) if len(g.G0) == 0 || len(g.G1) == 0 { return nil } @@ -94,8 +63,6 @@ func ZippyExact(releases, refs io.Reader, matchResult MatchResult, w io.Writer) groupLogf(g, "[skip] failed to parse ref: %v", err) continue } - bref = brefPool.Get().(BiblioRef) - defer brefPool.Put(bref) bref.Reset() bref.SourceReleaseIdent = ref.ReleaseIdent bref.SourceWorkIdent = ref.WorkIdent @@ -115,7 +82,7 @@ func ZippyExact(releases, refs io.Reader, matchResult MatchResult, w io.Writer) return nil }) ) - batcher.Size = 50000 // hard-code for now + batcher.Size = 10000 // hard-code for now defer batcher.Close() zipper := zipkey.New(releases, refs, keyer, batcher.GroupFunc) return zipper.Run() @@ -128,7 +95,6 @@ func ZippyExactReleases(olr, releases io.Reader, matchResult MatchResult, w io.W enc = json.NewEncoder(w) keyer = makeKeyFunc("\t", 1) i = 0 - bref BiblioRef grouper = func(g *zipkey.Group) error { i++ if i%10000 == 0 { @@ -153,8 +119,7 @@ func ZippyExactReleases(olr, releases io.Reader, matchResult MatchResult, w io.W if target.WorkID == "" { continue } - bref = brefPool.Get().(BiblioRef) - bref.Reset() + var bref BiblioRef bref.SourceReleaseIdent = re.Ident bref.SourceWorkIdent = re.WorkID bref.SourceReleaseStage = re.ReleaseStage @@ -168,7 +133,6 @@ func ZippyExactReleases(olr, releases io.Reader, matchResult MatchResult, w io.W if err := enc.Encode(bref); err != nil { return err } - brefPool.Put(bref) } return nil } @@ -183,7 +147,6 @@ func ZippyExactWiki(releases, wiki io.Reader, mr MatchResult, w io.Writer) error var ( enc = json.NewEncoder(w) keyer = makeKeyFunc("\t", 1) - bref BiblioRef grouper = func(g *zipkey.Group) error { var ( target *Release @@ -200,8 +163,7 @@ func ZippyExactWiki(releases, wiki io.Reader, mr MatchResult, w io.Writer) error if wiki, err = parseWiki(Cut(line, 2)); err != nil { return err } - bref = brefPool.Get().(BiblioRef) - bref.Reset() + var bref BiblioRef bref.Key = fmt.Sprintf("%s_%s", slugifyString(wiki.PageTitle), target.Ident) // XXX: what should we use? bref.SourceWikipediaArticle = wiki.PageTitle bref.TargetReleaseIdent = target.Ident @@ -212,7 +174,6 @@ func ZippyExactWiki(releases, wiki io.Reader, mr MatchResult, w io.Writer) error if err := enc.Encode(bref); err != nil { return err } - brefPool.Put(bref) } return nil } @@ -311,7 +272,6 @@ func ZippyVerifyRefsOpenLibrary(olr, refs io.Reader, w io.Writer) error { var ( enc = json.NewEncoder(w) keyer = makeKeyFunc("\t", 1) - bref BiblioRef grouper = func(g *zipkey.Group) error { var ( ref, pivot *Release // ref (reference), pivot (open library) @@ -334,8 +294,7 @@ func ZippyVerifyRefsOpenLibrary(olr, refs io.Reader, w io.Writer) error { result := Verify(pivot, ref) switch result.Status { case StatusExact, StatusStrong: - bref = brefPool.Get().(BiblioRef) - bref.Reset() + var bref BiblioRef bref.SourceReleaseIdent = ref.Ident bref.SourceWorkIdent = ref.WorkID bref.SourceReleaseStage = ref.ReleaseStage @@ -349,7 +308,6 @@ func ZippyVerifyRefsOpenLibrary(olr, refs io.Reader, w io.Writer) error { if err := enc.Encode(bref); err != nil { return err } - brefPool.Put(bref) default: } } -- cgit v1.2.3 From 42267fc424b52b05d20362d3f5087d44dd68316a Mon Sep 17 00:00:00 2001 From: Martin Czygan Date: Tue, 6 Jul 2021 00:30:41 +0200 Subject: add resource usage note --- skate/reduce.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/skate/reduce.go b/skate/reduce.go index c5da99e..ad486a1 100644 --- a/skate/reduce.go +++ b/skate/reduce.go @@ -82,7 +82,7 @@ func ZippyExact(releases, refs io.Reader, matchResult MatchResult, w io.Writer) return nil }) ) - batcher.Size = 10000 // hard-code for now + batcher.Size = 10000 // hard-code for now; on 24 cores 10K take up over 8G of RAM defer batcher.Close() zipper := zipkey.New(releases, refs, keyer, batcher.GroupFunc) return zipper.Run() -- cgit v1.2.3 From b67ed957878e672897a1c236ea24605efa567064 Mon Sep 17 00:00:00 2001 From: Martin Czygan Date: Tue, 6 Jul 2021 00:45:56 +0200 Subject: reduce: move to threaded versions --- skate/reduce.go | 55 ++++++++++++++++++++++++++++++------------------------- 1 file changed, 30 insertions(+), 25 deletions(-) diff --git a/skate/reduce.go b/skate/reduce.go index ad486a1..23c5cc8 100644 --- a/skate/reduce.go +++ b/skate/reduce.go @@ -92,14 +92,9 @@ func ZippyExact(releases, refs io.Reader, matchResult MatchResult, w io.Writer) // match result, e.g. used with release entities converted from open library snapshots. func ZippyExactReleases(olr, releases io.Reader, matchResult MatchResult, w io.Writer) error { var ( - enc = json.NewEncoder(w) + enc = json.NewEncoder(xio.NewSingleWriter(w)) keyer = makeKeyFunc("\t", 1) - i = 0 - grouper = func(g *zipkey.Group) error { - i++ - if i%10000 == 0 { - log.Printf("processed %v groups", i) - } + batcher = zipkey.NewBatcher(func(g *zipkey.Group) error { var ( target, re *Release err error @@ -135,9 +130,11 @@ func ZippyExactReleases(olr, releases io.Reader, matchResult MatchResult, w io.W } } return nil - } + }) ) - zipper := zipkey.New(olr, releases, keyer, grouper) + batcher.Size = 10000 + defer batcher.Close() + zipper := zipkey.New(olr, releases, keyer, batcher.GroupFunc) return zipper.Run() } @@ -145,9 +142,9 @@ func ZippyExactReleases(olr, releases io.Reader, matchResult MatchResult, w io.W // fixed match result. func ZippyExactWiki(releases, wiki io.Reader, mr MatchResult, w io.Writer) error { var ( - enc = json.NewEncoder(w) + enc = json.NewEncoder(xio.NewSingleWriter(w)) keyer = makeKeyFunc("\t", 1) - grouper = func(g *zipkey.Group) error { + batcher = zipkey.NewBatcher(func(g *zipkey.Group) error { var ( target *Release wiki *MinimalCitations @@ -176,9 +173,11 @@ func ZippyExactWiki(releases, wiki io.Reader, mr MatchResult, w io.Writer) error } } return nil - } + }) ) - zipper := zipkey.New(releases, wiki, keyer, grouper) + batcher.Size = 10000 + defer batcher.Close() + zipper := zipkey.New(releases, wiki, keyer, batcher.GroupFunc) return zipper.Run() } @@ -187,9 +186,9 @@ func ZippyExactWiki(releases, wiki io.Reader, mr MatchResult, w io.Writer) error // match. func ZippyVerifyRefs(releases, refs io.Reader, w io.Writer) error { var ( - enc = json.NewEncoder(w) + enc = json.NewEncoder(xio.NewSingleWriter(w)) keyer = makeKeyFunc("\t", 1) - grouper = func(g *zipkey.Group) error { + batcher = zipkey.NewBatcher(func(g *zipkey.Group) error { var ( re, pivot *Release err error @@ -218,9 +217,11 @@ func ZippyVerifyRefs(releases, refs io.Reader, w io.Writer) error { } } return nil - } + }) ) - zipper := zipkey.New(releases, refs, keyer, grouper) + batcher.Size = 10000 + defer batcher.Close() + zipper := zipkey.New(releases, refs, keyer, batcher.GroupFunc) return zipper.Run() } @@ -270,9 +271,9 @@ func ZippyVerifyRefsOpenLibraryTable(olr, refs io.Reader, w io.Writer) error { // release) and writes biblioref. func ZippyVerifyRefsOpenLibrary(olr, refs io.Reader, w io.Writer) error { var ( - enc = json.NewEncoder(w) + enc = json.NewEncoder(xio.NewSingleWriter(w)) keyer = makeKeyFunc("\t", 1) - grouper = func(g *zipkey.Group) error { + batcher = zipkey.NewBatcher(func(g *zipkey.Group) error { var ( ref, pivot *Release // ref (reference), pivot (open library) err error @@ -312,9 +313,11 @@ func ZippyVerifyRefsOpenLibrary(olr, refs io.Reader, w io.Writer) error { } } return nil - } + }) ) - zipper := zipkey.New(olr, refs, keyer, grouper) + batcher.Size = 10000 + defer batcher.Close() + zipper := zipkey.New(olr, refs, keyer, batcher.GroupFunc) return zipper.Run() } @@ -331,9 +334,9 @@ func ZippyVerifyRefsOpenLibrary(olr, refs io.Reader, w io.Writer) error { func ZippyBrefAugment(bref, raw io.Reader, w io.Writer) error { var ( stats = statsAugment{} - enc = json.NewEncoder(w) + enc = json.NewEncoder(xio.NewSingleWriter(w)) keyer = makeKeyFunc("\t", 1) - grouper = func(g *zipkey.Group) error { + batcher = zipkey.NewBatcher(func(g *zipkey.Group) error { // g.G0 contains matched docs for a given work id, g.G1 all raw // refs, with the same work id. @@ -370,9 +373,11 @@ func ZippyBrefAugment(bref, raw io.Reader, w io.Writer) error { } } return nil - } + }) ) - zipper := zipkey.New(bref, raw, keyer, grouper) + batcher.Size = 10000 + defer batcher.Close() + zipper := zipkey.New(bref, raw, keyer, batcher.GroupFunc) err := zipper.Run() log.Println(stats) return err -- cgit v1.2.3 From 5701de3937ae3b4f210be957830e623afce83adc Mon Sep 17 00:00:00 2001 From: Martin Czygan Date: Tue, 6 Jul 2021 01:24:51 +0200 Subject: reduce: remove log line --- skate/reduce.go | 1 - 1 file changed, 1 deletion(-) diff --git a/skate/reduce.go b/skate/reduce.go index 23c5cc8..5b30fdf 100644 --- a/skate/reduce.go +++ b/skate/reduce.go @@ -431,7 +431,6 @@ func deduplicateBrefs(brefs []*BiblioRef) []*BiblioRef { seen.Add(v.Key) } brefs = brefs[:i] - log.Printf("trimmed brefs from %d to %d", len(brefs), i) return brefs } -- cgit v1.2.3 From 15790f0be8af9b9cfe79d5de338d39cd164188c1 Mon Sep 17 00:00:00 2001 From: Martin Czygan Date: Tue, 6 Jul 2021 01:25:45 +0200 Subject: util: cleanup encoder --- skate/xio/util.go | 19 ------------------- 1 file changed, 19 deletions(-) diff --git a/skate/xio/util.go b/skate/xio/util.go index 9f6cc26..49f38a3 100644 --- a/skate/xio/util.go +++ b/skate/xio/util.go @@ -9,25 +9,6 @@ import ( "sync" ) -type Encoder interface { - Encode(interface{}) error -} - -type SafeEncoder struct { - sync.Mutex - enc Encoder -} - -func NewSafeEncoder(enc Encoder) *SafeEncoder { - return &SafeEncoder{enc: enc} -} - -func (s *SafeEncoder) Encode(v interface{}) error { - s.Lock() - defer s.Unlock() - return s.enc.Encode(v) -} - // SingleWriter makes any writer thread safe. type SingleWriter struct { sync.Mutex -- cgit v1.2.3 From 024fd2432aacef1f2a0be634575de4d2355fcd9c Mon Sep 17 00:00:00 2001 From: Martin Czygan Date: Tue, 6 Jul 2021 23:45:48 +0200 Subject: run a parity derivation --- python/refcat/tasks.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/python/refcat/tasks.py b/python/refcat/tasks.py index 01f879b..ab682a0 100644 --- a/python/refcat/tasks.py +++ b/python/refcat/tasks.py @@ -244,9 +244,9 @@ class Refcat(BaseTask): A base tasks for all refcat related tasks. """ BASE = settings.BASE - TAG = '2021-07-01' + TAG = '2021-07-06' - date = luigi.DateParameter(default=datetime.date(2021, 7, 1), + date = luigi.DateParameter(default=datetime.date(2021, 7, 6), description="a versioning help, will be part of filename, change this manually") tmpdir = luigi.Parameter(default=settings.TMPDIR, description="set tempdir", significant=False) n = luigi.IntParameter(default=multiprocessing.cpu_count(), significant=False) -- cgit v1.2.3 From d2e8720b512d2bf4a22ab7c15b71b7eda10024c6 Mon Sep 17 00:00:00 2001 From: Martin Czygan Date: Tue, 6 Jul 2021 23:51:33 +0200 Subject: do not compress sort tmp files --- python/refcat/tasks.py | 42 +++++++++++++++++++++--------------------- 1 file changed, 21 insertions(+), 21 deletions(-) diff --git a/python/refcat/tasks.py b/python/refcat/tasks.py index ab682a0..4ae21fe 100644 --- a/python/refcat/tasks.py +++ b/python/refcat/tasks.py @@ -457,7 +457,7 @@ class URLTabs(Refcat): output = shellout(""" zstdcat -T0 {input} | skate-map -m ru -skip-on-empty 3 | - LC_ALL=C sort -T {tmpdir} -k3,3 -S25% --compress-program=zstd | + LC_ALL=C sort -T {tmpdir} -k3,3 -S25% | zstd -T0 -c > {output} """, n=self.n, @@ -480,7 +480,7 @@ class URLTabsCleaned(Refcat): output = shellout(""" zstdcat -T0 {input} | skate-cleanup -c url -allow http,https -X -B -S -f 3 | - LC_ALL=C sort -T {tmpdir} -k3,3 -S25% --compress-program=zstd | + LC_ALL=C sort -T {tmpdir} -k3,3 -S25% | zstd -T0 -c > {output} """, n=self.n, @@ -504,7 +504,7 @@ class URLList(Refcat): zstdcat -T0 {input} | cut -f 3 | skate-cleanup -X -c url -B -S -f 1 | - LC_ALL=C sort -u -T {tmpdir} -k1,1 -S25% --compress-program=zstd | + LC_ALL=C sort -u -T {tmpdir} -k1,1 -S25% | LC_ALL=C grep -E '^https?://' | zstd -T0 -c > {output} """, @@ -535,7 +535,7 @@ class RefsDOI(Refcat): zstdcat -T0 {input} | skate-map -m ff -x biblio.doi -skip-on-empty 1 | skate-cleanup -S -c doi -f 1 | - LC_ALL=C sort -T {tmpdir} -k1,1 -S25% --compress-program=zstd | + LC_ALL=C sort -T {tmpdir} -k1,1 -S25% | zstd -T0 -c > {output} """, n=self.n, @@ -559,7 +559,7 @@ class RefsPMID(Refcat): output = shellout(""" zstdcat -T0 {input} | skate-map -m ff -x biblio.pmid -skip-on-empty 1 | - LC_ALL=C sort -T {tmpdir} -k1,1 -S25% --compress-program=zstd | + LC_ALL=C sort -T {tmpdir} -k1,1 -S25% | zstd -T0 -c > {output} """, n=self.n, @@ -583,7 +583,7 @@ class RefsPMCID(Refcat): output = shellout(""" zstdcat -T0 {input} | skate-map -m ff -x biblio.pmcid -skip-on-empty 1 | - LC_ALL=C sort -T {tmpdir} -k1,1 -S25% --compress-program=zstd | + LC_ALL=C sort -T {tmpdir} -k1,1 -S25% | zstd -T0 -c > {output} """, n=self.n, @@ -606,7 +606,7 @@ class RefsArxiv(Refcat): output = shellout(""" zstdcat -T0 {input} | skate-map -m ff -x biblio.arxiv_id -skip-on-empty 1 | - LC_ALL=C sort -T {tmpdir} -k1,1 -S25% --compress-program=zstd | + LC_ALL=C sort -T {tmpdir} -k1,1 -S25% | zstd -T0 -c > {output} """, n=self.n, @@ -636,7 +636,7 @@ class FatcatDOI(Refcat): zstdcat -T0 {input} | skate-map -m ff -x ext_ids.doi -skip-on-empty 1 | skate-cleanup -S -c doi -f 1 | - LC_ALL=C sort -T {tmpdir} -k1,1 -S25% --compress-program=zstd | + LC_ALL=C sort -T {tmpdir} -k1,1 -S25% | zstd -T0 -c > {output} """, n=self.n, @@ -659,7 +659,7 @@ class FatcatPMID(Refcat): output = shellout(""" zstdcat -T0 {input} | skate-map -m ff -x ext_ids.pmid -skip-on-empty 1 | - LC_ALL=C sort -T {tmpdir} -k1,1 -S25% --compress-program=zstd | + LC_ALL=C sort -T {tmpdir} -k1,1 -S25% | zstd -T0 -c > {output} """, n=self.n, @@ -682,7 +682,7 @@ class FatcatPMCID(Refcat): output = shellout(""" zstdcat -T0 {input} | skate-map -m ff -x ext_ids.pmcid -skip-on-empty 1 | - LC_ALL=C sort -T {tmpdir} -k1,1 -S25% --compress-program=zstd | + LC_ALL=C sort -T {tmpdir} -k1,1 -S25% | zstd -T0 -c > {output} """, n=self.n, @@ -705,7 +705,7 @@ class FatcatArxiv(Refcat): output = shellout(""" zstdcat -T0 {input} | skate-map -m ff -x extra.arxiv.base_id -skip-on-empty 1 | - LC_ALL=C sort -T {tmpdir} -k1,1 -S25% --compress-program=zstd | + LC_ALL=C sort -T {tmpdir} -k1,1 -S25% | zstd -T0 -c > {output} """, n=self.n, @@ -736,7 +736,7 @@ class FatcatMapped(Refcat): output = shellout(""" zstdcat -T0 {input} | skate-map -m {mapper} -skip-on-empty 1 | - LC_ALL=C sort -T {tmpdir} -k1,1 -S25% --compress-program=zstd | + LC_ALL=C sort -T {tmpdir} -k1,1 -S25% | zstd -T0 -c > {output} """, mapper=self.mapper, @@ -782,7 +782,7 @@ class RefsMapped(Refcat): output = shellout(""" zstdcat -T0 {input} | skate-map -m {mapper} -skip-on-empty 1 | - LC_ALL=C sort -T {tmpdir} -k1,1 -S25% --compress-program=zstd | + LC_ALL=C sort -T {tmpdir} -k1,1 -S25% | zstd -T0 -c > {output} """, n=self.n, @@ -943,7 +943,7 @@ class OpenLibraryEditionsByWork(Refcat): zstdcat -T0 {input} | cut -f 5 | skate-map -skip-on-empty 1 -m ff -x 'works.0.key' | - LC_ALL=C sort -T {tmpdir} -S25% -k1,1 --compress-program=zstd | + LC_ALL=C sort -T {tmpdir} -S25% -k1,1 | zstd -T0 -c > {output} """, tmpdir=self.tmpdir, @@ -965,7 +965,7 @@ class OpenLibraryWorksSorted(Refcat): output = shellout(""" zstdcat -T0 {input} | cut -f 2,5 | - LC_ALL=C sort -T {tmpdir} -S25% -k1,1 --compress-program=zstd | + LC_ALL=C sort -T {tmpdir} -S25% -k1,1 | zstd -T0 -c > {output} """, tmpdir=self.tmpdir, @@ -1047,7 +1047,7 @@ class OpenLibraryEditionsMapped(Refcat): output = shellout(""" zstdcat -T0 {input} | skate-map -m {mapper} -skip-on-empty 1 | - LC_ALL=C sort -T {tmpdir} -k1,1 -S25% --compress-program=zstd | + LC_ALL=C sort -T {tmpdir} -k1,1 -S25% | zstd -T0 -c > {output} """, n=self.n, @@ -1102,7 +1102,7 @@ class UnmatchedMapped(Refcat): zstdcat -T0 {input} | skate-conv -f ref | skate-map -m rcns -skip-on-empty 1 | - LC_ALL=C sort -T {tmpdir} -S25% -k1,1 --compress-program=zstd | + LC_ALL=C sort -T {tmpdir} -S25% -k1,1 | zstd -T0 -c > {output} """, tmpdir=self.tmpdir, @@ -1185,7 +1185,7 @@ class OpenLibraryReleaseMapped(Refcat): output = shellout(""" zstdcat -T0 {input} | skate-map -m {mapper} -skip-on-empty 1 | - LC_ALL=C sort -T {tmpdir} -k1,1 -S25% --compress-program=zstd | + LC_ALL=C sort -T {tmpdir} -k1,1 -S25% | zstd -T0 -c > {output} """, mapper=self.mapper, @@ -1247,7 +1247,7 @@ class BrefSortedByWorkID(Refcat): output = shellout(""" zstdcat -T0 {bref} | skate-map -B -m ff -x source_work_ident | - LC_ALL=C sort -T {tmpdir} -S25% -k1,1 --compress-program=zstd | zstd -c -T0 > {output} + LC_ALL=C sort -T {tmpdir} -S25% -k1,1 | zstd -c -T0 > {output} """, tmpdir=self.tmpdir, bref=self.input().path) @@ -1271,7 +1271,7 @@ class RefsByWorkID(Refcat): output = shellout(""" zstdcat -T0 {input} | skate-map -m ff -x work_ident | - LC_ALL=C sort -T {tmpdir} -S25% -k1,1 --compress-program=zstd | + LC_ALL=C sort -T {tmpdir} -S25% -k1,1 | zstd -c -T0 > {output} """, tmpdir=self.tmpdir, @@ -1376,7 +1376,7 @@ class UnmatchedResolveJournalNamesMapped(Refcat): output = shellout(""" zstdcat -T0 {input} | skate-map -m vcns -skip-on-empty 1 | - LC_ALL=C sort -T {tmpdir} -k1,1 -S25% --compress-program=zstd | + LC_ALL=C sort -T {tmpdir} -k1,1 -S25% | zstd -T0 -c > {output} """, tmpdir=self.tmpdir, -- cgit v1.2.3 From df8d801b0b7227d24e9508cfc2474859ee584d2a Mon Sep 17 00:00:00 2001 From: Martin Czygan Date: Wed, 7 Jul 2021 21:27:43 +0200 Subject: add WikipediaDOI --- python/refcat/tasks.py | 43 +++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 43 insertions(+) diff --git a/python/refcat/tasks.py b/python/refcat/tasks.py index 4ae21fe..0db6f86 100644 --- a/python/refcat/tasks.py +++ b/python/refcat/tasks.py @@ -300,6 +300,27 @@ class WikipediaCitationsMinimalDataset(luigi.ExternalTask, Refcat): Dataset contains parquet, but we want JSON here: $ parquet-tools cat --json minimal_dataset.parquet > minimal_dataset.json + + Contains (07/2021) around 29276667 rows. + + Rough id type distribution: + + 2160819 ISBN + 1442176 DOI + 825970 PMID + 353425 ISSN + 279369 PMC + 185742 OCLC + 181375 BIBCODE + 110921 JSTOR + 47601 ARXIV + 15202 LCCN + 12878 MR + 8270 ASIN + 6293 OL + 3790 SSRN + 3013 ZBL + """ def output(self): return luigi.LocalTarget(path=os.path.join(settings.WIKIPEDIA_CITATIONS, "minimal_dataset.json")) @@ -1385,3 +1406,25 @@ class UnmatchedResolveJournalNamesMapped(Refcat): def output(self): return luigi.LocalTarget(path=self.path(ext="tsv.zst"), format=Zstd) + +# Wikipedia related tasks + +class WikipediaDOI(Refcat): + """ + Sorted DOI keys from wikipedia. + """ + def requires(self): + return WikipediaCitationsMinimalDataset() + + def run(self): + output = shellout(""" + skate-wikipedia-doi < {input} | + LC_ALL=C sort -T {tmpdir} -S 20% -k2,2 | + zstd -T0 -c > {output} + """, + tmpdir=self.tmpdir, + input=self.input().path) + luigi.LocalTarget(output).move(self.output().path) + + def output(self): + return luigi.LocalTarget(path=self.path(ext="tsv.zst"), format=Zstd) -- cgit v1.2.3 From bb656d25cdcc1c5793a69fe281be13244e60c4ec Mon Sep 17 00:00:00 2001 From: Martin Czygan Date: Wed, 7 Jul 2021 21:27:52 +0200 Subject: skate: no need for alias --- skate/cmd/skate-wikipedia-doi/main.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/skate/cmd/skate-wikipedia-doi/main.go b/skate/cmd/skate-wikipedia-doi/main.go index c4fdb1e..d9ee135 100644 --- a/skate/cmd/skate-wikipedia-doi/main.go +++ b/skate/cmd/skate-wikipedia-doi/main.go @@ -11,7 +11,7 @@ import ( "git.archive.org/martin/cgraph/skate" "git.archive.org/martin/cgraph/skate/parallel" - json "github.com/segmentio/encoding/json" + "github.com/segmentio/encoding/json" ) var ( -- cgit v1.2.3 From 9ea69942a54f1c2e13f058ba35279af3612add1b Mon Sep 17 00:00:00 2001 From: Martin Czygan Date: Wed, 7 Jul 2021 22:36:16 +0200 Subject: update docs --- skate/zipkey/batch.go | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/skate/zipkey/batch.go b/skate/zipkey/batch.go index 9e52f90..ebbd081 100644 --- a/skate/zipkey/batch.go +++ b/skate/zipkey/batch.go @@ -14,7 +14,7 @@ type Batcher struct { queue chan []*Group wg sync.WaitGroup err error - closing bool + closing bool // https://stackoverflow.com/q/16105325/89391 } // NewBatcher set ups a new Batcher. @@ -32,6 +32,8 @@ func NewBatcher(gf groupFunc) *Batcher { return &batcher } +// Close tears down the batcher. If this is not called, you get goroutine leaks +// and will miss the data from the last uncommitted batch. func (b *Batcher) Close() error { b.closing = true g := make([]*Group, len(b.batch)) @@ -43,7 +45,8 @@ func (b *Batcher) Close() error { return b.err } -// GroupFunc implement the groupFunc type. Not thread safe. +// GroupFunc implement the groupFunc type. Not thread safe. Panics if called +// after Close has been called. func (b *Batcher) GroupFunc(g *Group) error { if b.closing { panic("cannot call GroupFunc after Close") @@ -58,7 +61,9 @@ func (b *Batcher) GroupFunc(g *Group) error { return nil } -// worker will wind down after a first error encountered. +// worker will wind down after any error has been encountered. Multiple threads +// may set the error, but we currently only care whether the error is nil or +// not. func (b *Batcher) worker() { defer b.wg.Done() OUTER: -- cgit v1.2.3