diff options
| author | Bryan Newbold <bnewbold@archive.org> | 2019-08-09 16:57:52 -0700 | 
|---|---|---|
| committer | Bryan Newbold <bnewbold@archive.org> | 2019-08-09 16:57:52 -0700 | 
| commit | db711e1ac9e4b8ba8c9d5229bf310e89ffc27a47 (patch) | |
| tree | d977dfee48cfd947aec3c424733aebe74646d25e /postgrest/backfill/backfill_cdx.py | |
| parent | 9e4657d49dd91f1249042865505d1a9ea8ad2ea6 (diff) | |
| download | sandcrawler-db711e1ac9e4b8ba8c9d5229bf310e89ffc27a47.tar.gz sandcrawler-db711e1ac9e4b8ba8c9d5229bf310e89ffc27a47.zip | |
move postgres/rest directory
Diffstat (limited to 'postgrest/backfill/backfill_cdx.py')
| -rwxr-xr-x | postgrest/backfill/backfill_cdx.py | 131 | 
1 files changed, 131 insertions, 0 deletions
| diff --git a/postgrest/backfill/backfill_cdx.py b/postgrest/backfill/backfill_cdx.py new file mode 100755 index 0000000..1c452ca --- /dev/null +++ b/postgrest/backfill/backfill_cdx.py @@ -0,0 +1,131 @@ +#!/usr/bin/env python3 +""" +This is a "one-time" tranform helper script for CDX backfill into sandcrawler +postgresql. + +Most of this file was copied from '../python/common.py'. +""" + +import json, os, sys, collections +import base64 +import psycopg2 +import psycopg2.extras + +NORMAL_MIME = ( +    'application/pdf', +    'application/postscript', +    'text/html', +    'text/xml', +) + +def normalize_mime(raw): +    raw = raw.lower() +    for norm in NORMAL_MIME: +        if raw.startswith(norm): +            return norm + +    # Special cases +    if raw.startswith('application/xml'): +        return 'text/xml' +    if raw.startswith('application/x-pdf'): +        return 'application/pdf' +    return None + + +def test_normalize_mime(): +    assert normalize_mime("asdf") is None +    assert normalize_mime("application/pdf") == "application/pdf" +    assert normalize_mime("application/pdf+journal") == "application/pdf" +    assert normalize_mime("Application/PDF") == "application/pdf" +    assert normalize_mime("application/p") is None +    assert normalize_mime("application/xml+stuff") == "text/xml" +    assert normalize_mime("application/x-pdf") == "application/pdf" +    assert normalize_mime("application/x-html") is None + +def b32_hex(s): +    s = s.strip().split()[0].lower() +    if s.startswith("sha1:"): +        s = s[5:] +    if len(s) != 32: +        return s +    return base64.b16encode(base64.b32decode(s.upper())).lower().decode('utf-8') + + +def parse_cdx_line(raw_cdx): + +    cdx = raw_cdx.split() +    if len(cdx) < 11: +        return None + +    surt = cdx[0] +    dt = cdx[1] +    url = cdx[2] +    mime = normalize_mime(cdx[3]) +    http_status = cdx[4] +    key = cdx[5] +    c_size = cdx[8] +    offset = cdx[9] +    warc = cdx[10] + +    if not (key.isalnum() and c_size.isdigit() and offset.isdigit() +            and http_status == "200" and len(key) == 32 and dt.isdigit() +            and mime != None): +        return None + +    if '-' in (surt, dt, url, mime, http_status, key, c_size, offset, warc): +        return None + +    # these are the new/specific bits +    sha1 = b32_hex(key) +    return dict(url=url, datetime=dt, sha1hex=sha1, cdx_sha1hex=None, mimetype=mime, warc_path=warc, warc_csize=int(c_size), warc_offset=int(offset)) + +def insert(cur, batch): +    sql = """ +        INSERT INTO +        cdx (url, datetime, sha1hex, mimetype, warc_path, warc_csize, warc_offset) +        VALUES %s +        ON CONFLICT ON CONSTRAINT cdx_pkey DO NOTHING +        RETURNING 1; +    """ +    batch = [(d['url'], d['datetime'], d['sha1hex'], d['mimetype'], +              d['warc_path'], d['warc_csize'], d['warc_offset']) +             for d in batch] +    res = psycopg2.extras.execute_values(cur, sql, batch) # fetch=True +    #return len(res) + +def stdin_to_pg(): +    # no host means it will use local domain socket by default +    conn = psycopg2.connect(database="sandcrawler", user="postgres") +    cur = conn.cursor() +    counts = collections.Counter({'total': 0}) +    batch = [] +    for l in sys.stdin: +        l = l.strip() +        if counts['raw_lines'] > 0 and counts['raw_lines'] % 10000 == 0: +            print("Progress: {}...".format(counts)) +        counts['raw_lines'] += 1 +        if not l: +            continue +        info = parse_cdx_line(l) +        if not info: +            continue +        batch.append(info) +        counts['total'] += 1 +        if len(batch) >= 1000: +            insert(cur, batch) +            conn.commit() +            #counts['inserted'] += i +            #counts['existing'] += len(batch) - i +            batch = [] +            counts['batches'] += 1 +    if batch: +        insert(cur, batch) +        #counts['inserted'] += i +        #counts['existing'] += len(batch) - i +        batch = [] +    conn.commit() +    cur.close() +    print("Done: {}".format(counts)) + +if __name__=='__main__': +    stdin_to_pg() | 
