aboutsummaryrefslogtreecommitdiffstats
path: root/skate/cmd/skate-map/main.go
diff options
context:
space:
mode:
Diffstat (limited to 'skate/cmd/skate-map/main.go')
-rw-r--r--skate/cmd/skate-map/main.go65
1 files changed, 42 insertions, 23 deletions
diff --git a/skate/cmd/skate-map/main.go b/skate/cmd/skate-map/main.go
index ee02875..227acf2 100644
--- a/skate/cmd/skate-map/main.go
+++ b/skate/cmd/skate-map/main.go
@@ -1,9 +1,10 @@
-// skate-map runs a given map function over input data. We mostly want to
+// skate-map runs a given "map" function over input data. Here, we mostly want to
// extract a key from a json document. For simple cases, you can use `jq` and
-// other tools. Some key derivations require a bit more.
+// other tools. Some key derivations require a bit more, hence a dedicated program.
//
-// An example with mostly unix tools. We want to extract the DOI and sort by
-// it; we also want to do this fast, hence parallel, LC_ALL, etc.
+// An example with mostly unix tools. We want to extract the DOI from newline
+// delimited JSON and sort by it; we also want to do this fast, hence parallel,
+// LC_ALL, etc.
//
// $ zstdcat -T0 file.zst | (1)
// LC_ALL=C tr -d '\t' | (2) *
@@ -21,15 +22,15 @@
// be skipped, if we limit number of splits)
// (3) we pass the data to jq, with a bit larger buffer (default is 1MB)
// (4) we want no "null" output
-// (5) tostring prints input as string, because we need to carry the document forward
-// (6) but we need some cleanup, too
+// (5) tostring prints the input as string, because we need to carry the document forward ...
+// (6) ... but we'll need some cleanup, too
// (7) we normalize the DOI to lowercase
// (8) a custom filter to normalize a DOI in a specific column
// (9) sorting by DOI
//
// This is reasonably fast, but some cleanup is ugly. We also want more complex
-// keys, e.g. more normalizations, etc. We'd like to encapsulate (2) to (8).
-
+// keys, e.g. more normalizations, etc; in short: we'd like to encapsulate (2)
+// to (8) with `skate-map`.
package main
import (
@@ -45,21 +46,26 @@ import (
)
var (
- mapperName = flag.String("m", "", "mapper to run")
- numWorkers = flag.Int("w", runtime.NumCPU(), "number of workers")
- batchSize = flag.Int("b", 50000, "batch size")
- verbose = flag.Bool("verbose", false, "show progress")
- keyPrefix = flag.String("p", "", "a key prefix to use")
- extraValue = flag.String("x", "", "extra value to pass to configurable mappers")
+ mapperName = flag.String("m", "", "mapper to run")
+ numWorkers = flag.Int("w", runtime.NumCPU(), "number of workers")
+ batchSize = flag.Int("b", 50000, "batch size")
+ verbose = flag.Bool("verbose", false, "show progress")
+ keyPrefix = flag.String("p", "", "a key prefix to use")
+ extraValue = flag.String("x", "", "extra value to pass to configurable mappers")
+ bestEffort = flag.Bool("B", false, "best effort")
+ logFile = flag.String("log", "", "log filename")
+ skipOnEmpty = flag.Int("skip-on-empty", -1, "omit docs with empty value in given column (zero indexed)")
+
+ help = `skate-map available mappers
+
+ $ skate-map -m ts < file.ndj > file.tsv
+ `
)
func main() {
flag.Parse()
- // TODO
- // [ ] add prefixes and a way to derive multiple keys in one go
- // [ ] how to store multiple keys, sorted?
- // [ ] maybe wrap jq and parallel for arbitrary nested keys
availableMappers := map[string]skate.Mapper{
+ // Add new mapper functions here.
"id": skate.Identity,
"ff": skate.CreateFixedMapper(*extraValue),
"ti": skate.MapperTitle,
@@ -67,15 +73,29 @@ func main() {
"ty": skate.MapperTitleNysiis,
"ts": skate.MapperTitleSandcrawler,
}
+ if *logFile != "" {
+ f, err := os.OpenFile(*logFile, os.O_CREATE|os.O_APPEND, 0644)
+ if err != nil {
+ log.Fatal(err)
+ }
+ defer f.Close()
+ log.SetOutput(f)
+ }
switch {
case *mapperName != "":
- if f, ok := availableMappers[*mapperName]; !ok {
+ if mapf, ok := availableMappers[*mapperName]; !ok {
log.Fatalf("unknown mapper name: %v", *mapperName)
} else {
+ if *skipOnEmpty >= 0 {
+ mapf = skate.WithSkipOnEmpty(mapf, *skipOnEmpty)
+ }
if *keyPrefix != "" {
- f = skate.WithPrefix(f, *keyPrefix)
+ mapf = skate.WithPrefix(mapf, *keyPrefix)
+ }
+ if *bestEffort {
+ mapf = skate.WithBestEffort(mapf)
}
- pp := parallel.NewProcessor(os.Stdin, os.Stdout, f.AsTSV)
+ pp := parallel.NewProcessor(os.Stdin, os.Stdout, mapf.AsTSV)
pp.NumWorkers = *numWorkers
pp.BatchSize = *batchSize
pp.Verbose = *verbose
@@ -84,8 +104,7 @@ func main() {
}
}
default:
- fmt.Println("skate-map available mappers")
- fmt.Println()
+ fmt.Println(help)
w := tabwriter.NewWriter(os.Stdout, 0, 0, 4, ' ', 0)
for k, v := range availableMappers {
fmt.Fprintf(w, "%s\t%s\n", k, skate.NameOf(v))