1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
|
import base64
import magic
import hashlib
import datetime
def gen_file_metadata(blob):
"""
Takes a file blob (bytestream) and returns hashes and other metadata.
Returns a dict: size_bytes, md5hex, sha1hex, sha256hex, mimetype
"""
assert blob
mimetype = magic.Magic(mime=True).from_buffer(blob)
hashes = [
hashlib.sha1(),
hashlib.sha256(),
hashlib.md5(),
]
for h in hashes:
h.update(blob)
return dict(
size_bytes=len(blob),
sha1hex=hashes[0].hexdigest(),
sha256hex=hashes[1].hexdigest(),
md5hex=hashes[2].hexdigest(),
mimetype=mimetype,
)
def b32_hex(s):
"""
Converts a base32-encoded SHA-1 checksum into hex-encoded
base32 checksums are used by, eg, heritrix and in wayback CDX files
"""
s = s.strip().split()[0].lower()
if s.startswith("sha1:"):
s = s[5:]
if len(s) != 32:
if len(s) == 40:
return s
raise ValueError("not a base-32 encoded SHA-1 hash: {}".format(s))
return base64.b16encode(base64.b32decode(s.upper())).lower().decode('utf-8')
NORMAL_MIME = (
'application/pdf',
'application/postscript',
'text/html',
'text/xml',
)
def normalize_mime(raw):
raw = raw.lower()
for norm in NORMAL_MIME:
if raw.startswith(norm):
return norm
# Special cases
if raw.startswith('application/xml'):
return 'text/xml'
if raw.startswith('application/x-pdf'):
return 'application/pdf'
return None
def test_normalize_mime():
assert normalize_mime("asdf") is None
assert normalize_mime("application/pdf") == "application/pdf"
assert normalize_mime("application/pdf+journal") == "application/pdf"
assert normalize_mime("Application/PDF") == "application/pdf"
assert normalize_mime("application/p") is None
assert normalize_mime("application/xml+stuff") == "text/xml"
assert normalize_mime("application/x-pdf") == "application/pdf"
assert normalize_mime("application/x-html") is None
def parse_cdx_line(raw_cdx, normalize=True):
cdx = raw_cdx.split()
if len(cdx) < 11:
return None
surt = cdx[0]
dt = cdx[1]
url = cdx[2]
mime = normalize_mime(cdx[3])
if normalize:
mime = normalize_mime(mime)
http_status = cdx[4]
sha1b32 = cdx[5]
c_size = cdx[8]
offset = cdx[9]
warc = cdx[10]
if not (sha1b32.isalnum() and c_size.isdigit() and offset.isdigit()
and len(sha1b32) == 32 and dt.isdigit()):
return None
if '-' in (surt, dt, url, http_status, sha1b32, c_size, offset, warc):
return None
if mime is None or mime == '-':
mime = "application/octet-stream"
sha1hex = b32_hex(sha1b32)
http_status = int(http_status)
c_size = int(c_size)
offset = int(offset)
return dict(
surt=surt,
url=url,
datetime=dt,
mimetype=mime,
http_status=http_status,
sha1b32=sha1b32,
sha1hex=sha1hex,
c_size=c_size,
offset=offset,
warc=warc,
)
def parse_cdx_datetime(dt_str):
try:
return datetime.strptime(dt_str, "%Y%m%d%H%M%S")
except Exception:
return None
|