From 9794996b3bda583ffd1d59b2f831518210033be3 Mon Sep 17 00:00:00 2001 From: Martin Czygan Date: Mon, 5 Jul 2021 22:20:25 +0200 Subject: use most sensible file name * we have map, so we should have reduce --- skate/reduce.go | 599 +++++++++++++++++++++++++++++++++++++++++++++++++++ skate/reduce_test.go | 451 ++++++++++++++++++++++++++++++++++++++ skate/zippy.go | 599 --------------------------------------------------- skate/zippy_test.go | 451 -------------------------------------- 4 files changed, 1050 insertions(+), 1050 deletions(-) create mode 100644 skate/reduce.go create mode 100644 skate/reduce_test.go delete mode 100644 skate/zippy.go delete mode 100644 skate/zippy_test.go diff --git a/skate/reduce.go b/skate/reduce.go new file mode 100644 index 0000000..ff836e8 --- /dev/null +++ b/skate/reduce.go @@ -0,0 +1,599 @@ +// This file contains various "reducers", e.g. merging data from two streams and +// applying a function on groups of documents with a shared key. +// +// Note: This is a bit repetitive, but we do not want to introduce any other +// abstraction for now. Since most of the logic is in the "grouper" functions, +// we could make them top level values and then assemble the zipkey runner on +// the fly. +// +// The most confusing aspect currently is the variety of schemas hidden within +// the readers (and string groups): release, ref, ref-as-release, open library, +// wikipedia, ... +// +// TODO: [ ] pass release stage through all match types +// TODO: [ ] switch to faster logging, e.g. zerolog, https://github.com/rs/zerolog#benchmarks +package skate + +import ( + "fmt" + "io" + "log" + "sort" + "strings" + "sync" + "time" + + "git.archive.org/martin/cgraph/skate/set" + "git.archive.org/martin/cgraph/skate/zipkey" + json "github.com/segmentio/encoding/json" +) + +var brefPool = sync.Pool{ + New: func() interface{} { + var bref BiblioRef + return bref + }, +} + +// groupLogf logs a message alongsize a serialized group for debugging. +func groupLogf(g *zipkey.Group, s string, vs ...interface{}) { + log.Printf(s, vs...) + b, _ := json.MarshalIndent(g, "", " ") + log.Println(string(b)) +} + +// ZippyExact takes a release and refs reader (key, doc) and assigns a fixed +// match result, e.g. for doi matches. +func ZippyExact(releases, refs io.Reader, matchResult MatchResult, w io.Writer) error { + var ( + enc = json.NewEncoder(w) + keyer = makeKeyFunc("\t", 1) + i = 0 + bref BiblioRef + grouper = func(g *zipkey.Group) error { + i++ + if i%10000 == 0 { + log.Printf("processed %v groups", i) + } + var ( + target *Release + ref *Ref + err error + ) + if len(g.G0) == 0 || len(g.G1) == 0 { + return nil + } + if target, err = parseRelease(Cut(g.G0[0], 2)); err != nil { + groupLogf(g, "[skip] failed to parse release: %v", err) + return nil + } + for _, line := range g.G1 { + if ref, err = parseRef(Cut(line, 2)); err != nil { + groupLogf(g, "[skip] failed to parse ref: %v", err) + continue + } + bref = brefPool.Get().(BiblioRef) + bref.Reset() + bref.SourceReleaseIdent = ref.ReleaseIdent + bref.SourceWorkIdent = ref.WorkIdent + bref.SourceReleaseStage = ref.ReleaseStage + bref.SourceYear = fmt.Sprintf("%d", ref.ReleaseYear) + bref.RefIndex = ref.Index + 1 // we want 1-index (also helps with omitempty) + bref.RefKey = ref.Key + bref.TargetReleaseIdent = target.Ident + bref.TargetWorkIdent = target.WorkID + bref.MatchProvenance = ref.RefSource + bref.MatchStatus = matchResult.Status.Short() + bref.MatchReason = matchResult.Reason.Short() + if err := enc.Encode(bref); err != nil { + return err + } + brefPool.Put(bref) + } + return nil + } + ) + zipper := zipkey.New(releases, refs, keyer, grouper) + return zipper.Run() +} + +// ZippyExactReleases takes two release readers (key, doc) and assigns a fixed +// match result, e.g. used with release entities converted from open library snapshots. +func ZippyExactReleases(olr, releases io.Reader, matchResult MatchResult, w io.Writer) error { + var ( + enc = json.NewEncoder(w) + keyer = makeKeyFunc("\t", 1) + i = 0 + bref BiblioRef + grouper = func(g *zipkey.Group) error { + i++ + if i%10000 == 0 { + log.Printf("processed %v groups", i) + } + var ( + target, re *Release + err error + ) + if len(g.G0) == 0 || len(g.G1) == 0 { + return nil + } + if target, err = parseRelease(Cut(g.G0[0], 2)); err != nil { + groupLogf(g, "[skip] failed to parse release: %v", err) + return nil + } + for _, line := range g.G1 { + if re, err = parseRelease(Cut(line, 2)); err != nil { + groupLogf(g, "[skip] failed to parse release: %v", err) + continue + } + if target.WorkID == "" { + continue + } + bref = brefPool.Get().(BiblioRef) + bref.Reset() + bref.SourceReleaseIdent = re.Ident + bref.SourceWorkIdent = re.WorkID + bref.SourceReleaseStage = re.ReleaseStage + bref.SourceYear = fmt.Sprintf("%d", re.ReleaseYear()) + bref.RefIndex = re.Extra.Skate.Ref.Index + 1 // we want 1-index (also helps with omitempty) + bref.RefKey = re.Extra.Skate.Ref.Key + bref.TargetOpenLibraryWork = target.WorkID + bref.MatchProvenance = re.Extra.Skate.Ref.Source + bref.MatchStatus = matchResult.Status.Short() + bref.MatchReason = matchResult.Reason.Short() + if err := enc.Encode(bref); err != nil { + return err + } + brefPool.Put(bref) + } + return nil + } + ) + zipper := zipkey.New(olr, releases, keyer, grouper) + return zipper.Run() +} + +// ZippyExactWiki takes a release and wiki reader (key, doc) and assigns a +// fixed match result. +func ZippyExactWiki(releases, wiki io.Reader, mr MatchResult, w io.Writer) error { + var ( + enc = json.NewEncoder(w) + keyer = makeKeyFunc("\t", 1) + bref BiblioRef + grouper = func(g *zipkey.Group) error { + var ( + target *Release + wiki *MinimalCitations + err error + ) + if len(g.G0) == 0 || len(g.G1) == 0 { + return nil + } + if target, err = parseRelease(Cut(g.G0[0], 2)); err != nil { + return err + } + for _, line := range g.G1 { + if wiki, err = parseWiki(Cut(line, 2)); err != nil { + return err + } + bref = brefPool.Get().(BiblioRef) + bref.Reset() + bref.Key = fmt.Sprintf("%s_%s", slugifyString(wiki.PageTitle), target.Ident) // XXX: what should we use? + bref.SourceWikipediaArticle = wiki.PageTitle + bref.TargetReleaseIdent = target.Ident + bref.TargetWorkIdent = target.WorkID + bref.MatchProvenance = "wikipedia" + bref.MatchStatus = mr.Status.Short() + bref.MatchReason = mr.Reason.Short() + if err := enc.Encode(bref); err != nil { + return err + } + brefPool.Put(bref) + } + return nil + } + ) + zipper := zipkey.New(releases, wiki, keyer, grouper) + return zipper.Run() +} + +// ZippyVerifyRefs takes a release and refs (as release) reader (key, doc), run +// fuzzy verification and will emit a biblioref document, if exact or strong +// match. +func ZippyVerifyRefs(releases, refs io.Reader, w io.Writer) error { + var ( + enc = json.NewEncoder(w) + keyer = makeKeyFunc("\t", 1) + grouper = func(g *zipkey.Group) error { + var ( + re, pivot *Release + err error + ) + if len(g.G0) == 0 || len(g.G1) == 0 { + return nil + } + if pivot, err = parseRelease(Cut(g.G0[0], 2)); err != nil { + return err + } + for _, line := range g.G1 { + if re, err = parseRelease(Cut(line, 2)); err != nil { + return err + } + result := Verify(pivot, re) + switch result.Status { + case StatusExact, StatusStrong: + if result.Reason == ReasonDOI { + continue + } + br := generateBiblioRef(re, pivot, result, "fuzzy") + if err := enc.Encode(br); err != nil { + return err + } + default: + } + } + return nil + } + ) + zipper := zipkey.New(releases, refs, keyer, grouper) + return zipper.Run() +} + +// ZippyVerifyRefsOpenLibraryTable takes OL editions (as release) and refs (as +// release) and emits a match table for manual inspection. +func ZippyVerifyRefsOpenLibraryTable(olr, refs io.Reader, w io.Writer) error { + var ( + keyer = makeKeyFunc("\t", 1) + grouper = func(g *zipkey.Group) error { + var ( + re, pivot *Release + err error + ) + if len(g.G0) == 0 || len(g.G1) == 0 { + return nil + } + // We take a single edition from OL. + if pivot, err = parseRelease(Cut(g.G0[0], 2)); err != nil { + return err + } + for _, line := range g.G1 { + if re, err = parseRelease(Cut(line, 2)); err != nil { + return err + } + // The refs have a container name, but not a title, but here we + // compare against titles from open library. + re.Title = re.ContainerName + result := Verify(pivot, re) + fmt.Printf("%s\t%s\t%s\t%s\t%s\t%s\t%s\t%s\n", + result.Status.Short(), + result.Reason.Short(), + pivot.Extra.OpenLibrary.WorkID, + FindByPrefix(pivot.Extra.OpenLibrary.SourceRecords, "ia:"), + re.Ident, + CutSep(g.G0[0], "\t", 1), + pivot.Title, + re.Title) + } + return nil + } + ) + zipper := zipkey.New(olr, refs, keyer, grouper) + return zipper.Run() +} + +// ZippyVerifyRefsOpenLibrary takes OL editions (as release) and refs (as +// release) and writes biblioref. +func ZippyVerifyRefsOpenLibrary(olr, refs io.Reader, w io.Writer) error { + var ( + enc = json.NewEncoder(w) + keyer = makeKeyFunc("\t", 1) + bref BiblioRef + grouper = func(g *zipkey.Group) error { + var ( + ref, pivot *Release // ref (reference), pivot (open library) + err error + ) + if len(g.G0) == 0 || len(g.G1) == 0 { + return nil + } + // We take a single edition from OL. + if pivot, err = parseRelease(Cut(g.G0[0], 2)); err != nil { + return err + } + for _, line := range g.G1 { + if ref, err = parseRelease(Cut(line, 2)); err != nil { + return err + } + // The refs have a container name, but not a title, but here we + // compare against titles from open library. + ref.Title = ref.ContainerName + result := Verify(pivot, ref) + switch result.Status { + case StatusExact, StatusStrong: + bref = brefPool.Get().(BiblioRef) + bref.Reset() + bref.SourceReleaseIdent = ref.Ident + bref.SourceWorkIdent = ref.WorkID + bref.SourceReleaseStage = ref.ReleaseStage + bref.SourceYear = fmt.Sprintf("%d", ref.ReleaseYear()) + bref.RefIndex = ref.Extra.Skate.Ref.Index + 1 // we want 1-index (also helps with omitempty) + bref.RefKey = ref.Extra.Skate.Ref.Key + bref.TargetOpenLibraryWork = pivot.WorkID + bref.MatchProvenance = ref.Extra.Skate.Ref.Source + bref.MatchStatus = result.Status.Short() + bref.MatchReason = result.Reason.Short() + if err := enc.Encode(bref); err != nil { + return err + } + brefPool.Put(bref) + default: + } + } + return nil + } + ) + zipper := zipkey.New(olr, refs, keyer, grouper) + return zipper.Run() +} + +// ZippyBrefAugment takes all matched docs from bref and adds docs from raw +// refs, which have not been matched. It also gets rid of duplicate matches. +// Note: This operates on two streams: raw refs with about 2.5B (07/2021) and +// matches, which will be about 1B; in essence we have to iterate through about +// 3.5B records; small tweak here may be worthwhile. +// +// We can identify, which docs have been matched by checking the source ident, +// ref index and key. +// +// TODO: This needs to be completed and made fast. +func ZippyBrefAugment(bref, raw io.Reader, w io.Writer) error { + var ( + stats = statsAugment{} + enc = json.NewEncoder(w) + keyer = makeKeyFunc("\t", 1) + grouper = func(g *zipkey.Group) error { + // g.G0 contains matched docs for a given work id, g.G1 all raw + // refs, with the same work id. + + // First, iterate over all matches and sort out duplicates, e.g. + // docs that have the same source and target id. + log.Printf("group K=%s, G0=%d, G1=%d", g.Key, len(g.G0), len(g.G1)) + matched, err := uniqueMatches(CutBatch(g.G0, 2), &stats) + if err != nil { + return err + } + var refs = make([]*Ref, len(g.G1)) + for i := 0; i < len(refs); i++ { + var ( + data []byte = []byte(Cut(g.G1[i], 2)) + ref Ref + ) + if err := json.Unmarshal(data, &ref); err != nil { + return err + } + refs[i] = &ref + } + // TODO: this slows down this process; be a bit smarter about slices. + matched = matchedRefsExtend(matched, refs, &stats) + // At this point, we may have duplicates by "_id", e.g. source + // release ident and ref index (example: + // 4kg2dejsgzaf3cszs2lt5hz4by_9, which appears three times, one + // exact match, and twice unmatched). + matched = deduplicateBrefs(matched) + matched = removeSelfLinks(matched) + for _, bref := range matched { + stats.total++ + if err := enc.Encode(bref); err != nil { + return err + } + } + return nil + } + ) + zipper := zipkey.New(bref, raw, keyer, grouper) + err := zipper.Run() + log.Println(stats) + return err +} + +// removeSelfLinks removes self-referential links. TODO: Those should be caught +// at the root cause. +func removeSelfLinks(brefs []*BiblioRef) (result []*BiblioRef) { + var i int + for _, bref := range brefs { + if bref.SourceReleaseIdent == bref.TargetReleaseIdent { + continue + } + brefs[i] = bref + i++ + } + brefs = brefs[:i] + return brefs +} + +// deduplicateBrefs deduplicates by the document id (for elasticsearch), which +// may help filter out some duplicates but not all. +func deduplicateBrefs(brefs []*BiblioRef) []*BiblioRef { + // Sort by match status, exact first, unmatched last. + sort.Slice(brefs, func(i, j int) bool { + switch { + case brefs[i].MatchStatus == StatusExact.Short(): + return true + case brefs[i].MatchStatus == StatusStrong.Short(): + return true + case brefs[i].MatchStatus == StatusWeak.Short(): + return false + case brefs[i].MatchStatus == StatusAmbiguous.Short(): + return false + case brefs[i].MatchStatus != StatusUnmatched.Short(): + return true + default: + return false + } + }) + var ( + seen = set.New() + i int + ) + for _, v := range brefs { + if seen.Contains(v.Key) { + continue + } + brefs[i] = v + i++ + seen.Add(v.Key) + } + brefs = brefs[:i] + log.Printf("trimmed brefs from %d to %d", len(brefs), i) + return brefs +} + +// matchedRefsExtend takes a set of (unique) biblioref docs and will emit that +// set of biblioref docs (unchanged) plus raw references as biblioref, which +// did not result in a match (determined by e.g. ref key and index). XXX: We +// may have duplicate refs as well - how to distinguish them? +func matchedRefsExtend(matched []*BiblioRef, refs []*Ref, stats *statsAugment) []*BiblioRef { + seen := set.New() // store "key + index" of matched items + for _, m := range matched { + s := m.RefKey + fmt.Sprintf("%d", m.RefIndex) + seen.Add(s) + } + for _, r := range refs { + s := r.Key + fmt.Sprintf("%d", r.Index) + if seen.Contains(s) { + stats.skipMatchedRef++ + log.Printf("skip-matched-ref [%d]: from %d matches; ident=%v, title=%s, key=%v, index=%d", + stats.skipMatchedRef, len(matched), r.ReleaseIdent, r.Biblio.Title, r.Key, r.Index) + continue + } + var bref BiblioRef + bref.IndexedTs = time.Now().UTC().Format(time.RFC3339) + bref.Key = fmt.Sprintf("%s_%d", r.ReleaseIdent, r.Index) + bref.RefIndex = r.Index + bref.RefKey = r.Key + bref.SourceReleaseIdent = r.ReleaseIdent + bref.SourceReleaseStage = r.ReleaseStage + bref.SourceWorkIdent = r.WorkIdent + bref.SourceYear = fmt.Sprintf("%d", r.ReleaseYear) + bref.TargetUnstructured = r.Biblio.Unstructured + // Reuse fields for debugging, for now. + bref.MatchStatus = StatusUnmatched.Short() + bref.MatchReason = ReasonUnknown.Short() + matched = append(matched, &bref) + } + return matched +} + +// uniqueMatches takes a list of bref docs (unserialized) and will return a +// list of deserialized bref docs, containing unique matches only (e.g. filter +// out duplicate matches, e.g. from exact and fuzzy). We are including +// "skate-bref-id" post-processing here as well (but there is surely a better +// place for that). +func uniqueMatches(docs []string, stats *statsAugment) (result []*BiblioRef, err error) { + var brefs []*BiblioRef + for _, doc := range docs { + var bref BiblioRef + if err := json.Unmarshal([]byte(doc), &bref); err != nil { + return nil, err + } + // On-the-fly add elasticsearch "_id" and indexed timestamp, if not already set. + if bref.Key == "" && bref.SourceReleaseIdent != "" { + bref.Key = fmt.Sprintf("%s_%d", bref.SourceReleaseIdent, bref.RefIndex) + bref.IndexedTs = time.Now().UTC().Format(time.RFC3339) + } + brefs = append(brefs, &bref) + } + // Make sure exact matches come first. XXX: bug? + sort.Slice(brefs, func(i, j int) bool { + return brefs[i].MatchStatus == StatusExact.Short() + }) + seen := set.New() + for _, doc := range brefs { + h := doc.LinkHash() + if seen.Contains(h) { + stats.skipDuplicatedBref++ + log.Printf("skip-dup-bref [%d]: hash=%v source=%v status=%v reason=%v", + stats.skipDuplicatedBref, h, doc.SourceReleaseIdent, doc.MatchStatus, doc.MatchReason) + continue + } + seen.Add(h) + result = append(result, doc) + } + return result, nil +} + +type statsAugment struct { + skipDuplicatedBref int64 + skipMatchedRef int64 + total int64 +} + +func (s statsAugment) String() string { + return fmt.Sprintf("total=%d, skipMatchedRef=%d, skipDuplicatedBref=%d", + s.total, s.skipMatchedRef, s.skipDuplicatedBref) +} + +// CutBatch runs Cut over a list of lines. +func CutBatch(lines []string, column int) (result []string) { + for _, line := range lines { + result = append(result, Cut(line, column)) + } + return result +} + +// Cut returns a specific column (1-indexed) from a line, returns empty string +// if column is invalid. +func Cut(line string, column int) string { + return CutSep(line, "\t", column) +} + +// CutSep allows to specify a separator, column is 1-indexed. +func CutSep(line, sep string, column int) string { + parts := strings.Split(strings.TrimSpace(line), sep) + if len(parts) < column { + return "" + } else { + return parts[column-1] + } +} + +// FindByPrefix return the first element for a slice of strings, which matches a prefix. +func FindByPrefix(ss []string, prefix string) string { + for _, s := range ss { + if strings.HasPrefix(s, prefix) { + return s + } + } + return "" +} + +// makeKeyFunc creates a function that can be used as keyFunc, selecting a +// column from fields separated by sep; column is 1-indexed. +func makeKeyFunc(sep string, column int) func(string) (string, error) { + return func(s string) (string, error) { + if k := CutSep(s, sep, column); k != "" { + return k, nil + } + return "", fmt.Errorf("cannot get key from column %d in line (len=%d): %s", column, len(s), s) + } +} + +func parseRelease(s string) (r *Release, err error) { + err = json.Unmarshal([]byte(s), &r) + return +} + +func parseRef(s string) (r *Ref, err error) { + err = json.Unmarshal([]byte(s), &r) + return +} + +func parseWiki(s string) (r *MinimalCitations, err error) { + err = json.Unmarshal([]byte(s), &r) + return +} + +func parseBiblioref(s string) (r *BiblioRef, err error) { + err = json.Unmarshal([]byte(s), &r) + return +} diff --git a/skate/reduce_test.go b/skate/reduce_test.go new file mode 100644 index 0000000..501d8cd --- /dev/null +++ b/skate/reduce_test.go @@ -0,0 +1,451 @@ +package skate + +import ( + "bytes" + "io/ioutil" + "reflect" + "testing" + + "git.archive.org/martin/cgraph/skate/atomic" + "git.archive.org/martin/cgraph/skate/xio" + "github.com/kr/pretty" +) + +func TestLineColumn(t *testing.T) { + var cases = []struct { + line string + sep string + column int + result string + }{ + {"", "", 2, ""}, + {"1 2 3", " ", 1, "1"}, + {"1 2 3", " ", 2, "2"}, + {"1 2 3", " ", 3, "3"}, + {"1 2 3", " ", 4, ""}, + {"1 2 3", "\t", 1, "1 2 3"}, + } + for _, c := range cases { + result := CutSep(c.line, c.sep, c.column) + if result != c.result { + t.Fatalf("got %v, want %v", result, c.result) + } + } +} + +func TestCutBatch(t *testing.T) { + var cases = []struct { + lines []string + column int + result []string + }{ + { + []string{}, + 1, + nil, + }, + { + []string{}, + 9, + nil, + }, + { + []string{"1\t2\n", "3\t4\n"}, + 2, + []string{"2", "4"}, + }, + } + for _, c := range cases { + result := CutBatch(c.lines, c.column) + if !reflect.DeepEqual(result, c.result) { + t.Fatalf("got %v (%d), want %v (%d)", + result, len(result), c.result, len(c.result)) + } + } +} + +func TestUniqueMatches(t *testing.T) { + var cases = []struct { + about string + docs []string + result []*BiblioRef + err error + }{ + { + about: "missing fields are ignored", + docs: []string{`{}`}, + result: []*BiblioRef{&BiblioRef{}}, + err: nil, + }, + { + about: "a single doc is passed on", + docs: []string{`{ + "_id": "s1_0", + "source_release_ident": "s1", + "target_release_ident": "t1"}`}, + result: []*BiblioRef{&BiblioRef{ + Key: "s1_0", + SourceReleaseIdent: "s1", + TargetReleaseIdent: "t1", + }}, + err: nil, + }, + { + about: "we want to keep the exact match, if available", + docs: []string{` + {"_id": "s1_0", + "source_release_ident": "s1", + "target_release_ident": "t1", + "match_status": "fuzzy"}`, + `{"_id": "s1_1", + "source_release_ident": "s1", + "target_release_ident": "t1", + "match_status": "exact"}`, + }, + result: []*BiblioRef{&BiblioRef{ + Key: "s1_1", + SourceReleaseIdent: "s1", + TargetReleaseIdent: "t1", + MatchStatus: "exact", + }}, + err: nil, + }, + { + about: "if both are exact, we just take (any) one", + docs: []string{` + {"_id": "s1_0", + "source_release_ident": "s1", + "target_release_ident": "t1", + "match_status": "exact", + "match_reason": "a"}`, + `{"_id": "s1_1", + "source_release_ident": "s1", + "target_release_ident": "t1", + "match_status": "exact", + "match_reason": "b"}`, + }, + result: []*BiblioRef{&BiblioRef{ + Key: "s1_1", + SourceReleaseIdent: "s1", + TargetReleaseIdent: "t1", + MatchStatus: "exact", + MatchReason: "b", + }}, + err: nil, + }, + { + about: "regression; a buggy sort?", + docs: []string{` + {"_id": "s1_0", + "source_release_ident": "s1", + "target_release_ident": "t1", + "match_status": "exact", + "match_reason": "a"}`, + `{"_id": "s1_1", + "source_release_ident": "s1", + "target_release_ident": "t1", + "match_status": "fuzzy", + "match_reason": "b"}`, + }, + result: []*BiblioRef{&BiblioRef{ + Key: "s1_0", + SourceReleaseIdent: "s1", + TargetReleaseIdent: "t1", + MatchStatus: "exact", + MatchReason: "a", + }}, + err: nil, + }, + } + for _, c := range cases { + result, err := uniqueMatches(c.docs, &statsAugment{}) + if err != c.err { + t.Fatalf("got %v, want %v (%s)", err, c.err, c.about) + } + if !reflect.DeepEqual(result, c.result) { + t.Fatalf("got %#v, want %#v (%s)", + pretty.Sprint(result), + pretty.Sprint(c.result), c.about) + } + } +} + +func TestMatchedRefsExtend(t *testing.T) { + var cases = []struct { + matched []*BiblioRef + refs []*Ref + result []*BiblioRef + }{ + { + matched: []*BiblioRef{}, + refs: []*Ref{}, + result: []*BiblioRef{}, + }, + { + matched: []*BiblioRef{ + &BiblioRef{ + RefIndex: 2, + RefKey: "K2", + }, + }, + refs: []*Ref{}, + result: []*BiblioRef{ + &BiblioRef{ + RefIndex: 2, + RefKey: "K2", + }, + }, + }, + { + matched: []*BiblioRef{ + &BiblioRef{ + SourceReleaseIdent: "pud5shsflfgrth77lmlernavjm", + RefIndex: 2, + RefKey: "K2", + }, + }, + refs: []*Ref{ + &Ref{ + ReleaseIdent: "0000", + Biblio: Biblio{ + Title: "Title", + }, + Index: 3, + Key: "K3", + }, + }, + result: []*BiblioRef{ + &BiblioRef{ + SourceReleaseIdent: "pud5shsflfgrth77lmlernavjm", + RefIndex: 2, + RefKey: "K2", + }, + &BiblioRef{ + Key: "0000_3", + SourceReleaseIdent: "0000", + RefIndex: 3, + RefKey: "K3", + MatchStatus: StatusUnmatched.Short(), + MatchReason: ReasonUnknown.Short(), + SourceYear: "0", + }, + }, + }, + { + matched: []*BiblioRef{ + &BiblioRef{ + SourceReleaseIdent: "pud5shsflfgrth77lmlernavjm", + RefIndex: 2, + RefKey: "K2", + }, + }, + refs: []*Ref{ + &Ref{ + ReleaseIdent: "0000", + Biblio: Biblio{ + Title: "Title", + }, + Index: 2, + Key: "K2", + }, + }, + result: []*BiblioRef{ + &BiblioRef{ + SourceReleaseIdent: "pud5shsflfgrth77lmlernavjm", + RefIndex: 2, + RefKey: "K2", + }, + }, + }, + } + for i, c := range cases { + result := matchedRefsExtend(c.matched, c.refs, &statsAugment{}) + for _, v := range result { + v.IndexedTs = "" // we do not want to mock out time, now + } + if !reflect.DeepEqual(result, c.result) { + t.Fatalf("[%d]: got %v, want %v (%v)", + i+1, result, c.result, pretty.Diff(result, c.result)) + } + } +} + +func TestRemoveSelfLinks(t *testing.T) { + var cases = []struct { + brefs []*BiblioRef + result []*BiblioRef + }{ + { + brefs: nil, + result: nil, + }, + { + brefs: []*BiblioRef{}, + result: []*BiblioRef{}, + }, + { + brefs: []*BiblioRef{ + &BiblioRef{SourceReleaseIdent: "a", TargetReleaseIdent: "a"}, + &BiblioRef{SourceReleaseIdent: "a", TargetReleaseIdent: "b"}, + }, + result: []*BiblioRef{ + &BiblioRef{SourceReleaseIdent: "a", TargetReleaseIdent: "b"}, + }, + }, + { + brefs: []*BiblioRef{ + &BiblioRef{SourceReleaseIdent: "a", TargetReleaseIdent: "a"}, + &BiblioRef{SourceReleaseIdent: "a", TargetReleaseIdent: "a"}, + &BiblioRef{SourceReleaseIdent: "a", TargetReleaseIdent: "a"}, + &BiblioRef{SourceReleaseIdent: "a", TargetReleaseIdent: "b"}, + }, + result: []*BiblioRef{ + &BiblioRef{SourceReleaseIdent: "a", TargetReleaseIdent: "b"}, + }, + }, + } + for i, c := range cases { + result := removeSelfLinks(c.brefs) + if !reflect.DeepEqual(result, c.result) { + t.Fatalf("[%d]: got %v, want %v (%v)", + i, result, c.result, pretty.Diff(result, c.result)) + } + } +} + +func TestDeduplicateBrefs(t *testing.T) { + var cases = []struct { + brefs []*BiblioRef + result []*BiblioRef + }{ + { + brefs: nil, + result: nil, + }, + { + brefs: []*BiblioRef{}, + result: []*BiblioRef{}, + }, + { + brefs: []*BiblioRef{ + &BiblioRef{Key: "123", MatchStatus: StatusStrong.Short()}, + &BiblioRef{Key: "123", MatchStatus: StatusExact.Short()}, + }, + result: []*BiblioRef{ + &BiblioRef{Key: "123", MatchStatus: StatusExact.Short()}, + }, + }, + { + brefs: []*BiblioRef{ + &BiblioRef{Key: "123", MatchStatus: StatusStrong.Short()}, + &BiblioRef{Key: "123", MatchStatus: StatusUnmatched.Short()}, + }, + result: []*BiblioRef{ + &BiblioRef{Key: "123", MatchStatus: StatusStrong.Short()}, + }, + }, + { + brefs: []*BiblioRef{ + &BiblioRef{Key: "123", MatchStatus: StatusStrong.Short()}, + &BiblioRef{Key: "123", MatchStatus: StatusWeak.Short()}, + }, + result: []*BiblioRef{ + &BiblioRef{Key: "123", MatchStatus: StatusStrong.Short()}, + }, + }, + { + brefs: []*BiblioRef{ + &BiblioRef{Key: "123", MatchStatus: StatusStrong.Short()}, + &BiblioRef{Key: "123", MatchStatus: StatusAmbiguous.Short()}, + }, + result: []*BiblioRef{ + &BiblioRef{Key: "123", MatchStatus: StatusStrong.Short()}, + }, + }, + } + for i, c := range cases { + result := deduplicateBrefs(c.brefs) + if !reflect.DeepEqual(result, c.result) { + t.Fatalf("[%d]: got %v, want %v (%v)", + i, result, c.result, pretty.Diff(result, c.result)) + } + } +} + +func TestZippyExact(t *testing.T) { + var cases = []struct { + a, b, out string + err error + }{ + { + a: "testdata/zippy/cE00a.json", + b: "testdata/zippy/cE00b.json", + out: "testdata/zippy/cE00r.json", + err: nil, + }, + { + a: "testdata/zippy/cE01a.json", + b: "testdata/zippy/cE01b.json", + out: "testdata/zippy/cE01r.json", + err: nil, + }, + { + a: "testdata/zippy/cE02a.json", + b: "testdata/zippy/cE02b.json", + out: "testdata/zippy/cE02r.json", + err: nil, + }, + } + for i, c := range cases { + a, b, err := xio.OpenTwo(c.a, c.b) + if err != nil { + t.Errorf("failed to open test files: %v, %v", c.a, c.b) + } + var ( + buf bytes.Buffer + matchResult = MatchResult{Status: StatusExact, Reason: ReasonDOI} + ) + err = ZippyExact(a, b, matchResult, &buf) + if err != c.err { + t.Errorf("[%d] got %v, want %v", i, err, c.err) + } + ok, err := equalsFilename(&buf, c.out) + if err != nil { + t.Errorf("failed to open test file: %v", c.out) + } + if !ok { + filename, err := tempWriteFile(&buf) + if err != nil { + t.Logf("could not write temp file") + } + t.Errorf("[%d] output mismatch (buffer length=%d, content=%v), want %v", i, buf.Len(), filename, c.out) + } + } +} + +// equalsFilename returns true, if the contents of a given buffer matches the +// contents of a file given by filename. +func equalsFilename(buf *bytes.Buffer, filename string) (bool, error) { + b, err := ioutil.ReadFile(filename) + if err != nil { + return false, err + } + bb := buf.Bytes() + if len(bb) == 0 && len(b) == 0 { + return true, nil + } + return reflect.DeepEqual(b, bb), nil +} + +// tempWriteFile writes the content of a buffer to a temporary file and returns +// its path. +func tempWriteFile(buf *bytes.Buffer) (string, error) { + f, err := ioutil.TempFile("", "skate-test-*") + if err != nil { + return "", err + } + if err = atomic.WriteFile(f.Name(), buf.Bytes(), 0755); err != nil { + return "", err + } + return f.Name(), nil +} diff --git a/skate/zippy.go b/skate/zippy.go deleted file mode 100644 index ff836e8..0000000 --- a/skate/zippy.go +++ /dev/null @@ -1,599 +0,0 @@ -// This file contains various "reducers", e.g. merging data from two streams and -// applying a function on groups of documents with a shared key. -// -// Note: This is a bit repetitive, but we do not want to introduce any other -// abstraction for now. Since most of the logic is in the "grouper" functions, -// we could make them top level values and then assemble the zipkey runner on -// the fly. -// -// The most confusing aspect currently is the variety of schemas hidden within -// the readers (and string groups): release, ref, ref-as-release, open library, -// wikipedia, ... -// -// TODO: [ ] pass release stage through all match types -// TODO: [ ] switch to faster logging, e.g. zerolog, https://github.com/rs/zerolog#benchmarks -package skate - -import ( - "fmt" - "io" - "log" - "sort" - "strings" - "sync" - "time" - - "git.archive.org/martin/cgraph/skate/set" - "git.archive.org/martin/cgraph/skate/zipkey" - json "github.com/segmentio/encoding/json" -) - -var brefPool = sync.Pool{ - New: func() interface{} { - var bref BiblioRef - return bref - }, -} - -// groupLogf logs a message alongsize a serialized group for debugging. -func groupLogf(g *zipkey.Group, s string, vs ...interface{}) { - log.Printf(s, vs...) - b, _ := json.MarshalIndent(g, "", " ") - log.Println(string(b)) -} - -// ZippyExact takes a release and refs reader (key, doc) and assigns a fixed -// match result, e.g. for doi matches. -func ZippyExact(releases, refs io.Reader, matchResult MatchResult, w io.Writer) error { - var ( - enc = json.NewEncoder(w) - keyer = makeKeyFunc("\t", 1) - i = 0 - bref BiblioRef - grouper = func(g *zipkey.Group) error { - i++ - if i%10000 == 0 { - log.Printf("processed %v groups", i) - } - var ( - target *Release - ref *Ref - err error - ) - if len(g.G0) == 0 || len(g.G1) == 0 { - return nil - } - if target, err = parseRelease(Cut(g.G0[0], 2)); err != nil { - groupLogf(g, "[skip] failed to parse release: %v", err) - return nil - } - for _, line := range g.G1 { - if ref, err = parseRef(Cut(line, 2)); err != nil { - groupLogf(g, "[skip] failed to parse ref: %v", err) - continue - } - bref = brefPool.Get().(BiblioRef) - bref.Reset() - bref.SourceReleaseIdent = ref.ReleaseIdent - bref.SourceWorkIdent = ref.WorkIdent - bref.SourceReleaseStage = ref.ReleaseStage - bref.SourceYear = fmt.Sprintf("%d", ref.ReleaseYear) - bref.RefIndex = ref.Index + 1 // we want 1-index (also helps with omitempty) - bref.RefKey = ref.Key - bref.TargetReleaseIdent = target.Ident - bref.TargetWorkIdent = target.WorkID - bref.MatchProvenance = ref.RefSource - bref.MatchStatus = matchResult.Status.Short() - bref.MatchReason = matchResult.Reason.Short() - if err := enc.Encode(bref); err != nil { - return err - } - brefPool.Put(bref) - } - return nil - } - ) - zipper := zipkey.New(releases, refs, keyer, grouper) - return zipper.Run() -} - -// ZippyExactReleases takes two release readers (key, doc) and assigns a fixed -// match result, e.g. used with release entities converted from open library snapshots. -func ZippyExactReleases(olr, releases io.Reader, matchResult MatchResult, w io.Writer) error { - var ( - enc = json.NewEncoder(w) - keyer = makeKeyFunc("\t", 1) - i = 0 - bref BiblioRef - grouper = func(g *zipkey.Group) error { - i++ - if i%10000 == 0 { - log.Printf("processed %v groups", i) - } - var ( - target, re *Release - err error - ) - if len(g.G0) == 0 || len(g.G1) == 0 { - return nil - } - if target, err = parseRelease(Cut(g.G0[0], 2)); err != nil { - groupLogf(g, "[skip] failed to parse release: %v", err) - return nil - } - for _, line := range g.G1 { - if re, err = parseRelease(Cut(line, 2)); err != nil { - groupLogf(g, "[skip] failed to parse release: %v", err) - continue - } - if target.WorkID == "" { - continue - } - bref = brefPool.Get().(BiblioRef) - bref.Reset() - bref.SourceReleaseIdent = re.Ident - bref.SourceWorkIdent = re.WorkID - bref.SourceReleaseStage = re.ReleaseStage - bref.SourceYear = fmt.Sprintf("%d", re.ReleaseYear()) - bref.RefIndex = re.Extra.Skate.Ref.Index + 1 // we want 1-index (also helps with omitempty) - bref.RefKey = re.Extra.Skate.Ref.Key - bref.TargetOpenLibraryWork = target.WorkID - bref.MatchProvenance = re.Extra.Skate.Ref.Source - bref.MatchStatus = matchResult.Status.Short() - bref.MatchReason = matchResult.Reason.Short() - if err := enc.Encode(bref); err != nil { - return err - } - brefPool.Put(bref) - } - return nil - } - ) - zipper := zipkey.New(olr, releases, keyer, grouper) - return zipper.Run() -} - -// ZippyExactWiki takes a release and wiki reader (key, doc) and assigns a -// fixed match result. -func ZippyExactWiki(releases, wiki io.Reader, mr MatchResult, w io.Writer) error { - var ( - enc = json.NewEncoder(w) - keyer = makeKeyFunc("\t", 1) - bref BiblioRef - grouper = func(g *zipkey.Group) error { - var ( - target *Release - wiki *MinimalCitations - err error - ) - if len(g.G0) == 0 || len(g.G1) == 0 { - return nil - } - if target, err = parseRelease(Cut(g.G0[0], 2)); err != nil { - return err - } - for _, line := range g.G1 { - if wiki, err = parseWiki(Cut(line, 2)); err != nil { - return err - } - bref = brefPool.Get().(BiblioRef) - bref.Reset() - bref.Key = fmt.Sprintf("%s_%s", slugifyString(wiki.PageTitle), target.Ident) // XXX: what should we use? - bref.SourceWikipediaArticle = wiki.PageTitle - bref.TargetReleaseIdent = target.Ident - bref.TargetWorkIdent = target.WorkID - bref.MatchProvenance = "wikipedia" - bref.MatchStatus = mr.Status.Short() - bref.MatchReason = mr.Reason.Short() - if err := enc.Encode(bref); err != nil { - return err - } - brefPool.Put(bref) - } - return nil - } - ) - zipper := zipkey.New(releases, wiki, keyer, grouper) - return zipper.Run() -} - -// ZippyVerifyRefs takes a release and refs (as release) reader (key, doc), run -// fuzzy verification and will emit a biblioref document, if exact or strong -// match. -func ZippyVerifyRefs(releases, refs io.Reader, w io.Writer) error { - var ( - enc = json.NewEncoder(w) - keyer = makeKeyFunc("\t", 1) - grouper = func(g *zipkey.Group) error { - var ( - re, pivot *Release - err error - ) - if len(g.G0) == 0 || len(g.G1) == 0 { - return nil - } - if pivot, err = parseRelease(Cut(g.G0[0], 2)); err != nil { - return err - } - for _, line := range g.G1 { - if re, err = parseRelease(Cut(line, 2)); err != nil { - return err - } - result := Verify(pivot, re) - switch result.Status { - case StatusExact, StatusStrong: - if result.Reason == ReasonDOI { - continue - } - br := generateBiblioRef(re, pivot, result, "fuzzy") - if err := enc.Encode(br); err != nil { - return err - } - default: - } - } - return nil - } - ) - zipper := zipkey.New(releases, refs, keyer, grouper) - return zipper.Run() -} - -// ZippyVerifyRefsOpenLibraryTable takes OL editions (as release) and refs (as -// release) and emits a match table for manual inspection. -func ZippyVerifyRefsOpenLibraryTable(olr, refs io.Reader, w io.Writer) error { - var ( - keyer = makeKeyFunc("\t", 1) - grouper = func(g *zipkey.Group) error { - var ( - re, pivot *Release - err error - ) - if len(g.G0) == 0 || len(g.G1) == 0 { - return nil - } - // We take a single edition from OL. - if pivot, err = parseRelease(Cut(g.G0[0], 2)); err != nil { - return err - } - for _, line := range g.G1 { - if re, err = parseRelease(Cut(line, 2)); err != nil { - return err - } - // The refs have a container name, but not a title, but here we - // compare against titles from open library. - re.Title = re.ContainerName - result := Verify(pivot, re) - fmt.Printf("%s\t%s\t%s\t%s\t%s\t%s\t%s\t%s\n", - result.Status.Short(), - result.Reason.Short(), - pivot.Extra.OpenLibrary.WorkID, - FindByPrefix(pivot.Extra.OpenLibrary.SourceRecords, "ia:"), - re.Ident, - CutSep(g.G0[0], "\t", 1), - pivot.Title, - re.Title) - } - return nil - } - ) - zipper := zipkey.New(olr, refs, keyer, grouper) - return zipper.Run() -} - -// ZippyVerifyRefsOpenLibrary takes OL editions (as release) and refs (as -// release) and writes biblioref. -func ZippyVerifyRefsOpenLibrary(olr, refs io.Reader, w io.Writer) error { - var ( - enc = json.NewEncoder(w) - keyer = makeKeyFunc("\t", 1) - bref BiblioRef - grouper = func(g *zipkey.Group) error { - var ( - ref, pivot *Release // ref (reference), pivot (open library) - err error - ) - if len(g.G0) == 0 || len(g.G1) == 0 { - return nil - } - // We take a single edition from OL. - if pivot, err = parseRelease(Cut(g.G0[0], 2)); err != nil { - return err - } - for _, line := range g.G1 { - if ref, err = parseRelease(Cut(line, 2)); err != nil { - return err - } - // The refs have a container name, but not a title, but here we - // compare against titles from open library. - ref.Title = ref.ContainerName - result := Verify(pivot, ref) - switch result.Status { - case StatusExact, StatusStrong: - bref = brefPool.Get().(BiblioRef) - bref.Reset() - bref.SourceReleaseIdent = ref.Ident - bref.SourceWorkIdent = ref.WorkID - bref.SourceReleaseStage = ref.ReleaseStage - bref.SourceYear = fmt.Sprintf("%d", ref.ReleaseYear()) - bref.RefIndex = ref.Extra.Skate.Ref.Index + 1 // we want 1-index (also helps with omitempty) - bref.RefKey = ref.Extra.Skate.Ref.Key - bref.TargetOpenLibraryWork = pivot.WorkID - bref.MatchProvenance = ref.Extra.Skate.Ref.Source - bref.MatchStatus = result.Status.Short() - bref.MatchReason = result.Reason.Short() - if err := enc.Encode(bref); err != nil { - return err - } - brefPool.Put(bref) - default: - } - } - return nil - } - ) - zipper := zipkey.New(olr, refs, keyer, grouper) - return zipper.Run() -} - -// ZippyBrefAugment takes all matched docs from bref and adds docs from raw -// refs, which have not been matched. It also gets rid of duplicate matches. -// Note: This operates on two streams: raw refs with about 2.5B (07/2021) and -// matches, which will be about 1B; in essence we have to iterate through about -// 3.5B records; small tweak here may be worthwhile. -// -// We can identify, which docs have been matched by checking the source ident, -// ref index and key. -// -// TODO: This needs to be completed and made fast. -func ZippyBrefAugment(bref, raw io.Reader, w io.Writer) error { - var ( - stats = statsAugment{} - enc = json.NewEncoder(w) - keyer = makeKeyFunc("\t", 1) - grouper = func(g *zipkey.Group) error { - // g.G0 contains matched docs for a given work id, g.G1 all raw - // refs, with the same work id. - - // First, iterate over all matches and sort out duplicates, e.g. - // docs that have the same source and target id. - log.Printf("group K=%s, G0=%d, G1=%d", g.Key, len(g.G0), len(g.G1)) - matched, err := uniqueMatches(CutBatch(g.G0, 2), &stats) - if err != nil { - return err - } - var refs = make([]*Ref, len(g.G1)) - for i := 0; i < len(refs); i++ { - var ( - data []byte = []byte(Cut(g.G1[i], 2)) - ref Ref - ) - if err := json.Unmarshal(data, &ref); err != nil { - return err - } - refs[i] = &ref - } - // TODO: this slows down this process; be a bit smarter about slices. - matched = matchedRefsExtend(matched, refs, &stats) - // At this point, we may have duplicates by "_id", e.g. source - // release ident and ref index (example: - // 4kg2dejsgzaf3cszs2lt5hz4by_9, which appears three times, one - // exact match, and twice unmatched). - matched = deduplicateBrefs(matched) - matched = removeSelfLinks(matched) - for _, bref := range matched { - stats.total++ - if err := enc.Encode(bref); err != nil { - return err - } - } - return nil - } - ) - zipper := zipkey.New(bref, raw, keyer, grouper) - err := zipper.Run() - log.Println(stats) - return err -} - -// removeSelfLinks removes self-referential links. TODO: Those should be caught -// at the root cause. -func removeSelfLinks(brefs []*BiblioRef) (result []*BiblioRef) { - var i int - for _, bref := range brefs { - if bref.SourceReleaseIdent == bref.TargetReleaseIdent { - continue - } - brefs[i] = bref - i++ - } - brefs = brefs[:i] - return brefs -} - -// deduplicateBrefs deduplicates by the document id (for elasticsearch), which -// may help filter out some duplicates but not all. -func deduplicateBrefs(brefs []*BiblioRef) []*BiblioRef { - // Sort by match status, exact first, unmatched last. - sort.Slice(brefs, func(i, j int) bool { - switch { - case brefs[i].MatchStatus == StatusExact.Short(): - return true - case brefs[i].MatchStatus == StatusStrong.Short(): - return true - case brefs[i].MatchStatus == StatusWeak.Short(): - return false - case brefs[i].MatchStatus == StatusAmbiguous.Short(): - return false - case brefs[i].MatchStatus != StatusUnmatched.Short(): - return true - default: - return false - } - }) - var ( - seen = set.New() - i int - ) - for _, v := range brefs { - if seen.Contains(v.Key) { - continue - } - brefs[i] = v - i++ - seen.Add(v.Key) - } - brefs = brefs[:i] - log.Printf("trimmed brefs from %d to %d", len(brefs), i) - return brefs -} - -// matchedRefsExtend takes a set of (unique) biblioref docs and will emit that -// set of biblioref docs (unchanged) plus raw references as biblioref, which -// did not result in a match (determined by e.g. ref key and index). XXX: We -// may have duplicate refs as well - how to distinguish them? -func matchedRefsExtend(matched []*BiblioRef, refs []*Ref, stats *statsAugment) []*BiblioRef { - seen := set.New() // store "key + index" of matched items - for _, m := range matched { - s := m.RefKey + fmt.Sprintf("%d", m.RefIndex) - seen.Add(s) - } - for _, r := range refs { - s := r.Key + fmt.Sprintf("%d", r.Index) - if seen.Contains(s) { - stats.skipMatchedRef++ - log.Printf("skip-matched-ref [%d]: from %d matches; ident=%v, title=%s, key=%v, index=%d", - stats.skipMatchedRef, len(matched), r.ReleaseIdent, r.Biblio.Title, r.Key, r.Index) - continue - } - var bref BiblioRef - bref.IndexedTs = time.Now().UTC().Format(time.RFC3339) - bref.Key = fmt.Sprintf("%s_%d", r.ReleaseIdent, r.Index) - bref.RefIndex = r.Index - bref.RefKey = r.Key - bref.SourceReleaseIdent = r.ReleaseIdent - bref.SourceReleaseStage = r.ReleaseStage - bref.SourceWorkIdent = r.WorkIdent - bref.SourceYear = fmt.Sprintf("%d", r.ReleaseYear) - bref.TargetUnstructured = r.Biblio.Unstructured - // Reuse fields for debugging, for now. - bref.MatchStatus = StatusUnmatched.Short() - bref.MatchReason = ReasonUnknown.Short() - matched = append(matched, &bref) - } - return matched -} - -// uniqueMatches takes a list of bref docs (unserialized) and will return a -// list of deserialized bref docs, containing unique matches only (e.g. filter -// out duplicate matches, e.g. from exact and fuzzy). We are including -// "skate-bref-id" post-processing here as well (but there is surely a better -// place for that). -func uniqueMatches(docs []string, stats *statsAugment) (result []*BiblioRef, err error) { - var brefs []*BiblioRef - for _, doc := range docs { - var bref BiblioRef - if err := json.Unmarshal([]byte(doc), &bref); err != nil { - return nil, err - } - // On-the-fly add elasticsearch "_id" and indexed timestamp, if not already set. - if bref.Key == "" && bref.SourceReleaseIdent != "" { - bref.Key = fmt.Sprintf("%s_%d", bref.SourceReleaseIdent, bref.RefIndex) - bref.IndexedTs = time.Now().UTC().Format(time.RFC3339) - } - brefs = append(brefs, &bref) - } - // Make sure exact matches come first. XXX: bug? - sort.Slice(brefs, func(i, j int) bool { - return brefs[i].MatchStatus == StatusExact.Short() - }) - seen := set.New() - for _, doc := range brefs { - h := doc.LinkHash() - if seen.Contains(h) { - stats.skipDuplicatedBref++ - log.Printf("skip-dup-bref [%d]: hash=%v source=%v status=%v reason=%v", - stats.skipDuplicatedBref, h, doc.SourceReleaseIdent, doc.MatchStatus, doc.MatchReason) - continue - } - seen.Add(h) - result = append(result, doc) - } - return result, nil -} - -type statsAugment struct { - skipDuplicatedBref int64 - skipMatchedRef int64 - total int64 -} - -func (s statsAugment) String() string { - return fmt.Sprintf("total=%d, skipMatchedRef=%d, skipDuplicatedBref=%d", - s.total, s.skipMatchedRef, s.skipDuplicatedBref) -} - -// CutBatch runs Cut over a list of lines. -func CutBatch(lines []string, column int) (result []string) { - for _, line := range lines { - result = append(result, Cut(line, column)) - } - return result -} - -// Cut returns a specific column (1-indexed) from a line, returns empty string -// if column is invalid. -func Cut(line string, column int) string { - return CutSep(line, "\t", column) -} - -// CutSep allows to specify a separator, column is 1-indexed. -func CutSep(line, sep string, column int) string { - parts := strings.Split(strings.TrimSpace(line), sep) - if len(parts) < column { - return "" - } else { - return parts[column-1] - } -} - -// FindByPrefix return the first element for a slice of strings, which matches a prefix. -func FindByPrefix(ss []string, prefix string) string { - for _, s := range ss { - if strings.HasPrefix(s, prefix) { - return s - } - } - return "" -} - -// makeKeyFunc creates a function that can be used as keyFunc, selecting a -// column from fields separated by sep; column is 1-indexed. -func makeKeyFunc(sep string, column int) func(string) (string, error) { - return func(s string) (string, error) { - if k := CutSep(s, sep, column); k != "" { - return k, nil - } - return "", fmt.Errorf("cannot get key from column %d in line (len=%d): %s", column, len(s), s) - } -} - -func parseRelease(s string) (r *Release, err error) { - err = json.Unmarshal([]byte(s), &r) - return -} - -func parseRef(s string) (r *Ref, err error) { - err = json.Unmarshal([]byte(s), &r) - return -} - -func parseWiki(s string) (r *MinimalCitations, err error) { - err = json.Unmarshal([]byte(s), &r) - return -} - -func parseBiblioref(s string) (r *BiblioRef, err error) { - err = json.Unmarshal([]byte(s), &r) - return -} diff --git a/skate/zippy_test.go b/skate/zippy_test.go deleted file mode 100644 index 501d8cd..0000000 --- a/skate/zippy_test.go +++ /dev/null @@ -1,451 +0,0 @@ -package skate - -import ( - "bytes" - "io/ioutil" - "reflect" - "testing" - - "git.archive.org/martin/cgraph/skate/atomic" - "git.archive.org/martin/cgraph/skate/xio" - "github.com/kr/pretty" -) - -func TestLineColumn(t *testing.T) { - var cases = []struct { - line string - sep string - column int - result string - }{ - {"", "", 2, ""}, - {"1 2 3", " ", 1, "1"}, - {"1 2 3", " ", 2, "2"}, - {"1 2 3", " ", 3, "3"}, - {"1 2 3", " ", 4, ""}, - {"1 2 3", "\t", 1, "1 2 3"}, - } - for _, c := range cases { - result := CutSep(c.line, c.sep, c.column) - if result != c.result { - t.Fatalf("got %v, want %v", result, c.result) - } - } -} - -func TestCutBatch(t *testing.T) { - var cases = []struct { - lines []string - column int - result []string - }{ - { - []string{}, - 1, - nil, - }, - { - []string{}, - 9, - nil, - }, - { - []string{"1\t2\n", "3\t4\n"}, - 2, - []string{"2", "4"}, - }, - } - for _, c := range cases { - result := CutBatch(c.lines, c.column) - if !reflect.DeepEqual(result, c.result) { - t.Fatalf("got %v (%d), want %v (%d)", - result, len(result), c.result, len(c.result)) - } - } -} - -func TestUniqueMatches(t *testing.T) { - var cases = []struct { - about string - docs []string - result []*BiblioRef - err error - }{ - { - about: "missing fields are ignored", - docs: []string{`{}`}, - result: []*BiblioRef{&BiblioRef{}}, - err: nil, - }, - { - about: "a single doc is passed on", - docs: []string{`{ - "_id": "s1_0", - "source_release_ident": "s1", - "target_release_ident": "t1"}`}, - result: []*BiblioRef{&BiblioRef{ - Key: "s1_0", - SourceReleaseIdent: "s1", - TargetReleaseIdent: "t1", - }}, - err: nil, - }, - { - about: "we want to keep the exact match, if available", - docs: []string{` - {"_id": "s1_0", - "source_release_ident": "s1", - "target_release_ident": "t1", - "match_status": "fuzzy"}`, - `{"_id": "s1_1", - "source_release_ident": "s1", - "target_release_ident": "t1", - "match_status": "exact"}`, - }, - result: []*BiblioRef{&BiblioRef{ - Key: "s1_1", - SourceReleaseIdent: "s1", - TargetReleaseIdent: "t1", - MatchStatus: "exact", - }}, - err: nil, - }, - { - about: "if both are exact, we just take (any) one", - docs: []string{` - {"_id": "s1_0", - "source_release_ident": "s1", - "target_release_ident": "t1", - "match_status": "exact", - "match_reason": "a"}`, - `{"_id": "s1_1", - "source_release_ident": "s1", - "target_release_ident": "t1", - "match_status": "exact", - "match_reason": "b"}`, - }, - result: []*BiblioRef{&BiblioRef{ - Key: "s1_1", - SourceReleaseIdent: "s1", - TargetReleaseIdent: "t1", - MatchStatus: "exact", - MatchReason: "b", - }}, - err: nil, - }, - { - about: "regression; a buggy sort?", - docs: []string{` - {"_id": "s1_0", - "source_release_ident": "s1", - "target_release_ident": "t1", - "match_status": "exact", - "match_reason": "a"}`, - `{"_id": "s1_1", - "source_release_ident": "s1", - "target_release_ident": "t1", - "match_status": "fuzzy", - "match_reason": "b"}`, - }, - result: []*BiblioRef{&BiblioRef{ - Key: "s1_0", - SourceReleaseIdent: "s1", - TargetReleaseIdent: "t1", - MatchStatus: "exact", - MatchReason: "a", - }}, - err: nil, - }, - } - for _, c := range cases { - result, err := uniqueMatches(c.docs, &statsAugment{}) - if err != c.err { - t.Fatalf("got %v, want %v (%s)", err, c.err, c.about) - } - if !reflect.DeepEqual(result, c.result) { - t.Fatalf("got %#v, want %#v (%s)", - pretty.Sprint(result), - pretty.Sprint(c.result), c.about) - } - } -} - -func TestMatchedRefsExtend(t *testing.T) { - var cases = []struct { - matched []*BiblioRef - refs []*Ref - result []*BiblioRef - }{ - { - matched: []*BiblioRef{}, - refs: []*Ref{}, - result: []*BiblioRef{}, - }, - { - matched: []*BiblioRef{ - &BiblioRef{ - RefIndex: 2, - RefKey: "K2", - }, - }, - refs: []*Ref{}, - result: []*BiblioRef{ - &BiblioRef{ - RefIndex: 2, - RefKey: "K2", - }, - }, - }, - { - matched: []*BiblioRef{ - &BiblioRef{ - SourceReleaseIdent: "pud5shsflfgrth77lmlernavjm", - RefIndex: 2, - RefKey: "K2", - }, - }, - refs: []*Ref{ - &Ref{ - ReleaseIdent: "0000", - Biblio: Biblio{ - Title: "Title", - }, - Index: 3, - Key: "K3", - }, - }, - result: []*BiblioRef{ - &BiblioRef{ - SourceReleaseIdent: "pud5shsflfgrth77lmlernavjm", - RefIndex: 2, - RefKey: "K2", - }, - &BiblioRef{ - Key: "0000_3", - SourceReleaseIdent: "0000", - RefIndex: 3, - RefKey: "K3", - MatchStatus: StatusUnmatched.Short(), - MatchReason: ReasonUnknown.Short(), - SourceYear: "0", - }, - }, - }, - { - matched: []*BiblioRef{ - &BiblioRef{ - SourceReleaseIdent: "pud5shsflfgrth77lmlernavjm", - RefIndex: 2, - RefKey: "K2", - }, - }, - refs: []*Ref{ - &Ref{ - ReleaseIdent: "0000", - Biblio: Biblio{ - Title: "Title", - }, - Index: 2, - Key: "K2", - }, - }, - result: []*BiblioRef{ - &BiblioRef{ - SourceReleaseIdent: "pud5shsflfgrth77lmlernavjm", - RefIndex: 2, - RefKey: "K2", - }, - }, - }, - } - for i, c := range cases { - result := matchedRefsExtend(c.matched, c.refs, &statsAugment{}) - for _, v := range result { - v.IndexedTs = "" // we do not want to mock out time, now - } - if !reflect.DeepEqual(result, c.result) { - t.Fatalf("[%d]: got %v, want %v (%v)", - i+1, result, c.result, pretty.Diff(result, c.result)) - } - } -} - -func TestRemoveSelfLinks(t *testing.T) { - var cases = []struct { - brefs []*BiblioRef - result []*BiblioRef - }{ - { - brefs: nil, - result: nil, - }, - { - brefs: []*BiblioRef{}, - result: []*BiblioRef{}, - }, - { - brefs: []*BiblioRef{ - &BiblioRef{SourceReleaseIdent: "a", TargetReleaseIdent: "a"}, - &BiblioRef{SourceReleaseIdent: "a", TargetReleaseIdent: "b"}, - }, - result: []*BiblioRef{ - &BiblioRef{SourceReleaseIdent: "a", TargetReleaseIdent: "b"}, - }, - }, - { - brefs: []*BiblioRef{ - &BiblioRef{SourceReleaseIdent: "a", TargetReleaseIdent: "a"}, - &BiblioRef{SourceReleaseIdent: "a", TargetReleaseIdent: "a"}, - &BiblioRef{SourceReleaseIdent: "a", TargetReleaseIdent: "a"}, - &BiblioRef{SourceReleaseIdent: "a", TargetReleaseIdent: "b"}, - }, - result: []*BiblioRef{ - &BiblioRef{SourceReleaseIdent: "a", TargetReleaseIdent: "b"}, - }, - }, - } - for i, c := range cases { - result := removeSelfLinks(c.brefs) - if !reflect.DeepEqual(result, c.result) { - t.Fatalf("[%d]: got %v, want %v (%v)", - i, result, c.result, pretty.Diff(result, c.result)) - } - } -} - -func TestDeduplicateBrefs(t *testing.T) { - var cases = []struct { - brefs []*BiblioRef - result []*BiblioRef - }{ - { - brefs: nil, - result: nil, - }, - { - brefs: []*BiblioRef{}, - result: []*BiblioRef{}, - }, - { - brefs: []*BiblioRef{ - &BiblioRef{Key: "123", MatchStatus: StatusStrong.Short()}, - &BiblioRef{Key: "123", MatchStatus: StatusExact.Short()}, - }, - result: []*BiblioRef{ - &BiblioRef{Key: "123", MatchStatus: StatusExact.Short()}, - }, - }, - { - brefs: []*BiblioRef{ - &BiblioRef{Key: "123", MatchStatus: StatusStrong.Short()}, - &BiblioRef{Key: "123", MatchStatus: StatusUnmatched.Short()}, - }, - result: []*BiblioRef{ - &BiblioRef{Key: "123", MatchStatus: StatusStrong.Short()}, - }, - }, - { - brefs: []*BiblioRef{ - &BiblioRef{Key: "123", MatchStatus: StatusStrong.Short()}, - &BiblioRef{Key: "123", MatchStatus: StatusWeak.Short()}, - }, - result: []*BiblioRef{ - &BiblioRef{Key: "123", MatchStatus: StatusStrong.Short()}, - }, - }, - { - brefs: []*BiblioRef{ - &BiblioRef{Key: "123", MatchStatus: StatusStrong.Short()}, - &BiblioRef{Key: "123", MatchStatus: StatusAmbiguous.Short()}, - }, - result: []*BiblioRef{ - &BiblioRef{Key: "123", MatchStatus: StatusStrong.Short()}, - }, - }, - } - for i, c := range cases { - result := deduplicateBrefs(c.brefs) - if !reflect.DeepEqual(result, c.result) { - t.Fatalf("[%d]: got %v, want %v (%v)", - i, result, c.result, pretty.Diff(result, c.result)) - } - } -} - -func TestZippyExact(t *testing.T) { - var cases = []struct { - a, b, out string - err error - }{ - { - a: "testdata/zippy/cE00a.json", - b: "testdata/zippy/cE00b.json", - out: "testdata/zippy/cE00r.json", - err: nil, - }, - { - a: "testdata/zippy/cE01a.json", - b: "testdata/zippy/cE01b.json", - out: "testdata/zippy/cE01r.json", - err: nil, - }, - { - a: "testdata/zippy/cE02a.json", - b: "testdata/zippy/cE02b.json", - out: "testdata/zippy/cE02r.json", - err: nil, - }, - } - for i, c := range cases { - a, b, err := xio.OpenTwo(c.a, c.b) - if err != nil { - t.Errorf("failed to open test files: %v, %v", c.a, c.b) - } - var ( - buf bytes.Buffer - matchResult = MatchResult{Status: StatusExact, Reason: ReasonDOI} - ) - err = ZippyExact(a, b, matchResult, &buf) - if err != c.err { - t.Errorf("[%d] got %v, want %v", i, err, c.err) - } - ok, err := equalsFilename(&buf, c.out) - if err != nil { - t.Errorf("failed to open test file: %v", c.out) - } - if !ok { - filename, err := tempWriteFile(&buf) - if err != nil { - t.Logf("could not write temp file") - } - t.Errorf("[%d] output mismatch (buffer length=%d, content=%v), want %v", i, buf.Len(), filename, c.out) - } - } -} - -// equalsFilename returns true, if the contents of a given buffer matches the -// contents of a file given by filename. -func equalsFilename(buf *bytes.Buffer, filename string) (bool, error) { - b, err := ioutil.ReadFile(filename) - if err != nil { - return false, err - } - bb := buf.Bytes() - if len(bb) == 0 && len(b) == 0 { - return true, nil - } - return reflect.DeepEqual(b, bb), nil -} - -// tempWriteFile writes the content of a buffer to a temporary file and returns -// its path. -func tempWriteFile(buf *bytes.Buffer) (string, error) { - f, err := ioutil.TempFile("", "skate-test-*") - if err != nil { - return "", err - } - if err = atomic.WriteFile(f.Name(), buf.Bytes(), 0755); err != nil { - return "", err - } - return f.Name(), nil -} -- cgit v1.2.3