aboutsummaryrefslogtreecommitdiffstats
path: root/sql/backfill/backfill_cdx.py
blob: f92950263ae122e6f9b3618c6dd208142801e0c8 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
#!/usr/bin/env python3
"""
This is a "one-time" tranform helper script for CDX backfill into sandcrawler
postgresql.

Most of this file was copied from '../python/common.py'.
"""

import json, os, sys, collections
import base64
import psycopg2
import psycopg2.extras

NORMAL_MIME = (
    'application/pdf',
    'application/postscript',
    'text/html',
    'text/xml',
)

def normalize_mime(raw):
    raw = raw.lower()
    for norm in NORMAL_MIME:
        if raw.startswith(norm):
            return norm

    # Special cases
    if raw.startswith('application/xml'):
        return 'text/xml'
    if raw.startswith('application/x-pdf'):
        return 'application/pdf'
    return None


def test_normalize_mime():
    assert normalize_mime("asdf") is None
    assert normalize_mime("application/pdf") == "application/pdf"
    assert normalize_mime("application/pdf+journal") == "application/pdf"
    assert normalize_mime("Application/PDF") == "application/pdf"
    assert normalize_mime("application/p") is None
    assert normalize_mime("application/xml+stuff") == "text/xml"
    assert normalize_mime("application/x-pdf") == "application/pdf"
    assert normalize_mime("application/x-html") is None

def b32_hex(s):
    s = s.strip().split()[0].lower()
    if s.startswith("sha1:"):
        s = s[5:]
    if len(s) != 32:
        return s
    return base64.b16encode(base64.b32decode(s.upper())).lower().decode('utf-8')


def parse_cdx_line(raw_cdx):

    cdx = raw_cdx.split()
    if len(cdx) < 11:
        return None

    surt = cdx[0]
    dt = cdx[1]
    url = cdx[2]
    mime = normalize_mime(cdx[3])
    http_status = cdx[4]
    key = cdx[5]
    c_size = cdx[8]
    offset = cdx[9]
    warc = cdx[10]

    if not (key.isalnum() and c_size.isdigit() and offset.isdigit()
            and http_status == "200" and len(key) == 32 and dt.isdigit()
            and mime != None):
        return None

    if '-' in (surt, dt, url, mime, http_status, key, c_size, offset, warc):
        return None

    # these are the new/specific bits
    sha1 = b32_hex(key)
    return dict(url=url, datetime=dt, sha1hex=sha1, cdx_sha1hex=None, mimetype=mime, warc_path=warc, warc_csize=int(c_size), warc_offset=int(offset))

def insert(cur, batch):
    sql = """
        INSERT INTO
        cdx (url, datetime, sha1hex, mimetype, warc_path, warc_csize, warc_offset)
        VALUES %s
        ON CONFLICT ON CONSTRAINT cdx_pkey DO NOTHING
        RETURNING 1;
    """
    batch = [(d['url'], d['datetime'], d['sha1hex'], d['mimetype'],
              d['warc_path'], d['warc_csize'], d['warc_offset'])
             for d in batch]
    res = psycopg2.extras.execute_values(cur, sql, batch) # fetch=True
    #return len(res)

def stdin_to_pg():
    # no host means it will use local domain socket by default
    conn = psycopg2.connect(database="sandcrawler", user="postgres")
    cur = conn.cursor()
    counts = collections.Counter({'total': 0})
    batch = []
    for l in sys.stdin:
        l = l.strip()
        if counts['raw_lines'] > 0 and counts['raw_lines'] % 10000 == 0:
            print("Progress: {}...".format(counts))
        counts['raw_lines'] += 1
        if not l:
            continue
        info = parse_cdx_line(l)
        if not info:
            continue
        # XXX: filter to, eg, PDF or octet/stream (derp)
        batch.append(info)
        counts['total'] += 1
        if len(batch) >= 1000:
            insert(cur, batch)
            conn.commit()
            #counts['inserted'] += i
            #counts['existing'] += len(batch) - i
            batch = []
            counts['batches'] += 1
    if batch:
        insert(cur, batch)
        #counts['inserted'] += i
        #counts['existing'] += len(batch) - i
        batch = []
    conn.commit()
    cur.close()
    print("Done: {}".format(counts))

if __name__=='__main__':
    stdin_to_pg()