aboutsummaryrefslogtreecommitdiffstats
path: root/skate/cmd/skate-cdx-lookup/main.go
blob: 9822c90976bce4b65ae6b564dea433668c75a8a2 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
// skate-cdx-lookup is a lookup tool for small and large lists of URLs. We try
// to read from HDFS in parallel and cache some mapping information locally for
// fast access.
//
// What we want: Lookup 10-100M URLs quickly and report, whether the URL is in
// GWB or not.  Also make this a bit more generic, so we can lookup other
// things in the CDX index.
//
// As of 04/2021 the CDX is split into 300 files, each around 230G, for a total
// of 70T (compressed, maybe 350T plain). Each file comes with a 90M index
// containing about 1M lines (with surt, offset, ...).
//
// Test run and tiny design:
//
// * [ ] accept sorted input only
// * [ ] get first URL, find the corresponding index file
//
// Raw index; only HTTP 200, or redirect; include everything; random URL from a
// source; popular URL; hundreds of captures; filter the dump! SURT; huge
// efficiency; PIG;
// https://git.archive.org/webgroup/sandcrawler/-/tree/master/pig
//
// Alternatives: Spark, Sparkling, Pig, Hive, Java MR, ...
//
// We take advantage of index files and sorted data. The complete dataset is
// 66TB, gzip compressed. We do not need compute to be distrubuted, as a single
// machine may be enough to process the data.
//
// An example line:
//
// org,rdnn,software,gps)/he.jpg 20050412144213 http://www.gps.software.rdnn.org:80/He.JPG image/jpeg 200 VSJNO26E43GP7OYL6BIRE4IXSIOMHZA5 - - 3943 43865977 ED_crawl28.20050412080854-c/ED_crawl28.20050412144103.arc.gz
//
// The index files are named part-a-00276-idx and are typically around 100M, not compressed. 900K lines, takes 1-2s to scan.
//
// The idx files are probably concatenated gzips, otherwise we could not seek into them.
package main

import (
	"bufio"
	"compress/gzip"
	"flag"
	"fmt"
	"io"
	"log"
	"sort"
	"strconv"
	"strings"
	"time"

	"github.com/colinmarc/hdfs"
)

var (
	nameNode = flag.String("nn", "", "namenode, leave empty when env is set up")
	cdxDir   = flag.String("C", "/user/wmdata2/cdx-all-index", "cdx dir")

	note = `
Make sure HADOOP env is set up.

$ git clone https://git.archive.org/webgroup/hadoop-env.git
$ source hadoop-env/prod/setup-env.sh
$ echo $HADOOP_CONF_DIR # should not be empty
`
)

func main() {
	flag.Usage = func() {
		fmt.Println(note)
	}
	flag.Parse()
	client, err := hdfs.New(*nameNode)
	if err != nil {
		log.Fatal(err)
	}
	fis, err := client.ReadDir(*cdxDir)
	if err != nil {
		log.Fatal(err)
	}
	var names []string
	for _, fi := range fis {
		names = append(names, fi.Name())
	}
	sort.Strings(names)
	if len(names) == 0 {
		log.Fatalf("missing files: %s", *cdxDir)
	}
	cdxTs := names[0]
	log.Printf("using %s", cdxTs)
	// Example seek and read.
	// /user/wmdata2/cdx-all-index/20210211202455/part-a-00271-idx, 845068 lines, uncompressed
	// /user/wmdata2/cdx-all-index/20210211202455/part-a-00271.gz, maybe: concatenated gzip
	f, err := client.Open("/user/wmdata2/cdx-all-index/20210211202455/part-a-00271-idx")
	if err != nil {
		log.Fatal(err)
	}
	defer f.Close()
	var i int
	br := bufio.NewReader(f)
	for {
		i++
		line, err := br.ReadString('\n')
		if err == io.EOF {
			break
		}
		if err != nil {
			log.Fatal(err)
		}
		indexLine, err := parseIndexLine(line)
		if err != nil {
			log.Fatal(err)
		}
		if i%25000 == 0 {
			log.Printf("%d cdx index lines read", i)
		}
		if i == 100000 {
			started := time.Now()
			// example extraction
			g, err := client.Open("/user/wmdata2/cdx-all-index/20210211202455/part-a-00271.gz")
			if err != nil {
				log.Fatal(err)
			}
			defer g.Close()
			_, err = g.Seek(indexLine.Offset, io.SeekStart)
			if err != nil {
				log.Fatal(err)
			}
			lr := io.LimitReader(g, indexLine.Length)
			gzr, err := gzip.NewReader(lr)
			if err != nil {
				log.Fatal(err)
			}
			n, err := io.Copy(io.Discard, gzr)
			if err != nil {
				log.Fatal(err)
			}
			log.Printf("scanned %d bytes in %v (from slice 100000)", n, time.Since(started))
		}
	}
}

// IndexLine contains CDX index fields.
type IndexLine struct {
	Surt   string
	Date   string
	Name   string
	Offset int64
	Length int64
}

func parseIndexLine(s string) (*IndexLine, error) {
	parts := strings.Fields(strings.TrimSpace(s))
	if len(parts) != 5 {
		return nil, fmt.Errorf("invalid line: %s", s)
	}
	offset, err := strconv.Atoi(parts[3])
	if err != nil {
		return nil, fmt.Errorf("cannot parse offset: %v", offset)
	}
	length, err := strconv.Atoi(parts[4])
	if err != nil {
		return nil, fmt.Errorf("cannot parse length: %v", offset)
	}
	return &IndexLine{
		Surt:   parts[0],
		Date:   parts[1],
		Name:   parts[2],
		Offset: int64(offset),
		Length: int64(length),
	}, nil
}