// skate-cluster takes the (tab) output of skate-map (plus sort) and generates // a "cluster" document, grouping docs by key. Can do some pre-filtering (e.g. // require refs and release docs in a single cluster). // // For example, this: // // id123 somekey123 {"a":"b", ...} // id391 somekey123 {"x":"y", ...} // // would turn into (a single line containing all docs with the same key). // // {"k": "somekey123", "v": [{"a":"b", ...},{"x":"y",...}]} // // A single line cluster is easier to parallelize (e.g. for verification, etc.). package main import ( "bufio" "flag" "fmt" "io" "log" "os" "strings" ) var ( keyField = flag.Int("k", 2, "which column contains the key (one based)") docField = flag.Int("d", 3, "which column contains the doc (one based)") minClusterSize = flag.Int("min", 2, "minimum cluster size") maxClusterSize = flag.Int("max", 100000, "maximum cluster size") requireBoth = flag.Bool("both", false, "require at least one ref and one non-ref item present in the cluster, implies -min 2") dropEmptyKeys = flag.Bool("D", false, "drop empty keys") delimiter = flag.String("d", "\t", "field delimiter") ) func main() { flag.Parse() var ( br = bufio.NewReader(os.Stdin) bw = bufio.NewWriter(os.Stdout) prev, key, doc string batch, fields []string keyIndex = *keyField - 1 docIndex = *docField - 1 line string err error ) defer bw.Flush() for { line, err = br.ReadString('\n') if err == io.EOF { break } if err != nil { log.Fatal(err) } fields = strings.Split(line, *delimiter) if len(fields) <= keyIndex || len(fields) <= docIndex { log.Fatalf("line has only %d fields", len(fields)) } key = strings.TrimSpace(fields[keyIndex]) if *dropEmptyKeys && len(key) == 0 { continue } doc = strings.TrimSpace(fields[docIndex]) if prev != key { if err := writeBatch(bw, key, batch); err != nil { log.Fatal(err) } batch = nil } prev = key batch = append(batch, doc) } if err := writeBatch(bw, prev, batch); err != nil { log.Fatal(err) } } // containsBoth return true, if we have a ref and a non-ref item in the batch. func containsBoth(batch []string) bool { var numRef int for _, doc := range batch { // This is brittle (but faster). Most JSON should be in compact form, // and there the following chars are by convention added to distinguish // a release coming from a reference doc from other releases. if strings.Contains(doc, `"status":"ref"`) { numRef++ } } return numRef > 0 && numRef < len(batch) } // writeBatch writes out a single line containing the key and the cluster values. func writeBatch(w io.Writer, key string, batch []string) (err error) { if len(batch) == 0 { return nil } if len(batch) < *minClusterSize || len(batch) > *maxClusterSize { return nil } if *requireBoth && !containsBoth(batch) { return nil } // This is brittle (and fast), but all items in a batch are valid JSON // objects, hence, the following will be valid JSON as well, or will it? // The key should not contain a quote. _, err = fmt.Fprintf(w, "{\"k\": \"%s\", \"v\": [%s]}\n", key, strings.Join(batch, ",")) return }