// This file contains various "reducers", which e.g. merge data from two // streams and apply a function on groups of documents with a shared key. // // Note: This is a bit repetitive, but we do not want to introduce any other // abstraction for now. Since most of the logic is in the "grouper" functions, // we could make them top level values and then assemble the zipkey runner on // the fly. // // The most confusing aspect currently is the variety of schemas hidden within // the readers (and string groups): release, ref, biblioref, csl, // ref-as-release, open library, wikipedia, ... // // We call the biblioref schema sometimes just bref, for short. // // TODO: // * [ ] pass release stage through all match types // * [ ] switch to faster logging, e.g. zerolog, https://github.com/rs/zerolog#benchmarks // * [x] batch, parallelize // * [ ] unify flags to "-a", "-b" // // A couple more ideas to improve. // // * each reducer could be its own type instead of a function in order to allow // for customizations, options, e.g. // // type ReduceExact struct { // ReleasesReader io.Reader // RefsReader io.Reader // W io.Writer // MatchResult MatchResult // } // // func (r *ReduceExact) Run() error { ... } // package skate import ( "crypto/sha1" "encoding/base32" "fmt" "io" "log" "sort" "strings" "time" "github.com/benbjohnson/clock" "github.com/segmentio/encoding/json" "gitlab.com/internetarchive/refcat/skate/set" "gitlab.com/internetarchive/refcat/skate/xio" "gitlab.com/internetarchive/refcat/skate/zipkey" ) var ( // a mockable time, here on package level (todo: more encapsulation) T clock.Clock = clock.New() // a few sane dates to accept minDate = 1500 maxDate = time.Now().AddDate(5, 0, 0).Year() ) // groupLogf logs a message alongsize a serialized group for debugging. func groupLogf(g *zipkey.Group, s string, vs ...interface{}) { log.Printf(s, vs...) b, _ := json.MarshalIndent(g, "", " ") log.Println(string(b)) } // ZippyExact takes a release and refs reader (key, doc) and assigns a fixed // match result, e.g. for doi matches. func ZippyExact(releases, refs io.Reader, matchResult MatchResult, w io.Writer) error { var ( ts = T.Now().UTC().Format(time.RFC3339) enc = json.NewEncoder(xio.NewSyncWriter(w)) keyer = makeKeyFunc("\t", 1) grouper = func(g *zipkey.Group) error { var ( target *Release ref *Ref bref BiblioRef err error ) if len(g.G0) == 0 || len(g.G1) == 0 { return nil } if target, err = parseRelease(Cut(g.G0[0], 2)); err != nil { groupLogf(g, "[skip] failed to parse release: %v", err) return nil } for _, line := range g.G1 { if ref, err = parseRef(Cut(line, 2)); err != nil { log.Printf("[skip] failed to parse ref from line: '%s'", line) continue } bref.Reset() bref.Key = fmt.Sprintf("%s_%d", ref.ReleaseIdent, ref.Index) bref.IndexedTs = ts bref.SourceReleaseIdent = ref.ReleaseIdent bref.SourceWorkIdent = ref.WorkIdent bref.SourceReleaseStage = ref.ReleaseStage bref.SourceYear = fmt.Sprintf("%d", ref.ReleaseYear) bref.RefIndex = ref.Index bref.RefKey = ref.Key bref.TargetReleaseIdent = target.Ident bref.TargetWorkIdent = target.WorkID bref.MatchProvenance = ref.RefSource bref.MatchStatus = matchResult.Status.Short() bref.MatchReason = matchResult.Reason.Short() if err := enc.Encode(bref); err != nil { return err } } return nil } batcher = zipkey.NewBatcher(grouper) ) defer batcher.Close() zipper := zipkey.New(releases, refs, keyer, batcher.GroupFunc) return zipper.Run() } // ZippyExactReleases takes two release readers (key, doc) and assigns a fixed // match result, e.g. used with release entities converted from open library snapshots. func ZippyExactReleases(olr, releases io.Reader, matchResult MatchResult, w io.Writer) error { var ( enc = json.NewEncoder(xio.NewSyncWriter(w)) keyer = makeKeyFunc("\t", 1) grouper = func(g *zipkey.Group) error { var ( target, re *Release err error ) if len(g.G0) == 0 || len(g.G1) == 0 { return nil } if target, err = parseRelease(Cut(g.G0[0], 2)); err != nil { groupLogf(g, "[skip] failed to parse release: %v", err) return nil } for _, line := range g.G1 { if re, err = parseRelease(Cut(line, 2)); err != nil { groupLogf(g, "[skip] failed to parse release: %v", err) continue } if target.WorkID == "" { continue } var bref BiblioRef bref.SourceReleaseIdent = re.Ident bref.SourceWorkIdent = re.WorkID bref.SourceReleaseStage = re.ReleaseStage bref.SourceYear = fmt.Sprintf("%d", re.ReleaseYear()) bref.RefIndex = re.Extra.Skate.Ref.Index bref.RefKey = re.Extra.Skate.Ref.Key bref.TargetOpenLibraryWork = target.WorkID bref.MatchProvenance = re.Extra.Skate.Ref.Source bref.MatchStatus = matchResult.Status.Short() bref.MatchReason = matchResult.Reason.Short() if err := enc.Encode(bref); err != nil { return err } } return nil } batcher = zipkey.NewBatcher(grouper) ) defer batcher.Close() zipper := zipkey.New(olr, releases, keyer, batcher.GroupFunc) return zipper.Run() } // ZippyExactWiki takes a release and wiki reader (key, doc) and assigns a // fixed match result. func ZippyExactWiki(releases, wiki io.Reader, mr MatchResult, w io.Writer) error { var ( enc = json.NewEncoder(xio.NewSyncWriter(w)) // TODO: Use slug version of title. Also consider a generic schema // (e.g. one that would look similar for OL, WB, WP, ...) b32enc = base32.StdEncoding.WithPadding(base32.NoPadding) keyer = makeKeyFunc("\t", 1) grouper = func(g *zipkey.Group) error { var ( target *Release wiki *MinimalCitations key, lang, encodedPage string err error ) if len(g.G0) == 0 || len(g.G1) == 0 { return nil } if target, err = parseRelease(Cut(g.G0[0], 2)); err != nil { return err } // Sort out a few duplicates, e.g. // lfqxs3tv_obj3cjr5wrhjffnmgze5jn7a4a, // z2kc233qnfxwszbaojswgzlqorxxe_f7mn45dvyvespbv2pxgyt674k4, ... seen := set.New() for _, line := range g.G1 { if wiki, err = parseWiki(Cut(line, 3)); err != nil { return err } var bref BiblioRef // We use lowercase base32 w/o padding of the original // PageTitle as component for the id. XXX: ok for now? if wiki.Language == "" { lang = "en" // XXX: We currently only use "en" subset. } else { lang = wiki.Language } encodedPage = strings.ToLower(b32enc.EncodeToString([]byte(lang + ":" + wiki.PageTitle))) key = fmt.Sprintf("wikipedia_%s_%s", encodedPage, target.Ident) if seen.Contains(key) { continue } seen.Add(key) bref.Key = key bref.SourceWikipediaArticle = fmt.Sprintf("%s:%s", lang, wiki.PageTitle) bref.TargetReleaseIdent = target.Ident bref.TargetWorkIdent = target.WorkID bref.MatchProvenance = "wikipedia" bref.MatchStatus = mr.Status.Short() bref.MatchReason = mr.Reason.Short() if err := enc.Encode(bref); err != nil { return err } } return nil } batcher = zipkey.NewBatcher(grouper) ) defer batcher.Close() zipper := zipkey.New(releases, wiki, keyer, batcher.GroupFunc) return zipper.Run() } // ZippyVerifyRefs takes a release and refs (as release) reader (key, doc), run // fuzzy verification and will emit a biblioref document, if exact or strong // match. func ZippyVerifyRefs(releases, refs io.Reader, w io.Writer) error { var ( enc = json.NewEncoder(xio.NewSyncWriter(w)) keyer = makeKeyFunc("\t", 1) grouper = func(g *zipkey.Group) error { var ( re, pivot *Release err error ) if len(g.G0) == 0 || len(g.G1) == 0 { return nil } if pivot, err = parseRelease(Cut(g.G0[0], 2)); err != nil { return err } for _, line := range g.G1 { if re, err = parseRelease(Cut(line, 2)); err != nil { return err } result := Verify(pivot, re) switch result.Status { case StatusExact, StatusStrong: if result.Reason == ReasonDOI { continue } // XXX: what should be the provenance? br := generateBiblioRef(re, pivot, result, "fuzzy") if err := enc.Encode(br); err != nil { return err } default: } } return nil } batcher = zipkey.NewBatcher(grouper) ) defer batcher.Close() zipper := zipkey.New(releases, refs, keyer, batcher.GroupFunc) return zipper.Run() } // ZippyVerifyRefsOpenLibraryTable takes OL editions (as release) and refs (as // release) and emits a match table for manual inspection. This is mainly for // debugging. func ZippyVerifyRefsOpenLibraryTable(olr, refs io.Reader, w io.Writer) error { var ( keyer = makeKeyFunc("\t", 1) grouper = func(g *zipkey.Group) error { var ( re, pivot *Release err error ) if len(g.G0) == 0 || len(g.G1) == 0 { return nil } // We take a single edition from OL. if pivot, err = parseRelease(Cut(g.G0[0], 2)); err != nil { return err } for _, line := range g.G1 { if re, err = parseRelease(Cut(line, 2)); err != nil { return err } // The refs have a container name, but not a title, but here we // compare against titles from open library. re.Title = re.ContainerName result := Verify(pivot, re) fmt.Printf("%s\t%s\t%s\t%s\t%s\t%s\t%s\t%s\n", result.Status.Short(), result.Reason.Short(), pivot.Extra.OpenLibrary.WorkID, FindByPrefix(pivot.Extra.OpenLibrary.SourceRecords, "ia:"), re.Ident, CutSep(g.G0[0], "\t", 1), pivot.Title, re.Title) } return nil } ) zipper := zipkey.New(olr, refs, keyer, grouper) return zipper.Run() } // ZippyVerifyRefsOpenLibrary takes OL editions (as release) and refs (as // release) and writes biblioref. func ZippyVerifyRefsOpenLibrary(olr, refs io.Reader, w io.Writer) error { var ( enc = json.NewEncoder(xio.NewSyncWriter(w)) keyer = makeKeyFunc("\t", 1) grouper = func(g *zipkey.Group) error { // TODO: For openlibrary and wayback matches, pass through either // unstructured ref string, or CSL JSON var ( ref, pivot *Release // ref (reference), pivot (open library) err error ) if len(g.G0) == 0 || len(g.G1) == 0 { return nil } // We take a single edition from OL. if pivot, err = parseRelease(Cut(g.G0[0], 2)); err != nil { return err } for _, line := range g.G1 { if ref, err = parseRelease(Cut(line, 2)); err != nil { return err } // The refs have a container name, but not a title, but here we // compare against titles from open library. ref.Title = ref.ContainerName result := Verify(pivot, ref) switch result.Status { case StatusExact, StatusStrong: openLibraryWorkID := cleanOpenLibraryIdentifier(pivot.WorkID) if openLibraryWorkID == "" { continue } var bref BiblioRef bref.SourceReleaseIdent = ref.Ident bref.SourceWorkIdent = ref.WorkID bref.SourceReleaseStage = ref.ReleaseStage bref.SourceYear = fmt.Sprintf("%d", ref.ReleaseYear()) bref.RefIndex = ref.Extra.Skate.Ref.Index bref.RefKey = ref.Extra.Skate.Ref.Key bref.TargetOpenLibraryWork = openLibraryWorkID bref.MatchProvenance = ref.Extra.Skate.Ref.Source bref.MatchStatus = result.Status.Short() bref.MatchReason = result.Reason.Short() bref.TargetUnstructured = ReleaseToUnstructured(pivot) if err := enc.Encode(bref); err != nil { return err } default: } } return nil } batcher = zipkey.NewBatcher(grouper) ) defer batcher.Close() zipper := zipkey.New(olr, refs, keyer, batcher.GroupFunc) return zipper.Run() } // ZippyWayback takes a (url, ref) reader and a (url, cdx) reader and will // write a bref document for each match. func ZippyWayback(refs, cdx io.Reader, w io.Writer) error { var ( ts = T.Now().UTC().Format(time.RFC3339) enc = json.NewEncoder(xio.NewSyncWriter(w)) keyer = makeKeyFunc("\t", 1) grouper = func(g *zipkey.Group) error { var ( ref *Ref cdx *cdxSummary err error h = sha1.New() ) // We take a single item from refs. if ref, err = parseRef(Cut(g.G0[0], 2)); err != nil { return err } if cdx, err = parseCdxSummary(Cut(g.G1[0], 2)); err != nil { return err } var bref BiblioRef // TODO: this is a temporary way to generate an id. _, _ = h.Write([]byte(cdx.Line)) hashedURL := fmt.Sprintf("%x", h.Sum(nil)) bref.Key = fmt.Sprintf("web_%s_%s", ref.ReleaseIdent, hashedURL) bref.IndexedTs = ts bref.SourceReleaseIdent = ref.ReleaseIdent bref.SourceWorkIdent = ref.WorkIdent bref.SourceReleaseStage = ref.ReleaseStage // TODO: track down the 0 year docs. if ref.ReleaseYear > 1800 { bref.SourceYear = fmt.Sprintf("%d", ref.ReleaseYear) } bref.RefIndex = ref.Index bref.RefKey = ref.Key if cdx.NumRows == 0 { bref.TargetURL = cdx.Line } else { if cdx.Summary.Ok == "" { bref.TargetURL = cdx.Line } else { // TODO: This would be better, if we only add a wayback // link, if live web fails. For that we would need a full // check of the URLs on the live web. bref.TargetURL = fmt.Sprintf("https://web.archive.org/web/%s/%s", cdx.Summary.Ok, cdx.Line) } } bref.MatchProvenance = ref.RefSource bref.MatchStatus = StatusExact.Short() bref.MatchReason = ReasonURLMatch.Short() if err := enc.Encode(bref); err != nil { return err } return nil } batcher = zipkey.NewBatcher(grouper) ) defer batcher.Close() zipper := zipkey.New(refs, cdx, keyer, batcher.GroupFunc) return zipper.Run() } // ZippyBrefAugment takes all matched docs from bref and adds docs from raw // refs, which have not been matched. It also gets rid of duplicate matches. // Note: This operates on two streams: raw refs with about 2.5B (07/2021) and // matches, which will be about 1B; in essence we have to iterate through about // 3.5B records; small tweaks here may be worthwhile. // // We can identify, which docs have been matched by checking the source ident, // ref index and key. // // TODO: This needs to be completed and made fast. func ZippyBrefAugment(bref, raw io.Reader, w io.Writer) error { var ( stats = statsAugment{} enc = json.NewEncoder(xio.NewSyncWriter(w)) keyer = makeKeyFunc("\t", 1) grouper = func(g *zipkey.Group) error { // g.G0 contains matched docs for a given work id, g.G1 all raw // refs, with the same work id. // First, iterate over all matches and sort out duplicates, e.g. // docs that have the same source and target id. log.Printf("group K=%s, G0=%d (bref), G1=%d (ref)", g.Key, len(g.G0), len(g.G1)) matched, err := uniqueMatches(CutBatch(g.G0, 2), &stats) if err != nil { return err } var refs = make([]*Ref, len(g.G1)) for i := 0; i < len(refs); i++ { var ( data []byte = []byte(Cut(g.G1[i], 2)) ref Ref ) if err := json.Unmarshal(data, &ref); err != nil { return err } refs[i] = &ref } // TODO: this slows down this process; be a bit smarter about slices. matched = matchedRefsExtend(matched, refs, &stats) // At this point, we may have duplicates by "_id", e.g. source // release ident and ref index (example: // 4kg2dejsgzaf3cszs2lt5hz4by_9, which appears three times, one // exact match, and twice unmatched). matched = deduplicateBrefs(matched) matched = removeSelfLinks(matched) for _, bref := range matched { stats.total++ if err := enc.Encode(bref); err != nil { return err } } return nil } batcher = zipkey.NewBatcher(grouper) ) defer batcher.Close() defer func() { log.Println(stats) }() zipper := zipkey.New(bref, raw, keyer, batcher.GroupFunc) return zipper.Run() } // removeSelfLinks removes self-referential links. TODO: Those should be caught // at the root cause. func removeSelfLinks(brefs []*BiblioRef) (result []*BiblioRef) { var i int for _, bref := range brefs { if bref.SourceReleaseIdent == bref.TargetReleaseIdent { continue } brefs[i] = bref i++ } brefs = brefs[:i] return brefs } // deduplicateBrefs deduplicates by the document id (for elasticsearch), which // may help filter out some duplicates but not all. func deduplicateBrefs(brefs []*BiblioRef) []*BiblioRef { // Sort by match status, exact first, unmatched last. sort.Slice(brefs, func(i, j int) bool { switch { case brefs[i].MatchStatus == StatusExact.Short(): return true case brefs[i].MatchStatus == StatusStrong.Short(): return true case brefs[i].MatchStatus == StatusWeak.Short(): return false case brefs[i].MatchStatus == StatusAmbiguous.Short(): return false case brefs[i].MatchStatus != StatusUnmatched.Short(): return true default: return false } }) var ( seen = set.New() i int ) for _, v := range brefs { // XXX: Is this enough, do we get rid of too many things here? if seen.Contains(v.Key) { log.Printf("key: %s", v.Key) continue } brefs[i] = v i++ seen.Add(v.Key) } brefs = brefs[:i] return brefs } // matchedRefsExtend takes a set of (unique) biblioref docs and will emit that // set of biblioref docs (unchanged) plus raw references as biblioref, which // did not result in a match (determined by e.g. ref key and index). XXX: We // may have duplicate refs as well - how to distinguish them? func matchedRefsExtend(matched []*BiblioRef, refs []*Ref, stats *statsAugment) []*BiblioRef { var ( seen = set.New() // store "key + index" of matched items ts = T.Now().UTC().Format(time.RFC3339) ) for _, m := range matched { s := m.RefKey + fmt.Sprintf("%d", m.RefIndex) seen.Add(s) } for _, r := range refs { s := r.Key + fmt.Sprintf("%d", r.Index) if seen.Contains(s) { stats.skipMatchedRef++ log.Printf("skip-matched-ref [%d]: from %d matches; ident=%v, title=%s, key=%v, index=%d", stats.skipMatchedRef, len(matched), r.ReleaseIdent, r.Biblio.Title, r.Key, r.Index) continue } // Assemble bref from unmatched ref. var bref BiblioRef bref.IndexedTs = ts bref.Key = fmt.Sprintf("%s_%d", r.ReleaseIdent, r.Index) bref.RefIndex = r.Index bref.RefKey = r.Key bref.SourceReleaseIdent = r.ReleaseIdent bref.SourceReleaseStage = r.ReleaseStage bref.SourceWorkIdent = r.WorkIdent bref.SourceYear = fmt.Sprintf("%d", r.ReleaseYear) if r.Biblio.Unstructured != "" { bref.TargetUnstructured = r.Biblio.Unstructured } else { // Unmatched and no unstructured field: generate CSL. var ( authors []CSLAuthor isbn string issued *CSLDate ) for _, rawName := range r.Biblio.ContribRawNames { authors = append(authors, CSLAuthor{RawName: rawName}) } if len(r.Biblio.Extra.ISBN) > 0 { isbn = r.Biblio.Extra.ISBN[0] } // TODO: need to update this "max year" number frequently? if r.Biblio.Year > minDate && r.Biblio.Year <= maxDate { issued = &CSLDate{Parts: [][]int{{int(r.Biblio.Year)}}} } else { issued = &CSLDate{} } bref.TargetCSL = &CSL{ Author: authors, ContainerTitle: r.Biblio.ContainerName, DOI: r.Biblio.DOI, ISBN: isbn, Issue: r.Biblio.Issue, PMCID: r.Biblio.PMCID, PMID: r.Biblio.PMID, Page: r.Biblio.Pages, Publisher: r.Biblio.Publisher, Title: r.Biblio.Title, URL: r.Biblio.Url, Volume: r.Biblio.Volume, Issued: issued, } } // Reuse fields for debugging, for now. bref.MatchProvenance = r.RefSource bref.MatchStatus = StatusUnmatched.Short() bref.MatchReason = ReasonUnknown.Short() matched = append(matched, &bref) } return matched } // uniqueMatches takes a list of bref docs (as string) and will return a // list of deserialized bref docs, containing unique matches only (e.g. filter // out duplicate matches, e.g. from exact and fuzzy). We are including // "skate-bref-id" post-processing here as well (but there is surely a better // place for that). func uniqueMatches(docs []string, stats *statsAugment) (result []*BiblioRef, err error) { var ( brefs []*BiblioRef ts = T.Now().UTC().Format(time.RFC3339) ) for _, doc := range docs { var bref BiblioRef if err := json.Unmarshal([]byte(doc), &bref); err != nil { return nil, err } // On-the-fly add elasticsearch "_id" and indexed timestamp, if not already set. if bref.Key == "" && bref.SourceReleaseIdent != "" { bref.Key = fmt.Sprintf("%s_%d", bref.SourceReleaseIdent, bref.RefIndex) bref.IndexedTs = ts } brefs = append(brefs, &bref) } // Make sure exact matches come first. XXX: bug? sort.Slice(brefs, func(i, j int) bool { return brefs[i].MatchStatus == StatusExact.Short() }) seen := set.New() for _, doc := range brefs { h := doc.LinkHash() if seen.Contains(h) { stats.skipDuplicatedBref++ log.Printf("skip-dup-bref [%d]: hash=%v source=%v status=%v reason=%v", stats.skipDuplicatedBref, h, doc.SourceReleaseIdent, doc.MatchStatus, doc.MatchReason) continue } seen.Add(h) result = append(result, doc) } return result, nil } type statsAugment struct { skipDuplicatedBref int64 skipMatchedRef int64 total int64 } func (s statsAugment) String() string { return fmt.Sprintf("total=%d, skipMatchedRef=%d, skipDuplicatedBref=%d", s.total, s.skipMatchedRef, s.skipDuplicatedBref) } // CutBatch runs Cut over a list of lines. func CutBatch(lines []string, column int) (result []string) { for _, line := range lines { result = append(result, Cut(line, column)) } return result } // Cut returns a specific column (1-indexed) from a line, returns empty string // if column is invalid. func Cut(line string, column int) string { return CutSep(line, "\t", column) } // CutSep allows to specify a separator, column is 1-indexed. func CutSep(line, sep string, column int) string { // XXX: This will cut the tab separator, if there is no other value. parts := strings.Split(line, sep) if len(parts) < column { return "" } else { if len(parts) == column { return strings.TrimSuffix(parts[column-1], "\n") } return parts[column-1] } } // FindByPrefix return the first element for a slice of strings, which matches a prefix. func FindByPrefix(ss []string, prefix string) string { for _, s := range ss { if strings.HasPrefix(s, prefix) { return s } } return "" } // makeKeyFunc creates a function that can be used as keyFunc, selecting a // column from fields separated by sep; column is 1-indexed. func makeKeyFunc(sep string, column int) func(string) (string, error) { return func(s string) (string, error) { if k := CutSep(s, sep, column); k != "" { return k, nil } return "", fmt.Errorf("cannot get key from column %d in line (len=%d): %s", column, len(s), s) } } func parseRelease(s string) (r *Release, err error) { err = json.Unmarshal([]byte(s), &r) return } func parseRef(s string) (r *Ref, err error) { err = json.Unmarshal([]byte(s), &r) return } func parseWiki(s string) (r *MinimalCitations, err error) { err = json.Unmarshal([]byte(s), &r) return } func parseBiblioRef(s string) (r *BiblioRef, err error) { err = json.Unmarshal([]byte(s), &r) return } func parseCdxSummary(s string) (r *cdxSummary, err error) { err = json.Unmarshal([]byte(s), &r) return } // cleanOpenLibraryIdentifier turns OL ids like /books/OL31189321M into OL31189321M. func cleanOpenLibraryIdentifier(s string) string { // XXX: This can be made faster; iterate from the end of the string // and use a slice. if s = strings.TrimSpace(s); len(s) == 0 { return "" } var ( parts = strings.Split(s, "/") last = parts[len(parts)-1] ) if strings.HasPrefix(last, "OL") { return last } log.Printf("warning: unexpected OL id: %s", s) return "" }