diff options
Diffstat (limited to 'skate')
-rw-r--r-- | skate/cmd/skate-map/main.go | 40 | ||||
-rw-r--r-- | skate/map.go | 25 |
2 files changed, 55 insertions, 10 deletions
diff --git a/skate/cmd/skate-map/main.go b/skate/cmd/skate-map/main.go index 2517878..67fc62b 100644 --- a/skate/cmd/skate-map/main.go +++ b/skate/cmd/skate-map/main.go @@ -2,6 +2,10 @@ // extract a key from a json document. For simple cases, you can use `jq` and // other tools. Some key derivations require a bit more. // +// This tool helps us to find similar things in billions of items by mapping +// docs to key. All docs that share a key are considered match candidates and can be +// post-processed, e.g. to verify matches or to generate output schemas. +// // An example with mostly unix tools. We want to extract the DOI and sort by // it; we also want to do this fast, hence parallel, LC_ALL, etc. // @@ -29,7 +33,6 @@ // // This is reasonably fast, but some cleanup is ugly. We also want more complex // keys, e.g. more normalizations, etc. We'd like to encapsulate (2) to (8). - package main import ( @@ -45,12 +48,15 @@ import ( ) var ( - mapperName = flag.String("m", "", "mapper to run") - numWorkers = flag.Int("w", runtime.NumCPU(), "number of workers") - batchSize = flag.Int("b", 50000, "batch size") - verbose = flag.Bool("verbose", false, "show progress") - keyPrefix = flag.String("p", "", "a key prefix to use") - extraValue = flag.String("x", "", "extra value to pass to configurable mappers") + mapperName = flag.String("m", "", "mapper to run") + numWorkers = flag.Int("w", runtime.NumCPU(), "number of workers") + batchSize = flag.Int("b", 50000, "batch size") + verbose = flag.Bool("verbose", false, "show progress") + keyPrefix = flag.String("p", "", "a key prefix to use") + extraValue = flag.String("x", "", "extra value to pass to configurable mappers") + bestEffort = flag.Bool("B", false, "best effort") + logFile = flag.String("log", "", "log filename") + skipOnEmpty = flag.Int("skip-on-empty", -1, "omit docs with empty value in given field, zero indexed") ) func main() { @@ -67,15 +73,29 @@ func main() { "ty": skate.MapperTitleNysiis, "ts": skate.MapperTitleSandcrawler, } + if *logFile != "" { + f, err := os.OpenFile(*logFile, os.O_CREATE|os.O_APPEND, 0644) + if err != nil { + log.Fatal(err) + } + defer f.Close() + log.SetOutput(f) + } switch { case *mapperName != "": - if f, ok := availableMappers[*mapperName]; !ok { + if mapf, ok := availableMappers[*mapperName]; !ok { log.Fatalf("unknown mapper name: %v", *mapperName) } else { + if *skipOnEmpty >= 0 { + mapf = skate.WithSkipOnEmpty(mapf, *skipOnEmpty) + } if *keyPrefix != "" { - f = skate.WithPrefix(f, *keyPrefix) + mapf = skate.WithPrefix(mapf, *keyPrefix) + } + if *bestEffort { + mapf = skate.WithBestEffort(mapf) } - pp := parallel.NewProcessor(os.Stdin, os.Stdout, f.AsTSV) + pp := parallel.NewProcessor(os.Stdin, os.Stdout, mapf.AsTSV) pp.NumWorkers = *numWorkers pp.BatchSize = *batchSize pp.Verbose = *verbose diff --git a/skate/map.go b/skate/map.go index 90d8c05..d6e37be 100644 --- a/skate/map.go +++ b/skate/map.go @@ -78,6 +78,31 @@ func WithPrefix(f Mapper, prefix string) Mapper { } } +// WithBestEffort will not fail on an error. +func WithBestEffort(f Mapper) Mapper { + return func(p []byte) ([][]byte, error) { + if fields, err := f(p); err != nil { + return nil, nil + } else { + return fields, err + } + } +} + +// WithSkipOnEmpty ignores results where the value at a given field is empty. +func WithSkipOnEmpty(f Mapper, index int) Mapper { + return func(p []byte) ([][]byte, error) { + fields, err := f(p) + if err != nil { + return nil, err + } + if index < len(fields) && len(fields[index]) == 0 { + return nil, nil + } + return fields, err + } +} + // NameOf returns name of value, e.g. the name of a function. func NameOf(f interface{}) string { v := reflect.ValueOf(f) |