aboutsummaryrefslogtreecommitdiffstats
path: root/python/persist_tool.py
blob: 309601b5d2e4645580681031f78f619adb1ee8fc (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
#!/usr/bin/env python3

"""
Commands for backfilling content from bulk files into postgresql and s3 (minio).

Normally this is done by workers (in sandcrawler_worker.py) consuming from
Kafka feeds, but sometimes we have bulk processing output we want to backfill.
"""

import os
import sys
import argparse
import datetime
import raven

from sandcrawler import *
from sandcrawler.persist import *


def run_cdx(args):
    worker = PersistCdxWorker(
        db_url=args.db_url,
    )
    filter_mimetypes = ['application/pdf']
    if args.no_mimetype_filter:
        filter_mimetypes = None
    pusher = CdxLinePusher(
        worker,
        args.cdx_file,
        filter_http_statuses=[200],
        filter_mimetypes=filter_mimetypes,
        #allow_octet_stream
        batch_size=200,
    )
    pusher.run()

def run_grobid(args):
    worker = PersistGrobidWorker(
        db_url=args.db_url,
        s3_url=args.s3_url,
        s3_bucket=args.s3_bucket,
        s3_access_key=args.s3_access_key,
        s3_secret_key=args.s3_secret_key,
        s3_only=args.s3_only,
    )
    pusher = JsonLinePusher(
        worker,
        args.json_file,
        batch_size=50,
    )
    pusher.run()

def run_ingest_file_result(args):
    worker = PersistIngestFileResultWorker(
        db_url=args.db_url,
    )
    pusher = JsonLinePusher(
        worker,
        args.json_file,
        batch_size=200,
    )
    pusher.run()

def main():
    parser = argparse.ArgumentParser(
        formatter_class=argparse.ArgumentDefaultsHelpFormatter)
    parser.add_argument('--db-url',
        help="postgresql database connection string",
        default="postgres:///sandcrawler")
    parser.add_argument('--s3-url',
        help="S3 (minio) backend URL",
        default="localhost:9000")
    parser.add_argument('--s3-access-key',
        help="S3 (minio) credential",
        default=os.environ.get('MINIO_ACCESS_KEY'))
    parser.add_argument('--s3-secret-key',
        help="S3 (minio) credential",
        default=os.environ.get('MINIO_SECRET_KEY'))
    parser.add_argument('--s3-bucket',
        help="S3 (minio) bucket to persist into",
        default="sandcrawler-dev")
    subparsers = parser.add_subparsers()

    sub_cdx = subparsers.add_parser('cdx',
        help="backfill a CDX file into postgresql cdx table")
    sub_cdx.set_defaults(func=run_cdx)
    sub_cdx.add_argument('cdx_file',
        help="CDX file to import from (or '-' for stdin)",
        type=argparse.FileType('r'))
    sub_cdx.add_argument('--no-mimetype-filter',
        action='store_true',
        help="ignore mimetype filtering; insert all content types (eg, assuming pre-filtered)")

    sub_grobid = subparsers.add_parser('grobid',
        help="backfill a grobid JSON ('pg') dump into postgresql and s3 (minio)")
    sub_grobid.set_defaults(func=run_grobid)
    sub_grobid.add_argument('json_file',
        help="grobid file to import from (or '-' for stdin)",
        type=argparse.FileType('r'))

    sub_ingest_file_result = subparsers.add_parser('ingest-file-result',
        help="backfill a ingest_file_result JSON dump into postgresql")
    sub_ingest_file_result.set_defaults(func=run_ingest_file_result)
    sub_ingest_file_result.add_argument('json_file',
        help="ingest_file_result file to import from (or '-' for stdin)",
        type=argparse.FileType('r'))

    args = parser.parse_args()
    if not args.__dict__.get("func"):
        print("Tell me what to do!", file=sys.stderr)
        sys.exit(-1)

    args.func(args)

if __name__ == '__main__':
    main()