package skate import ( "bytes" "errors" "reflect" "runtime" "strings" json "github.com/segmentio/encoding/json" "github.com/tidwall/gjson" ) var ( bTab = []byte("\t") bNewline = []byte("\n") ErrZeroFields = errors.New("zero fields") ErrMissingFieldName = errors.New("missing field name") ) // TitleDoc is a document with a title. type TitleDoc struct { Title string `json:"title"` } // ContainerNameDoc is a (ref) document with a container title. type ContainerNameDoc struct { Biblio struct { ContainerName string `json:"container_name"` } `json:"biblio"` } // PartialRef for ref docs, that do not have DOI or title. E.g. we found 49701699 // (NCVY), 36401044 (NCVYU), 29668363 (NCUY), and so on. Some examples: XXX. type PartialRef struct { ContainerName string `json:"container_name"` Contribs []struct { // XXX: Need a way to sensibly compare sets of author names. RawName string `json:"raw_name"` } `json:"contribs"` Volume string `json:"volume"` Unstructured string `json:"unstructured"` Year string `json:"release_year"` } // Mapper maps a blob to an arbitrary number of fields, e.g. for (key, // doc). We want fields, but we do not want to bake in TSV into each function. type Mapper func([]byte) ([][]byte, error) // AsTSV serializes the result of a field mapper as TSV. This is a slim // adapter, e.g. to parallel.Processor, which expects this function signature. // A newline will be appended, if not there already. func (f Mapper) AsTSV(p []byte) ([]byte, error) { var ( fields [][]byte err error b []byte ) if fields, err = f(p); err != nil { return nil, err } if len(fields) == 0 { return nil, nil } b = bytes.Join(fields, bTab) if len(b) > 0 && !bytes.HasSuffix(b, bNewline) { b = append(b, bNewline...) } return b, nil } // WithPrefix is a "mapper middleware", adding a given prefix to the first field. func WithPrefix(f Mapper, prefix string) Mapper { return func(p []byte) ([][]byte, error) { fields, err := f(p) if err != nil { return fields, err } if len(fields) == 0 { return nil, ErrZeroFields } fields[0] = append([]byte(prefix+":"), fields[0]...) return fields, err } } // WithBestEffort will not fail on an error. func WithBestEffort(f Mapper) Mapper { return func(p []byte) ([][]byte, error) { if fields, err := f(p); err != nil { return nil, nil } else { return fields, err } } } // WithSkipOnEmpty ignores results where the value at a given field is empty. // One indexed. func WithSkipOnEmpty(f Mapper, index int) Mapper { return func(p []byte) ([][]byte, error) { fields, err := f(p) if err != nil { return nil, err } if index < len(fields) && len(fields[index]) == 0 { return nil, nil } return fields, err } } // NameOf returns name of value, e.g. the name of a function. func NameOf(f interface{}) string { v := reflect.ValueOf(f) if v.Kind() == reflect.Func { if rf := runtime.FuncForPC(v.Pointer()); rf != nil { return rf.Name() } } return v.String() } // Identifier returns just the input again. func Identity(p []byte) ([][]byte, error) { return [][]byte{p}, nil } // CreateFixedMapper extract the value from a given fixed json key, e.g. // "biblio.doi" and the like. Returns a function that maps doc to (value, doc). func CreateFixedMapper(field string) Mapper { if len(field) > 0 && field[0] == '.' { // gjson is not jq, we do not use a leading dot, so remove it, if // accidentally used here field = field[1:] } f := func(p []byte) ([][]byte, error) { result := gjson.GetBytes(p, field) // A subtle bug can emerge here: By default we use tab as separator. If // the value extracted ends with the separator (e.g. tab), then we get // an invalid row. Hence, trim all space. key := []byte(strings.TrimSpace(result.String())) return [][]byte{key, p}, nil } return f } // MapperTitle extracts (title, doc). func MapperTitle(p []byte) ([][]byte, error) { var ( doc TitleDoc key []byte ) if err := json.Unmarshal(p, &doc); err != nil { return nil, err } else { key = []byte(wsReplacer.Replace(strings.TrimSpace(doc.Title))) } return [][]byte{key, p}, nil } // MapperTitleNormalized extracts (title normalized, doc). func MapperTitleNormalized(p []byte) (fields [][]byte, err error) { if fields, err = MapperTitle(p); err != nil { return nil, err } key := string(fields[0]) key = wsReplacer.Replace(strings.TrimSpace(key)) key = strings.ToLower(key) key = repeatedWs.ReplaceAllString(key, " ") key = nonWord.ReplaceAllString(key, "") fields[0] = []byte(key) return fields, nil } // MapperTitleNormalized extracts (title nysiis, doc). func MapperTitleNysiis(p []byte) (fields [][]byte, err error) { if fields, err = MapperTitle(p); err != nil { return nil, err } key := string(fields[0]) key = wsReplacer.Replace(strings.TrimSpace(key)) key = NYSIIS(key) fields[0] = []byte(key) return fields, nil } // MapperTitleSandcrawler extracts (title sandcrawler, doc). func MapperTitleSandcrawler(p []byte) (fields [][]byte, err error) { if fields, err = MapperTitle(p); err != nil { return nil, err } key := string(fields[0]) key = sandcrawlerSlugify(wsReplacer.Replace(strings.TrimSpace(key))) fields[0] = []byte(key) return fields, nil } // MapperContainerName extracts (container_name, doc). func MapperContainerName(p []byte) ([][]byte, error) { var ( doc PartialRef key []byte ) if err := json.Unmarshal(p, &doc); err != nil { return nil, err } else { key = []byte(wsReplacer.Replace(strings.TrimSpace(doc.ContainerName))) } return [][]byte{key, p}, nil } // MapperContainerNameSandcrawler extracts (container_name, doc). func MapperContainerNameSandcrawler(p []byte) (fields [][]byte, err error) { if fields, err = MapperContainerName(p); err != nil { return nil, err } key := string(fields[0]) key = sandcrawlerSlugify(wsReplacer.Replace(strings.TrimSpace(key))) fields[0] = []byte(key) return fields, nil } // MapperURLFromRef extracts the (work ident, release ident, url, doc). // Previously: parallel -j 16 --block 100M --pipe "jq -rc '[.work_ident, // .release_ident, .biblio.url?] | @tsv'" ... // This implementation seems slightly faster than jq and parallel. func MapperURLFromRef(p []byte) (fields [][]byte, err error) { var ref Ref if err = json.Unmarshal(p, &ref); err != nil { return nil, err } fields = [][]byte{ []byte(ref.WorkIdent), []byte(ref.ReleaseIdent), []byte(ref.Biblio.Url), p, } return fields, nil } // MapperReleaseContainerName extracts a normalized container name. func MapperReleaseContainerName(p []byte) (fields [][]byte, err error) { var ( doc Release key []byte w string ) if err := json.Unmarshal(p, &doc); err != nil { return nil, err } if doc.Container.Name != "" { w = doc.Container.Name } else if doc.ContainerName != "" { w = doc.ContainerName } else { w = doc.Extra.ContainerName } key = []byte(sandcrawlerSlugify(wsReplacer.Replace(strings.TrimSpace(w)))) return [][]byte{key, p}, nil } // MapperReleaseResolvedContainerName extracts slug container name from // resolved container names. func MapperReleaseResolvedContainerName(p []byte) (fields [][]byte, err error) { var ( doc Release key []byte ) if err := json.Unmarshal(p, &doc); err != nil { return nil, err } if doc.Extra.Skate.ResolvedContainerName == "" { return nil, nil } key = []byte(sandcrawlerSlugify(wsReplacer.Replace(strings.TrimSpace(doc.Extra.Skate.ResolvedContainerName)))) return [][]byte{key, p}, nil } func MapperOpenLibraryReleaseNormalizedISBN(p []byte) (fields [][]byte, err error) { var ( doc Release key []byte isbn13 string ) if err := json.Unmarshal(p, &doc); err != nil { return nil, err } // There can be 10 and 13 variants in the data, we always want 13. for _, isbn := range doc.ExtIDs.ISBN { if len(isbn) == 13 { isbn13 = isbn break } } if isbn13 == "" { // This is rarer, more expensive. for _, isbn := range doc.ExtIDs.ISBN { parsed := ParseIsbn(isbn) if len(parsed) > 0 { isbn13 = parsed[0] break } } } if isbn13 == "" { return nil, nil } else { key = []byte(isbn13) } return [][]byte{key, p}, nil } // MapperPartial works on partial documents. func MapperPartial(p []byte) (fields [][]byte, err error) { // TODO: Group by some normlized container name or identifier. return nil, nil }