diff options
author | Martin Czygan <martin.czygan@gmail.com> | 2021-04-08 18:44:47 +0200 |
---|---|---|
committer | Martin Czygan <martin.czygan@gmail.com> | 2021-04-19 20:29:17 +0200 |
commit | c8f2e93e1ea6542291cf977f0957ed7786f00766 (patch) | |
tree | 34808cfb7c50bac2a4f66dfd2a86b0f893604018 /skate/cmd/skate-cdx-lookup | |
parent | bf5ffca07b4dfbd9b1134e2a0223c0c7d27b49d9 (diff) | |
download | refcat-c8f2e93e1ea6542291cf977f0957ed7786f00766.tar.gz refcat-c8f2e93e1ea6542291cf977f0957ed7786f00766.zip |
cdx: stubby example
Diffstat (limited to 'skate/cmd/skate-cdx-lookup')
-rw-r--r-- | skate/cmd/skate-cdx-lookup/main.go | 106 |
1 files changed, 98 insertions, 8 deletions
diff --git a/skate/cmd/skate-cdx-lookup/main.go b/skate/cmd/skate-cdx-lookup/main.go index 742ca7d..2e43b8a 100644 --- a/skate/cmd/skate-cdx-lookup/main.go +++ b/skate/cmd/skate-cdx-lookup/main.go @@ -1,15 +1,16 @@ // skate-cdx-lookup is a lookup tool for small and large lists of URLs. We try -// to read from HDSFs in parallel and cache some mapping information locally +// to read from HDFS in parallel and cache some mapping information locally // for fast access. // // What we want: Lookup 10-100M URLs and report, whether we have it or not. // Also make this a bit more generic, so we can lookup all kinds of things in // the CDX index. // -// Alternatives: Spark, Sparkling, PIG, Hive, ... +// Alternatives: Spark, Sparkling, Pig, Hive, ... // // We take advantage of index files and sorted data. The complete dataset is -// 66TB, gzip compressed. +// 66TB, gzip compressed. We do not need compute to be distrubuted, as a single +// machine may be enough to process the data. // // An example line: // @@ -17,13 +18,20 @@ // // The index files are named part-a-00276-idx and are typically around 100M, not compressed. 900K lines, takes 1-2s to scan. // +// The idx files are probably concatenated gzips, otherwise we could not seek into them. package main import ( + "bufio" + "compress/gzip" "flag" "fmt" + "io" "log" "sort" + "strconv" + "strings" + "time" "github.com/colinmarc/hdfs" ) @@ -31,18 +39,19 @@ import ( var ( nameNode = flag.String("nn", "", "namenode, leave empty when env is set up") cdxDir = flag.String("C", "/user/wmdata2/cdx-all-index", "cdx dir") -) -func main() { - flag.Usage = func() { - fmt.Println(` + note = ` Make sure HADOOP env is set up. $ git clone https://git.archive.org/webgroup/hadoop-env.git $ source hadoop-env/prod/setup-env.sh $ echo $HADOOP_CONF_DIR # should not be empty +` +) -`) +func main() { + flag.Usage = func() { + fmt.Println(note) } flag.Parse() client, err := hdfs.New(*nameNode) @@ -63,4 +72,85 @@ $ echo $HADOOP_CONF_DIR # should not be empty } cdxTs := names[0] log.Printf("using %s", cdxTs) + // Example seek and read. + // /user/wmdata2/cdx-all-index/20210211202455/part-a-00271-idx, 845068 lines, uncompressed + // /user/wmdata2/cdx-all-index/20210211202455/part-a-00271.gz, maybe: concatenated gzip + f, err := client.Open("/user/wmdata2/cdx-all-index/20210211202455/part-a-00271-idx") + if err != nil { + log.Fatal(err) + } + defer f.Close() + var i int + br := bufio.NewReader(f) + for { + i++ + line, err := br.ReadString('\n') + if err == io.EOF { + break + } + if err != nil { + log.Fatal(err) + } + indexLine, err := parseIndexLine(line) + if err != nil { + log.Fatal(err) + } + if i%25000 == 0 { + log.Printf("%d cdx index lines read", i) + } + if i == 100000 { + started := time.Now() + // example extraction + g, err := client.Open("/user/wmdata2/cdx-all-index/20210211202455/part-a-00271.gz") + if err != nil { + log.Fatal(err) + } + defer g.Close() + _, err = g.Seek(indexLine.Offset, io.SeekStart) + if err != nil { + log.Fatal(err) + } + lr := io.LimitReader(g, indexLine.Length) + gzr, err := gzip.NewReader(lr) + if err != nil { + log.Fatal(err) + } + n, err := io.Copy(io.Discard, gzr) + if err != nil { + log.Fatal(err) + } + log.Printf("scanned %d bytes in %v (from slice 100000)", n, time.Since(started)) + } + } +} + +// IndexLine contains CDX index fields. +type IndexLine struct { + Surt string + Date string + Name string + Offset int64 + Length int64 +} + +func parseIndexLine(s string) (*IndexLine, error) { + parts := strings.Fields(strings.TrimSpace(s)) + if len(parts) != 5 { + return nil, fmt.Errorf("invalid line: %s", s) + } + offset, err := strconv.Atoi(parts[3]) + if err != nil { + return nil, fmt.Errorf("cannot parse offset: %v", offset) + } + length, err := strconv.Atoi(parts[4]) + if err != nil { + return nil, fmt.Errorf("cannot parse length: %v", offset) + } + return &IndexLine{ + Surt: parts[0], + Date: parts[1], + Name: parts[2], + Offset: int64(offset), + Length: int64(length), + }, nil } |