aboutsummaryrefslogtreecommitdiffstats
path: root/skate/cmd/skate-cluster-stats
diff options
context:
space:
mode:
Diffstat (limited to 'skate/cmd/skate-cluster-stats')
-rw-r--r--skate/cmd/skate-cluster-stats/main.go92
1 files changed, 92 insertions, 0 deletions
diff --git a/skate/cmd/skate-cluster-stats/main.go b/skate/cmd/skate-cluster-stats/main.go
new file mode 100644
index 0000000..3817b7c
--- /dev/null
+++ b/skate/cmd/skate-cluster-stats/main.go
@@ -0,0 +1,92 @@
+package main
+
+import (
+ "flag"
+ "fmt"
+ "log"
+ "os"
+ "runtime"
+
+ jsoniter "github.com/json-iterator/go"
+ "git.archive.org/martin/cgraph/skate"
+ "git.archive.org/martin/cgraph/skate/parallel"
+)
+
+var (
+ numWorkers = flag.Int("w", runtime.NumCPU(), "number of workers")
+ batchSize = flag.Int("b", 100000, "batch size")
+ bestEffort = flag.Bool("B", false, "best effort, log errors")
+ // unmatched: clusters w/ refs only
+ // count: number of entities in cluster (by type)
+ // default: key and number of values
+ mode = flag.String("m", "", "what to extract (unmatched, count, ...)")
+
+ json = jsoniter.ConfigCompatibleWithStandardLibrary
+ bytesNewline = []byte("\n")
+)
+
+type Func func([]byte) ([]byte, error)
+
+func main() {
+ flag.Parse()
+ var f Func
+ switch *mode {
+ case "unmatched":
+ f = func(p []byte) ([]byte, error) {
+ var cluster skate.ClusterResult
+ if err := json.Unmarshal(p, &cluster); err != nil {
+ if *bestEffort {
+ log.Printf("%v", err)
+ return nil, nil
+ }
+ log.Fatal(err)
+ }
+ var refs int
+ for _, v := range cluster.Values {
+ if v.Extra.Skate.Status == "ref" {
+ refs++
+ }
+ }
+ if refs == len(cluster.Values) {
+ return p, nil
+ }
+ return nil, nil
+ }
+ case "count":
+ f = func(p []byte) ([]byte, error) {
+ var cluster skate.ClusterResult
+ if err := json.Unmarshal(p, &cluster); err != nil {
+ if *bestEffort {
+ log.Printf("%v", err)
+ return nil, nil
+ }
+ log.Fatal(err)
+ }
+ var refs int
+ for _, v := range cluster.Values {
+ if v.Extra.Skate.Status == "ref" {
+ refs++
+ }
+ }
+ // total, refs, non-refs, key
+ s := fmt.Sprintf("%d\t%d\t%d\t%s\n",
+ len(cluster.Values), refs, len(cluster.Values)-refs, cluster.Key)
+ return []byte(s), nil
+ }
+ default:
+ f = func(p []byte) ([]byte, error) {
+ var cluster skate.ClusterResult
+ if err := json.Unmarshal(p, &cluster); err != nil {
+ return nil, err
+ }
+ s := fmt.Sprintf("%d\t%s\n", len(cluster.Values), cluster.Key)
+ return []byte(s), nil
+ }
+ }
+ pp := parallel.NewProcessor(os.Stdin, os.Stdout, f)
+ pp.NumWorkers = *numWorkers
+ pp.BatchSize = *batchSize
+ if err := pp.Run(); err != nil {
+ log.Fatal(err)
+ }
+}