diff options
author | Martin Czygan <martin.czygan@gmail.com> | 2021-03-21 01:17:38 +0100 |
---|---|---|
committer | Martin Czygan <martin.czygan@gmail.com> | 2021-03-21 01:17:38 +0100 |
commit | 09a7e8c9d013f13a1aa1ef4e9b7f397647b79967 (patch) | |
tree | 122b474e27afbc66cba1182e983ef5c8555ed12f /skate/cmd/skate-cluster-stats | |
parent | a7e0cf191ebf8fb499e0ab9a3b6cae45727f1286 (diff) | |
download | refcat-09a7e8c9d013f13a1aa1ef4e9b7f397647b79967.tar.gz refcat-09a7e8c9d013f13a1aa1ef4e9b7f397647b79967.zip |
initial import of skate
Diffstat (limited to 'skate/cmd/skate-cluster-stats')
-rw-r--r-- | skate/cmd/skate-cluster-stats/main.go | 92 |
1 files changed, 92 insertions, 0 deletions
diff --git a/skate/cmd/skate-cluster-stats/main.go b/skate/cmd/skate-cluster-stats/main.go new file mode 100644 index 0000000..3817b7c --- /dev/null +++ b/skate/cmd/skate-cluster-stats/main.go @@ -0,0 +1,92 @@ +package main + +import ( + "flag" + "fmt" + "log" + "os" + "runtime" + + jsoniter "github.com/json-iterator/go" + "git.archive.org/martin/cgraph/skate" + "git.archive.org/martin/cgraph/skate/parallel" +) + +var ( + numWorkers = flag.Int("w", runtime.NumCPU(), "number of workers") + batchSize = flag.Int("b", 100000, "batch size") + bestEffort = flag.Bool("B", false, "best effort, log errors") + // unmatched: clusters w/ refs only + // count: number of entities in cluster (by type) + // default: key and number of values + mode = flag.String("m", "", "what to extract (unmatched, count, ...)") + + json = jsoniter.ConfigCompatibleWithStandardLibrary + bytesNewline = []byte("\n") +) + +type Func func([]byte) ([]byte, error) + +func main() { + flag.Parse() + var f Func + switch *mode { + case "unmatched": + f = func(p []byte) ([]byte, error) { + var cluster skate.ClusterResult + if err := json.Unmarshal(p, &cluster); err != nil { + if *bestEffort { + log.Printf("%v", err) + return nil, nil + } + log.Fatal(err) + } + var refs int + for _, v := range cluster.Values { + if v.Extra.Skate.Status == "ref" { + refs++ + } + } + if refs == len(cluster.Values) { + return p, nil + } + return nil, nil + } + case "count": + f = func(p []byte) ([]byte, error) { + var cluster skate.ClusterResult + if err := json.Unmarshal(p, &cluster); err != nil { + if *bestEffort { + log.Printf("%v", err) + return nil, nil + } + log.Fatal(err) + } + var refs int + for _, v := range cluster.Values { + if v.Extra.Skate.Status == "ref" { + refs++ + } + } + // total, refs, non-refs, key + s := fmt.Sprintf("%d\t%d\t%d\t%s\n", + len(cluster.Values), refs, len(cluster.Values)-refs, cluster.Key) + return []byte(s), nil + } + default: + f = func(p []byte) ([]byte, error) { + var cluster skate.ClusterResult + if err := json.Unmarshal(p, &cluster); err != nil { + return nil, err + } + s := fmt.Sprintf("%d\t%s\n", len(cluster.Values), cluster.Key) + return []byte(s), nil + } + } + pp := parallel.NewProcessor(os.Stdin, os.Stdout, f) + pp.NumWorkers = *numWorkers + pp.BatchSize = *batchSize + if err := pp.Run(); err != nil { + log.Fatal(err) + } +} |