aboutsummaryrefslogtreecommitdiffstats
path: root/python/persist_tool.py
blob: d65fa535ead0a9acfa5f4b9994fb888d72359431 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
#!/usr/bin/env python3

"""
Commands for backfilling content from bulk files into postgresql and minio.

Normally this is done by workers (in sandcrawler_worker.py) consuming from
Kafka feeds, but sometimes we have bulk processing output we want to backfill.
"""

import sys
import argparse
import datetime
import raven

from sandcrawler import *
from sandcrawler.persist import *


def run_cdx(args):
    worker = PersistCdxWorker(
        db_url=args.db_url,
    )
    filter_mimetypes = ['application/pdf']
    if args.no_mimetype_filter:
        filter_mimetypes = None
    pusher = CdxLinePusher(
        worker,
        args.cdx_file,
        filter_http_statuses=[200],
        filter_mimetypes=filter_mimetypes,
        #allow_octet_stream
        batch_size=200,
    )
    pusher.run()

def run_grobid(args):
    worker = PersistGrobidWorker(
        db_url=args.db_url,
    )
    pusher = JsonLinePusher(
        worker,
        args.json_file,
        batch_size=50,
    )
    pusher.run()

def run_ingest_file_result(args):
    worker = PersistIngestFileResultWorker(
        db_url=args.db_url,
    )
    pusher = JsonLinePusher(
        worker,
        args.json_file,
        batch_size=200,
    )
    pusher.run()

def main():
    parser = argparse.ArgumentParser(
        formatter_class=argparse.ArgumentDefaultsHelpFormatter)
    parser.add_argument('--db-url',
        help="postgresql database connection string",
        default="postgres:///sandcrawler")
    subparsers = parser.add_subparsers()

    sub_cdx = subparsers.add_parser('cdx',
        help="backfill a CDX file into postgresql cdx table")
    sub_cdx.set_defaults(func=run_cdx)
    sub_cdx.add_argument('cdx_file',
        help="CDX file to import from (or '-' for stdin)",
        type=argparse.FileType('r'))
    sub_cdx.add_argument('--no-mimetype-filter',
        action='store_true',
        help="ignore mimetype filtering; insert all content types (eg, assuming pre-filtered)")

    sub_grobid = subparsers.add_parser('grobid',
        help="backfill a grobid JSON ('pg') dump into postgresql and minio")
    sub_grobid.set_defaults(func=run_grobid)
    sub_grobid.add_argument('json_file',
        help="grobid file to import from (or '-' for stdin)",
        type=argparse.FileType('r'))

    sub_ingest_file_result = subparsers.add_parser('ingest-file-result',
        help="backfill a ingest_file_result JSON dump into postgresql")
    sub_ingest_file_result.set_defaults(func=run_ingest_file_result)
    sub_ingest_file_result.add_argument('json_file',
        help="ingest_file_result file to import from (or '-' for stdin)",
        type=argparse.FileType('r'))

    args = parser.parse_args()
    if not args.__dict__.get("func"):
        print("Tell me what to do!", file=sys.stderr)
        sys.exit(-1)

    args.func(args)

if __name__ == '__main__':
    main()