diff options
Diffstat (limited to 'skate')
-rw-r--r-- | skate/Makefile | 2 | ||||
-rw-r--r-- | skate/cmd/skate-reduce/main.go | 144 | ||||
-rw-r--r-- | skate/cmd/skate-verify/main.go | 158 | ||||
-rw-r--r-- | skate/xio/util.go | 14 | ||||
-rw-r--r-- | skate/zippy.go | 28 |
5 files changed, 173 insertions, 173 deletions
diff --git a/skate/Makefile b/skate/Makefile index f098ecd..97b192d 100644 --- a/skate/Makefile +++ b/skate/Makefile @@ -1,5 +1,5 @@ SHELL := /bin/bash -TARGETS := skate-conv skate-derive-key skate-cluster skate-verify skate-to-doi skate-bref-id skate-from-unstructured skate-wikipedia-doi skate-dot skate-map +TARGETS := skate-conv skate-cluster skate-to-doi skate-bref-id skate-from-unstructured skate-wikipedia-doi skate-dot skate-map skate-reduce PKGNAME := skate .PHONY: test diff --git a/skate/cmd/skate-reduce/main.go b/skate/cmd/skate-reduce/main.go new file mode 100644 index 0000000..df72ef4 --- /dev/null +++ b/skate/cmd/skate-reduce/main.go @@ -0,0 +1,144 @@ +// skate-reduce takes prepared inputs (e.g. from skate-map or skate-cluster) +// and applies various verification and conversion functions. The output will +// often be the biblioref schema. +// +// Support various modes. +// +// * exact: takes (key, doc) TSV files (one for releases, one for refs) and +// will emit biblioref docs relating one element from releases with all +// elements from ref; this is for "doi", "pmid" and other id matches, where no +// further checks are necessary. The match reason, e.g. "doi" needs to be +// supplied. +// +// $ skate-reduce -m exact -r doi -F a.tsv -L b.tsv +// +// * verify: takes (key, doc) TSV files (one for release, one for refs), runs +// verification within a group and will emit biblioref. +// +// $ skate-reduce -m verify -F a.tsv -L b.tsv +// +// * ref: takes a single file with clusters containing releases and refs and +// will emit verification results. +// +// $ skate-reduce -m ref < a.ndj +// +// * bref: same as ref, but generate a biblioref file as output +// +// $ skate-reduce -m bref < a.ndj +// +// * wiki: zippy mode for releases and wikipedia inputs. +// +// $ skate-reduce -m wiki -L a.ndj -W b.ndj +// +package main + +import ( + "bufio" + "flag" + "log" + "os" + "runtime" + "runtime/pprof" + + "git.archive.org/martin/cgraph/skate" + "git.archive.org/martin/cgraph/skate/parallel" + "git.archive.org/martin/cgraph/skate/xio" +) + +var ( + numWorkers = flag.Int("w", runtime.NumCPU(), "number of workers") + batchSize = flag.Int("b", 10000, "batch size") + // Each mode may work on one or two files, and may need extra args. + mode = flag.String("m", "ref", "mode, e.g. exact, verify, ref, bref, wiki") + + cpuProfile = flag.String("cpuprofile", "", "write cpu profile to file") + memProfile = flag.String("memprofile", "", "write heap profile to file (go tool pprof -png --alloc_objects program mem.pprof > mem.png)") + + // Possible inputs, we could switch to a subcommand cli parser. + refs = flag.String("F", "", "path to refs input") + releases = flag.String("L", "", "path to release input") + wiki = flag.String("W", "", "path to wiki input") + + // Extra args. + reason = flag.String("r", "", "reason for match: doi, pmid, pmcid, arxiv, unknown") + reasonMap = map[string]skate.MatchResult{ + "doi": skate.MatchResult{Status: skate.StatusExact, Reason: skate.ReasonDOI}, + "pmid": skate.MatchResult{Status: skate.StatusExact, Reason: skate.ReasonPMID}, + "pmcid": skate.MatchResult{Status: skate.StatusExact, Reason: skate.ReasonPMCID}, + "arxiv": skate.MatchResult{Status: skate.StatusExact, Reason: skate.ReasonArxiv}, + "unknown": skate.MatchResult{Status: skate.StatusUnknown, Reason: skate.ReasonUnknown}, + } +) + +func main() { + flag.Parse() + if *cpuProfile != "" { + file, err := os.Create(*cpuProfile) + if err != nil { + log.Fatal(err) + } + pprof.StartCPUProfile(file) + defer pprof.StopCPUProfile() + } + + bw := bufio.NewWriter(os.Stdout) + defer bw.Flush() + + switch *mode { + case "exact": + l, f, err := xio.OpenTwo(*releases, *refs) + if err != nil { + log.Fatal(err) + } + r, ok := reasonMap[*reason] + if !ok { + log.Fatalf("unknown reason: %v", *reason) + } + if err := skate.ZippyExact(l, f, r, bw); err != nil { + log.Fatal(err) + } + case "verify": + l, f, err := xio.OpenTwo(*releases, *refs) + if err != nil { + log.Fatal(err) + } + if err := skate.ZippyVerifyRefs(l, f, bw); err != nil { + log.Fatal(err) + } + case "ref": + pp := parallel.NewProcessor(os.Stdin, os.Stdout, skate.RefClusterVerify) + pp.NumWorkers = *numWorkers + pp.BatchSize = *batchSize + if err := pp.Run(); err != nil { + log.Fatal(err) + } + case "bref": + pp := parallel.NewProcessor(os.Stdin, os.Stdout, skate.RefClusterToBiblioRef) + pp.NumWorkers = *numWorkers + pp.BatchSize = *batchSize + if err := pp.Run(); err != nil { + log.Fatal(err) + } + case "wiki": + l, w, err := xio.OpenTwo(*releases, *wiki) + if err != nil { + log.Fatal(err) + } + if err := skate.ZippyExactWiki(l, w, reasonMap["doi"], bw); err != nil { + log.Fatal(err) + } + default: + log.Fatalf("invalid mode") + } + if *memProfile != "" { + f, err := os.Create(*memProfile) + if err != nil { + log.Fatal("could not create memory profile: ", err) + } + defer f.Close() + runtime.GC() + if err := pprof.WriteHeapProfile(f); err != nil { + log.Fatal(err) + } + } +} diff --git a/skate/cmd/skate-verify/main.go b/skate/cmd/skate-verify/main.go deleted file mode 100644 index 1288404..0000000 --- a/skate/cmd/skate-verify/main.go +++ /dev/null @@ -1,158 +0,0 @@ -// Generate pairs and run verification on larger number of records. Mimick -// fuzzycat.verify, but make it faster (e.g. fuzzycat took about 50h for the -// complete set). -// -// Currently: about 2h for 40M clusters (in "ref" mode). -// -// XXX: Cleanup inconsistent "modes". -package main - -import ( - "bufio" - "flag" - "io" - "log" - "os" - "runtime" - "runtime/pprof" - "strings" - - "git.archive.org/martin/cgraph/skate" - "git.archive.org/martin/cgraph/skate/parallel" -) - -var ( - numWorkers = flag.Int("w", runtime.NumCPU(), "number of workers") - batchSize = flag.Int("b", 10000, "batch size") - mode = flag.String("m", "ref", "mode: exact, ref, bref, zip, bzip, wiki") - exactReason = flag.String("r", "", "doi, pmid, pmcid, arxiv") - provenance = flag.String("p", "join", "provenance info") - wikiFile = flag.String("W", "", "wiki citation file") - releasesFile = flag.String("R", "", "releases, tsv, sorted by key (zip mode only)") - refsFile = flag.String("F", "", "refs, tsv, sorted by key (zip mode only)") - cpuProfile = flag.String("cpuprofile", "", "write cpu profile to file") - memProfile = flag.String("memprofile", "", "write heap profile to file (go tool pprof -png --alloc_objects program mem.pprof > mem.png)") - - // XXX: This should be cleanup up soon. - matchResults = map[string]skate.MatchResult{ - "doi": skate.MatchResult{Status: skate.StatusExact, Reason: skate.ReasonDOI}, - "pmid": skate.MatchResult{Status: skate.StatusExact, Reason: skate.ReasonPMID}, - "pmcid": skate.MatchResult{Status: skate.StatusExact, Reason: skate.ReasonPMCID}, - "arxiv": skate.MatchResult{Status: skate.StatusExact, Reason: skate.ReasonArxiv}, - "unknown": skate.MatchResult{Status: skate.StatusUnknown, Reason: skate.ReasonUnknown}, - } -) - -func main() { - flag.Parse() - if *cpuProfile != "" { - file, err := os.Create(*cpuProfile) - if err != nil { - log.Fatal(err) - } - pprof.StartCPUProfile(file) - defer pprof.StopCPUProfile() - } - var ( - f, g io.ReadCloser - err error - bw = bufio.NewWriter(os.Stdout) - ) - defer bw.Flush() - switch *mode { - case "exact": - // Fixed zip mode for DOI. - if *refsFile == "" || *releasesFile == "" { - log.Fatal("mode requires -R and -F to be set") - } - if *exactReason == "" { - var keys []string - for k := range matchResults { - keys = append(keys, k) - } - log.Fatalf("need a reason for the record, one of: %s", strings.Join(keys, ", ")) - } - if f, g, err = readersFromFilenames(*releasesFile, *refsFile); err != nil { - log.Fatal(err) - } - defer f.Close() - defer g.Close() - mr, ok := matchResults[*exactReason] - if !ok { - mr = matchResults["unknown"] - } - if err := skate.ZippyFixed(f, g, mr, *provenance, bw); err != nil { - log.Fatal(err) - } - case "zip": - // Take two "sorted key files" (one refs, one releases) and run - // verification across groups, generate biblioref file. - if *refsFile == "" || *releasesFile == "" { - log.Fatal("zip mode requires -F and -R to be set") - } - if f, g, err = readersFromFilenames(*releasesFile, *refsFile); err != nil { - log.Fatal(err) - } - defer f.Close() - defer g.Close() - if err := skate.ZippyVerifyRefs(f, g, bw); err != nil { - log.Fatal(err) - } - case "ref": - // https://git.io/JtACz - pp := parallel.NewProcessor(os.Stdin, os.Stdout, skate.RefClusterVerify) - pp.NumWorkers = *numWorkers - pp.BatchSize = *batchSize - if err := pp.Run(); err != nil { - log.Fatal(err) - } - case "bref": - // generate biblioref - pp := parallel.NewProcessor(os.Stdin, os.Stdout, skate.RefClusterToBiblioRef) - pp.NumWorkers = *numWorkers - pp.BatchSize = *batchSize - if err := pp.Run(); err != nil { - log.Fatal(err) - } - case "wiki": - // Fixed zip mode for DOI from wikipedia. - if *wikiFile == "" || *releasesFile == "" { - log.Fatal("mode requires -W and -F to be set") - } - if f, g, err = readersFromFilenames(*releasesFile, *wikiFile); err != nil { - log.Fatal(err) - } - defer f.Close() - defer g.Close() - mr := skate.MatchResult{Status: skate.StatusExact, Reason: skate.ReasonDOI} - if err = skate.ZippyFixedWiki(f, g, mr, "wiki", bw); err != nil { - log.Fatal(err) - } - default: - log.Fatal("not implemented, only: exact, zip, ref, bref, wiki") - } - if *memProfile != "" { - f, err := os.Create(*memProfile) - if err != nil { - log.Fatal("could not create memory profile: ", err) - } - defer f.Close() - runtime.GC() - if err := pprof.WriteHeapProfile(f); err != nil { - log.Fatal(err) - } - } -} - -// readersFromFilenames lets the called check for a single error only. -func readersFromFilenames(f0, f1 string) (io.ReadCloser, io.ReadCloser, error) { - f, err := os.Open(f0) - if err != nil { - return nil, nil, err - } - g, err := os.Open(f1) - if err != nil { - return nil, nil, err - } - return f, g, nil -} diff --git a/skate/xio/util.go b/skate/xio/util.go new file mode 100644 index 0000000..554317b --- /dev/null +++ b/skate/xio/util.go @@ -0,0 +1,14 @@ +package xio + +import "os" + +// OpenTwo opens two files, and the caller needs to check for a single error only. +func OpenTwo(f1, f2 string) (g1 *os.File, g2 *os.File, err error) { + if g1, err = os.Open(f1); err != nil { + return nil, nil, err + } + if g2, err = os.Open(f2); err != nil { + return nil, nil, err + } + return g1, g2, nil +} diff --git a/skate/zippy.go b/skate/zippy.go index 6b1c373..c8d2aee 100644 --- a/skate/zippy.go +++ b/skate/zippy.go @@ -11,12 +11,12 @@ import ( // This file contains the two-stream (zippy) matchers. -// ZippyFixed takes a release and refs reader (tsv, with ident, key, doc) +// ZippyExact takes a release and refs reader (tsv, with ident, key, doc) // and assigns a fixed match result. -func ZippyFixed(releases, refs io.Reader, mr MatchResult, provenance string, w io.Writer) error { +func ZippyExact(releases, refs io.Reader, matchResult MatchResult, w io.Writer) error { var ( enc = json.NewEncoder(w) - keyer = makeKeyFunc("\t", 2) + keyer = makeKeyFunc("\t", 1) grouper = func(g *zipkey.Group) error { var ( target *Release @@ -26,11 +26,11 @@ func ZippyFixed(releases, refs io.Reader, mr MatchResult, provenance string, w i if len(g.G0) == 0 || len(g.G1) == 0 { return nil } - if target, err = stringToRelease(cut(g.G0[0], "\t", 3)); err != nil { + if target, err = stringToRelease(cut(g.G0[0], "\t", 2)); err != nil { return err } for _, line := range g.G1 { - if ref, err = stringToRef(cut(line, "\t", 3)); err != nil { + if ref, err = stringToRef(cut(line, "\t", 2)); err != nil { return err } var bref BiblioRef @@ -42,9 +42,9 @@ func ZippyFixed(releases, refs io.Reader, mr MatchResult, provenance string, w i bref.RefKey = ref.Key bref.TargetReleaseIdent = target.Ident bref.TargetWorkIdent = target.WorkID - bref.MatchProvenance = provenance - bref.MatchStatus = mr.Status.Short() - bref.MatchReason = mr.Reason.Short() + bref.MatchProvenance = ref.RefSource + bref.MatchStatus = matchResult.Status.Short() + bref.MatchReason = matchResult.Reason.Short() if err := enc.Encode(bref); err != nil { return err } @@ -56,12 +56,12 @@ func ZippyFixed(releases, refs io.Reader, mr MatchResult, provenance string, w i return zipper.Run() } -// ZippyFixedWiki takes a release and wiki reader (tsv, with ident, key, doc) +// ZippyExactWiki takes a release and wiki reader (tsv, with key, doc) // and assigns a fixed match result. -func ZippyFixedWiki(releases, wiki io.Reader, mr MatchResult, provenance string, w io.Writer) error { +func ZippyExactWiki(releases, wiki io.Reader, mr MatchResult, w io.Writer) error { var ( enc = json.NewEncoder(w) - keyer = makeKeyFunc("\t", 2) + keyer = makeKeyFunc("\t", 1) grouper = func(g *zipkey.Group) error { var ( target *Release @@ -71,11 +71,11 @@ func ZippyFixedWiki(releases, wiki io.Reader, mr MatchResult, provenance string, if len(g.G0) == 0 || len(g.G1) == 0 { return nil } - if target, err = stringToRelease(cut(g.G0[0], "\t", 3)); err != nil { + if target, err = stringToRelease(cut(g.G0[0], "\t", 2)); err != nil { return err } for _, line := range g.G1 { - if wiki, err = stringToWiki(cut(line, "\t", 3)); err != nil { + if wiki, err = stringToWiki(cut(line, "\t", 2)); err != nil { return err } var bref BiblioRef @@ -83,7 +83,7 @@ func ZippyFixedWiki(releases, wiki io.Reader, mr MatchResult, provenance string, bref.SourceWikipediaArticle = wiki.PageTitle bref.TargetReleaseIdent = target.Ident bref.TargetWorkIdent = target.WorkID - bref.MatchProvenance = provenance + bref.MatchProvenance = "wikipedia" bref.MatchStatus = mr.Status.Short() bref.MatchReason = mr.Reason.Short() if err := enc.Encode(bref); err != nil { |