diff options
Diffstat (limited to 'postgresql')
| -rw-r--r-- | postgresql/backfill.md | 117 | ||||
| -rwxr-xr-x | postgresql/backfill_cdx.py | 131 | ||||
| -rwxr-xr-x | postgresql/backfill_file_meta.py | 55 | ||||
| -rwxr-xr-x | postgresql/backfill_grobid.py | 91 | ||||
| -rwxr-xr-x | postgresql/filter_transform_cdx.py | 88 | ||||
| -rwxr-xr-x | postgresql/petabox_transform.py | 24 | 
6 files changed, 506 insertions, 0 deletions
| diff --git a/postgresql/backfill.md b/postgresql/backfill.md new file mode 100644 index 0000000..f97c901 --- /dev/null +++ b/postgresql/backfill.md @@ -0,0 +1,117 @@ + +SQL Backfill Notes +----------------------- + +GROBID is going to be somewhat complex. + +TODO: +x CDX backfill script (CDX to postgresql direct, bulk inserts, python) +x `file_meta` bulk insert (TSV to postgresql direct, bulk upserts, python) +x GROBID insert (python, dump TSV to minio then postgresql) + +## `cdx` + +    #cat example.cdx | rg ' 200 ' | cut -d' ' -f2,3,4,6,9,10,11 +    #cat example.cdx | rg ' 200 ' | awk '{print $6 "\t" $3 "\t" $2 "\t" $4 "\t\t" $6 "\t" $11 "\t" $9 "\t" $10}' | b32_hex.py | awk '{print $2 "\t" $3 "\t" $4 "\t" $1 "\t" $6 "\t" $7 "\t" $8}' > cdx.example.tsv +    cat example.cdx | ./filter_transform_cdx.py > cdx.example.tsv + +    COPY cdx (url, datetime, sha1hex, mimetype, warc_path, warc_csize, warc_offset) FROM '/sandcrawler-db/backfill/cdx/cdx.example.tsv' WITH (FORMAT TEXT, DELIMITER E'\t', NULL ''); + +Big HDFS import: + +    # but actually didn't import those; don't want to need to re-import +    hdfs dfs -get journal_crawl_cdx/* + +    cat citeseerx_crawl_2017.cdx | rg ' 200 ' | ./filter_transform_cdx.py > cdx.citeseerx_crawl_2017.tsv +    cat gwb-pdf-20171227034923-surt-filter/* | rg ' 200 ' | ./filter_transform_cdx.py > gwb-pdf-20171227034923-surt-filter.tsv +    cat UNPAYWALL-PDF-CRAWL-2018-07.filtered.cdx | rg ' 200 ' | ./filter_transform_cdx.py > cdx.UNPAYWALL-PDF-CRAWL-2018-07.filtered.tsv +    cat MSAG-PDF-CRAWL-2017.cdx | rg ' 200 ' | ./filter_transform_cdx.py > cdx.MSAG-PDF-CRAWL-2017.tsv + +    cat CORE-UPSTREAM-CRAWL-2018-11.sorted.cdx | rg ' 200 ' | ./filter_transform_cdx.py > cdx.CORE-UPSTREAM-CRAWL-2018-11.sorted.tsv +    cat DIRECT-OA-CRAWL-2019.pdfs.cdx | rg ' 200 ' | ./filter_transform_cdx.py > cdx.DIRECT-OA-CRAWL-2019.pdfs.tsv +    cat DOI-LANDING-CRAWL-2018-06.200_pdf.cdx | rg ' 200 ' | ./filter_transform_cdx.py > cdx.DOI-LANDING-CRAWL-2018-06.200_pdf.tsv +    cat OA-JOURNAL-TESTCRAWL-TWO-2018.pdf.cdx | rg ' 200 ' | ./filter_transform_cdx.py > cdx.OA-JOURNAL-TESTCRAWL-TWO-2018.pdf.tsv +    cat SEMSCHOLAR-PDF-CRAWL-2017.cdx | rg ' 200 ' | ./filter_transform_cdx.py > cdx.SEMSCHOLAR-PDF-CRAWL-2017.tsv +    cat TARGETED-PDF-CRAWL-2017.cdx | rg ' 200 ' | ./filter_transform_cdx.py > cdx.TARGETED-PDF-CRAWL-2017.tsv +    cat UNPAYWALL-PDF-CRAWL-2019-04.pdfs_sorted.cdx | rg ' 200 ' | ./filter_transform_cdx.py > cdx.UNPAYWALL-PDF-CRAWL-2019-04.pdfs_sorted.tsv + +TODO: nasty escaping? + +In psql: + +    COPY cdx (url, datetime, sha1hex, mimetype, warc_path, warc_csize, warc_offset) FROM '/sandcrawler-db/backfill/cdx/cdx.UNPAYWALL-PDF-CRAWL-2018-07.filtered.tsv' WITH (FORMAT TEXT, DELIMITER E'\t', NULL ''); +    # ERROR +    COPY cdx (url, datetime, sha1hex, mimetype, warc_path, warc_csize, warc_offset) FROM '/sandcrawler-db/backfill/cdx/cdx.MSAG-PDF-CRAWL-2017.tsv' WITH (FORMAT TEXT, DELIMITER E'\t', NULL ''); +    # ERROR +    COPY cdx (url, datetime, sha1hex, mimetype, warc_path, warc_csize, warc_offset) FROM '/sandcrawler-db/backfill/cdx/cdx.citeseerx_crawl_2017.tsv' WITH (FORMAT TEXT, DELIMITER E'\t', NULL ''); +    # COPY 1653840 +    COPY cdx (url, datetime, sha1hex, mimetype, warc_path, warc_csize, warc_offset) FROM '/sandcrawler-db/backfill/cdx/cdx.CORE-UPSTREAM-CRAWL-2018-11.sorted.tsv' WITH (FORMAT TEXT, DELIMITER E'\t', NULL ''); +    # COPY 2827563 +    COPY cdx (url, datetime, sha1hex, mimetype, warc_path, warc_csize, warc_offset) FROM '/sandcrawler-db/backfill/cdx/cdx.DIRECT-OA-CRAWL-2019.pdfs.tsv' WITH (FORMAT TEXT, DELIMITER E'\t', NULL ''); +    # COPY 10651736 +    COPY cdx (url, datetime, sha1hex, mimetype, warc_path, warc_csize, warc_offset) FROM '/sandcrawler-db/backfill/cdx/cdx.DOI-LANDING-CRAWL-2018-06.200_pdf.tsv' WITH (FORMAT TEXT, DELIMITER E'\t', NULL ''); +    # COPY 768565 +    COPY cdx (url, datetime, sha1hex, mimetype, warc_path, warc_csize, warc_offset) FROM '/sandcrawler-db/backfill/cdx/cdx.OA-JOURNAL-TESTCRAWL-TWO-2018.pdf.tsv' WITH (FORMAT TEXT, DELIMITER E'\t', NULL ''); +    # COPY 5310017 +    COPY cdx (url, datetime, sha1hex, mimetype, warc_path, warc_csize, warc_offset) FROM '/sandcrawler-db/backfill/cdx/cdx.SEMSCHOLAR-PDF-CRAWL-2017.tsv' WITH (FORMAT TEXT, DELIMITER E'\t', NULL ''); +    # COPY 2219839 +    COPY cdx (url, datetime, sha1hex, mimetype, warc_path, warc_csize, warc_offset) FROM '/sandcrawler-db/backfill/cdx/cdx.TARGETED-PDF-CRAWL-2017.tsv' WITH (FORMAT TEXT, DELIMITER E'\t', NULL ''); +    # ERROR +    COPY cdx (url, datetime, sha1hex, mimetype, warc_path, warc_csize, warc_offset) FROM '/sandcrawler-db/backfill/cdx/cdx.UNPAYWALL-PDF-CRAWL-2019-04.pdfs_sorted.tsv' WITH (FORMAT TEXT, DELIMITER E'\t', NULL ''); +    # ERROR + +NOTE: these largely didn't work; will need to write a batch importer. + +Batch import process: + +    cat UNPAYWALL-PDF-CRAWL-2018-07.filtered.cdx MSAG-PDF-CRAWL-2017.cdx TARGETED-PDF-CRAWL-2017.cdx UNPAYWALL-PDF-CRAWL-2019-04.pdfs_sorted.cdx | ./backfill_cdx.py + +## `fatcat_file` + +    zcat file_export.2019-07-07.json.gz | pv -l | jq -r 'select(.sha1 != null) | [.sha1, .ident, .release_ids[0]] | @tsv' | sort -S 8G | uniq -w 40 > /sandcrawler-db/backfill/fatcat_file.2019-07-07.tsv + +In psql: + +    COPY fatcat_file FROM '/sandcrawler-db/backfill/fatcat_file.2019-07-07.tsv' WITH (FORMAT TEXT, DELIMITER E'\t', NULL ''); +    # => COPY 24727350 + +## `file_meta` + +    zcat /fast/download/file_export.2019-07-07.json.gz | pv -l | jq -r 'select(.md5 != null) | [.sha1, .sha256, .md5, .size, .mimetype] | @tsv' | sort -S 8G | uniq -w 40 > /sandcrawler-db/backfill/file_meta.2019-07-07.tsv + +In psql: + +    COPY file_meta FROM '/sandcrawler-db/backfill/file_meta.2019-07-07.tsv' WITH (FORMAT TEXT, DELIMITER E'\t', NULL ''); +    # -> COPY 5860092 + +## `petabox` + +    zcat /fast/download/file_export.2019-07-07.json.gz | rg '//archive.org/' | pigz > /fast/download/file_export.2019-07-07.petabox.json.gz +    zcat /fast/download/file_export.2019-07-07.petabox.json.gz | ./petabox_transform.py | sort -u -S 8G | awk '{print $3 "\t" $1 "\t" $2}' | uniq -s40 | awk '{print $2 "\t" $3 "\t" $1}' > petabox.fatcat_2019-07-07.tsv + +In psql: + +    COPY petabox FROM '/sandcrawler-db/backfill/petabox.fatcat_2019-07-07.tsv' WITH (FORMAT TEXT, DELIMITER E'\t', NULL ''); +    # -> COPY 2887834 + +## `grobid` + +Quick test: + +    zcat /bigger/unpaywall-transfer/2019-07-17-1741.30-dumpgrobidxml/part-00000.gz | cut -f2 | head | ./backfill_grobid.py + +Run big batch: + +    ls /bigger/unpaywall-transfer/2019-07-17-1741.30-dumpgrobidxml/part*gz |arallel --progress -j8 'zcat {} | cut -f2 | ./backfill_grobid.py' + +## rough table sizes + +                              table_name                          | table_size | indexes_size | total_size  +    --------------------------------------------------------------+------------+--------------+------------ +     "public"."cdx"                                               | 11 GB      | 8940 MB      | 20 GB +     "public"."shadow"                                            | 8303 MB    | 7205 MB      | 15 GB +     "public"."fatcat_file"                                       | 5206 MB    | 2094 MB      | 7300 MB +     "public"."file_meta"                                         | 814 MB     | 382 MB       | 1196 MB +     "public"."petabox"                                           | 403 MB     | 594 MB       | 997 MB +    [...] + diff --git a/postgresql/backfill_cdx.py b/postgresql/backfill_cdx.py new file mode 100755 index 0000000..1c452ca --- /dev/null +++ b/postgresql/backfill_cdx.py @@ -0,0 +1,131 @@ +#!/usr/bin/env python3 +""" +This is a "one-time" tranform helper script for CDX backfill into sandcrawler +postgresql. + +Most of this file was copied from '../python/common.py'. +""" + +import json, os, sys, collections +import base64 +import psycopg2 +import psycopg2.extras + +NORMAL_MIME = ( +    'application/pdf', +    'application/postscript', +    'text/html', +    'text/xml', +) + +def normalize_mime(raw): +    raw = raw.lower() +    for norm in NORMAL_MIME: +        if raw.startswith(norm): +            return norm + +    # Special cases +    if raw.startswith('application/xml'): +        return 'text/xml' +    if raw.startswith('application/x-pdf'): +        return 'application/pdf' +    return None + + +def test_normalize_mime(): +    assert normalize_mime("asdf") is None +    assert normalize_mime("application/pdf") == "application/pdf" +    assert normalize_mime("application/pdf+journal") == "application/pdf" +    assert normalize_mime("Application/PDF") == "application/pdf" +    assert normalize_mime("application/p") is None +    assert normalize_mime("application/xml+stuff") == "text/xml" +    assert normalize_mime("application/x-pdf") == "application/pdf" +    assert normalize_mime("application/x-html") is None + +def b32_hex(s): +    s = s.strip().split()[0].lower() +    if s.startswith("sha1:"): +        s = s[5:] +    if len(s) != 32: +        return s +    return base64.b16encode(base64.b32decode(s.upper())).lower().decode('utf-8') + + +def parse_cdx_line(raw_cdx): + +    cdx = raw_cdx.split() +    if len(cdx) < 11: +        return None + +    surt = cdx[0] +    dt = cdx[1] +    url = cdx[2] +    mime = normalize_mime(cdx[3]) +    http_status = cdx[4] +    key = cdx[5] +    c_size = cdx[8] +    offset = cdx[9] +    warc = cdx[10] + +    if not (key.isalnum() and c_size.isdigit() and offset.isdigit() +            and http_status == "200" and len(key) == 32 and dt.isdigit() +            and mime != None): +        return None + +    if '-' in (surt, dt, url, mime, http_status, key, c_size, offset, warc): +        return None + +    # these are the new/specific bits +    sha1 = b32_hex(key) +    return dict(url=url, datetime=dt, sha1hex=sha1, cdx_sha1hex=None, mimetype=mime, warc_path=warc, warc_csize=int(c_size), warc_offset=int(offset)) + +def insert(cur, batch): +    sql = """ +        INSERT INTO +        cdx (url, datetime, sha1hex, mimetype, warc_path, warc_csize, warc_offset) +        VALUES %s +        ON CONFLICT ON CONSTRAINT cdx_pkey DO NOTHING +        RETURNING 1; +    """ +    batch = [(d['url'], d['datetime'], d['sha1hex'], d['mimetype'], +              d['warc_path'], d['warc_csize'], d['warc_offset']) +             for d in batch] +    res = psycopg2.extras.execute_values(cur, sql, batch) # fetch=True +    #return len(res) + +def stdin_to_pg(): +    # no host means it will use local domain socket by default +    conn = psycopg2.connect(database="sandcrawler", user="postgres") +    cur = conn.cursor() +    counts = collections.Counter({'total': 0}) +    batch = [] +    for l in sys.stdin: +        l = l.strip() +        if counts['raw_lines'] > 0 and counts['raw_lines'] % 10000 == 0: +            print("Progress: {}...".format(counts)) +        counts['raw_lines'] += 1 +        if not l: +            continue +        info = parse_cdx_line(l) +        if not info: +            continue +        batch.append(info) +        counts['total'] += 1 +        if len(batch) >= 1000: +            insert(cur, batch) +            conn.commit() +            #counts['inserted'] += i +            #counts['existing'] += len(batch) - i +            batch = [] +            counts['batches'] += 1 +    if batch: +        insert(cur, batch) +        #counts['inserted'] += i +        #counts['existing'] += len(batch) - i +        batch = [] +    conn.commit() +    cur.close() +    print("Done: {}".format(counts)) + +if __name__=='__main__': +    stdin_to_pg() diff --git a/postgresql/backfill_file_meta.py b/postgresql/backfill_file_meta.py new file mode 100755 index 0000000..e3b40a0 --- /dev/null +++ b/postgresql/backfill_file_meta.py @@ -0,0 +1,55 @@ +#!/usr/bin/env python3 +""" +This is a "one-time" tranform helper script for file_meta backfill into +sandcrawler postgresql. + +Most of this file was copied from '../python/common.py'. +""" + +import json, os, sys, collections +import psycopg2 +import psycopg2.extras + + +def insert(cur, batch): +    sql = """ +        INSERT INTO +        file_meta +        VALUES %s +        ON CONFLICT DO NOTHING; +    """ +    res = psycopg2.extras.execute_values(cur, sql, batch) + +def stdin_to_pg(): +    # no host means it will use local domain socket by default +    conn = psycopg2.connect(database="sandcrawler", user="postgres") +    cur = conn.cursor() +    counts = collections.Counter({'total': 0}) +    batch = [] +    for l in sys.stdin: +        if counts['raw_lines'] > 0 and counts['raw_lines'] % 10000 == 0: +            print("Progress: {}...".format(counts)) +        counts['raw_lines'] += 1 +        if not l.strip(): +            continue +        info = l.split("\t") +        if not info: +            continue +        assert len(info) == 5 +        info[-1] = info[-1].strip() or None +        batch.append(info) +        counts['total'] += 1 +        if len(batch) >= 1000: +            insert(cur, batch) +            conn.commit() +            batch = [] +            counts['batches'] += 1 +    if batch: +        insert(cur, batch) +        batch = [] +    conn.commit() +    cur.close() +    print("Done: {}".format(counts)) + +if __name__=='__main__': +    stdin_to_pg() diff --git a/postgresql/backfill_grobid.py b/postgresql/backfill_grobid.py new file mode 100755 index 0000000..08fad7f --- /dev/null +++ b/postgresql/backfill_grobid.py @@ -0,0 +1,91 @@ +#!/usr/bin/env python3 +""" +This is a "one-time" tranform helper script for GROBID backfill into +sandcrawler minio and postgresql. +""" + +import json, os, sys, collections, io +import base64 +import requests +from minio import Minio +import psycopg2 +import psycopg2.extras + + +def b32_hex(s): +    s = s.strip().split()[0].lower() +    if s.startswith("sha1:"): +        s = s[5:] +    if len(s) != 32: +        return s +    return base64.b16encode(base64.b32decode(s.upper())).lower().decode('utf-8') + +def insert(cur, batch): +    sql = """ +        INSERT INTO +        grobid (sha1hex, grobid_version, status_code, status, fatcat_release, metadata) +        VALUES %s +        ON CONFLICT DO NOTHING; +    """ +    batch = [(d['sha1hex'], d['grobid_version'], d['status_code'], d['status'], d['fatcat_release'], d['metadata']) +             for d in batch] +    res = psycopg2.extras.execute_values(cur, sql, batch) + +def stdin_to_pg(): +    mc = Minio('localhost:9000', +        access_key=os.environ['MINIO_ACCESS_KEY'], +        secret_key=os.environ['MINIO_SECRET_KEY'], +        secure=False) +    # no host means it will use local domain socket by default +    conn = psycopg2.connect(database="sandcrawler", user="postgres") +    cur = conn.cursor() +    counts = collections.Counter({'total': 0}) +    batch = [] +    for l in sys.stdin: +        if counts['raw_lines'] > 0 and counts['raw_lines'] % 10000 == 0: +            print("Progress: {}...".format(counts)) +        counts['raw_lines'] += 1 +        l = l.strip() +        if not l: +            continue +        row = json.loads(l) +        if not row: +            continue +        sha1hex = b32_hex(row['pdf_hash']) +        grobid_xml = row['tei_xml'].encode('utf-8') +        grobid_xml_len = len(grobid_xml) +        grobid_xml = io.BytesIO(grobid_xml) + +        key = "{}/{}/{}.tei.xml".format( +            sha1hex[0:2], +            sha1hex[2:4], +            sha1hex) +        mc.put_object("grobid", key, grobid_xml, grobid_xml_len, +            content_type="application/tei+xml", +            metadata=None) +        counts['minio-success'] += 1 + +        info = dict( +            sha1hex=sha1hex, +            grobid_version=None, # TODO +            status_code=200, +            status=None, +            fatcat_release=None, +            metadata=None, +        ) +        batch.append(info) +        counts['total'] += 1 +        if len(batch) >= 1000: +            insert(cur, batch) +            conn.commit() +            batch = [] +            counts['batches'] += 1 +    if batch: +        insert(cur, batch) +        batch = [] +    conn.commit() +    cur.close() +    print("Done: {}".format(counts)) + +if __name__=='__main__': +    stdin_to_pg() diff --git a/postgresql/filter_transform_cdx.py b/postgresql/filter_transform_cdx.py new file mode 100755 index 0000000..3507dfc --- /dev/null +++ b/postgresql/filter_transform_cdx.py @@ -0,0 +1,88 @@ +#!/usr/bin/env python3 +""" +This is a "one-time" tranform helper script for CDX backfill into sandcrawler +postgresql. + +Most of this file was copied from '../python/common.py'. +""" + +import json, os, sys +import base64 + +NORMAL_MIME = ( +    'application/pdf', +    'application/postscript', +    'text/html', +    'text/xml', +) + +def normalize_mime(raw): +    raw = raw.lower() +    for norm in NORMAL_MIME: +        if raw.startswith(norm): +            return norm + +    # Special cases +    if raw.startswith('application/xml'): +        return 'text/xml' +    if raw.startswith('application/x-pdf'): +        return 'application/pdf' +    return None + + +def test_normalize_mime(): +    assert normalize_mime("asdf") is None +    assert normalize_mime("application/pdf") == "application/pdf" +    assert normalize_mime("application/pdf+journal") == "application/pdf" +    assert normalize_mime("Application/PDF") == "application/pdf" +    assert normalize_mime("application/p") is None +    assert normalize_mime("application/xml+stuff") == "text/xml" +    assert normalize_mime("application/x-pdf") == "application/pdf" +    assert normalize_mime("application/x-html") is None + +def b32_hex(s): +    s = s.strip().split()[0].lower() +    if s.startswith("sha1:"): +        s = s[5:] +    if len(s) != 32: +        return s +    return base64.b16encode(base64.b32decode(s.upper())).lower().decode('utf-8') + + +def parse_cdx_line(raw_cdx): + +    cdx = raw_cdx.split() +    if len(cdx) < 11: +        return None + +    surt = cdx[0] +    dt = cdx[1] +    url = cdx[2] +    mime = normalize_mime(cdx[3]) +    http_status = cdx[4] +    key = cdx[5] +    c_size = cdx[8] +    offset = cdx[9] +    warc = cdx[10] + +    if not (key.isalnum() and c_size.isdigit() and offset.isdigit() +            and http_status == "200" and len(key) == 32 and dt.isdigit() +            and mime != None): +        return None + +    if '-' in (surt, dt, url, mime, http_status, key, c_size, offset, warc): +        return None + +    # these are the new/specific bits +    sha1 = b32_hex(key) +    return dict(url=url, datetime=dt, sha1hex=sha1, cdx_sha1hex=None, mimetype=mime, warc_path=warc, warc_csize=int(c_size), warc_offset=int(offset)) + +for l in sys.stdin: +    l = l.strip() +    if not l: +        continue +    info = parse_cdx_line(l) +    if not info: +        continue +    print("\t".join([info['url'], info['datetime'], info['sha1hex'], info['mimetype'], info['warc_path'], str(info['warc_csize']), str(info['warc_offset'])])) + diff --git a/postgresql/petabox_transform.py b/postgresql/petabox_transform.py new file mode 100755 index 0000000..b638911 --- /dev/null +++ b/postgresql/petabox_transform.py @@ -0,0 +1,24 @@ +#!/usr/bin/env python3 + +import json, sys, os + +for l in sys.stdin.readlines(): +    l = l.strip() +    if not l: +        continue +    r = json.loads(l) +    if not r['sha1']: +        continue +    sha1hex = r['sha1'] +    for url in r['urls']: +        u = url['url'] +        if not '//archive.org/' in u: +            continue +        u = u.split('/') +        if u[2] == 'web.archive.org': +            continue +        #print(u) +        assert u[2] == 'archive.org' and u[3] in ('download', 'serve') +        item = u[4] +        path = '/'.join(u[5:]) +        print("\t".join([item, path, sha1hex])) | 
