package skate import ( "bytes" "errors" "reflect" "regexp" "runtime" "strings" "github.com/segmentio/encoding/json" "github.com/tidwall/gjson" "golang.org/x/text/unicode/norm" ) var ( bTab = []byte("\t") bNewline = []byte("\n") ErrZeroFields = errors.New("zero fields") ErrMissingFieldName = errors.New("missing field name") wsReplacer = strings.NewReplacer("\t", " ", "\n", " ") repeatedWs = regexp.MustCompile(`[ ]{2,}`) nonWord = regexp.MustCompile(`[\W]+`) SandcrawlerCharMap = map[string]string{ "\u00c6": "AE", "\u00e6": "ae", "\u00d0": "D", "\u00f0": "d", "\u00d8": "O", "\u00f8": "o", "\u00de": "Th", "\u00fe": "th", "\u00df": "s", "\u0110": "D", "\u0111": "d", "\u0126": "H", "\u0127": "h", "\u0131": "i", "\u0138": "k", "\u0141": "L", "\u0142": "l", "\u014a": "N", "\u014b": "n", "\u0152": "Oe", "\u0153": "oe", "\u0166": "T", "\u0167": "t", "\u00b5": "u", "c": "c", "\u0192": "f", "\u2202": "", "\u0296": "", "\u2211": "", "\u220f": "", "\u02c6": "", "\u2603": "", "\u02c7": "", } SandcrawlerPrefixRemove = []string{ "original article: ", "original article ", "article: ", "title: ", } // SandcrawlerPrefixRemove does not have: // InCombiningDiacriticalMarks (assume it's in "M"), // https://unicodebook.readthedocs.io/unicode.html, // https://stackoverflow.com/q/5697171/89391, // https://github.com/google/re2/wiki/Syntax. SandcrawlerRemoveCharRegex = regexp.MustCompile("[\\s\\p{P}\\p{M}\u2000-\u206F\u2E00-\u2E7F’\u0060·“”‘’“”«»「」¿–±§_°ʖ©®¤=<>|+$^~≈√∫≤≥÷ƒ∆¬£¢∞¥◊€]") ) // TitleDoc is a document with a title. type TitleDoc struct { Title string `json:"title"` } // ContainerNameDoc is a (ref) document with a container title. type ContainerNameDoc struct { Biblio struct { ContainerName string `json:"container_name"` } `json:"biblio"` } // PartialRef for ref docs, that do not have DOI or title. E.g. we found 49701699 // (NCVY), 36401044 (NCVYU), 29668363 (NCUY), and so on. Some examples: XXX. type PartialRef struct { ContainerName string `json:"container_name"` Contribs []struct { // XXX: Need a way to sensibly compare sets of author names. RawName string `json:"raw_name"` } `json:"contribs"` Volume string `json:"volume"` Unstructured string `json:"unstructured"` Year string `json:"release_year"` } // cdxSummary is an ad-hoc CDX summary format, created currently by skate-cdx-lookup. type cdxSummary struct { Line string `json:"line"` NumRows int64 `json:"numRows"` Summary struct { Delta int64 `json:"delta"` Last string `json:"last"` Ok string `json:"ok"` } `json:"summary"` } // Mapper maps a blob to an arbitrary number of fields, e.g. for (key, // doc). We want fields, but we do not want to bake in TSV into each function. type Mapper func([]byte) ([][]byte, error) // AsTSV serializes the result of a field mapper as TSV. This is a slim // adapter, e.g. to parallel.Processor, which expects this function signature. // A newline will be appended, if not there already. // // Anecdotally a parallelized implementation of a mapper can process around 300MiB/s. func (f Mapper) AsTSV(p []byte) ([]byte, error) { var ( fields [][]byte err error b []byte ) if fields, err = f(p); err != nil { return nil, err } if len(fields) == 0 { return nil, nil } b = bytes.Join(fields, bTab) if len(b) > 0 && !bytes.HasSuffix(b, bNewline) { b = append(b, bNewline...) } return b, nil } // WithPrefix is a "mapper middleware", adding a given prefix to the first field. func WithPrefix(f Mapper, prefix string) Mapper { return func(p []byte) ([][]byte, error) { fields, err := f(p) if err != nil { return fields, err } if len(fields) == 0 { return nil, ErrZeroFields } fields[0] = append([]byte(prefix+":"), fields[0]...) return fields, err } } // WithBestEffort will not fail on an error. func WithBestEffort(f Mapper) Mapper { return func(p []byte) ([][]byte, error) { if fields, err := f(p); err != nil { return nil, nil } else { return fields, err } } } // WithSkipOnEmpty ignores results where the value at a given field is empty. // One indexed. func WithSkipOnEmpty(f Mapper, index int) Mapper { return func(p []byte) ([][]byte, error) { fields, err := f(p) if err != nil { return nil, err } if index < len(fields) && len(fields[index]) == 0 { return nil, nil } return fields, err } } // NameOf returns name of value, e.g. the name of a function. func NameOf(f interface{}) string { v := reflect.ValueOf(f) if v.Kind() == reflect.Func { if rf := runtime.FuncForPC(v.Pointer()); rf != nil { return rf.Name() } } return v.String() } // Identifier returns just the input again. func Identity(p []byte) ([][]byte, error) { return [][]byte{p}, nil } // CreateFixedMapper extract the value from a given fixed json key, e.g. // "biblio.doi" and the like. Returns a function that maps doc to (value, doc). func CreateFixedMapper(field string) Mapper { if len(field) > 0 && field[0] == '.' { // gjson is not jq, we do not use a leading dot, so remove it, if // accidentally used here field = field[1:] } f := func(p []byte) ([][]byte, error) { result := gjson.GetBytes(p, field) // A subtle bug can emerge here: By default we use tab as separator. If // the value extracted ends with the separator (e.g. tab), then we get // an invalid row. Hence, trim all space. key := []byte(strings.TrimSpace(result.String())) return [][]byte{key, p}, nil } return f } // MapperTitle extracts (title, doc). func MapperTitle(p []byte) ([][]byte, error) { var ( doc TitleDoc key []byte ) if err := json.Unmarshal(p, &doc); err != nil { return nil, err } else { key = []byte(wsReplacer.Replace(strings.TrimSpace(doc.Title))) } return [][]byte{key, p}, nil } // MapperTitleNormalized extracts (title normalized, doc). func MapperTitleNormalized(p []byte) (fields [][]byte, err error) { if fields, err = MapperTitle(p); err != nil { return nil, err } key := string(fields[0]) key = wsReplacer.Replace(strings.TrimSpace(key)) key = strings.ToLower(key) key = repeatedWs.ReplaceAllString(key, " ") key = nonWord.ReplaceAllString(key, "") fields[0] = []byte(key) return fields, nil } // MapperTitleNormalized extracts (title nysiis, doc). func MapperTitleNysiis(p []byte) (fields [][]byte, err error) { if fields, err = MapperTitle(p); err != nil { return nil, err } key := string(fields[0]) key = wsReplacer.Replace(strings.TrimSpace(key)) key = NYSIIS(key) fields[0] = []byte(key) return fields, nil } // MapperTitleSandcrawler extracts (title sandcrawler, doc). func MapperTitleSandcrawler(p []byte) (fields [][]byte, err error) { if fields, err = MapperTitle(p); err != nil { return nil, err } key := string(fields[0]) key = sandcrawlerSlugify(wsReplacer.Replace(strings.TrimSpace(key))) fields[0] = []byte(key) return fields, nil } // MapperContainerName extracts (container_name, doc). func MapperContainerName(p []byte) ([][]byte, error) { var ( doc PartialRef key []byte ) if err := json.Unmarshal(p, &doc); err != nil { return nil, err } else { key = []byte(wsReplacer.Replace(strings.TrimSpace(doc.ContainerName))) } return [][]byte{key, p}, nil } // MapperContainerNameSandcrawler extracts (container_name, doc). func MapperContainerNameSandcrawler(p []byte) (fields [][]byte, err error) { if fields, err = MapperContainerName(p); err != nil { return nil, err } key := string(fields[0]) key = sandcrawlerSlugify(wsReplacer.Replace(strings.TrimSpace(key))) fields[0] = []byte(key) return fields, nil } // MapperURLFromRef extracts the (url, doc). func MapperURLFromRef(p []byte) (fields [][]byte, err error) { var ref Ref if err = json.Unmarshal(p, &ref); err != nil { return nil, err } fields = [][]byte{ []byte(ref.Biblio.Url), p, } return fields, nil } // MapperIdentURLFromRef extracts the (work ident, release ident, url, doc). // Previously: parallel -j 16 --block 100M --pipe "jq -rc '[.work_ident, // .release_ident, .biblio.url?] | @tsv'" ... // This implementation seems slightly faster than jq and parallel. func MapperIdentURLFromRef(p []byte) (fields [][]byte, err error) { var ref Ref if err = json.Unmarshal(p, &ref); err != nil { return nil, err } fields = [][]byte{ []byte(ref.WorkIdent), []byte(ref.ReleaseIdent), []byte(ref.Biblio.Url), p, } return fields, nil } // MapperReleaseContainerName extracts a normalized container name. func MapperReleaseContainerName(p []byte) (fields [][]byte, err error) { var ( doc Release key []byte w string ) if err := json.Unmarshal(p, &doc); err != nil { return nil, err } if doc.Container.Name != "" { w = doc.Container.Name } else if doc.ContainerName != "" { w = doc.ContainerName } else { w = doc.Extra.ContainerName } key = []byte(sandcrawlerSlugify(wsReplacer.Replace(strings.TrimSpace(w)))) return [][]byte{key, p}, nil } // MapperReleaseResolvedContainerName extracts slug container name from // resolved container names. func MapperReleaseResolvedContainerName(p []byte) (fields [][]byte, err error) { var ( doc Release key []byte ) if err := json.Unmarshal(p, &doc); err != nil { return nil, err } if doc.Extra.Skate.ResolvedContainerName == "" { return nil, nil } key = []byte(sandcrawlerSlugify(wsReplacer.Replace(strings.TrimSpace(doc.Extra.Skate.ResolvedContainerName)))) return [][]byte{key, p}, nil } func MapperOpenLibraryReleaseNormalizedISBN(p []byte) (fields [][]byte, err error) { var ( doc Release key []byte isbn13 string ) if err := json.Unmarshal(p, &doc); err != nil { return nil, err } // There can be 10 and 13 variants in the data, we always want 13. for _, isbn := range doc.ExtIDs.ISBN { parsed := ParseIsbn(isbn) if len(parsed) > 0 { isbn13 = parsed[0] break } } if isbn13 == "" { return nil, nil } else { key = []byte(isbn13) } return [][]byte{key, p}, nil } func MapperCdxSummary(p []byte) (fields [][]byte, err error) { var ( cdx cdxSummary ) if err := json.Unmarshal(p, &cdx); err != nil { return nil, err } return [][]byte{[]byte(cdx.Line), p}, nil } // MapperPartial works on partial documents. func MapperPartial(p []byte) (fields [][]byte, err error) { // TODO: Group by some normlized container name or identifier. return nil, nil } // MapperBrefWork maps bref for comparison with other datasets, // such as COCI (which uses DOI). func MapperBrefWork(p []byte) (fields [][]byte, err error) { var bref BiblioRef if err := json.Unmarshal(p, &bref); err != nil { return nil, err } return [][]byte{[]byte(bref.SourceWorkIdent), p}, nil } // MapperReleaseWork maps release to work ident. func MapperReleaseWork(p []byte) (fields [][]byte, err error) { var release Release if err := json.Unmarshal(p, &release); err != nil { return nil, err } return [][]byte{[]byte(release.WorkID), p}, nil } // sandcrawlerSlugify normalizes a string. func sandcrawlerSlugify(s string) string { slug := strings.ToLower(strings.TrimSpace(s)) for _, prefix := range SandcrawlerPrefixRemove { if strings.HasPrefix(slug, prefix) { slug = slug[:len(prefix)] } } slug = strings.ReplaceAll(slug, "'", "'") for k, v := range SandcrawlerCharMap { slug = strings.ReplaceAll(slug, k, v) } if len(slug) == 0 { return slug } slug = norm.NFKD.String(slug) slug = SandcrawlerRemoveCharRegex.ReplaceAllString(slug, "") return strings.ToLower(slug) }