diff options
Diffstat (limited to 'skate/cmd/skate-map/main.go')
-rw-r--r-- | skate/cmd/skate-map/main.go | 65 |
1 files changed, 42 insertions, 23 deletions
diff --git a/skate/cmd/skate-map/main.go b/skate/cmd/skate-map/main.go index ee02875..227acf2 100644 --- a/skate/cmd/skate-map/main.go +++ b/skate/cmd/skate-map/main.go @@ -1,9 +1,10 @@ -// skate-map runs a given map function over input data. We mostly want to +// skate-map runs a given "map" function over input data. Here, we mostly want to // extract a key from a json document. For simple cases, you can use `jq` and -// other tools. Some key derivations require a bit more. +// other tools. Some key derivations require a bit more, hence a dedicated program. // -// An example with mostly unix tools. We want to extract the DOI and sort by -// it; we also want to do this fast, hence parallel, LC_ALL, etc. +// An example with mostly unix tools. We want to extract the DOI from newline +// delimited JSON and sort by it; we also want to do this fast, hence parallel, +// LC_ALL, etc. // // $ zstdcat -T0 file.zst | (1) // LC_ALL=C tr -d '\t' | (2) * @@ -21,15 +22,15 @@ // be skipped, if we limit number of splits) // (3) we pass the data to jq, with a bit larger buffer (default is 1MB) // (4) we want no "null" output -// (5) tostring prints input as string, because we need to carry the document forward -// (6) but we need some cleanup, too +// (5) tostring prints the input as string, because we need to carry the document forward ... +// (6) ... but we'll need some cleanup, too // (7) we normalize the DOI to lowercase // (8) a custom filter to normalize a DOI in a specific column // (9) sorting by DOI // // This is reasonably fast, but some cleanup is ugly. We also want more complex -// keys, e.g. more normalizations, etc. We'd like to encapsulate (2) to (8). - +// keys, e.g. more normalizations, etc; in short: we'd like to encapsulate (2) +// to (8) with `skate-map`. package main import ( @@ -45,21 +46,26 @@ import ( ) var ( - mapperName = flag.String("m", "", "mapper to run") - numWorkers = flag.Int("w", runtime.NumCPU(), "number of workers") - batchSize = flag.Int("b", 50000, "batch size") - verbose = flag.Bool("verbose", false, "show progress") - keyPrefix = flag.String("p", "", "a key prefix to use") - extraValue = flag.String("x", "", "extra value to pass to configurable mappers") + mapperName = flag.String("m", "", "mapper to run") + numWorkers = flag.Int("w", runtime.NumCPU(), "number of workers") + batchSize = flag.Int("b", 50000, "batch size") + verbose = flag.Bool("verbose", false, "show progress") + keyPrefix = flag.String("p", "", "a key prefix to use") + extraValue = flag.String("x", "", "extra value to pass to configurable mappers") + bestEffort = flag.Bool("B", false, "best effort") + logFile = flag.String("log", "", "log filename") + skipOnEmpty = flag.Int("skip-on-empty", -1, "omit docs with empty value in given column (zero indexed)") + + help = `skate-map available mappers + + $ skate-map -m ts < file.ndj > file.tsv + ` ) func main() { flag.Parse() - // TODO - // [ ] add prefixes and a way to derive multiple keys in one go - // [ ] how to store multiple keys, sorted? - // [ ] maybe wrap jq and parallel for arbitrary nested keys availableMappers := map[string]skate.Mapper{ + // Add new mapper functions here. "id": skate.Identity, "ff": skate.CreateFixedMapper(*extraValue), "ti": skate.MapperTitle, @@ -67,15 +73,29 @@ func main() { "ty": skate.MapperTitleNysiis, "ts": skate.MapperTitleSandcrawler, } + if *logFile != "" { + f, err := os.OpenFile(*logFile, os.O_CREATE|os.O_APPEND, 0644) + if err != nil { + log.Fatal(err) + } + defer f.Close() + log.SetOutput(f) + } switch { case *mapperName != "": - if f, ok := availableMappers[*mapperName]; !ok { + if mapf, ok := availableMappers[*mapperName]; !ok { log.Fatalf("unknown mapper name: %v", *mapperName) } else { + if *skipOnEmpty >= 0 { + mapf = skate.WithSkipOnEmpty(mapf, *skipOnEmpty) + } if *keyPrefix != "" { - f = skate.WithPrefix(f, *keyPrefix) + mapf = skate.WithPrefix(mapf, *keyPrefix) + } + if *bestEffort { + mapf = skate.WithBestEffort(mapf) } - pp := parallel.NewProcessor(os.Stdin, os.Stdout, f.AsTSV) + pp := parallel.NewProcessor(os.Stdin, os.Stdout, mapf.AsTSV) pp.NumWorkers = *numWorkers pp.BatchSize = *batchSize pp.Verbose = *verbose @@ -84,8 +104,7 @@ func main() { } } default: - fmt.Println("skate-map available mappers") - fmt.Println() + fmt.Println(help) w := tabwriter.NewWriter(os.Stdout, 0, 0, 4, ' ', 0) for k, v := range availableMappers { fmt.Fprintf(w, "%s\t%s\n", k, skate.NameOf(v)) |