aboutsummaryrefslogtreecommitdiffstats
path: root/sql/backfill/backfill_grobid.py
blob: 08fad7f71ba973d5a764b647af17fbe34a6df024 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
#!/usr/bin/env python3
"""
This is a "one-time" tranform helper script for GROBID backfill into
sandcrawler minio and postgresql.
"""

import json, os, sys, collections, io
import base64
import requests
from minio import Minio
import psycopg2
import psycopg2.extras


def b32_hex(s):
    s = s.strip().split()[0].lower()
    if s.startswith("sha1:"):
        s = s[5:]
    if len(s) != 32:
        return s
    return base64.b16encode(base64.b32decode(s.upper())).lower().decode('utf-8')

def insert(cur, batch):
    sql = """
        INSERT INTO
        grobid (sha1hex, grobid_version, status_code, status, fatcat_release, metadata)
        VALUES %s
        ON CONFLICT DO NOTHING;
    """
    batch = [(d['sha1hex'], d['grobid_version'], d['status_code'], d['status'], d['fatcat_release'], d['metadata'])
             for d in batch]
    res = psycopg2.extras.execute_values(cur, sql, batch)

def stdin_to_pg():
    mc = Minio('localhost:9000',
        access_key=os.environ['MINIO_ACCESS_KEY'],
        secret_key=os.environ['MINIO_SECRET_KEY'],
        secure=False)
    # no host means it will use local domain socket by default
    conn = psycopg2.connect(database="sandcrawler", user="postgres")
    cur = conn.cursor()
    counts = collections.Counter({'total': 0})
    batch = []
    for l in sys.stdin:
        if counts['raw_lines'] > 0 and counts['raw_lines'] % 10000 == 0:
            print("Progress: {}...".format(counts))
        counts['raw_lines'] += 1
        l = l.strip()
        if not l:
            continue
        row = json.loads(l)
        if not row:
            continue
        sha1hex = b32_hex(row['pdf_hash'])
        grobid_xml = row['tei_xml'].encode('utf-8')
        grobid_xml_len = len(grobid_xml)
        grobid_xml = io.BytesIO(grobid_xml)

        key = "{}/{}/{}.tei.xml".format(
            sha1hex[0:2],
            sha1hex[2:4],
            sha1hex)
        mc.put_object("grobid", key, grobid_xml, grobid_xml_len,
            content_type="application/tei+xml",
            metadata=None)
        counts['minio-success'] += 1

        info = dict(
            sha1hex=sha1hex,
            grobid_version=None, # TODO
            status_code=200,
            status=None,
            fatcat_release=None,
            metadata=None,
        )
        batch.append(info)
        counts['total'] += 1
        if len(batch) >= 1000:
            insert(cur, batch)
            conn.commit()
            batch = []
            counts['batches'] += 1
    if batch:
        insert(cur, batch)
        batch = []
    conn.commit()
    cur.close()
    print("Done: {}".format(counts))

if __name__=='__main__':
    stdin_to_pg()