From 87ff9eb5dd33719c3947034ea4695c3e27ba6f9d Mon Sep 17 00:00:00 2001 From: Martin Czygan Date: Tue, 27 Apr 2021 10:09:02 +0200 Subject: add prefix wrapper --- skate/cmd/skate-map/main.go | 12 ++++++------ skate/map.go | 29 +++++++++++++++++++++++++++-- 2 files changed, 33 insertions(+), 8 deletions(-) (limited to 'skate') diff --git a/skate/cmd/skate-map/main.go b/skate/cmd/skate-map/main.go index ffe1017..94cf606 100644 --- a/skate/cmd/skate-map/main.go +++ b/skate/cmd/skate-map/main.go @@ -27,18 +27,18 @@ func main() { // XXX: introduce prefixes availableMappers := map[string]skate.Mapper{ "id": skate.Identity, - "ff": skate.CreateFixedMapper(*extraValue), - "title": skate.MapperTitle, - "tnorm": skate.MapperTitleNormalized, - "tnysi": skate.MapperTitleNysiis, - "tsand": skate.MapperTitleSandcrawler, + "field": skate.WithPrefix(skate.CreateFixedMapper(*extraValue), "field"), + "title": skate.WithPrefix(skate.MapperTitle, "title"), + "tnorm": skate.WithPrefix(skate.MapperTitleNormalized, "tnorm"), + "tnysi": skate.WithPrefix(skate.MapperTitleNysiis, "tnysi"), + "tsand": skate.WithPrefix(skate.MapperTitleSandcrawler, "tsand"), } switch { case *mapperName != "": if f, ok := availableMappers[*mapperName]; !ok { log.Fatalf("unknown mapper name: %v", *mapperName) } else { - pp := parallel.NewProcessor(os.Stdin, os.Stdout, f.TSV) + pp := parallel.NewProcessor(os.Stdin, os.Stdout, f.AsTSV) pp.NumWorkers = *numWorkers pp.BatchSize = *batchSize pp.Verbose = *verbose diff --git a/skate/map.go b/skate/map.go index 8d8094b..b06bf96 100644 --- a/skate/map.go +++ b/skate/map.go @@ -2,6 +2,7 @@ package skate import ( "bytes" + "errors" "reflect" "runtime" "strconv" @@ -13,8 +14,12 @@ import ( var ( bTab = []byte("\t") bNewline = []byte("\n") + + ErrZeroFields = errors.New("zero fields") + ErrMissingFieldName = errors.New("missing field name") ) +// Title is a document with a title. type TitleDoc struct { Title string `json:"title"` } @@ -35,9 +40,9 @@ type PartialDoc struct { // doc). We want fields, but we do not want to bake in TSV into each function. type Mapper func([]byte) ([][]byte, error) -// TSV serialized the result of a field mapper as TSV. This is a slim adapter, +// AsTSV serialized the result of a field mapper as AsTSV. This is a slim adapter, // e.g. to parallel.Processor, which expects this function signature. -func (f Mapper) TSV(p []byte) ([]byte, error) { +func (f Mapper) AsTSV(p []byte) ([]byte, error) { fields, err := f(p) if err != nil { return nil, err @@ -45,6 +50,21 @@ func (f Mapper) TSV(p []byte) ([]byte, error) { return append(bytes.Join(fields, bTab), bNewline...), nil } +// WithPrefix adds a given prefix to the first element. +func WithPrefix(f Mapper, prefix string) Mapper { + return func(p []byte) ([][]byte, error) { + fields, err := f(p) + if err != nil { + return fields, err + } + if len(fields) == 0 { + return nil, ErrZeroFields + } + fields[0] = append([]byte(prefix+":"), fields[0]...) + return fields, err + } +} + // NameOf returns name of value, e.g. the name of a function. func NameOf(f interface{}) string { v := reflect.ValueOf(f) @@ -64,6 +84,11 @@ func Identity(p []byte) ([][]byte, error) { // CreateFixedMapper extract the value from a given fixed top level json key. // Returns a function that maps doc to (v, doc). func CreateFixedMapper(field string) Mapper { + if field == "" { + return func(p []byte) ([][]byte, error) { + return nil, ErrMissingFieldName + } + } f := func(p []byte) ([][]byte, error) { var ( doc map[string]interface{} -- cgit v1.2.3