// skate-map runs a given "map" function over input data. Here, we mostly want to // extract a key from a json document. For simple cases, you can use `jq` and // other tools. Some key derivations require a bit more, hence a dedicated program. // // An example with mostly unix tools. We want to extract (DOI, doc) tuples // (sorted by DOI) from newline delimited JSON; we also want to do this fast, // hence GNU parallel, LC_ALL, etc. // // $ zstdcat -T0 file.zst | (1) // LC_ALL=C tr -d '\t' | (2) * // parallel -j 16 --block 10M --pipe (3) * // "jq -rc 'select(.biblio.doi != null) | (4) * // [.biblio.doi, (.|tostring)] | @tsv'" | (5) * // LC_ALL=C sed 's/\\\\/\\/g' | (6) * // LC_ALL=C awk -F $'\t' -v OFS='\t' '$1=tolower($1)' | (7) * // skate-to-doi -B -S -f 1 | (8) * // LC_ALL=C sort -S 30% --parallel 6 -k1,1 | (9) // zstd -c -T0 > skate.out // // (1) zstd is fast! "~4x faster than zlib" (https://is.gd/HT1DUs) // (2) we use tab as column separator and we want clean this up before (could // be skipped, if we limit number of splits) // (3) we pass the data to jq, with a bit larger buffer for GNU parallel (default is 1MB, currently) // (4) we want no "null" output // (5) tostring prints the input as string, because we need to carry the document forward ... // (6) ... but we'll need some cleanup, too // (7) we normalize the DOI to lowercase // (8) a custom filter to normalize a DOI in a specific column // (9) sorting by DOI // // This is reasonably fast, but some data cleanup code is ugly. We also want // more complex keys, e.g. more normalizations, etc; in short: we'd like to // encapsulate (2) to (8) with `skate-map`. package main import ( "flag" "fmt" "log" "os" "runtime" "text/tabwriter" "git.archive.org/martin/cgraph/skate" "git.archive.org/martin/cgraph/skate/parallel" ) var ( mapperName = flag.String("m", "", "mapper to run") numWorkers = flag.Int("w", runtime.NumCPU(), "number of workers") batchSize = flag.Int("b", 50000, "batch size") verbose = flag.Bool("verbose", false, "show progress") keyPrefix = flag.String("p", "", "a key prefix to use") extraValue = flag.String("x", "", "extra value to pass to configurable mappers") bestEffort = flag.Bool("B", false, "best effort") logFile = flag.String("log", "", "log filename") skipOnEmpty = flag.Int("skip-on-empty", 0, "omit docs with empty value in given column (one indexed)") help = `skate-map available mappers $ skate-map -m ts < file.ndj > file.tsv ` ) func main() { flag.Parse() availableMappers := map[string]skate.Mapper{ // Add new mapper functions here. TODO: add more docs, and improve // composability, e.g. like middleware. Also improve naming. "id": skate.Identity, "ff": skate.CreateFixedMapper(*extraValue), "ti": skate.MapperTitle, "tn": skate.MapperTitleNormalized, "ty": skate.MapperTitleNysiis, "ts": skate.MapperTitleSandcrawler, "ur": skate.MapperURLFromRef, "ru": skate.MapperIdentURLFromRef, "cni": skate.MapperContainerName, "cns": skate.MapperContainerNameSandcrawler, "rcns": skate.MapperReleaseContainerName, "vcns": skate.MapperReleaseResolvedContainerName, "isbn": skate.MapperOpenLibraryReleaseNormalizedISBN, "cdxu": skate.MapperCdxSummary, } if *logFile != "" { f, err := os.OpenFile(*logFile, os.O_CREATE|os.O_APPEND, 0644) if err != nil { log.Fatal(err) } defer f.Close() log.SetOutput(f) } switch { case *mapperName != "": if mapper, ok := availableMappers[*mapperName]; !ok { log.Fatalf("unknown mapper name: %v", *mapperName) } else { if *skipOnEmpty > 0 { mapper = skate.WithSkipOnEmpty(mapper, *skipOnEmpty-1) } if *keyPrefix != "" { mapper = skate.WithPrefix(mapper, *keyPrefix) } if *bestEffort { mapper = skate.WithBestEffort(mapper) } pp := parallel.NewProcessor(os.Stdin, os.Stdout, mapper.AsTSV) pp.NumWorkers = *numWorkers pp.BatchSize = *batchSize pp.Verbose = *verbose if err := pp.Run(); err != nil { log.Fatal(err) } } default: fmt.Println(help) w := tabwriter.NewWriter(os.Stdout, 0, 0, 4, ' ', 0) defer w.Flush() for k, v := range availableMappers { fmt.Fprintf(w, "%s\t%s\n", k, skate.NameOf(v)) } } }