// skate-map runs a given "map" function over input data. Here, we mostly want to // extract a key from a json document. For simple cases, you can use `jq` and // other tools. Some key derivations require a bit more, hence a dedicated program. // // An example with mostly unix tools. We want to extract the DOI from newline // delimited JSON and sort by it; we also want to do this fast, hence parallel, // LC_ALL, etc. // // $ zstdcat -T0 file.zst | (1) // LC_ALL=C tr -d '\t' | (2) * // parallel -j 16 --block 10M --pipe (3) * // "jq -rc 'select(.biblio.doi != null) | (4) * // [.biblio.doi, (.|tostring)] | @tsv'" | (5) * // LC_ALL=C sed 's/\\\\/\\/g' | (6) * // LC_ALL=C awk -F $'\t' -v OFS='\t' '$1=tolower($1)' | (7) * // skate-to-doi -B -S -f 1 | (8) * // LC_ALL=C sort -S 30% --parallel 6 -k1,1 | (9) // zstd -c -T0 > skate.out // // (1) zstd is fast! "~4x faster than zlib" (https://is.gd/HT1DUs) // (2) we use tab as column separator and we want clean this up before (could // be skipped, if we limit number of splits) // (3) we pass the data to jq, with a bit larger buffer (default is 1MB) // (4) we want no "null" output // (5) tostring prints the input as string, because we need to carry the document forward ... // (6) ... but we'll need some cleanup, too // (7) we normalize the DOI to lowercase // (8) a custom filter to normalize a DOI in a specific column // (9) sorting by DOI // // This is reasonably fast, but some cleanup is ugly. We also want more complex // keys, e.g. more normalizations, etc; in short: we'd like to encapsulate (2) // to (8) with `skate-map`. package main import ( "flag" "fmt" "log" "os" "runtime" "text/tabwriter" "git.archive.org/martin/cgraph/skate" "git.archive.org/martin/cgraph/skate/parallel" ) var ( mapperName = flag.String("m", "", "mapper to run") numWorkers = flag.Int("w", runtime.NumCPU(), "number of workers") batchSize = flag.Int("b", 50000, "batch size") verbose = flag.Bool("verbose", false, "show progress") keyPrefix = flag.String("p", "", "a key prefix to use") extraValue = flag.String("x", "", "extra value to pass to configurable mappers") bestEffort = flag.Bool("B", false, "best effort") logFile = flag.String("log", "", "log filename") skipOnEmpty = flag.Int("skip-on-empty", -1, "omit docs with empty value in given column (zero indexed)") help = `skate-map available mappers $ skate-map -m ts < file.ndj > file.tsv ` ) func main() { flag.Parse() availableMappers := map[string]skate.Mapper{ // Add new mapper functions here. "id": skate.Identity, "ff": skate.CreateFixedMapper(*extraValue), "ti": skate.MapperTitle, "tn": skate.MapperTitleNormalized, "ty": skate.MapperTitleNysiis, "ts": skate.MapperTitleSandcrawler, } if *logFile != "" { f, err := os.OpenFile(*logFile, os.O_CREATE|os.O_APPEND, 0644) if err != nil { log.Fatal(err) } defer f.Close() log.SetOutput(f) } switch { case *mapperName != "": if mapf, ok := availableMappers[*mapperName]; !ok { log.Fatalf("unknown mapper name: %v", *mapperName) } else { if *skipOnEmpty >= 0 { mapf = skate.WithSkipOnEmpty(mapf, *skipOnEmpty) } if *keyPrefix != "" { mapf = skate.WithPrefix(mapf, *keyPrefix) } if *bestEffort { mapf = skate.WithBestEffort(mapf) } pp := parallel.NewProcessor(os.Stdin, os.Stdout, mapf.AsTSV) pp.NumWorkers = *numWorkers pp.BatchSize = *batchSize pp.Verbose = *verbose if err := pp.Run(); err != nil { log.Fatal(err) } } default: fmt.Println(help) w := tabwriter.NewWriter(os.Stdout, 0, 0, 4, ' ', 0) for k, v := range availableMappers { fmt.Fprintf(w, "%s\t%s\n", k, skate.NameOf(v)) } w.Flush() } }