diff options
Diffstat (limited to 'skate')
-rw-r--r-- | skate/zippy.go | 39 |
1 files changed, 35 insertions, 4 deletions
diff --git a/skate/zippy.go b/skate/zippy.go index 9691f6f..0d92873 100644 --- a/skate/zippy.go +++ b/skate/zippy.go @@ -1,9 +1,10 @@ -// This file contains various "reducers", e.g. working on two data streams and +// This file contains various "reducers", e.g. merging data from two streams and // applying a function on groups of documents with a shared key. // -// Note: This is a bit repetitive, but not want to introduce any other -// abstraction for now. Since most of the logic is in the grouper functions, we -// could make them top level and then assemble the zipkey runner on the fly. +// Note: This is a bit repetitive, but we do not want to introduce any other +// abstraction for now. Since most of the logic is in the "grouper" functions, +// we could make them top level values and then assemble the zipkey runner on +// the fly. // // The most confusing aspect currently is the variety of schemas hidden within // the readers (and string groups): release, ref, ref-as-release, open library, @@ -352,6 +353,7 @@ func ZippyBrefAugment(bref, raw io.Reader, w io.Writer) error { // 4kg2dejsgzaf3cszs2lt5hz4by_9, which appears three times, one // exact match, and twice unmatched). // TODO: remove duplicates + matched = deduplicateBrefs(matched) for _, bref := range matched { stats.total++ if err := enc.Encode(bref); err != nil { @@ -367,6 +369,35 @@ func ZippyBrefAugment(bref, raw io.Reader, w io.Writer) error { return err } +// deduplicateBrefs deduplicates by the document id (for elasticsearch), which +// may help filter out some duplicates but not all. +func deduplicateBrefs(brefs []*BiblioRef) []*BiblioRef { + // Sort by match status, exact first, unmatched last. + sort.Slice(brefs, func(i, j int) bool { + switch { + case brefs[i].MatchStatus == StatusExact.Short(): + return true + case brefs[i].MatchStatus != StatusUnmatched.Short(): + return true + default: + return false + } + }) + var ( + unique []*BiblioRef + seen = set.New() + ) + for _, v := range brefs { + if seen.Contains(v.Key) { + continue + } + unique = append(unique, v) + seen.Add(v.Key) + } + log.Printf("trimmed brefs from %d to %d", len(brefs), len(unique)) + return unique +} + // matchedRefsExtend takes a set of (unique) biblioref docs and will emit that // set of biblioref docs (unchanged) plus raw references as biblioref, which // did not result in a match (determined by e.g. ref key and index). XXX: We |