From 1cf6d135f274ce79b09a0396367186132bc178f3 Mon Sep 17 00:00:00 2001 From: Martin Czygan Date: Sat, 24 Apr 2021 14:18:11 +0200 Subject: wip: mapper with arbitrary number of fields --- skate/cluster.go | 25 +++++++++ skate/cmd/skate-map/main.go | 8 +-- skate/map.go | 120 +++++++++++++++++++++++++++----------------- 3 files changed, 103 insertions(+), 50 deletions(-) (limited to 'skate') diff --git a/skate/cluster.go b/skate/cluster.go index 7fc4e1b..3421a0b 100644 --- a/skate/cluster.go +++ b/skate/cluster.go @@ -1,6 +1,7 @@ package skate import ( + "fmt" "regexp" "strings" @@ -109,6 +110,30 @@ func KeyTitleSandcrawler(p []byte) (ident string, key string, err error) { return ident, sandcrawlerSlugify(key), nil } +// CreateFixedFieldFunc creates an extractor function given a json path. +// Currently only top level key is supported. +func CreateFixedFieldFunc(path string) IdentifierKeyFunc { + f := func(p []byte) (ident string, key string, err error) { + var doc map[string]interface{} + if err = json.Unmarshal(p, &doc); err != nil { + return + } + v, ok := doc[path] + if !ok { + return "", "", nil + } + switch t := v.(type) { + case string: + return "", t, nil + case int, int64, float32, float64: + return "", fmt.Sprintf("%v", t), nil + default: + return "", "", nil + } + } + return f +} + // sandcrawlerSlugify normalizes a string. func sandcrawlerSlugify(s string) string { slug := strings.ToLower(strings.TrimSpace(s)) diff --git a/skate/cmd/skate-map/main.go b/skate/cmd/skate-map/main.go index c5fb798..e72cc0c 100644 --- a/skate/cmd/skate-map/main.go +++ b/skate/cmd/skate-map/main.go @@ -25,9 +25,9 @@ var ( func main() { flag.Parse() // XXX: introduce prefixes - availableMappers := map[string]skate.Mapper{ + availableMappers := map[string]skate.FieldMapper{ "id": skate.Identity, - "ff": skate.CreateFixedFieldFunc(*extraValue), + "ff": skate.CreateFixedMapper(*extraValue), "title": skate.MapperTitle, "tnorm": skate.MapperTitleNormalized, "tnysi": skate.MapperTitleNysiis, @@ -36,9 +36,9 @@ func main() { switch { case *mapperName != "": if f, ok := availableMappers[*mapperName]; !ok { - log.Fatal("unknown mapper name: %v", *mapperName) + log.Fatalf("unknown mapper name: %v", *mapperName) } else { - pp := parallel.NewProcessor(os.Stdin, os.Stdout, f) + pp := parallel.NewProcessor(os.Stdin, os.Stdout, f.TSV) pp.NumWorkers = *numWorkers pp.BatchSize = *batchSize pp.Verbose = *verbose diff --git a/skate/map.go b/skate/map.go index ae8b59f..571a297 100644 --- a/skate/map.go +++ b/skate/map.go @@ -2,16 +2,39 @@ package skate import ( "bytes" - "fmt" "reflect" "runtime" + "strconv" "strings" json "github.com/segmentio/encoding/json" ) +var ( + bTab = []byte("\b") + bNewline = []byte("\n") +) + +type TitleDoc struct { + Title string `json:"title"` +} + +// Mapper converts a blob. type Mapper func([]byte) ([]byte, error) +// FieldMapper maps a blob to an arbitrary number of fields, e.g. for (key, +// doc) etc. +type FieldMapper func([]byte) ([][]byte, error) + +// TSV serialized the result of a field mapper as TSV. +func (f FieldMapper) TSV(p []byte) ([]byte, error) { + fields, err := f(p) + if err != nil { + return nil, err + } + return append(bytes.Join(fields, bTab), bNewline...), nil +} + // NameOf returns name of value, e.g. the name of a function. func NameOf(f interface{}) string { v := reflect.ValueOf(f) @@ -23,79 +46,84 @@ func NameOf(f interface{}) string { return v.String() } -// Identity mapper. -func Identity(p []byte) ([]byte, error) { - return p, nil +func Identity(p []byte) ([][]byte, error) { + return [][]byte{p}, nil } -// CreateFixedFieldFunc creates an extractor function given a json path. -// Currently only top level key is supported. -func CreateFixedFieldFunc(path string) Mapper { - f := func(p []byte) ([]byte, error) { - var doc map[string]interface{} +func CreateFixedMapper(path string) FieldMapper { + f := func(p []byte) ([][]byte, error) { + var ( + doc map[string]interface{} + v interface{} + ok bool + key []byte + ) if err := json.Unmarshal(p, &doc); err != nil { return nil, err } - v, ok := doc[path] - if !ok { + if v, ok = doc[path]; !ok { return nil, nil } - switch t := v.(type) { + switch w := v.(type) { case string: - return []byte(fmt.Sprintf("%v\t%s", t, p)), nil - case int, int64, float32, float64: - return []byte(fmt.Sprintf("%v\t%s", t, p)), nil + key = []byte(w) + case int: + key = []byte(strconv.Itoa(w)) + case int64: + key = []byte(strconv.Itoa(int(w))) + case float64: + key = []byte(strconv.FormatFloat(w, 'f', 52, 64)) default: return nil, nil } + return [][]byte{key, p}, nil } return f } -func MapperTitle(p []byte) ([]byte, error) { - var doc struct { - Title string - } +func MapperTitle(p []byte) ([][]byte, error) { + var ( + doc TitleDoc + key []byte + ) if err := json.Unmarshal(p, &doc); err != nil { return nil, err + } else { + key = []byte(wsReplacer.Replace(strings.TrimSpace(doc.Title))) } - title := wsReplacer.Replace(strings.TrimSpace(doc.Title)) - return bytes.Join([][]byte{[]byte(title), p}, []byte("\t")), nil + return [][]byte{key, p}, nil } -func MapperTitleNormalized(p []byte) ([]byte, error) { - var doc struct { - Title string - } - if err := json.Unmarshal(p, &doc); err != nil { +func MapperTitleNormalized(p []byte) (fields [][]byte, err error) { + if fields, err = MapperTitle(p); err != nil { return nil, err } - title := wsReplacer.Replace(strings.TrimSpace(doc.Title)) - title = strings.ToLower(title) - title = repeatedWs.ReplaceAllString(title, " ") - title = nonWord.ReplaceAllString(title, "") - return bytes.Join([][]byte{[]byte(title), p}, []byte("\t")), nil + key := string(fields[0]) + key = wsReplacer.Replace(strings.TrimSpace(key)) + key = strings.ToLower(key) + key = repeatedWs.ReplaceAllString(key, " ") + key = nonWord.ReplaceAllString(key, "") + fields[0] = []byte(key) + return fields, nil } -func MapperTitleNysiis(p []byte) ([]byte, error) { - var doc struct { - Title string - } - if err := json.Unmarshal(p, &doc); err != nil { +func MapperTitleNysiis(p []byte) (fields [][]byte, err error) { + if fields, err = MapperTitle(p); err != nil { return nil, err } - title := wsReplacer.Replace(strings.TrimSpace(doc.Title)) - title = NYSIIS(title) - return bytes.Join([][]byte{[]byte(title), p}, []byte("\t")), nil + key := string(fields[0]) + key = wsReplacer.Replace(strings.TrimSpace(key)) + key = NYSIIS(key) + fields[0] = []byte(key) + return fields, nil } -func MapperTitleSandcrawler(p []byte) ([]byte, error) { - var doc struct { - Title string - } - if err := json.Unmarshal(p, &doc); err != nil { +func MapperTitleSandcrawler(p []byte) (fields [][]byte, err error) { + if fields, err = MapperTitle(p); err != nil { return nil, err } - title := sandcrawlerSlugify(wsReplacer.Replace(strings.TrimSpace(doc.Title))) - return bytes.Join([][]byte{[]byte(title), p}, []byte("\t")), nil + key := string(fields[0]) + key = sandcrawlerSlugify(wsReplacer.Replace(strings.TrimSpace(key))) + fields[0] = []byte(key) + return fields, nil } -- cgit v1.2.3