diff options
-rw-r--r-- | skate/cluster.go | 25 | ||||
-rw-r--r-- | skate/cmd/skate-map/main.go | 58 | ||||
-rw-r--r-- | skate/map.go | 101 |
3 files changed, 159 insertions, 25 deletions
diff --git a/skate/cluster.go b/skate/cluster.go index 3421a0b..7fc4e1b 100644 --- a/skate/cluster.go +++ b/skate/cluster.go @@ -1,7 +1,6 @@ package skate import ( - "fmt" "regexp" "strings" @@ -110,30 +109,6 @@ func KeyTitleSandcrawler(p []byte) (ident string, key string, err error) { return ident, sandcrawlerSlugify(key), nil } -// CreateFixedFieldFunc creates an extractor function given a json path. -// Currently only top level key is supported. -func CreateFixedFieldFunc(path string) IdentifierKeyFunc { - f := func(p []byte) (ident string, key string, err error) { - var doc map[string]interface{} - if err = json.Unmarshal(p, &doc); err != nil { - return - } - v, ok := doc[path] - if !ok { - return "", "", nil - } - switch t := v.(type) { - case string: - return "", t, nil - case int, int64, float32, float64: - return "", fmt.Sprintf("%v", t), nil - default: - return "", "", nil - } - } - return f -} - // sandcrawlerSlugify normalizes a string. func sandcrawlerSlugify(s string) string { slug := strings.ToLower(strings.TrimSpace(s)) diff --git a/skate/cmd/skate-map/main.go b/skate/cmd/skate-map/main.go new file mode 100644 index 0000000..c5fb798 --- /dev/null +++ b/skate/cmd/skate-map/main.go @@ -0,0 +1,58 @@ +// skate-map runs a given map function over input data. We mostly want to +// extract a key from a json document. +package main + +import ( + "flag" + "fmt" + "log" + "os" + "runtime" + "text/tabwriter" + + "git.archive.org/martin/cgraph/skate" + "git.archive.org/martin/cgraph/skate/parallel" +) + +var ( + mapperName = flag.String("m", "", "mapper to run") + numWorkers = flag.Int("w", runtime.NumCPU(), "number of workers") + batchSize = flag.Int("b", 50000, "batch size") + verbose = flag.Bool("verbose", false, "show progress") + extraValue = flag.String("x", "", "extra value to pass to configurable mappers") +) + +func main() { + flag.Parse() + // XXX: introduce prefixes + availableMappers := map[string]skate.Mapper{ + "id": skate.Identity, + "ff": skate.CreateFixedFieldFunc(*extraValue), + "title": skate.MapperTitle, + "tnorm": skate.MapperTitleNormalized, + "tnysi": skate.MapperTitleNysiis, + "tsand": skate.MapperTitleSandcrawler, + } + switch { + case *mapperName != "": + if f, ok := availableMappers[*mapperName]; !ok { + log.Fatal("unknown mapper name: %v", *mapperName) + } else { + pp := parallel.NewProcessor(os.Stdin, os.Stdout, f) + pp.NumWorkers = *numWorkers + pp.BatchSize = *batchSize + pp.Verbose = *verbose + if err := pp.Run(); err != nil { + log.Fatal(err) + } + } + default: + fmt.Println("skate-map available mappers") + fmt.Println() + w := tabwriter.NewWriter(os.Stdout, 0, 0, 4, ' ', 0) + for k, v := range availableMappers { + fmt.Fprintf(w, "%s\t%s\n", k, skate.NameOf(v)) + } + w.Flush() + } +} diff --git a/skate/map.go b/skate/map.go new file mode 100644 index 0000000..ae8b59f --- /dev/null +++ b/skate/map.go @@ -0,0 +1,101 @@ +package skate + +import ( + "bytes" + "fmt" + "reflect" + "runtime" + "strings" + + json "github.com/segmentio/encoding/json" +) + +type Mapper func([]byte) ([]byte, error) + +// NameOf returns name of value, e.g. the name of a function. +func NameOf(f interface{}) string { + v := reflect.ValueOf(f) + if v.Kind() == reflect.Func { + if rf := runtime.FuncForPC(v.Pointer()); rf != nil { + return rf.Name() + } + } + return v.String() +} + +// Identity mapper. +func Identity(p []byte) ([]byte, error) { + return p, nil +} + +// CreateFixedFieldFunc creates an extractor function given a json path. +// Currently only top level key is supported. +func CreateFixedFieldFunc(path string) Mapper { + f := func(p []byte) ([]byte, error) { + var doc map[string]interface{} + if err := json.Unmarshal(p, &doc); err != nil { + return nil, err + } + v, ok := doc[path] + if !ok { + return nil, nil + } + switch t := v.(type) { + case string: + return []byte(fmt.Sprintf("%v\t%s", t, p)), nil + case int, int64, float32, float64: + return []byte(fmt.Sprintf("%v\t%s", t, p)), nil + default: + return nil, nil + } + } + return f +} + +func MapperTitle(p []byte) ([]byte, error) { + var doc struct { + Title string + } + if err := json.Unmarshal(p, &doc); err != nil { + return nil, err + } + title := wsReplacer.Replace(strings.TrimSpace(doc.Title)) + return bytes.Join([][]byte{[]byte(title), p}, []byte("\t")), nil +} + +func MapperTitleNormalized(p []byte) ([]byte, error) { + var doc struct { + Title string + } + if err := json.Unmarshal(p, &doc); err != nil { + return nil, err + } + title := wsReplacer.Replace(strings.TrimSpace(doc.Title)) + title = strings.ToLower(title) + title = repeatedWs.ReplaceAllString(title, " ") + title = nonWord.ReplaceAllString(title, "") + return bytes.Join([][]byte{[]byte(title), p}, []byte("\t")), nil +} + +func MapperTitleNysiis(p []byte) ([]byte, error) { + var doc struct { + Title string + } + if err := json.Unmarshal(p, &doc); err != nil { + return nil, err + } + title := wsReplacer.Replace(strings.TrimSpace(doc.Title)) + title = NYSIIS(title) + return bytes.Join([][]byte{[]byte(title), p}, []byte("\t")), nil +} + +func MapperTitleSandcrawler(p []byte) ([]byte, error) { + var doc struct { + Title string + } + if err := json.Unmarshal(p, &doc); err != nil { + return nil, err + } + title := sandcrawlerSlugify(wsReplacer.Replace(strings.TrimSpace(doc.Title))) + return bytes.Join([][]byte{[]byte(title), p}, []byte("\t")), nil +} |