aboutsummaryrefslogtreecommitdiffstats
path: root/skate/cmd
diff options
context:
space:
mode:
authorMartin Czygan <martin.czygan@gmail.com>2021-04-08 18:44:47 +0200
committerMartin Czygan <martin.czygan@gmail.com>2021-04-19 20:29:17 +0200
commitc8f2e93e1ea6542291cf977f0957ed7786f00766 (patch)
tree34808cfb7c50bac2a4f66dfd2a86b0f893604018 /skate/cmd
parentbf5ffca07b4dfbd9b1134e2a0223c0c7d27b49d9 (diff)
downloadrefcat-c8f2e93e1ea6542291cf977f0957ed7786f00766.tar.gz
refcat-c8f2e93e1ea6542291cf977f0957ed7786f00766.zip
cdx: stubby example
Diffstat (limited to 'skate/cmd')
-rw-r--r--skate/cmd/skate-cdx-lookup/main.go106
1 files changed, 98 insertions, 8 deletions
diff --git a/skate/cmd/skate-cdx-lookup/main.go b/skate/cmd/skate-cdx-lookup/main.go
index 742ca7d..2e43b8a 100644
--- a/skate/cmd/skate-cdx-lookup/main.go
+++ b/skate/cmd/skate-cdx-lookup/main.go
@@ -1,15 +1,16 @@
// skate-cdx-lookup is a lookup tool for small and large lists of URLs. We try
-// to read from HDSFs in parallel and cache some mapping information locally
+// to read from HDFS in parallel and cache some mapping information locally
// for fast access.
//
// What we want: Lookup 10-100M URLs and report, whether we have it or not.
// Also make this a bit more generic, so we can lookup all kinds of things in
// the CDX index.
//
-// Alternatives: Spark, Sparkling, PIG, Hive, ...
+// Alternatives: Spark, Sparkling, Pig, Hive, ...
//
// We take advantage of index files and sorted data. The complete dataset is
-// 66TB, gzip compressed.
+// 66TB, gzip compressed. We do not need compute to be distrubuted, as a single
+// machine may be enough to process the data.
//
// An example line:
//
@@ -17,13 +18,20 @@
//
// The index files are named part-a-00276-idx and are typically around 100M, not compressed. 900K lines, takes 1-2s to scan.
//
+// The idx files are probably concatenated gzips, otherwise we could not seek into them.
package main
import (
+ "bufio"
+ "compress/gzip"
"flag"
"fmt"
+ "io"
"log"
"sort"
+ "strconv"
+ "strings"
+ "time"
"github.com/colinmarc/hdfs"
)
@@ -31,18 +39,19 @@ import (
var (
nameNode = flag.String("nn", "", "namenode, leave empty when env is set up")
cdxDir = flag.String("C", "/user/wmdata2/cdx-all-index", "cdx dir")
-)
-func main() {
- flag.Usage = func() {
- fmt.Println(`
+ note = `
Make sure HADOOP env is set up.
$ git clone https://git.archive.org/webgroup/hadoop-env.git
$ source hadoop-env/prod/setup-env.sh
$ echo $HADOOP_CONF_DIR # should not be empty
+`
+)
-`)
+func main() {
+ flag.Usage = func() {
+ fmt.Println(note)
}
flag.Parse()
client, err := hdfs.New(*nameNode)
@@ -63,4 +72,85 @@ $ echo $HADOOP_CONF_DIR # should not be empty
}
cdxTs := names[0]
log.Printf("using %s", cdxTs)
+ // Example seek and read.
+ // /user/wmdata2/cdx-all-index/20210211202455/part-a-00271-idx, 845068 lines, uncompressed
+ // /user/wmdata2/cdx-all-index/20210211202455/part-a-00271.gz, maybe: concatenated gzip
+ f, err := client.Open("/user/wmdata2/cdx-all-index/20210211202455/part-a-00271-idx")
+ if err != nil {
+ log.Fatal(err)
+ }
+ defer f.Close()
+ var i int
+ br := bufio.NewReader(f)
+ for {
+ i++
+ line, err := br.ReadString('\n')
+ if err == io.EOF {
+ break
+ }
+ if err != nil {
+ log.Fatal(err)
+ }
+ indexLine, err := parseIndexLine(line)
+ if err != nil {
+ log.Fatal(err)
+ }
+ if i%25000 == 0 {
+ log.Printf("%d cdx index lines read", i)
+ }
+ if i == 100000 {
+ started := time.Now()
+ // example extraction
+ g, err := client.Open("/user/wmdata2/cdx-all-index/20210211202455/part-a-00271.gz")
+ if err != nil {
+ log.Fatal(err)
+ }
+ defer g.Close()
+ _, err = g.Seek(indexLine.Offset, io.SeekStart)
+ if err != nil {
+ log.Fatal(err)
+ }
+ lr := io.LimitReader(g, indexLine.Length)
+ gzr, err := gzip.NewReader(lr)
+ if err != nil {
+ log.Fatal(err)
+ }
+ n, err := io.Copy(io.Discard, gzr)
+ if err != nil {
+ log.Fatal(err)
+ }
+ log.Printf("scanned %d bytes in %v (from slice 100000)", n, time.Since(started))
+ }
+ }
+}
+
+// IndexLine contains CDX index fields.
+type IndexLine struct {
+ Surt string
+ Date string
+ Name string
+ Offset int64
+ Length int64
+}
+
+func parseIndexLine(s string) (*IndexLine, error) {
+ parts := strings.Fields(strings.TrimSpace(s))
+ if len(parts) != 5 {
+ return nil, fmt.Errorf("invalid line: %s", s)
+ }
+ offset, err := strconv.Atoi(parts[3])
+ if err != nil {
+ return nil, fmt.Errorf("cannot parse offset: %v", offset)
+ }
+ length, err := strconv.Atoi(parts[4])
+ if err != nil {
+ return nil, fmt.Errorf("cannot parse length: %v", offset)
+ }
+ return &IndexLine{
+ Surt: parts[0],
+ Date: parts[1],
+ Name: parts[2],
+ Offset: int64(offset),
+ Length: int64(length),
+ }, nil
}