// skate-map runs a given map function over input data. We mostly want to // extract a key from a json document. For simple cases, you can use `jq` and // other tools. Some key derivations require a bit more. // // This tool helps us to find similar things in billions of items by mapping // docs to key. All docs that share a key are considered match candidates and can be // post-processed, e.g. to verify matches or to generate output schemas. // // An example with mostly unix tools. We want to extract the DOI and sort by // it; we also want to do this fast, hence parallel, LC_ALL, etc. // // $ zstdcat -T0 file.zst | (1) // LC_ALL=C tr -d '\t' | (2) * // parallel -j 16 --block 10M --pipe (3) * // "jq -rc 'select(.biblio.doi != null) | (4) * // [.biblio.doi, (.|tostring)] | @tsv'" | (5) * // LC_ALL=C sed 's/\\\\/\\/g' | (6) * // LC_ALL=C awk -F $'\t' -v OFS='\t' '$1=tolower($1)' | (7) * // skate-to-doi -B -S -f 1 | (8) * // LC_ALL=C sort -S 30% --parallel 6 -k1,1 | (9) // zstd -c -T0 > skate.out // // (1) zstd is fast! "~4x faster than zlib" (https://is.gd/HT1DUs) // (2) we use tab as column separator and we want clean this up before (could // be skipped, if we limit number of splits) // (3) we pass the data to jq, with a bit larger buffer (default is 1MB) // (4) we want no "null" output // (5) tostring prints the input as string, because we need to carry the document forward ... // (6) ... but we'll need some cleanup, too // (7) we normalize the DOI to lowercase // (8) a custom filter to normalize a DOI in a specific column // (9) sorting by DOI // // This is reasonably fast, but some cleanup is ugly. We also want more complex // keys, e.g. more normalizations, etc. We'd like to encapsulate (2) to (8). package main import ( "flag" "fmt" "log" "os" "runtime" "text/tabwriter" "git.archive.org/martin/cgraph/skate" "git.archive.org/martin/cgraph/skate/parallel" ) var ( mapperName = flag.String("m", "", "mapper to run") numWorkers = flag.Int("w", runtime.NumCPU(), "number of workers") batchSize = flag.Int("b", 50000, "batch size") verbose = flag.Bool("verbose", false, "show progress") keyPrefix = flag.String("p", "", "a key prefix to use") extraValue = flag.String("x", "", "extra value to pass to configurable mappers") bestEffort = flag.Bool("B", false, "best effort") logFile = flag.String("log", "", "log filename") skipOnEmpty = flag.Int("skip-on-empty", -1, "omit docs with empty value in given field, zero indexed") ) func main() { flag.Parse() // TODO // [ ] add prefixes and a way to derive multiple keys in one go // [ ] how to store multiple keys, sorted? // [ ] maybe wrap jq and parallel for arbitrary nested keys availableMappers := map[string]skate.Mapper{ "id": skate.Identity, "ff": skate.CreateFixedMapper(*extraValue), "ti": skate.MapperTitle, "tn": skate.MapperTitleNormalized, "ty": skate.MapperTitleNysiis, "ts": skate.MapperTitleSandcrawler, } if *logFile != "" { f, err := os.OpenFile(*logFile, os.O_CREATE|os.O_APPEND, 0644) if err != nil { log.Fatal(err) } defer f.Close() log.SetOutput(f) } switch { case *mapperName != "": if mapf, ok := availableMappers[*mapperName]; !ok { log.Fatalf("unknown mapper name: %v", *mapperName) } else { if *skipOnEmpty >= 0 { mapf = skate.WithSkipOnEmpty(mapf, *skipOnEmpty) } if *keyPrefix != "" { mapf = skate.WithPrefix(mapf, *keyPrefix) } if *bestEffort { mapf = skate.WithBestEffort(mapf) } pp := parallel.NewProcessor(os.Stdin, os.Stdout, mapf.AsTSV) pp.NumWorkers = *numWorkers pp.BatchSize = *batchSize pp.Verbose = *verbose if err := pp.Run(); err != nil { log.Fatal(err) } } default: fmt.Println("skate-map available mappers") fmt.Println() w := tabwriter.NewWriter(os.Stdout, 0, 0, 4, ' ', 0) for k, v := range availableMappers { fmt.Fprintf(w, "%s\t%s\n", k, skate.NameOf(v)) } w.Flush() } }