// This file contains various "reducers", e.g. working on two data streams and // applying a function on groups of documents with a shared key. // // Note: This is a bit repetitive, but not want to introduce any other // abstraction for now. Since most of the logic is in the grouper functions, we // could make them top level and then assemble the zipkey runner on the fly. // // The most confusing aspect currently is the variety of schemas hidden within // the readers (and string groups): release, ref, ref-as-release, open library, // wikipedia, ... package skate import ( "fmt" "io" "log" "sort" "strings" "git.archive.org/martin/cgraph/skate/set" "git.archive.org/martin/cgraph/skate/zipkey" json "github.com/segmentio/encoding/json" ) // groupLogf logs a message alongsize a serialized group for debugging. func groupLogf(g *zipkey.Group, s string, vs ...interface{}) { log.Printf(s, vs...) b, _ := json.MarshalIndent(g, "", " ") log.Println(string(b)) } // ZippyExact takes a release and refs reader (key, doc) and assigns a fixed // match result. func ZippyExact(releases, refs io.Reader, matchResult MatchResult, w io.Writer) error { var ( enc = json.NewEncoder(w) keyer = makeKeyFunc("\t", 1) i = 0 grouper = func(g *zipkey.Group) error { i++ if i%10000 == 0 { log.Printf("processed %v groups", i) } var ( target *Release ref *Ref err error ) if len(g.G0) == 0 || len(g.G1) == 0 { return nil } if target, err = stringToRelease(Cut(g.G0[0], 2)); err != nil { groupLogf(g, "[skip] failed to parse release: %v", err) return nil } for _, line := range g.G1 { if ref, err = stringToRef(Cut(line, 2)); err != nil { groupLogf(g, "[skip] failed to parse ref: %v", err) continue } var bref BiblioRef bref.SourceReleaseIdent = ref.ReleaseIdent bref.SourceWorkIdent = ref.WorkIdent bref.SourceReleaseStage = ref.ReleaseStage bref.SourceYear = fmt.Sprintf("%d", ref.ReleaseYear) bref.RefIndex = ref.Index + 1 // we want 1-index (also helps with omitempty) bref.RefKey = ref.Key bref.TargetReleaseIdent = target.Ident bref.TargetWorkIdent = target.WorkID bref.MatchProvenance = ref.RefSource bref.MatchStatus = matchResult.Status.Short() bref.MatchReason = matchResult.Reason.Short() if err := enc.Encode(bref); err != nil { return err } } return nil } ) zipper := zipkey.New(releases, refs, keyer, grouper) return zipper.Run() } // ZippyExactReleases takes two release readers (key, doc) and assigns a fixed // match result. func ZippyExactReleases(olReader, reReader io.Reader, matchResult MatchResult, w io.Writer) error { var ( enc = json.NewEncoder(w) keyer = makeKeyFunc("\t", 1) i = 0 grouper = func(g *zipkey.Group) error { i++ if i%10000 == 0 { log.Printf("processed %v groups", i) } var ( target, re *Release err error ) if len(g.G0) == 0 || len(g.G1) == 0 { return nil } if target, err = stringToRelease(Cut(g.G0[0], 2)); err != nil { groupLogf(g, "[skip] failed to parse release: %v", err) return nil } for _, line := range g.G1 { if re, err = stringToRelease(Cut(line, 2)); err != nil { groupLogf(g, "[skip] failed to parse release: %v", err) continue } if target.WorkID == "" { continue } var bref BiblioRef bref.SourceReleaseIdent = re.Ident bref.SourceWorkIdent = re.WorkID bref.SourceReleaseStage = re.ReleaseStage bref.SourceYear = fmt.Sprintf("%d", re.ReleaseYear()) bref.RefIndex = re.Extra.Skate.Ref.Index + 1 // we want 1-index (also helps with omitempty) bref.RefKey = re.Extra.Skate.Ref.Key bref.TargetOpenLibraryWork = target.WorkID bref.MatchProvenance = re.Extra.Skate.Ref.Source bref.MatchStatus = matchResult.Status.Short() bref.MatchReason = matchResult.Reason.Short() if err := enc.Encode(bref); err != nil { return err } } return nil } ) zipper := zipkey.New(olReader, reReader, keyer, grouper) return zipper.Run() } // ZippyExactWiki takes a release and wiki reader (key, doc) and assigns a // fixed match result. func ZippyExactWiki(releases, wiki io.Reader, mr MatchResult, w io.Writer) error { var ( enc = json.NewEncoder(w) keyer = makeKeyFunc("\t", 1) grouper = func(g *zipkey.Group) error { var ( target *Release wiki *MinimalCitations err error ) if len(g.G0) == 0 || len(g.G1) == 0 { return nil } if target, err = stringToRelease(Cut(g.G0[0], 2)); err != nil { return err } for _, line := range g.G1 { if wiki, err = stringToWiki(Cut(line, 2)); err != nil { return err } var bref BiblioRef bref.Key = fmt.Sprintf("%s_%s", slugifyString(wiki.PageTitle), target.Ident) // XXX: what should we use? bref.SourceWikipediaArticle = wiki.PageTitle bref.TargetReleaseIdent = target.Ident bref.TargetWorkIdent = target.WorkID bref.MatchProvenance = "wikipedia" bref.MatchStatus = mr.Status.Short() bref.MatchReason = mr.Reason.Short() if err := enc.Encode(bref); err != nil { return err } } return nil } ) zipper := zipkey.New(releases, wiki, keyer, grouper) return zipper.Run() } // ZippyVerifyRefs takes a release and refs reader (key, doc), run fuzzy // verification and will emit a biblioref document, if exact or strong match. func ZippyVerifyRefs(releases, refs io.Reader, w io.Writer) error { var ( enc = json.NewEncoder(w) keyer = makeKeyFunc("\t", 1) grouper = func(g *zipkey.Group) error { var ( re, pivot *Release err error ) if len(g.G0) == 0 || len(g.G1) == 0 { return nil } if pivot, err = stringToRelease(Cut(g.G0[0], 2)); err != nil { return err } for _, line := range g.G1 { if re, err = stringToRelease(Cut(line, 2)); err != nil { return err } result := Verify(pivot, re) switch result.Status { case StatusExact, StatusStrong: if result.Reason == ReasonDOI { continue } br := generateBiblioRef(re, pivot, result, "fuzzy") if err := enc.Encode(br); err != nil { return err } default: // XXX: We want to add unmatched pieces as well; here? We // probably want to do a single final pass to complete the // dataset. } } return nil } ) zipper := zipkey.New(releases, refs, keyer, grouper) return zipper.Run() } // ZippyVerifyRefsOpenLibraryTable takes OL editions (as release) and refs (as // release) and emits a match table for manual inspection. func ZippyVerifyRefsOpenLibraryTable(olr, refs io.Reader, w io.Writer) error { var ( keyer = makeKeyFunc("\t", 1) grouper = func(g *zipkey.Group) error { var ( re, pivot *Release err error ) if len(g.G0) == 0 || len(g.G1) == 0 { return nil } // We take a single edition from OL. if pivot, err = stringToRelease(Cut(g.G0[0], 2)); err != nil { return err } for _, line := range g.G1 { if re, err = stringToRelease(Cut(line, 2)); err != nil { return err } // The refs have a container name, but not a title, but here we // compare against titles from open library. re.Title = re.ContainerName result := Verify(pivot, re) fmt.Printf("%s\t%s\t%s\t%s\t%s\t%s\t%s\t%s\n", result.Status.Short(), result.Reason.Short(), pivot.Extra.OpenLibrary.WorkID, FindByPrefix(pivot.Extra.OpenLibrary.SourceRecords, "ia:"), re.Ident, CutSep(g.G0[0], "\t", 1), pivot.Title, re.Title) } return nil } ) zipper := zipkey.New(olr, refs, keyer, grouper) return zipper.Run() } // ZippyVerifyRefsOpenLibrary takes OL editions (as release) and refs (as // release) and emits a match table for manual inspection. func ZippyVerifyRefsOpenLibrary(olr, refs io.Reader, w io.Writer) error { var ( enc = json.NewEncoder(w) keyer = makeKeyFunc("\t", 1) grouper = func(g *zipkey.Group) error { var ( ref, pivot *Release // ref (reference), pivot (open library) err error ) if len(g.G0) == 0 || len(g.G1) == 0 { return nil } // We take a single edition from OL. if pivot, err = stringToRelease(Cut(g.G0[0], 2)); err != nil { return err } for _, line := range g.G1 { if ref, err = stringToRelease(Cut(line, 2)); err != nil { return err } // The refs have a container name, but not a title, but here we // compare against titles from open library. ref.Title = ref.ContainerName result := Verify(pivot, ref) switch result.Status { case StatusExact, StatusStrong: var bref BiblioRef bref.SourceReleaseIdent = ref.Ident bref.SourceWorkIdent = ref.WorkID bref.SourceReleaseStage = ref.ReleaseStage bref.SourceYear = fmt.Sprintf("%d", ref.ReleaseYear()) bref.RefIndex = ref.Extra.Skate.Ref.Index + 1 // we want 1-index (also helps with omitempty) bref.RefKey = ref.Extra.Skate.Ref.Key bref.TargetOpenLibraryWork = pivot.WorkID bref.MatchProvenance = ref.Extra.Skate.Ref.Source bref.MatchStatus = result.Status.Short() bref.MatchReason = result.Reason.Short() if err := enc.Encode(bref); err != nil { return err } default: } } return nil } ) zipper := zipkey.New(olr, refs, keyer, grouper) return zipper.Run() } // ZippyBrefAugment takes all matched elements from bref and adds docs from raw // refs, which have not been matched. It also gets rid of duplicate matches. // // We can identify, which docs have been matched by checking the ref key and index. func ZippyBrefAugment(bref, raw io.Reader, w io.Writer) error { var ( enc = json.NewEncoder(w) keyer = makeKeyFunc("\t", 1) grouper = func(g *zipkey.Group) error { // g.G0 contains a matched docs for a given work id, g.G1 all raw // refs, with the same work id. // First, iterate over all matches and sort out duplicates, e.g. // docs that have the same source and target id. matched, err := uniqueMatches(g.G0) if err != nil { return err } var refs = make([]*Ref, len(g.G1)) for i := 0; i < len(refs); i++ { var ref Ref if err := json.Unmarshal([]byte(g.G1[i]), &ref); err != nil { return err } refs[i] = &ref } matchedRefsExtend(matched, refs) for _, bref := range matched { if err := enc.Encode(bref); err != nil { return err } } // We want to find all items in g.G1, which are not in unique. This // is a set like operation, but we want a custom comparator. return nil } ) zipper := zipkey.New(bref, raw, keyer, grouper) return zipper.Run() } // matchedRefsExtend takes a set of (unique) biblioref docs and will emit that // set of biblioref docs (unchanged) plus raw references as biblioref, which // did not result in a match (determined by ref key and index). func matchedRefsExtend(matched []*BiblioRef, refs []*Ref) { s := set.New() // store key + index of matched items for _, m := range matched { s.Add(m.Key + fmt.Sprintf("%d", m.RefIndex)) } for _, r := range refs { if s.Contains(r.Key + fmt.Sprintf("%d", r.Index)) { continue } var bref BiblioRef bref.Key = fmt.Sprintf("%s_%d", r.ReleaseIdent, r.Index) bref.RefIndex = r.Index bref.RefKey = r.Key bref.SourceReleaseIdent = r.ReleaseIdent bref.SourceReleaseStage = r.ReleaseStage bref.SourceWorkIdent = r.WorkIdent bref.SourceYear = fmt.Sprintf("%d", r.ReleaseYear) bref.TargetUnstructured = r.Biblio.Unstructured // Reuse fields for debugging, for now. bref.MatchStatus = StatusUnmatched.Short() bref.MatchReason = ReasonUnknown.Short() matched = append(matched, &bref) } return } // uniqueMatches takes a list of bref docs (unserialized) and will return a // list of serialized bref docs, containing unique matches. func uniqueMatches(docs []string) (result []*BiblioRef, err error) { var ( brefs []*BiblioRef bref BiblioRef ) for _, doc := range docs { if err := json.Unmarshal([]byte(doc), &bref); err != nil { return nil, err } brefs = append(brefs, &bref) } // Make sure we exact matches come first. sort.Slice(brefs, func(i, j int) bool { return brefs[i].MatchStatus != StatusExact.Short() }) // We consider a match unique, if source and target match. hash := func(bref *BiblioRef) string { return bref.SourceReleaseIdent + bref.TargetReleaseIdent } seen := set.New() for _, doc := range brefs { v := hash(doc) if seen.Contains(v) { continue } seen.Add(v) result = append(result, doc) } return result, nil } // Cut returns a specific column (1-indexed, like CutSep) from a tabular // file, returns empty string if column is invalid. func Cut(line string, column int) string { return CutSep(line, "\t", column) } // CutSep allows to specify a separator. func CutSep(line, sep string, column int) string { parts := strings.Split(strings.TrimSpace(line), sep) if len(parts) < column { return "" } else { return parts[column-1] } } // FindByPrefix return the first element for a slice of strings, which matches a prefix. func FindByPrefix(ss []string, prefix string) string { for _, s := range ss { if strings.HasPrefix(s, prefix) { return s } } return "" } // makeKeyFunc creates a function that can be used as keyFunc, selecting a // column from fields separated by sep; column is 1-indexed. func makeKeyFunc(sep string, column int) func(string) (string, error) { return func(s string) (string, error) { if k := CutSep(s, sep, column); k == "" { return k, fmt.Errorf("cannot get key from column %d in line (len=%d): %s", column, len(s), s) } else { return k, nil } } } func stringToRelease(s string) (r *Release, err error) { err = json.Unmarshal([]byte(s), &r) return } func stringToRef(s string) (r *Ref, err error) { err = json.Unmarshal([]byte(s), &r) return } func stringToWiki(s string) (r *MinimalCitations, err error) { err = json.Unmarshal([]byte(s), &r) return } func stringToBiblioref(s string) (r *BiblioRef, err error) { err = json.Unmarshal([]byte(s), &r) return }