diff options
Diffstat (limited to 'skate/reduce.go')
-rw-r--r-- | skate/reduce.go | 599 |
1 files changed, 599 insertions, 0 deletions
diff --git a/skate/reduce.go b/skate/reduce.go new file mode 100644 index 0000000..ff836e8 --- /dev/null +++ b/skate/reduce.go @@ -0,0 +1,599 @@ +// This file contains various "reducers", e.g. merging data from two streams and +// applying a function on groups of documents with a shared key. +// +// Note: This is a bit repetitive, but we do not want to introduce any other +// abstraction for now. Since most of the logic is in the "grouper" functions, +// we could make them top level values and then assemble the zipkey runner on +// the fly. +// +// The most confusing aspect currently is the variety of schemas hidden within +// the readers (and string groups): release, ref, ref-as-release, open library, +// wikipedia, ... +// +// TODO: [ ] pass release stage through all match types +// TODO: [ ] switch to faster logging, e.g. zerolog, https://github.com/rs/zerolog#benchmarks +package skate + +import ( + "fmt" + "io" + "log" + "sort" + "strings" + "sync" + "time" + + "git.archive.org/martin/cgraph/skate/set" + "git.archive.org/martin/cgraph/skate/zipkey" + json "github.com/segmentio/encoding/json" +) + +var brefPool = sync.Pool{ + New: func() interface{} { + var bref BiblioRef + return bref + }, +} + +// groupLogf logs a message alongsize a serialized group for debugging. +func groupLogf(g *zipkey.Group, s string, vs ...interface{}) { + log.Printf(s, vs...) + b, _ := json.MarshalIndent(g, "", " ") + log.Println(string(b)) +} + +// ZippyExact takes a release and refs reader (key, doc) and assigns a fixed +// match result, e.g. for doi matches. +func ZippyExact(releases, refs io.Reader, matchResult MatchResult, w io.Writer) error { + var ( + enc = json.NewEncoder(w) + keyer = makeKeyFunc("\t", 1) + i = 0 + bref BiblioRef + grouper = func(g *zipkey.Group) error { + i++ + if i%10000 == 0 { + log.Printf("processed %v groups", i) + } + var ( + target *Release + ref *Ref + err error + ) + if len(g.G0) == 0 || len(g.G1) == 0 { + return nil + } + if target, err = parseRelease(Cut(g.G0[0], 2)); err != nil { + groupLogf(g, "[skip] failed to parse release: %v", err) + return nil + } + for _, line := range g.G1 { + if ref, err = parseRef(Cut(line, 2)); err != nil { + groupLogf(g, "[skip] failed to parse ref: %v", err) + continue + } + bref = brefPool.Get().(BiblioRef) + bref.Reset() + bref.SourceReleaseIdent = ref.ReleaseIdent + bref.SourceWorkIdent = ref.WorkIdent + bref.SourceReleaseStage = ref.ReleaseStage + bref.SourceYear = fmt.Sprintf("%d", ref.ReleaseYear) + bref.RefIndex = ref.Index + 1 // we want 1-index (also helps with omitempty) + bref.RefKey = ref.Key + bref.TargetReleaseIdent = target.Ident + bref.TargetWorkIdent = target.WorkID + bref.MatchProvenance = ref.RefSource + bref.MatchStatus = matchResult.Status.Short() + bref.MatchReason = matchResult.Reason.Short() + if err := enc.Encode(bref); err != nil { + return err + } + brefPool.Put(bref) + } + return nil + } + ) + zipper := zipkey.New(releases, refs, keyer, grouper) + return zipper.Run() +} + +// ZippyExactReleases takes two release readers (key, doc) and assigns a fixed +// match result, e.g. used with release entities converted from open library snapshots. +func ZippyExactReleases(olr, releases io.Reader, matchResult MatchResult, w io.Writer) error { + var ( + enc = json.NewEncoder(w) + keyer = makeKeyFunc("\t", 1) + i = 0 + bref BiblioRef + grouper = func(g *zipkey.Group) error { + i++ + if i%10000 == 0 { + log.Printf("processed %v groups", i) + } + var ( + target, re *Release + err error + ) + if len(g.G0) == 0 || len(g.G1) == 0 { + return nil + } + if target, err = parseRelease(Cut(g.G0[0], 2)); err != nil { + groupLogf(g, "[skip] failed to parse release: %v", err) + return nil + } + for _, line := range g.G1 { + if re, err = parseRelease(Cut(line, 2)); err != nil { + groupLogf(g, "[skip] failed to parse release: %v", err) + continue + } + if target.WorkID == "" { + continue + } + bref = brefPool.Get().(BiblioRef) + bref.Reset() + bref.SourceReleaseIdent = re.Ident + bref.SourceWorkIdent = re.WorkID + bref.SourceReleaseStage = re.ReleaseStage + bref.SourceYear = fmt.Sprintf("%d", re.ReleaseYear()) + bref.RefIndex = re.Extra.Skate.Ref.Index + 1 // we want 1-index (also helps with omitempty) + bref.RefKey = re.Extra.Skate.Ref.Key + bref.TargetOpenLibraryWork = target.WorkID + bref.MatchProvenance = re.Extra.Skate.Ref.Source + bref.MatchStatus = matchResult.Status.Short() + bref.MatchReason = matchResult.Reason.Short() + if err := enc.Encode(bref); err != nil { + return err + } + brefPool.Put(bref) + } + return nil + } + ) + zipper := zipkey.New(olr, releases, keyer, grouper) + return zipper.Run() +} + +// ZippyExactWiki takes a release and wiki reader (key, doc) and assigns a +// fixed match result. +func ZippyExactWiki(releases, wiki io.Reader, mr MatchResult, w io.Writer) error { + var ( + enc = json.NewEncoder(w) + keyer = makeKeyFunc("\t", 1) + bref BiblioRef + grouper = func(g *zipkey.Group) error { + var ( + target *Release + wiki *MinimalCitations + err error + ) + if len(g.G0) == 0 || len(g.G1) == 0 { + return nil + } + if target, err = parseRelease(Cut(g.G0[0], 2)); err != nil { + return err + } + for _, line := range g.G1 { + if wiki, err = parseWiki(Cut(line, 2)); err != nil { + return err + } + bref = brefPool.Get().(BiblioRef) + bref.Reset() + bref.Key = fmt.Sprintf("%s_%s", slugifyString(wiki.PageTitle), target.Ident) // XXX: what should we use? + bref.SourceWikipediaArticle = wiki.PageTitle + bref.TargetReleaseIdent = target.Ident + bref.TargetWorkIdent = target.WorkID + bref.MatchProvenance = "wikipedia" + bref.MatchStatus = mr.Status.Short() + bref.MatchReason = mr.Reason.Short() + if err := enc.Encode(bref); err != nil { + return err + } + brefPool.Put(bref) + } + return nil + } + ) + zipper := zipkey.New(releases, wiki, keyer, grouper) + return zipper.Run() +} + +// ZippyVerifyRefs takes a release and refs (as release) reader (key, doc), run +// fuzzy verification and will emit a biblioref document, if exact or strong +// match. +func ZippyVerifyRefs(releases, refs io.Reader, w io.Writer) error { + var ( + enc = json.NewEncoder(w) + keyer = makeKeyFunc("\t", 1) + grouper = func(g *zipkey.Group) error { + var ( + re, pivot *Release + err error + ) + if len(g.G0) == 0 || len(g.G1) == 0 { + return nil + } + if pivot, err = parseRelease(Cut(g.G0[0], 2)); err != nil { + return err + } + for _, line := range g.G1 { + if re, err = parseRelease(Cut(line, 2)); err != nil { + return err + } + result := Verify(pivot, re) + switch result.Status { + case StatusExact, StatusStrong: + if result.Reason == ReasonDOI { + continue + } + br := generateBiblioRef(re, pivot, result, "fuzzy") + if err := enc.Encode(br); err != nil { + return err + } + default: + } + } + return nil + } + ) + zipper := zipkey.New(releases, refs, keyer, grouper) + return zipper.Run() +} + +// ZippyVerifyRefsOpenLibraryTable takes OL editions (as release) and refs (as +// release) and emits a match table for manual inspection. +func ZippyVerifyRefsOpenLibraryTable(olr, refs io.Reader, w io.Writer) error { + var ( + keyer = makeKeyFunc("\t", 1) + grouper = func(g *zipkey.Group) error { + var ( + re, pivot *Release + err error + ) + if len(g.G0) == 0 || len(g.G1) == 0 { + return nil + } + // We take a single edition from OL. + if pivot, err = parseRelease(Cut(g.G0[0], 2)); err != nil { + return err + } + for _, line := range g.G1 { + if re, err = parseRelease(Cut(line, 2)); err != nil { + return err + } + // The refs have a container name, but not a title, but here we + // compare against titles from open library. + re.Title = re.ContainerName + result := Verify(pivot, re) + fmt.Printf("%s\t%s\t%s\t%s\t%s\t%s\t%s\t%s\n", + result.Status.Short(), + result.Reason.Short(), + pivot.Extra.OpenLibrary.WorkID, + FindByPrefix(pivot.Extra.OpenLibrary.SourceRecords, "ia:"), + re.Ident, + CutSep(g.G0[0], "\t", 1), + pivot.Title, + re.Title) + } + return nil + } + ) + zipper := zipkey.New(olr, refs, keyer, grouper) + return zipper.Run() +} + +// ZippyVerifyRefsOpenLibrary takes OL editions (as release) and refs (as +// release) and writes biblioref. +func ZippyVerifyRefsOpenLibrary(olr, refs io.Reader, w io.Writer) error { + var ( + enc = json.NewEncoder(w) + keyer = makeKeyFunc("\t", 1) + bref BiblioRef + grouper = func(g *zipkey.Group) error { + var ( + ref, pivot *Release // ref (reference), pivot (open library) + err error + ) + if len(g.G0) == 0 || len(g.G1) == 0 { + return nil + } + // We take a single edition from OL. + if pivot, err = parseRelease(Cut(g.G0[0], 2)); err != nil { + return err + } + for _, line := range g.G1 { + if ref, err = parseRelease(Cut(line, 2)); err != nil { + return err + } + // The refs have a container name, but not a title, but here we + // compare against titles from open library. + ref.Title = ref.ContainerName + result := Verify(pivot, ref) + switch result.Status { + case StatusExact, StatusStrong: + bref = brefPool.Get().(BiblioRef) + bref.Reset() + bref.SourceReleaseIdent = ref.Ident + bref.SourceWorkIdent = ref.WorkID + bref.SourceReleaseStage = ref.ReleaseStage + bref.SourceYear = fmt.Sprintf("%d", ref.ReleaseYear()) + bref.RefIndex = ref.Extra.Skate.Ref.Index + 1 // we want 1-index (also helps with omitempty) + bref.RefKey = ref.Extra.Skate.Ref.Key + bref.TargetOpenLibraryWork = pivot.WorkID + bref.MatchProvenance = ref.Extra.Skate.Ref.Source + bref.MatchStatus = result.Status.Short() + bref.MatchReason = result.Reason.Short() + if err := enc.Encode(bref); err != nil { + return err + } + brefPool.Put(bref) + default: + } + } + return nil + } + ) + zipper := zipkey.New(olr, refs, keyer, grouper) + return zipper.Run() +} + +// ZippyBrefAugment takes all matched docs from bref and adds docs from raw +// refs, which have not been matched. It also gets rid of duplicate matches. +// Note: This operates on two streams: raw refs with about 2.5B (07/2021) and +// matches, which will be about 1B; in essence we have to iterate through about +// 3.5B records; small tweak here may be worthwhile. +// +// We can identify, which docs have been matched by checking the source ident, +// ref index and key. +// +// TODO: This needs to be completed and made fast. +func ZippyBrefAugment(bref, raw io.Reader, w io.Writer) error { + var ( + stats = statsAugment{} + enc = json.NewEncoder(w) + keyer = makeKeyFunc("\t", 1) + grouper = func(g *zipkey.Group) error { + // g.G0 contains matched docs for a given work id, g.G1 all raw + // refs, with the same work id. + + // First, iterate over all matches and sort out duplicates, e.g. + // docs that have the same source and target id. + log.Printf("group K=%s, G0=%d, G1=%d", g.Key, len(g.G0), len(g.G1)) + matched, err := uniqueMatches(CutBatch(g.G0, 2), &stats) + if err != nil { + return err + } + var refs = make([]*Ref, len(g.G1)) + for i := 0; i < len(refs); i++ { + var ( + data []byte = []byte(Cut(g.G1[i], 2)) + ref Ref + ) + if err := json.Unmarshal(data, &ref); err != nil { + return err + } + refs[i] = &ref + } + // TODO: this slows down this process; be a bit smarter about slices. + matched = matchedRefsExtend(matched, refs, &stats) + // At this point, we may have duplicates by "_id", e.g. source + // release ident and ref index (example: + // 4kg2dejsgzaf3cszs2lt5hz4by_9, which appears three times, one + // exact match, and twice unmatched). + matched = deduplicateBrefs(matched) + matched = removeSelfLinks(matched) + for _, bref := range matched { + stats.total++ + if err := enc.Encode(bref); err != nil { + return err + } + } + return nil + } + ) + zipper := zipkey.New(bref, raw, keyer, grouper) + err := zipper.Run() + log.Println(stats) + return err +} + +// removeSelfLinks removes self-referential links. TODO: Those should be caught +// at the root cause. +func removeSelfLinks(brefs []*BiblioRef) (result []*BiblioRef) { + var i int + for _, bref := range brefs { + if bref.SourceReleaseIdent == bref.TargetReleaseIdent { + continue + } + brefs[i] = bref + i++ + } + brefs = brefs[:i] + return brefs +} + +// deduplicateBrefs deduplicates by the document id (for elasticsearch), which +// may help filter out some duplicates but not all. +func deduplicateBrefs(brefs []*BiblioRef) []*BiblioRef { + // Sort by match status, exact first, unmatched last. + sort.Slice(brefs, func(i, j int) bool { + switch { + case brefs[i].MatchStatus == StatusExact.Short(): + return true + case brefs[i].MatchStatus == StatusStrong.Short(): + return true + case brefs[i].MatchStatus == StatusWeak.Short(): + return false + case brefs[i].MatchStatus == StatusAmbiguous.Short(): + return false + case brefs[i].MatchStatus != StatusUnmatched.Short(): + return true + default: + return false + } + }) + var ( + seen = set.New() + i int + ) + for _, v := range brefs { + if seen.Contains(v.Key) { + continue + } + brefs[i] = v + i++ + seen.Add(v.Key) + } + brefs = brefs[:i] + log.Printf("trimmed brefs from %d to %d", len(brefs), i) + return brefs +} + +// matchedRefsExtend takes a set of (unique) biblioref docs and will emit that +// set of biblioref docs (unchanged) plus raw references as biblioref, which +// did not result in a match (determined by e.g. ref key and index). XXX: We +// may have duplicate refs as well - how to distinguish them? +func matchedRefsExtend(matched []*BiblioRef, refs []*Ref, stats *statsAugment) []*BiblioRef { + seen := set.New() // store "key + index" of matched items + for _, m := range matched { + s := m.RefKey + fmt.Sprintf("%d", m.RefIndex) + seen.Add(s) + } + for _, r := range refs { + s := r.Key + fmt.Sprintf("%d", r.Index) + if seen.Contains(s) { + stats.skipMatchedRef++ + log.Printf("skip-matched-ref [%d]: from %d matches; ident=%v, title=%s, key=%v, index=%d", + stats.skipMatchedRef, len(matched), r.ReleaseIdent, r.Biblio.Title, r.Key, r.Index) + continue + } + var bref BiblioRef + bref.IndexedTs = time.Now().UTC().Format(time.RFC3339) + bref.Key = fmt.Sprintf("%s_%d", r.ReleaseIdent, r.Index) + bref.RefIndex = r.Index + bref.RefKey = r.Key + bref.SourceReleaseIdent = r.ReleaseIdent + bref.SourceReleaseStage = r.ReleaseStage + bref.SourceWorkIdent = r.WorkIdent + bref.SourceYear = fmt.Sprintf("%d", r.ReleaseYear) + bref.TargetUnstructured = r.Biblio.Unstructured + // Reuse fields for debugging, for now. + bref.MatchStatus = StatusUnmatched.Short() + bref.MatchReason = ReasonUnknown.Short() + matched = append(matched, &bref) + } + return matched +} + +// uniqueMatches takes a list of bref docs (unserialized) and will return a +// list of deserialized bref docs, containing unique matches only (e.g. filter +// out duplicate matches, e.g. from exact and fuzzy). We are including +// "skate-bref-id" post-processing here as well (but there is surely a better +// place for that). +func uniqueMatches(docs []string, stats *statsAugment) (result []*BiblioRef, err error) { + var brefs []*BiblioRef + for _, doc := range docs { + var bref BiblioRef + if err := json.Unmarshal([]byte(doc), &bref); err != nil { + return nil, err + } + // On-the-fly add elasticsearch "_id" and indexed timestamp, if not already set. + if bref.Key == "" && bref.SourceReleaseIdent != "" { + bref.Key = fmt.Sprintf("%s_%d", bref.SourceReleaseIdent, bref.RefIndex) + bref.IndexedTs = time.Now().UTC().Format(time.RFC3339) + } + brefs = append(brefs, &bref) + } + // Make sure exact matches come first. XXX: bug? + sort.Slice(brefs, func(i, j int) bool { + return brefs[i].MatchStatus == StatusExact.Short() + }) + seen := set.New() + for _, doc := range brefs { + h := doc.LinkHash() + if seen.Contains(h) { + stats.skipDuplicatedBref++ + log.Printf("skip-dup-bref [%d]: hash=%v source=%v status=%v reason=%v", + stats.skipDuplicatedBref, h, doc.SourceReleaseIdent, doc.MatchStatus, doc.MatchReason) + continue + } + seen.Add(h) + result = append(result, doc) + } + return result, nil +} + +type statsAugment struct { + skipDuplicatedBref int64 + skipMatchedRef int64 + total int64 +} + +func (s statsAugment) String() string { + return fmt.Sprintf("total=%d, skipMatchedRef=%d, skipDuplicatedBref=%d", + s.total, s.skipMatchedRef, s.skipDuplicatedBref) +} + +// CutBatch runs Cut over a list of lines. +func CutBatch(lines []string, column int) (result []string) { + for _, line := range lines { + result = append(result, Cut(line, column)) + } + return result +} + +// Cut returns a specific column (1-indexed) from a line, returns empty string +// if column is invalid. +func Cut(line string, column int) string { + return CutSep(line, "\t", column) +} + +// CutSep allows to specify a separator, column is 1-indexed. +func CutSep(line, sep string, column int) string { + parts := strings.Split(strings.TrimSpace(line), sep) + if len(parts) < column { + return "" + } else { + return parts[column-1] + } +} + +// FindByPrefix return the first element for a slice of strings, which matches a prefix. +func FindByPrefix(ss []string, prefix string) string { + for _, s := range ss { + if strings.HasPrefix(s, prefix) { + return s + } + } + return "" +} + +// makeKeyFunc creates a function that can be used as keyFunc, selecting a +// column from fields separated by sep; column is 1-indexed. +func makeKeyFunc(sep string, column int) func(string) (string, error) { + return func(s string) (string, error) { + if k := CutSep(s, sep, column); k != "" { + return k, nil + } + return "", fmt.Errorf("cannot get key from column %d in line (len=%d): %s", column, len(s), s) + } +} + +func parseRelease(s string) (r *Release, err error) { + err = json.Unmarshal([]byte(s), &r) + return +} + +func parseRef(s string) (r *Ref, err error) { + err = json.Unmarshal([]byte(s), &r) + return +} + +func parseWiki(s string) (r *MinimalCitations, err error) { + err = json.Unmarshal([]byte(s), &r) + return +} + +func parseBiblioref(s string) (r *BiblioRef, err error) { + err = json.Unmarshal([]byte(s), &r) + return +} |