aboutsummaryrefslogtreecommitdiffstats
path: root/skate
diff options
context:
space:
mode:
Diffstat (limited to 'skate')
-rw-r--r--skate/cmd/skate-map/main.go40
-rw-r--r--skate/map.go25
2 files changed, 55 insertions, 10 deletions
diff --git a/skate/cmd/skate-map/main.go b/skate/cmd/skate-map/main.go
index 2517878..67fc62b 100644
--- a/skate/cmd/skate-map/main.go
+++ b/skate/cmd/skate-map/main.go
@@ -2,6 +2,10 @@
// extract a key from a json document. For simple cases, you can use `jq` and
// other tools. Some key derivations require a bit more.
//
+// This tool helps us to find similar things in billions of items by mapping
+// docs to key. All docs that share a key are considered match candidates and can be
+// post-processed, e.g. to verify matches or to generate output schemas.
+//
// An example with mostly unix tools. We want to extract the DOI and sort by
// it; we also want to do this fast, hence parallel, LC_ALL, etc.
//
@@ -29,7 +33,6 @@
//
// This is reasonably fast, but some cleanup is ugly. We also want more complex
// keys, e.g. more normalizations, etc. We'd like to encapsulate (2) to (8).
-
package main
import (
@@ -45,12 +48,15 @@ import (
)
var (
- mapperName = flag.String("m", "", "mapper to run")
- numWorkers = flag.Int("w", runtime.NumCPU(), "number of workers")
- batchSize = flag.Int("b", 50000, "batch size")
- verbose = flag.Bool("verbose", false, "show progress")
- keyPrefix = flag.String("p", "", "a key prefix to use")
- extraValue = flag.String("x", "", "extra value to pass to configurable mappers")
+ mapperName = flag.String("m", "", "mapper to run")
+ numWorkers = flag.Int("w", runtime.NumCPU(), "number of workers")
+ batchSize = flag.Int("b", 50000, "batch size")
+ verbose = flag.Bool("verbose", false, "show progress")
+ keyPrefix = flag.String("p", "", "a key prefix to use")
+ extraValue = flag.String("x", "", "extra value to pass to configurable mappers")
+ bestEffort = flag.Bool("B", false, "best effort")
+ logFile = flag.String("log", "", "log filename")
+ skipOnEmpty = flag.Int("skip-on-empty", -1, "omit docs with empty value in given field, zero indexed")
)
func main() {
@@ -67,15 +73,29 @@ func main() {
"ty": skate.MapperTitleNysiis,
"ts": skate.MapperTitleSandcrawler,
}
+ if *logFile != "" {
+ f, err := os.OpenFile(*logFile, os.O_CREATE|os.O_APPEND, 0644)
+ if err != nil {
+ log.Fatal(err)
+ }
+ defer f.Close()
+ log.SetOutput(f)
+ }
switch {
case *mapperName != "":
- if f, ok := availableMappers[*mapperName]; !ok {
+ if mapf, ok := availableMappers[*mapperName]; !ok {
log.Fatalf("unknown mapper name: %v", *mapperName)
} else {
+ if *skipOnEmpty >= 0 {
+ mapf = skate.WithSkipOnEmpty(mapf, *skipOnEmpty)
+ }
if *keyPrefix != "" {
- f = skate.WithPrefix(f, *keyPrefix)
+ mapf = skate.WithPrefix(mapf, *keyPrefix)
+ }
+ if *bestEffort {
+ mapf = skate.WithBestEffort(mapf)
}
- pp := parallel.NewProcessor(os.Stdin, os.Stdout, f.AsTSV)
+ pp := parallel.NewProcessor(os.Stdin, os.Stdout, mapf.AsTSV)
pp.NumWorkers = *numWorkers
pp.BatchSize = *batchSize
pp.Verbose = *verbose
diff --git a/skate/map.go b/skate/map.go
index 90d8c05..d6e37be 100644
--- a/skate/map.go
+++ b/skate/map.go
@@ -78,6 +78,31 @@ func WithPrefix(f Mapper, prefix string) Mapper {
}
}
+// WithBestEffort will not fail on an error.
+func WithBestEffort(f Mapper) Mapper {
+ return func(p []byte) ([][]byte, error) {
+ if fields, err := f(p); err != nil {
+ return nil, nil
+ } else {
+ return fields, err
+ }
+ }
+}
+
+// WithSkipOnEmpty ignores results where the value at a given field is empty.
+func WithSkipOnEmpty(f Mapper, index int) Mapper {
+ return func(p []byte) ([][]byte, error) {
+ fields, err := f(p)
+ if err != nil {
+ return nil, err
+ }
+ if index < len(fields) && len(fields[index]) == 0 {
+ return nil, nil
+ }
+ return fields, err
+ }
+}
+
// NameOf returns name of value, e.g. the name of a function.
func NameOf(f interface{}) string {
v := reflect.ValueOf(f)