1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
|
#!/usr/bin/env python3
"""
This is a "one-time" tranform helper script for CDX backfill into sandcrawler
postgresql.
Most of this file was copied from '../python/common.py'.
"""
import json, os, sys, collections
import base64
import psycopg2
import psycopg2.extras
NORMAL_MIME = (
'application/pdf',
'application/postscript',
'text/html',
'text/xml',
)
def normalize_mime(raw):
raw = raw.lower()
for norm in NORMAL_MIME:
if raw.startswith(norm):
return norm
# Special cases
if raw.startswith('application/xml'):
return 'text/xml'
if raw.startswith('application/x-pdf'):
return 'application/pdf'
return None
def test_normalize_mime():
assert normalize_mime("asdf") is None
assert normalize_mime("application/pdf") == "application/pdf"
assert normalize_mime("application/pdf+journal") == "application/pdf"
assert normalize_mime("Application/PDF") == "application/pdf"
assert normalize_mime("application/p") is None
assert normalize_mime("application/xml+stuff") == "text/xml"
assert normalize_mime("application/x-pdf") == "application/pdf"
assert normalize_mime("application/x-html") is None
def b32_hex(s):
s = s.strip().split()[0].lower()
if s.startswith("sha1:"):
s = s[5:]
if len(s) != 32:
return s
return base64.b16encode(base64.b32decode(s.upper())).lower().decode('utf-8')
def parse_cdx_line(raw_cdx):
cdx = raw_cdx.split()
if len(cdx) < 11:
return None
surt = cdx[0]
dt = cdx[1]
url = cdx[2]
mime = normalize_mime(cdx[3])
http_status = cdx[4]
key = cdx[5]
c_size = cdx[8]
offset = cdx[9]
warc = cdx[10]
if not (key.isalnum() and c_size.isdigit() and offset.isdigit()
and http_status == "200" and len(key) == 32 and dt.isdigit()
and mime != None):
return None
if '-' in (surt, dt, url, mime, http_status, key, c_size, offset, warc):
return None
# these are the new/specific bits
sha1 = b32_hex(key)
return dict(url=url, datetime=dt, sha1hex=sha1, cdx_sha1hex=None, mimetype=mime, warc_path=warc, warc_csize=int(c_size), warc_offset=int(offset))
def insert(cur, batch):
sql = """
INSERT INTO
cdx (url, datetime, sha1hex, mimetype, warc_path, warc_csize, warc_offset)
VALUES %s
ON CONFLICT ON CONSTRAINT cdx_pkey DO NOTHING
RETURNING 1;
"""
batch = [(d['url'], d['datetime'], d['sha1hex'], d['mimetype'],
d['warc_path'], d['warc_csize'], d['warc_offset'])
for d in batch]
res = psycopg2.extras.execute_values(cur, sql, batch) # fetch=True
#return len(res)
def stdin_to_pg():
# no host means it will use local domain socket by default
conn = psycopg2.connect(database="sandcrawler", user="postgres")
cur = conn.cursor()
counts = collections.Counter({'total': 0})
batch = []
for l in sys.stdin:
l = l.strip()
if counts['raw_lines'] > 0 and counts['raw_lines'] % 10000 == 0:
print("Progress: {}...".format(counts))
counts['raw_lines'] += 1
if not l:
continue
info = parse_cdx_line(l)
if not info:
continue
batch.append(info)
counts['total'] += 1
if len(batch) >= 1000:
insert(cur, batch)
conn.commit()
#counts['inserted'] += i
#counts['existing'] += len(batch) - i
batch = []
counts['batches'] += 1
if batch:
insert(cur, batch)
#counts['inserted'] += i
#counts['existing'] += len(batch) - i
batch = []
conn.commit()
cur.close()
print("Done: {}".format(counts))
if __name__=='__main__':
stdin_to_pg()
|