aboutsummaryrefslogtreecommitdiffstats
path: root/skate/reduce.go
diff options
context:
space:
mode:
Diffstat (limited to 'skate/reduce.go')
-rw-r--r--skate/reduce.go599
1 files changed, 599 insertions, 0 deletions
diff --git a/skate/reduce.go b/skate/reduce.go
new file mode 100644
index 0000000..ff836e8
--- /dev/null
+++ b/skate/reduce.go
@@ -0,0 +1,599 @@
+// This file contains various "reducers", e.g. merging data from two streams and
+// applying a function on groups of documents with a shared key.
+//
+// Note: This is a bit repetitive, but we do not want to introduce any other
+// abstraction for now. Since most of the logic is in the "grouper" functions,
+// we could make them top level values and then assemble the zipkey runner on
+// the fly.
+//
+// The most confusing aspect currently is the variety of schemas hidden within
+// the readers (and string groups): release, ref, ref-as-release, open library,
+// wikipedia, ...
+//
+// TODO: [ ] pass release stage through all match types
+// TODO: [ ] switch to faster logging, e.g. zerolog, https://github.com/rs/zerolog#benchmarks
+package skate
+
+import (
+ "fmt"
+ "io"
+ "log"
+ "sort"
+ "strings"
+ "sync"
+ "time"
+
+ "git.archive.org/martin/cgraph/skate/set"
+ "git.archive.org/martin/cgraph/skate/zipkey"
+ json "github.com/segmentio/encoding/json"
+)
+
+var brefPool = sync.Pool{
+ New: func() interface{} {
+ var bref BiblioRef
+ return bref
+ },
+}
+
+// groupLogf logs a message alongsize a serialized group for debugging.
+func groupLogf(g *zipkey.Group, s string, vs ...interface{}) {
+ log.Printf(s, vs...)
+ b, _ := json.MarshalIndent(g, "", " ")
+ log.Println(string(b))
+}
+
+// ZippyExact takes a release and refs reader (key, doc) and assigns a fixed
+// match result, e.g. for doi matches.
+func ZippyExact(releases, refs io.Reader, matchResult MatchResult, w io.Writer) error {
+ var (
+ enc = json.NewEncoder(w)
+ keyer = makeKeyFunc("\t", 1)
+ i = 0
+ bref BiblioRef
+ grouper = func(g *zipkey.Group) error {
+ i++
+ if i%10000 == 0 {
+ log.Printf("processed %v groups", i)
+ }
+ var (
+ target *Release
+ ref *Ref
+ err error
+ )
+ if len(g.G0) == 0 || len(g.G1) == 0 {
+ return nil
+ }
+ if target, err = parseRelease(Cut(g.G0[0], 2)); err != nil {
+ groupLogf(g, "[skip] failed to parse release: %v", err)
+ return nil
+ }
+ for _, line := range g.G1 {
+ if ref, err = parseRef(Cut(line, 2)); err != nil {
+ groupLogf(g, "[skip] failed to parse ref: %v", err)
+ continue
+ }
+ bref = brefPool.Get().(BiblioRef)
+ bref.Reset()
+ bref.SourceReleaseIdent = ref.ReleaseIdent
+ bref.SourceWorkIdent = ref.WorkIdent
+ bref.SourceReleaseStage = ref.ReleaseStage
+ bref.SourceYear = fmt.Sprintf("%d", ref.ReleaseYear)
+ bref.RefIndex = ref.Index + 1 // we want 1-index (also helps with omitempty)
+ bref.RefKey = ref.Key
+ bref.TargetReleaseIdent = target.Ident
+ bref.TargetWorkIdent = target.WorkID
+ bref.MatchProvenance = ref.RefSource
+ bref.MatchStatus = matchResult.Status.Short()
+ bref.MatchReason = matchResult.Reason.Short()
+ if err := enc.Encode(bref); err != nil {
+ return err
+ }
+ brefPool.Put(bref)
+ }
+ return nil
+ }
+ )
+ zipper := zipkey.New(releases, refs, keyer, grouper)
+ return zipper.Run()
+}
+
+// ZippyExactReleases takes two release readers (key, doc) and assigns a fixed
+// match result, e.g. used with release entities converted from open library snapshots.
+func ZippyExactReleases(olr, releases io.Reader, matchResult MatchResult, w io.Writer) error {
+ var (
+ enc = json.NewEncoder(w)
+ keyer = makeKeyFunc("\t", 1)
+ i = 0
+ bref BiblioRef
+ grouper = func(g *zipkey.Group) error {
+ i++
+ if i%10000 == 0 {
+ log.Printf("processed %v groups", i)
+ }
+ var (
+ target, re *Release
+ err error
+ )
+ if len(g.G0) == 0 || len(g.G1) == 0 {
+ return nil
+ }
+ if target, err = parseRelease(Cut(g.G0[0], 2)); err != nil {
+ groupLogf(g, "[skip] failed to parse release: %v", err)
+ return nil
+ }
+ for _, line := range g.G1 {
+ if re, err = parseRelease(Cut(line, 2)); err != nil {
+ groupLogf(g, "[skip] failed to parse release: %v", err)
+ continue
+ }
+ if target.WorkID == "" {
+ continue
+ }
+ bref = brefPool.Get().(BiblioRef)
+ bref.Reset()
+ bref.SourceReleaseIdent = re.Ident
+ bref.SourceWorkIdent = re.WorkID
+ bref.SourceReleaseStage = re.ReleaseStage
+ bref.SourceYear = fmt.Sprintf("%d", re.ReleaseYear())
+ bref.RefIndex = re.Extra.Skate.Ref.Index + 1 // we want 1-index (also helps with omitempty)
+ bref.RefKey = re.Extra.Skate.Ref.Key
+ bref.TargetOpenLibraryWork = target.WorkID
+ bref.MatchProvenance = re.Extra.Skate.Ref.Source
+ bref.MatchStatus = matchResult.Status.Short()
+ bref.MatchReason = matchResult.Reason.Short()
+ if err := enc.Encode(bref); err != nil {
+ return err
+ }
+ brefPool.Put(bref)
+ }
+ return nil
+ }
+ )
+ zipper := zipkey.New(olr, releases, keyer, grouper)
+ return zipper.Run()
+}
+
+// ZippyExactWiki takes a release and wiki reader (key, doc) and assigns a
+// fixed match result.
+func ZippyExactWiki(releases, wiki io.Reader, mr MatchResult, w io.Writer) error {
+ var (
+ enc = json.NewEncoder(w)
+ keyer = makeKeyFunc("\t", 1)
+ bref BiblioRef
+ grouper = func(g *zipkey.Group) error {
+ var (
+ target *Release
+ wiki *MinimalCitations
+ err error
+ )
+ if len(g.G0) == 0 || len(g.G1) == 0 {
+ return nil
+ }
+ if target, err = parseRelease(Cut(g.G0[0], 2)); err != nil {
+ return err
+ }
+ for _, line := range g.G1 {
+ if wiki, err = parseWiki(Cut(line, 2)); err != nil {
+ return err
+ }
+ bref = brefPool.Get().(BiblioRef)
+ bref.Reset()
+ bref.Key = fmt.Sprintf("%s_%s", slugifyString(wiki.PageTitle), target.Ident) // XXX: what should we use?
+ bref.SourceWikipediaArticle = wiki.PageTitle
+ bref.TargetReleaseIdent = target.Ident
+ bref.TargetWorkIdent = target.WorkID
+ bref.MatchProvenance = "wikipedia"
+ bref.MatchStatus = mr.Status.Short()
+ bref.MatchReason = mr.Reason.Short()
+ if err := enc.Encode(bref); err != nil {
+ return err
+ }
+ brefPool.Put(bref)
+ }
+ return nil
+ }
+ )
+ zipper := zipkey.New(releases, wiki, keyer, grouper)
+ return zipper.Run()
+}
+
+// ZippyVerifyRefs takes a release and refs (as release) reader (key, doc), run
+// fuzzy verification and will emit a biblioref document, if exact or strong
+// match.
+func ZippyVerifyRefs(releases, refs io.Reader, w io.Writer) error {
+ var (
+ enc = json.NewEncoder(w)
+ keyer = makeKeyFunc("\t", 1)
+ grouper = func(g *zipkey.Group) error {
+ var (
+ re, pivot *Release
+ err error
+ )
+ if len(g.G0) == 0 || len(g.G1) == 0 {
+ return nil
+ }
+ if pivot, err = parseRelease(Cut(g.G0[0], 2)); err != nil {
+ return err
+ }
+ for _, line := range g.G1 {
+ if re, err = parseRelease(Cut(line, 2)); err != nil {
+ return err
+ }
+ result := Verify(pivot, re)
+ switch result.Status {
+ case StatusExact, StatusStrong:
+ if result.Reason == ReasonDOI {
+ continue
+ }
+ br := generateBiblioRef(re, pivot, result, "fuzzy")
+ if err := enc.Encode(br); err != nil {
+ return err
+ }
+ default:
+ }
+ }
+ return nil
+ }
+ )
+ zipper := zipkey.New(releases, refs, keyer, grouper)
+ return zipper.Run()
+}
+
+// ZippyVerifyRefsOpenLibraryTable takes OL editions (as release) and refs (as
+// release) and emits a match table for manual inspection.
+func ZippyVerifyRefsOpenLibraryTable(olr, refs io.Reader, w io.Writer) error {
+ var (
+ keyer = makeKeyFunc("\t", 1)
+ grouper = func(g *zipkey.Group) error {
+ var (
+ re, pivot *Release
+ err error
+ )
+ if len(g.G0) == 0 || len(g.G1) == 0 {
+ return nil
+ }
+ // We take a single edition from OL.
+ if pivot, err = parseRelease(Cut(g.G0[0], 2)); err != nil {
+ return err
+ }
+ for _, line := range g.G1 {
+ if re, err = parseRelease(Cut(line, 2)); err != nil {
+ return err
+ }
+ // The refs have a container name, but not a title, but here we
+ // compare against titles from open library.
+ re.Title = re.ContainerName
+ result := Verify(pivot, re)
+ fmt.Printf("%s\t%s\t%s\t%s\t%s\t%s\t%s\t%s\n",
+ result.Status.Short(),
+ result.Reason.Short(),
+ pivot.Extra.OpenLibrary.WorkID,
+ FindByPrefix(pivot.Extra.OpenLibrary.SourceRecords, "ia:"),
+ re.Ident,
+ CutSep(g.G0[0], "\t", 1),
+ pivot.Title,
+ re.Title)
+ }
+ return nil
+ }
+ )
+ zipper := zipkey.New(olr, refs, keyer, grouper)
+ return zipper.Run()
+}
+
+// ZippyVerifyRefsOpenLibrary takes OL editions (as release) and refs (as
+// release) and writes biblioref.
+func ZippyVerifyRefsOpenLibrary(olr, refs io.Reader, w io.Writer) error {
+ var (
+ enc = json.NewEncoder(w)
+ keyer = makeKeyFunc("\t", 1)
+ bref BiblioRef
+ grouper = func(g *zipkey.Group) error {
+ var (
+ ref, pivot *Release // ref (reference), pivot (open library)
+ err error
+ )
+ if len(g.G0) == 0 || len(g.G1) == 0 {
+ return nil
+ }
+ // We take a single edition from OL.
+ if pivot, err = parseRelease(Cut(g.G0[0], 2)); err != nil {
+ return err
+ }
+ for _, line := range g.G1 {
+ if ref, err = parseRelease(Cut(line, 2)); err != nil {
+ return err
+ }
+ // The refs have a container name, but not a title, but here we
+ // compare against titles from open library.
+ ref.Title = ref.ContainerName
+ result := Verify(pivot, ref)
+ switch result.Status {
+ case StatusExact, StatusStrong:
+ bref = brefPool.Get().(BiblioRef)
+ bref.Reset()
+ bref.SourceReleaseIdent = ref.Ident
+ bref.SourceWorkIdent = ref.WorkID
+ bref.SourceReleaseStage = ref.ReleaseStage
+ bref.SourceYear = fmt.Sprintf("%d", ref.ReleaseYear())
+ bref.RefIndex = ref.Extra.Skate.Ref.Index + 1 // we want 1-index (also helps with omitempty)
+ bref.RefKey = ref.Extra.Skate.Ref.Key
+ bref.TargetOpenLibraryWork = pivot.WorkID
+ bref.MatchProvenance = ref.Extra.Skate.Ref.Source
+ bref.MatchStatus = result.Status.Short()
+ bref.MatchReason = result.Reason.Short()
+ if err := enc.Encode(bref); err != nil {
+ return err
+ }
+ brefPool.Put(bref)
+ default:
+ }
+ }
+ return nil
+ }
+ )
+ zipper := zipkey.New(olr, refs, keyer, grouper)
+ return zipper.Run()
+}
+
+// ZippyBrefAugment takes all matched docs from bref and adds docs from raw
+// refs, which have not been matched. It also gets rid of duplicate matches.
+// Note: This operates on two streams: raw refs with about 2.5B (07/2021) and
+// matches, which will be about 1B; in essence we have to iterate through about
+// 3.5B records; small tweak here may be worthwhile.
+//
+// We can identify, which docs have been matched by checking the source ident,
+// ref index and key.
+//
+// TODO: This needs to be completed and made fast.
+func ZippyBrefAugment(bref, raw io.Reader, w io.Writer) error {
+ var (
+ stats = statsAugment{}
+ enc = json.NewEncoder(w)
+ keyer = makeKeyFunc("\t", 1)
+ grouper = func(g *zipkey.Group) error {
+ // g.G0 contains matched docs for a given work id, g.G1 all raw
+ // refs, with the same work id.
+
+ // First, iterate over all matches and sort out duplicates, e.g.
+ // docs that have the same source and target id.
+ log.Printf("group K=%s, G0=%d, G1=%d", g.Key, len(g.G0), len(g.G1))
+ matched, err := uniqueMatches(CutBatch(g.G0, 2), &stats)
+ if err != nil {
+ return err
+ }
+ var refs = make([]*Ref, len(g.G1))
+ for i := 0; i < len(refs); i++ {
+ var (
+ data []byte = []byte(Cut(g.G1[i], 2))
+ ref Ref
+ )
+ if err := json.Unmarshal(data, &ref); err != nil {
+ return err
+ }
+ refs[i] = &ref
+ }
+ // TODO: this slows down this process; be a bit smarter about slices.
+ matched = matchedRefsExtend(matched, refs, &stats)
+ // At this point, we may have duplicates by "_id", e.g. source
+ // release ident and ref index (example:
+ // 4kg2dejsgzaf3cszs2lt5hz4by_9, which appears three times, one
+ // exact match, and twice unmatched).
+ matched = deduplicateBrefs(matched)
+ matched = removeSelfLinks(matched)
+ for _, bref := range matched {
+ stats.total++
+ if err := enc.Encode(bref); err != nil {
+ return err
+ }
+ }
+ return nil
+ }
+ )
+ zipper := zipkey.New(bref, raw, keyer, grouper)
+ err := zipper.Run()
+ log.Println(stats)
+ return err
+}
+
+// removeSelfLinks removes self-referential links. TODO: Those should be caught
+// at the root cause.
+func removeSelfLinks(brefs []*BiblioRef) (result []*BiblioRef) {
+ var i int
+ for _, bref := range brefs {
+ if bref.SourceReleaseIdent == bref.TargetReleaseIdent {
+ continue
+ }
+ brefs[i] = bref
+ i++
+ }
+ brefs = brefs[:i]
+ return brefs
+}
+
+// deduplicateBrefs deduplicates by the document id (for elasticsearch), which
+// may help filter out some duplicates but not all.
+func deduplicateBrefs(brefs []*BiblioRef) []*BiblioRef {
+ // Sort by match status, exact first, unmatched last.
+ sort.Slice(brefs, func(i, j int) bool {
+ switch {
+ case brefs[i].MatchStatus == StatusExact.Short():
+ return true
+ case brefs[i].MatchStatus == StatusStrong.Short():
+ return true
+ case brefs[i].MatchStatus == StatusWeak.Short():
+ return false
+ case brefs[i].MatchStatus == StatusAmbiguous.Short():
+ return false
+ case brefs[i].MatchStatus != StatusUnmatched.Short():
+ return true
+ default:
+ return false
+ }
+ })
+ var (
+ seen = set.New()
+ i int
+ )
+ for _, v := range brefs {
+ if seen.Contains(v.Key) {
+ continue
+ }
+ brefs[i] = v
+ i++
+ seen.Add(v.Key)
+ }
+ brefs = brefs[:i]
+ log.Printf("trimmed brefs from %d to %d", len(brefs), i)
+ return brefs
+}
+
+// matchedRefsExtend takes a set of (unique) biblioref docs and will emit that
+// set of biblioref docs (unchanged) plus raw references as biblioref, which
+// did not result in a match (determined by e.g. ref key and index). XXX: We
+// may have duplicate refs as well - how to distinguish them?
+func matchedRefsExtend(matched []*BiblioRef, refs []*Ref, stats *statsAugment) []*BiblioRef {
+ seen := set.New() // store "key + index" of matched items
+ for _, m := range matched {
+ s := m.RefKey + fmt.Sprintf("%d", m.RefIndex)
+ seen.Add(s)
+ }
+ for _, r := range refs {
+ s := r.Key + fmt.Sprintf("%d", r.Index)
+ if seen.Contains(s) {
+ stats.skipMatchedRef++
+ log.Printf("skip-matched-ref [%d]: from %d matches; ident=%v, title=%s, key=%v, index=%d",
+ stats.skipMatchedRef, len(matched), r.ReleaseIdent, r.Biblio.Title, r.Key, r.Index)
+ continue
+ }
+ var bref BiblioRef
+ bref.IndexedTs = time.Now().UTC().Format(time.RFC3339)
+ bref.Key = fmt.Sprintf("%s_%d", r.ReleaseIdent, r.Index)
+ bref.RefIndex = r.Index
+ bref.RefKey = r.Key
+ bref.SourceReleaseIdent = r.ReleaseIdent
+ bref.SourceReleaseStage = r.ReleaseStage
+ bref.SourceWorkIdent = r.WorkIdent
+ bref.SourceYear = fmt.Sprintf("%d", r.ReleaseYear)
+ bref.TargetUnstructured = r.Biblio.Unstructured
+ // Reuse fields for debugging, for now.
+ bref.MatchStatus = StatusUnmatched.Short()
+ bref.MatchReason = ReasonUnknown.Short()
+ matched = append(matched, &bref)
+ }
+ return matched
+}
+
+// uniqueMatches takes a list of bref docs (unserialized) and will return a
+// list of deserialized bref docs, containing unique matches only (e.g. filter
+// out duplicate matches, e.g. from exact and fuzzy). We are including
+// "skate-bref-id" post-processing here as well (but there is surely a better
+// place for that).
+func uniqueMatches(docs []string, stats *statsAugment) (result []*BiblioRef, err error) {
+ var brefs []*BiblioRef
+ for _, doc := range docs {
+ var bref BiblioRef
+ if err := json.Unmarshal([]byte(doc), &bref); err != nil {
+ return nil, err
+ }
+ // On-the-fly add elasticsearch "_id" and indexed timestamp, if not already set.
+ if bref.Key == "" && bref.SourceReleaseIdent != "" {
+ bref.Key = fmt.Sprintf("%s_%d", bref.SourceReleaseIdent, bref.RefIndex)
+ bref.IndexedTs = time.Now().UTC().Format(time.RFC3339)
+ }
+ brefs = append(brefs, &bref)
+ }
+ // Make sure exact matches come first. XXX: bug?
+ sort.Slice(brefs, func(i, j int) bool {
+ return brefs[i].MatchStatus == StatusExact.Short()
+ })
+ seen := set.New()
+ for _, doc := range brefs {
+ h := doc.LinkHash()
+ if seen.Contains(h) {
+ stats.skipDuplicatedBref++
+ log.Printf("skip-dup-bref [%d]: hash=%v source=%v status=%v reason=%v",
+ stats.skipDuplicatedBref, h, doc.SourceReleaseIdent, doc.MatchStatus, doc.MatchReason)
+ continue
+ }
+ seen.Add(h)
+ result = append(result, doc)
+ }
+ return result, nil
+}
+
+type statsAugment struct {
+ skipDuplicatedBref int64
+ skipMatchedRef int64
+ total int64
+}
+
+func (s statsAugment) String() string {
+ return fmt.Sprintf("total=%d, skipMatchedRef=%d, skipDuplicatedBref=%d",
+ s.total, s.skipMatchedRef, s.skipDuplicatedBref)
+}
+
+// CutBatch runs Cut over a list of lines.
+func CutBatch(lines []string, column int) (result []string) {
+ for _, line := range lines {
+ result = append(result, Cut(line, column))
+ }
+ return result
+}
+
+// Cut returns a specific column (1-indexed) from a line, returns empty string
+// if column is invalid.
+func Cut(line string, column int) string {
+ return CutSep(line, "\t", column)
+}
+
+// CutSep allows to specify a separator, column is 1-indexed.
+func CutSep(line, sep string, column int) string {
+ parts := strings.Split(strings.TrimSpace(line), sep)
+ if len(parts) < column {
+ return ""
+ } else {
+ return parts[column-1]
+ }
+}
+
+// FindByPrefix return the first element for a slice of strings, which matches a prefix.
+func FindByPrefix(ss []string, prefix string) string {
+ for _, s := range ss {
+ if strings.HasPrefix(s, prefix) {
+ return s
+ }
+ }
+ return ""
+}
+
+// makeKeyFunc creates a function that can be used as keyFunc, selecting a
+// column from fields separated by sep; column is 1-indexed.
+func makeKeyFunc(sep string, column int) func(string) (string, error) {
+ return func(s string) (string, error) {
+ if k := CutSep(s, sep, column); k != "" {
+ return k, nil
+ }
+ return "", fmt.Errorf("cannot get key from column %d in line (len=%d): %s", column, len(s), s)
+ }
+}
+
+func parseRelease(s string) (r *Release, err error) {
+ err = json.Unmarshal([]byte(s), &r)
+ return
+}
+
+func parseRef(s string) (r *Ref, err error) {
+ err = json.Unmarshal([]byte(s), &r)
+ return
+}
+
+func parseWiki(s string) (r *MinimalCitations, err error) {
+ err = json.Unmarshal([]byte(s), &r)
+ return
+}
+
+func parseBiblioref(s string) (r *BiblioRef, err error) {
+ err = json.Unmarshal([]byte(s), &r)
+ return
+}