diff options
Diffstat (limited to 'skate/cmd')
-rw-r--r-- | skate/cmd/skate-biblioref/main.go | 139 | ||||
-rw-r--r-- | skate/cmd/skate-bref-id/main.go | 42 | ||||
-rw-r--r-- | skate/cmd/skate-cluster-stats/main.go | 92 | ||||
-rw-r--r-- | skate/cmd/skate-cluster/main.go | 107 | ||||
-rw-r--r-- | skate/cmd/skate-derive-key/main.go | 92 | ||||
-rw-r--r-- | skate/cmd/skate-from-unstructured/main.go | 88 | ||||
-rw-r--r-- | skate/cmd/skate-ref-to-release/main.go | 82 | ||||
-rw-r--r-- | skate/cmd/skate-to-doi/main.go | 58 | ||||
-rw-r--r-- | skate/cmd/skate-verify/main.go | 140 |
9 files changed, 840 insertions, 0 deletions
diff --git a/skate/cmd/skate-biblioref/main.go b/skate/cmd/skate-biblioref/main.go new file mode 100644 index 0000000..85b2a46 --- /dev/null +++ b/skate/cmd/skate-biblioref/main.go @@ -0,0 +1,139 @@ +// Experimental: Turn the minimal cluster result (key, target, source) into an +// indexable biblio ref (10eb30251f89806cb7a0f147f427c5ea7e5f9941). +// +// Supports multiple input styles transparently, for the moment. +// +// "id style" +// +// ---- id, title, ... --- ---- target -------------- ---- source -------------- +// +// 10.1001/2012.jama.11164 zhscs2mjlvcdte2i3j44ibgzae icg7bkoeqvfqnc5t5ot4evto6a +// 10.1001/2012.jama.11164 zhscs2mjlvcdte2i3j44ibgzae ichuaiowbvbx5ajae5ing27lka +// 10.1001/2012.jama.11164 zhscs2mjlvcdte2i3j44ibgzae io6b76ow6ngxnilc24qsf5kw6i +// +// "verify style" +// +// ---- target ------------------------------------------ ---- source ------------------------------------------ -- match ---- ---- match reason ------------------ +// +// https://fatcat.wiki/release/a6xucdggk5h7bcmbxidvqt7hxe https://fatcat.wiki/release/amnpvj5ma5dxlc2a3x2bm2zbnq Status.STRONG Reason.SLUG_TITLE_AUTHOR_MATCH +// https://fatcat.wiki/release/vyppsuuh2bhapdwcqzln5momta https://fatcat.wiki/release/6gd53yl5yzakrlr72xeojamchi Status.DIFFERENT Reason.CONTRIB_INTERSECTION_EMPTY +// https://fatcat.wiki/release/hazousx6wna5bn5e27s5mrljzq https://fatcat.wiki/release/iajt2xam5nbc3ichkxxuhqaqw4 Status.DIFFERENT Reason.YEAR +// +// Input might change, so we keep this short. +package main + +import ( + "flag" + "fmt" + "io/ioutil" + "log" + "os" + "runtime" + "strconv" + "strings" + "time" + + "github.com/dgraph-io/ristretto" + jsoniter "github.com/json-iterator/go" + "git.archive.org/martin/cgraph/skate" + "git.archive.org/martin/cgraph/skate/parallel" + "github.com/sethgrid/pester" +) + +var ( + numWorkers = flag.Int("w", runtime.NumCPU(), "number of workers") + batchSize = flag.Int("b", 100000, "batch size") + extended = flag.Bool("E", false, "fetch data from fatcat API") + + json = jsoniter.ConfigCompatibleWithStandardLibrary + bytesNewline = []byte("\n") + cache *ristretto.Cache + err error +) + +func main() { + flag.Parse() + cache, err = ristretto.NewCache(&ristretto.Config{ + NumCounters: 1e7, // number of keys to track frequency of (10M). + MaxCost: 1 << 30, // maximum cost of cache (1GB). + BufferItems: 64, // number of keys per Get buffer. + }) + if err != nil { + log.Fatal(err) + } + pp := parallel.NewProcessor(os.Stdin, os.Stdout, func(p []byte) ([]byte, error) { + var ( + fields = strings.Fields(string(p)) + target, source, matchStatus, matchReason, matchProvenance string + ) + switch len(fields) { + case 3: + // Some join output. + source = fields[2] + target = fields[1] + matchProvenance = "join" + case 4: + source = strings.ReplaceAll(fields[1], "https://fatcat.wiki/release/", "") + target = strings.ReplaceAll(fields[0], "https://fatcat.wiki/release/", "") + matchProvenance = "fuzzycat/ebee2de" + matchStatus = strings.ReplaceAll(fields[2], "Status.", "") + matchReason = strings.ReplaceAll(fields[3], "Reason.", "") + } + if source == target { + return nil, nil + } + br := skate.BiblioRef{ + UpdateTs: time.Now().Unix(), + SourceReleaseIdent: source, + TargetReleaseIdent: target, + MatchStatus: matchStatus, + MatchReason: matchReason, + MatchProvenance: matchProvenance, + } + if *extended { + var release skate.Release + if err := FetchRelease(source, &release); err != nil { + log.Fatal(err) + } + br.SourceReleaseStage = release.ReleaseStage + br.SourceWorkIdent = release.WorkID + br.SourceYear = strconv.Itoa(release.ReleaseYear()) + if err := FetchRelease(target, &release); err != nil { + log.Fatal(err) + } + br.TargetWorkIdent = release.WorkID + } + b, err := json.Marshal(br) + b = append(b, bytesNewline...) + return b, err + }) + pp.NumWorkers = *numWorkers + pp.BatchSize = *batchSize + if err := pp.Run(); err != nil { + log.Fatal(err) + } +} + +func FetchRelease(ident string, release *skate.Release) error { + v, found := cache.Get(ident) + if !found { + link := fmt.Sprintf("https://api.fatcat.wiki/v0/release/%s", ident) + resp, err := pester.Get(link) + if err != nil { + return err + } + defer resp.Body.Close() + b, err := ioutil.ReadAll(resp.Body) + if err != nil { + return err + } + cache.Set(ident, string(b), 1) + return json.Unmarshal(b, release) + } else { + s, ok := v.(string) + if !ok { + return fmt.Errorf("invalid cache value") + } + return json.Unmarshal([]byte(s), release) + } +} diff --git a/skate/cmd/skate-bref-id/main.go b/skate/cmd/skate-bref-id/main.go new file mode 100644 index 0000000..ca3d7c4 --- /dev/null +++ b/skate/cmd/skate-bref-id/main.go @@ -0,0 +1,42 @@ +// skate-bref-id is a temporary helper to generate an id for a bref doc. +package main + +import ( + "flag" + "fmt" + "log" + "os" + "runtime" + "time" + + jsoniter "github.com/json-iterator/go" + "git.archive.org/martin/cgraph/skate" + "git.archive.org/martin/cgraph/skate/parallel" +) + +var ( + numWorkers = flag.Int("w", runtime.NumCPU(), "number of workers") + batchSize = flag.Int("b", 100000, "batch size") + json = jsoniter.ConfigCompatibleWithStandardLibrary + + newlineB = []byte("\n") +) + +func main() { + pp := parallel.NewProcessor(os.Stdin, os.Stdout, func(p []byte) ([]byte, error) { + var bref skate.BiblioRef + if err := json.Unmarshal(p, &bref); err != nil { + return nil, err + } + bref.Key = fmt.Sprintf("%s_%d", bref.SourceReleaseIdent, bref.RefIndex) + bref.UpdateTs = time.Now().Unix() + b, err := json.Marshal(bref) + b = append(b, newlineB...) + return b, err + }) + pp.NumWorkers = *numWorkers + pp.BatchSize = *batchSize + if err := pp.Run(); err != nil { + log.Fatal(err) + } +} diff --git a/skate/cmd/skate-cluster-stats/main.go b/skate/cmd/skate-cluster-stats/main.go new file mode 100644 index 0000000..3817b7c --- /dev/null +++ b/skate/cmd/skate-cluster-stats/main.go @@ -0,0 +1,92 @@ +package main + +import ( + "flag" + "fmt" + "log" + "os" + "runtime" + + jsoniter "github.com/json-iterator/go" + "git.archive.org/martin/cgraph/skate" + "git.archive.org/martin/cgraph/skate/parallel" +) + +var ( + numWorkers = flag.Int("w", runtime.NumCPU(), "number of workers") + batchSize = flag.Int("b", 100000, "batch size") + bestEffort = flag.Bool("B", false, "best effort, log errors") + // unmatched: clusters w/ refs only + // count: number of entities in cluster (by type) + // default: key and number of values + mode = flag.String("m", "", "what to extract (unmatched, count, ...)") + + json = jsoniter.ConfigCompatibleWithStandardLibrary + bytesNewline = []byte("\n") +) + +type Func func([]byte) ([]byte, error) + +func main() { + flag.Parse() + var f Func + switch *mode { + case "unmatched": + f = func(p []byte) ([]byte, error) { + var cluster skate.ClusterResult + if err := json.Unmarshal(p, &cluster); err != nil { + if *bestEffort { + log.Printf("%v", err) + return nil, nil + } + log.Fatal(err) + } + var refs int + for _, v := range cluster.Values { + if v.Extra.Skate.Status == "ref" { + refs++ + } + } + if refs == len(cluster.Values) { + return p, nil + } + return nil, nil + } + case "count": + f = func(p []byte) ([]byte, error) { + var cluster skate.ClusterResult + if err := json.Unmarshal(p, &cluster); err != nil { + if *bestEffort { + log.Printf("%v", err) + return nil, nil + } + log.Fatal(err) + } + var refs int + for _, v := range cluster.Values { + if v.Extra.Skate.Status == "ref" { + refs++ + } + } + // total, refs, non-refs, key + s := fmt.Sprintf("%d\t%d\t%d\t%s\n", + len(cluster.Values), refs, len(cluster.Values)-refs, cluster.Key) + return []byte(s), nil + } + default: + f = func(p []byte) ([]byte, error) { + var cluster skate.ClusterResult + if err := json.Unmarshal(p, &cluster); err != nil { + return nil, err + } + s := fmt.Sprintf("%d\t%s\n", len(cluster.Values), cluster.Key) + return []byte(s), nil + } + } + pp := parallel.NewProcessor(os.Stdin, os.Stdout, f) + pp.NumWorkers = *numWorkers + pp.BatchSize = *batchSize + if err := pp.Run(); err != nil { + log.Fatal(err) + } +} diff --git a/skate/cmd/skate-cluster/main.go b/skate/cmd/skate-cluster/main.go new file mode 100644 index 0000000..1c8dfda --- /dev/null +++ b/skate/cmd/skate-cluster/main.go @@ -0,0 +1,107 @@ +// skate-cluster takes the output of skate-sorted-keys and generates a +// "cluster" document, grouping docs by key. Can do some pre-filtering. +// +// For example, this: +// +// id123 somekey123 {"a":"b", ...} +// id391 somekey123 {"x":"y", ...} +// +// would turn into (a single line containing all docs with the same key). +// +// {"k": "somekey123", "v": [{"a":"b", ...},{"x":"y",...}]} +// +// A single line cluster is easier to parallelize (e.g. for verification, etc.). +package main + +import ( + "bufio" + "flag" + "fmt" + "io" + "log" + "os" + "strings" +) + +var ( + keyField = flag.Int("k", 2, "which column contains the key (one based)") + docField = flag.Int("d", 3, "which column contains the doc") + minClusterSize = flag.Int("min", 2, "minimum cluster size") + maxClusterSize = flag.Int("max", 100000, "maximum cluster size") + requireBoth = flag.Bool("both", false, + "require at least one ref and one non-ref item present in the cluster, implies -min 2") + dropEmptyKeys = flag.Bool("D", false, "drop empty keys") +) + +func main() { + flag.Parse() + var ( + br = bufio.NewReader(os.Stdin) + bw = bufio.NewWriter(os.Stdout) + prev, key, doc string + batch, fields []string + keyIndex = *keyField - 1 + docIndex = *docField - 1 + ) + defer bw.Flush() + for { + line, err := br.ReadString('\n') + if err == io.EOF { + break + } + if err != nil { + log.Fatal(err) + } + fields = strings.Split(line, "\t") + if len(fields) <= keyIndex || len(fields) <= docIndex { + log.Fatalf("line has only %d fields", len(fields)) + } + key = strings.TrimSpace(fields[keyIndex]) + if *dropEmptyKeys && len(key) == 0 { + continue + } + doc = strings.TrimSpace(fields[docIndex]) + if prev != key { + if err := writeBatch(bw, key, batch); err != nil { + log.Fatal(err) + } + batch = nil + } + prev = key + batch = append(batch, doc) + } + if len(batch) > 0 { + if err := writeBatch(bw, prev, batch); err != nil { + log.Fatal(err) + } + } +} + +// containsBoth return true, if we have a ref and a non-ref item in the batch. +func containsBoth(batch []string) bool { + var isRef int + for _, doc := range batch { + // This is brittle. Most JSON should be in compact form, and there the + // following chars are by convention added to distinguish a release + // coming from a reference doc from other releases. + if strings.Contains(doc, `"status":"ref"`) { + isRef++ + } + } + return isRef > 0 && isRef < len(batch) +} + +// writeBatch writes out a single line containing the key and the cluster values. +func writeBatch(w io.Writer, key string, batch []string) (err error) { + if len(batch) < *minClusterSize || len(batch) > *maxClusterSize { + return nil + } + if *requireBoth && !containsBoth(batch) { + return nil + } + // This is brittle, but all items in a batch are valid JSON objects, hence, + // the following will be valid JSON as well, or will it? The key should not + // contain a quote. + _, err = fmt.Fprintf(w, "{\"k\": \"%s\", \"v\": [%s]}\n", key, strings.Join(batch, ",")) + return +} diff --git a/skate/cmd/skate-derive-key/main.go b/skate/cmd/skate-derive-key/main.go new file mode 100644 index 0000000..2375a73 --- /dev/null +++ b/skate/cmd/skate-derive-key/main.go @@ -0,0 +1,92 @@ +// skate-derive-key derives a key from release entity JSON documents. +// +// $ skate-derive-key < release_entities.jsonlines > docs.tsv +// +// Result will be a three column TSV (ident, key, doc), LC_ALL=C sorted by key. +// +// ---- ident --------------- ---- key ------------------------------ ---- doc ---------- +// +// 4lzgf5wzljcptlebhyobccj7ru 2568diamagneticsusceptibilityofh8n2o10sr {"abstracts":[],... +// +// After this step, a fast "itertools.groupby" or "skate-cluster" on key can yields clusters. +// +// Notes +// +// Using https://github.com/DataDog/zstd#stream-api, 3700 docs/s for key +// extraction only; using zstd -T0, we get 21K docs/s; about 13K docs/s; about +// 32h for 1.5B records. +// +// Default sort(1) buffer is 1K, but we'll need G's, e.g. -S35% of 48GB. +package main + +import ( + "flag" + "fmt" + "log" + "os" + "runtime" + "strings" + "time" + + "git.archive.org/martin/cgraph/skate" + "git.archive.org/martin/cgraph/skate/parallel" +) + +var ( + keyFuncName = flag.String("f", "tsand", "key function name, other: title, tnorm, tnysi, tsand") + numWorkers = flag.Int("w", runtime.NumCPU(), "number of workers") + batchSize = flag.Int("b", 50000, "batch size") + verbose = flag.Bool("verbose", false, "show progress") + bestEffort = flag.Bool("B", false, "best effort") + logFile = flag.String("log", "", "log filename") + skipEmptyKeys = flag.Bool("skip-empty-keys", false, "omit docs without keys") + + wsReplacer = strings.NewReplacer("\t", "", "\n", "") + keyOpts = map[string]skate.IdentifierKeyFunc{ + "title": skate.KeyTitle, + "tnorm": skate.KeyTitleNormalized, + "tnysi": skate.KeyTitleNysiis, + "tsand": skate.KeyTitleSandcrawler, + } + keyFunc skate.IdentifierKeyFunc + ok bool +) + +func main() { + flag.Parse() + if keyFunc, ok = keyOpts[*keyFuncName]; !ok { + log.Fatal("invalid key func") + } + if *logFile != "" { + f, err := os.OpenFile(*logFile, os.O_CREATE|os.O_APPEND, 0644) + if err != nil { + log.Fatal(err) + } + defer f.Close() + log.SetOutput(f) + } + started := time.Now() + pp := parallel.NewProcessor(os.Stdin, os.Stdout, func(p []byte) ([]byte, error) { + ident, key, err := keyFunc(p) + if err != nil { + if *bestEffort { + log.Printf("keyFunc failed with %v: %v", err, string(p)) + return nil, nil + } + return nil, err + } + ident, key = strings.TrimSpace(ident), strings.TrimSpace(key) + if *skipEmptyKeys && key == "" { + return nil, nil + } + v := fmt.Sprintf("%s\t%s\t%s\n", ident, key, wsReplacer.Replace(string(p))) + return []byte(v), nil + }) + pp.NumWorkers = *numWorkers + pp.BatchSize = *batchSize + pp.Verbose = *verbose + if err := pp.Run(); err != nil { + log.Fatal(err) + } + log.Printf("took: %s", time.Since(started)) +} diff --git a/skate/cmd/skate-from-unstructured/main.go b/skate/cmd/skate-from-unstructured/main.go new file mode 100644 index 0000000..bfd3f32 --- /dev/null +++ b/skate/cmd/skate-from-unstructured/main.go @@ -0,0 +1,88 @@ +// skate-from-unstructured tries to parse various pieces of information from an +// unstrctured string. +package main + +import ( + "flag" + "log" + "os" + "regexp" + "runtime" + "strings" + + jsoniter "github.com/json-iterator/go" + "git.archive.org/martin/cgraph/skate" + "git.archive.org/martin/cgraph/skate/parallel" +) + +var ( + numWorkers = flag.Int("w", runtime.NumCPU(), "number of workers") + batchSize = flag.Int("b", 100000, "batch size") + json = jsoniter.ConfigCompatibleWithStandardLibrary + bytesNewline = []byte("\n") + + PatDOI = regexp.MustCompile(`10[.][0-9]{1,8}/[^ ]*[\w]`) + PatDOINoHyphen = regexp.MustCompile(`10[.][0-9]{1,8}/[^ -]*[\w]`) + PatArxivPDF = regexp.MustCompile(`http://arxiv.org/pdf/([0-9]{4,4}[.][0-9]{1,8})(v[0-9]{1,2})?(.pdf)?`) + PatArxivAbs = regexp.MustCompile(`http://arxiv.org/abs/([0-9]{4,4}[.][0-9]{1,8})(v[0-9]{1,2})?(.pdf)?`) +) + +func main() { + pp := parallel.NewProcessor(os.Stdin, os.Stdout, func(p []byte) ([]byte, error) { + var ref skate.Ref + if err := json.Unmarshal(p, &ref); err != nil { + return nil, err + } + // TODO: ref + if err := parseUnstructured(&ref); err != nil { + return nil, err + } + b, err := json.Marshal(ref) + if err != nil { + return nil, err + } + b = append(b, bytesNewline...) + return b, nil + }) + pp.NumWorkers = *numWorkers + pp.BatchSize = *batchSize + if err := pp.Run(); err != nil { + log.Fatal(err) + } +} + +// parseUnstructured will in-place augment missing DOI, arxiv id and so on. +func parseUnstructured(ref *skate.Ref) error { + uns := ref.Biblio.Unstructured + var ( + v string + vs []string + ) + // Handle things like: 10.1111/j.1550-7408.1968.tb02138.x-BIB5|cit5, + // 10.1111/j.1558-5646.1997.tb02431.x-BIB0008|evo02431-cit-0008, ... + if strings.Contains(strings.ToLower(ref.Key), "-bib") && ref.Biblio.DOI == "" { + parts := strings.Split(strings.ToLower(ref.Key), "-bib") + ref.Biblio.DOI = parts[0] + } + // DOI + v = PatDOI.FindString(uns) + if v != "" && ref.Biblio.DOI == "" { + ref.Biblio.DOI = v + } + // DOI in Key + v = PatDOINoHyphen.FindString(ref.Key) + if v != "" && ref.Biblio.DOI == "" { + ref.Biblio.DOI = v + } + // Arxiv + vs = PatArxivPDF.FindStringSubmatch(uns) + if len(vs) != 0 && ref.Biblio.ArxivId == "" { + ref.Biblio.ArxivId = vs[1] + } else { + vs = PatArxivAbs.FindStringSubmatch(uns) + if len(vs) != 0 && ref.Biblio.ArxivId == "" { + ref.Biblio.ArxivId = vs[1] + } + } + return nil +} diff --git a/skate/cmd/skate-ref-to-release/main.go b/skate/cmd/skate-ref-to-release/main.go new file mode 100644 index 0000000..0eec40b --- /dev/null +++ b/skate/cmd/skate-ref-to-release/main.go @@ -0,0 +1,82 @@ +// skate-ref-to-release converts a "ref" document to a "release" document. +// +package main + +import ( + "flag" + "log" + "os" + "runtime" + "strings" + + "github.com/miku/parallel" + "git.archive.org/martin/cgraph/skate" + + jsoniter "github.com/json-iterator/go" +) + +var ( + numWorkers = flag.Int("w", runtime.NumCPU(), "number of workers") + batchSize = flag.Int("b", 100000, "batch size") + fromFormat = flag.String("f", "ref", "import data shape") + + json = jsoniter.ConfigCompatibleWithStandardLibrary + bytesNewline = []byte("\n") +) + +func refToRelease(p []byte) ([]byte, error) { + var ref skate.Ref + if err := json.Unmarshal(p, &ref); err != nil { + return nil, err + } + release, err := skate.RefToRelease(&ref) + if err != nil { + return nil, err + } + release.Extra.Skate.Status = "ref" // means: converted from ref + release.Extra.Skate.Ref.Index = ref.Index + release.Extra.Skate.Ref.Key = ref.Key + b, err := json.Marshal(release) + b = append(b, bytesNewline...) + return b, err +} + +func rgSitemapToRelease(p []byte) ([]byte, error) { + var ( + s skate.Sitemap + release skate.Release + ) + if err := json.Unmarshal(p, &s); err != nil { + return nil, err + } + release.Title = s.Title + if len(s.URL) > 41 { + // XXX: A pseudo ident, maybe irritating. + release.Ident = strings.Split(s.URL[41:], "_")[0] + } + release.Extra.Skate.Status = "rg" + release.Extra.Skate.ResearchGate.URL = s.URL + b, err := json.Marshal(release) + b = append(b, bytesNewline...) + return b, err +} + +func main() { + flag.Parse() + switch *fromFormat { + case "ref": + pp := parallel.NewProcessor(os.Stdin, os.Stdout, refToRelease) + pp.NumWorkers = *numWorkers + pp.BatchSize = *batchSize + if err := pp.Run(); err != nil { + log.Fatal(err) + } + case "rg": + pp := parallel.NewProcessor(os.Stdin, os.Stdout, rgSitemapToRelease) + pp.NumWorkers = *numWorkers + pp.BatchSize = *batchSize + if err := pp.Run(); err != nil { + log.Fatal(err) + } + } +} diff --git a/skate/cmd/skate-to-doi/main.go b/skate/cmd/skate-to-doi/main.go new file mode 100644 index 0000000..377383f --- /dev/null +++ b/skate/cmd/skate-to-doi/main.go @@ -0,0 +1,58 @@ +// Filter to parse out a correctly looking DOI from a field. +// +// $ echo "1,xxx 10.123/12312 xxx,3" | skate-to-doi -d , -f 2 +// 1,10.123/12312,3 +// +// We can use this to sanitize DOI-like fields in the reference dataset. + +package main + +import ( + "flag" + "fmt" + "log" + "os" + "regexp" + "runtime" + "strings" + + "git.archive.org/martin/cgraph/skate/parallel" +) + +var ( + numWorkers = flag.Int("w", runtime.NumCPU(), "number of workers") + batchSize = flag.Int("b", 100000, "batch size") + delimiter = flag.String("d", "\t", "delimiter") + index = flag.Int("f", 1, "one field to cleanup up a doi, 1-indexed") + bestEffort = flag.Bool("B", false, "only log errors, but do not stop") + skipNonMatches = flag.Bool("S", false, "do not emit a line for non-matches") + + PatDOI = regexp.MustCompile(`10[.][0-9]{1,8}/[^ ]*[\w]`) +) + +func main() { + flag.Parse() + pp := parallel.NewProcessor(os.Stdin, os.Stdout, func(p []byte) ([]byte, error) { + parts := strings.Split(string(p), *delimiter) + if len(parts) < *index { + msg := fmt.Sprintf("warn: line has too few fields (%d): %s", len(parts), string(p)) + if *bestEffort { + log.Println(msg) + return nil, nil + } else { + return nil, fmt.Errorf(msg) + } + } + result := PatDOI.FindString(parts[*index-1]) + if len(result) == 0 && *skipNonMatches { + return nil, nil + } + parts[*index-1] = result + return []byte(strings.Join(parts, *delimiter)), nil + }) + pp.NumWorkers = *numWorkers + pp.BatchSize = *batchSize + if err := pp.Run(); err != nil { + log.Fatal(err) + } +} diff --git a/skate/cmd/skate-verify/main.go b/skate/cmd/skate-verify/main.go new file mode 100644 index 0000000..e6fc417 --- /dev/null +++ b/skate/cmd/skate-verify/main.go @@ -0,0 +1,140 @@ +// Generate pairs and run verification on larger number of records. Mimick +// fuzzycat.verify, but make it faster (e.g. fuzzycat took about 50h for the +// complete set). +// +// Currently: about 2h for 40M clusters (in "ref" mode). +// +// XXX: Cleanup inconsistent "modes". +package main + +import ( + "bufio" + "flag" + "log" + "os" + "runtime" + "runtime/pprof" + "strings" + + jsoniter "github.com/json-iterator/go" + "git.archive.org/martin/cgraph/skate" + "git.archive.org/martin/cgraph/skate/parallel" +) + +var ( + numWorkers = flag.Int("w", runtime.NumCPU(), "number of workers") + batchSize = flag.Int("b", 10000, "batch size") + mode = flag.String("m", "ref", "mode: exact, ref, bref, zip, bzip") + exactReason = flag.String("r", "", "doi, pmid, pmcid, arxiv") + provenance = flag.String("p", "join", "provenance info") + releasesFile = flag.String("R", "", "releases, tsv, sorted by key (zip mode only)") + refsFile = flag.String("F", "", "refs, tsv, sorted by key (zip mode only)") + cpuProfile = flag.String("cpuprofile", "", "write cpu profile to file") + memProfile = flag.String("memprofile", "", "write heap profile to file (go tool pprof -png --alloc_objects program mem.pprof > mem.png)") + + json = jsoniter.ConfigCompatibleWithStandardLibrary + + // XXX: This should be cleanup up soon. + matchResults = map[string]skate.MatchResult{ + "doi": skate.MatchResult{skate.StatusExact, skate.ReasonDOI}, + "pmid": skate.MatchResult{skate.StatusExact, skate.ReasonPMID}, + "pmcid": skate.MatchResult{skate.StatusExact, skate.ReasonPMCID}, + "arxiv": skate.MatchResult{skate.StatusExact, skate.ReasonArxiv}, + "unknown": skate.MatchResult{skate.StatusUnknown, skate.ReasonUnknown}, + } +) + +func main() { + flag.Parse() + if *cpuProfile != "" { + file, err := os.Create(*cpuProfile) + if err != nil { + log.Fatal(err) + } + pprof.StartCPUProfile(file) + defer pprof.StopCPUProfile() + } + switch *mode { + case "exact": + // Fixed zip mode for DOI. + if *refsFile == "" || *releasesFile == "" { + log.Fatal("mode requires -R and -F to be set") + } + if *exactReason == "" { + var keys []string + for k := range matchResults { + keys = append(keys, k) + } + log.Fatalf("need a reason for the record, one of: %s", strings.Join(keys, ", ")) + } + f, err := os.Open(*releasesFile) + if err != nil { + log.Fatal(err) + } + defer f.Close() + g, err := os.Open(*refsFile) + if err != nil { + log.Fatal(err) + } + defer g.Close() + bw := bufio.NewWriter(os.Stdout) + defer bw.Flush() + mr, ok := matchResults[*exactReason] + if !ok { + mr = matchResults["unknown"] + } + if err := skate.ZipUnverified(f, g, mr, *provenance, bw); err != nil { + log.Fatal(err) + } + case "zip": + // Take two "sorted key files" (one refs, one releases) and run + // verification across groups, generate biblioref file. + if *refsFile == "" || *releasesFile == "" { + log.Fatal("zip mode requires -R and -F to be set") + } + f, err := os.Open(*releasesFile) + if err != nil { + log.Fatal(err) + } + defer f.Close() + g, err := os.Open(*refsFile) + if err != nil { + log.Fatal(err) + } + defer g.Close() + bw := bufio.NewWriter(os.Stdout) + defer bw.Flush() + if err := skate.ZipVerifyRefs(f, g, bw); err != nil { + log.Fatal(err) + } + case "ref": + // https://git.io/JtACz + pp := parallel.NewProcessor(os.Stdin, os.Stdout, skate.RefCluster) + pp.NumWorkers = *numWorkers + pp.BatchSize = *batchSize + if err := pp.Run(); err != nil { + log.Fatal(err) + } + case "bref": + // generate biblioref + pp := parallel.NewProcessor(os.Stdin, os.Stdout, skate.RefClusterToBiblioRef) + pp.NumWorkers = *numWorkers + pp.BatchSize = *batchSize + if err := pp.Run(); err != nil { + log.Fatal(err) + } + default: + log.Fatal("not implemented, only: zip, ref, bref") + } + if *memProfile != "" { + f, err := os.Create(*memProfile) + if err != nil { + log.Fatal("could not create memory profile: ", err) + } + defer f.Close() + runtime.GC() + if err := pprof.WriteHeapProfile(f); err != nil { + log.Fatal(err) + } + } +} |