aboutsummaryrefslogtreecommitdiffstats
path: root/hbase_cdx_backfill/cdx_fulltext_to_hbase.py
blob: 757794a320c74d2a8ef5bf8e6552cbb9de27fbf1 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
#!/usr/bin/env python3
"""
Streaming Hadoop script to import CDX metadata into the HBase fulltext table,
primarily for URL-agnostic crawl de-duplication. Takes only "fulltext" file
formats.

Requires:
- happybase

TODO:
- argparse
- refactor into an object
- tests in separate file
- nose tests
- sentry integration for error reporting
"""

import sys
import json
import happybase

NORMAL_MIME = (
    'application/pdf',
    'application/postscript',
    'text/html',
    'text/xml',
    #'application/xml',
)

def normalize_mime(raw):
    raw = raw.lower()
    for norm in NORMAL_MIME:
        if raw.startswith(norm):
            return norm

    # Special cases 
    if raw.startswith('application/xml'):
        return 'text/xml'
    if raw.startswith('application/x-pdf'):
        return 'application/pdf'
    return None

def test_normalize_mime():
    assert normalize_mime("asdf") == None
    assert normalize_mime("application/pdf") == "application/pdf"
    assert normalize_mime("application/pdf+journal") == "application/pdf"
    assert normalize_mime("Application/PDF") == "application/pdf"
    assert normalize_mime("application/p") == None
    assert normalize_mime("application/xml+stuff") == "text/xml"

def transform_line(raw_cdx):

    cdx = raw_cdx.split()
    if len(cdx) < 11:
        return None

    surt = cdx[0]
    dt = cdx[1]
    url = cdx[2]
    mime = normalize_mime(cdx[3])
    http_status = cdx[4]
    if http_status != "200":
        return None
    key = cdx[5]
    c_size = cdx[8]
    offset = cdx[9]
    warc = cdx[10]
    info = dict(surt=surt, dt=dt, url=url, c_size=c_size, offset=offset,
        warc=warc)
    return {'key': key, 'file:mime': mime, 'file:cdx': info}

def test_transform_line():

    raw = "edu,upenn,ldc)/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf 20170828233154 https://www.ldc.upenn.edu/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf application/pdf 200 WL3FEA62TEU4F52Y5DOVQ62VET4QJW7G - - 210251 931661233 SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828231135742-00000-00009-wbgrp-svc284/SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828232253025-00005-3480~wbgrp-svc284.us.archive.org~8443.warc.gz"
    correct = {
        'key': "WL3FEA62TEU4F52Y5DOVQ62VET4QJW7G",
        'file:mime': "application/pdf",
        'file:cdx': {
            'surt': "edu,upenn,ldc)/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf",
            'url': "https://www.ldc.upenn.edu/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf",
            'dt': "20170828233154",
            'warc': "SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828231135742-00000-00009-wbgrp-svc284/SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828232253025-00005-3480~wbgrp-svc284.us.archive.org~8443.warc.gz",
            'offset': "931661233",
            'c_size': "210251",
        }
    }

    assert transform_line(raw) == correct
    assert transform_line(raw + "\n") == correct
    assert transform_line(raw + " extra_field") == correct


def run(in_lines, out_lines, status_lines, table, mime_filter=None):

    if mime_filter is None:
        mime_filter = ['application/pdf']
    count_skip = count_invalid = count_fail = count_success = 0

    for raw_cdx in in_lines.readlines():
        if (raw_cdx.startswith(' ') or raw_cdx.startswith('filedesc') or
                raw_cdx.startswith('#')):
            # Skip line
            count_skip += 1
            continue

        info = transform_line(raw_cdx)
        if info is None:
            count_invalid += 1
            continue
        if info['file:mime'] not in mime_filter:
            count_skip += 1
            continue

        key = info.pop('key')
        info['file:cdx'] = json.dumps(info['file:cdx'], sort_keys=True,
            indent=None)
        try:
            table.put(key, info)
            count_success += 1
        except:
            status_lines.write("ERROR\n") # TODO:
            count_fail += 1

    status_lines.write('\n')
    status_lines.write('skip\t{}\n'.format(count_skip))
    status_lines.write('invalid\t{}\n'.format(count_invalid))
    status_lines.write('fail\t{}\n'.format(count_fail))
    status_lines.write('success\t{}\n'.format(count_success))


def test_run():
    
    import io 
    import happybase_mock

    out = io.StringIO()
    status = io.StringIO()
    raw = io.StringIO("""
com,sagepub,cep)/content/28/9/960.full.pdf 20170705062200 http://cep.sagepub.com/content/28/9/960.full.pdf application/pdf 301 3I42H3S6NNFQ2MSVX7XZKYAYSCX5QBYJ - - 401 313356621 CITESEERX-CRAWL-2017-06-20-20170705061647307-00039-00048-wbgrp-svc284/CITESEERX-CRAWL-2017-06-20-20170705062052659-00043-31209~wbgrp-svc284.us.archive.org~8443.warc.gz
eu,eui,cadmus)/bitstream/handle/1814/36635/rscas_2015_03.pdf;jsessionid=761393014319a39f40d32ae3eb3a853f?sequence=1 20170705062202 http://cadmus.eui.eu/bitstream/handle/1814/36635/RSCAS_2015_03.pdf%3Bjsessionid%3D761393014319A39F40D32AE3EB3A853F?sequence%3D1 application/PDF 200 MPCXVWMUTRUGFP36SLPHKDLY6NGU4S3J - - 854156 328850624 CITESEERX-CRAWL-2017-06-20-20170705061647307-00039-00048-wbgrp-svc284/CITESEERX-CRAWL-2017-06-20-20170705062052659-00043-31209~wbgrp-svc284.us.archive.org~8443.warc.gz
com,pbworks,educ333b)/robots.txt 20170705063311 http://educ333b.pbworks.com/robots.txt text/plain 200 6VAUYENMOU2SK2OWNRPDD6WTQTECGZAD - - 638 398190140 CITESEERX-CRAWL-2017-06-20-20170705062707827-00049-00058-wbgrp-svc284/CITESEERX-CRAWL-2017-06-20-20170705063158203-00053-31209~wbgrp-svc284.us.archive.org~8443.warc.gz
""")

    conn = happybase_mock.Connection()
    conn.create_table('wbgrp-journal-extract-test', {'file': {}, 'grobid0': {}})

    table = conn.table('wbgrp-journal-extract-test')
    run(raw, out, status, table)

    print(status.getvalue())

    assert table.row(b'1') == {}
    # HTTP 301
    assert table.row(b'3I42H3S6NNFQ2MSVX7XZKYAYSCX5QBYJ') == {}
    # valid
    assert table.row(b'MPCXVWMUTRUGFP36SLPHKDLY6NGU4S3J') != {}
    # text/plain
    assert table.row(b'6VAUYENMOU2SK2OWNRPDD6WTQTECGZAD') == {}

    row = table.row(b'MPCXVWMUTRUGFP36SLPHKDLY6NGU4S3J')
    assert row[b'file:mime'] == b"application/pdf"
    json.loads(row[b'file:cdx'].decode('utf-8'))

if __name__=="__main__":
    hb = happybase.Connection(host='')
    with hb.connection() as conn:
        table = conn.table('wbgrp-journal-extract-0-qa')
        run(sys.stdin, sys.stdout, sys.stderr, table)