diff options
-rwxr-xr-x | python/persist_tool.py | 22 | ||||
-rw-r--r-- | python/sandcrawler/minio.py | 55 | ||||
-rw-r--r-- | python/sandcrawler/persist.py | 38 |
3 files changed, 91 insertions, 24 deletions
diff --git a/python/persist_tool.py b/python/persist_tool.py index d65fa53..309601b 100755 --- a/python/persist_tool.py +++ b/python/persist_tool.py @@ -1,12 +1,13 @@ #!/usr/bin/env python3 """ -Commands for backfilling content from bulk files into postgresql and minio. +Commands for backfilling content from bulk files into postgresql and s3 (minio). Normally this is done by workers (in sandcrawler_worker.py) consuming from Kafka feeds, but sometimes we have bulk processing output we want to backfill. """ +import os import sys import argparse import datetime @@ -36,6 +37,11 @@ def run_cdx(args): def run_grobid(args): worker = PersistGrobidWorker( db_url=args.db_url, + s3_url=args.s3_url, + s3_bucket=args.s3_bucket, + s3_access_key=args.s3_access_key, + s3_secret_key=args.s3_secret_key, + s3_only=args.s3_only, ) pusher = JsonLinePusher( worker, @@ -61,6 +67,18 @@ def main(): parser.add_argument('--db-url', help="postgresql database connection string", default="postgres:///sandcrawler") + parser.add_argument('--s3-url', + help="S3 (minio) backend URL", + default="localhost:9000") + parser.add_argument('--s3-access-key', + help="S3 (minio) credential", + default=os.environ.get('MINIO_ACCESS_KEY')) + parser.add_argument('--s3-secret-key', + help="S3 (minio) credential", + default=os.environ.get('MINIO_SECRET_KEY')) + parser.add_argument('--s3-bucket', + help="S3 (minio) bucket to persist into", + default="sandcrawler-dev") subparsers = parser.add_subparsers() sub_cdx = subparsers.add_parser('cdx', @@ -74,7 +92,7 @@ def main(): help="ignore mimetype filtering; insert all content types (eg, assuming pre-filtered)") sub_grobid = subparsers.add_parser('grobid', - help="backfill a grobid JSON ('pg') dump into postgresql and minio") + help="backfill a grobid JSON ('pg') dump into postgresql and s3 (minio)") sub_grobid.set_defaults(func=run_grobid) sub_grobid.add_argument('json_file', help="grobid file to import from (or '-' for stdin)", diff --git a/python/sandcrawler/minio.py b/python/sandcrawler/minio.py index e6ebe41..39903e0 100644 --- a/python/sandcrawler/minio.py +++ b/python/sandcrawler/minio.py @@ -1,11 +1,13 @@ +import io import os + import minio class SandcrawlerMinioClient(object): - def __init__(self, host, access_key, secret_key, default_bucket=None): + def __init__(self, host_url, access_key, secret_key, default_bucket=None): """ host is minio connection string (host:port) access and secret key are as expected @@ -18,14 +20,30 @@ class SandcrawlerMinioClient(object): secret_key=os.environ['MINIO_SECRET_KEY'], """ self.mc = minio.Minio( - host, + host_url, access_key=access_key, secret_key=secret_key, secure=False, ) self.default_bucket = default_bucket - def upload_blob(self, folder, blob, sha1hex=None, extension="", prefix="", bucket=None): + def _blob_path(self, folder, sha1hex, extension, prefix): + if not extension: + extension = "" + if not prefix: + prefix = "" + assert len(sha1hex) == 40 + obj_path = "{}{}/{}/{}/{}{}".format( + prefix, + folder, + sha1hex[0:2], + sha1hex[2:4], + sha1hex, + extension, + ) + return obj_path + + def put_blob(self, folder, blob, sha1hex=None, extension="", prefix="", bucket=None): """ blob should be bytes sha1hex is assumed to be sha1 of the blob itself; if not supplied it will be calculated @@ -40,20 +58,31 @@ class SandcrawlerMinioClient(object): h = hashlib.sha1() h.update(blob) sha1hex = h.hexdigest() - obj_path = "{}{}/{}/{}/{}{}".format( - prefix, - folder, - sha1hex[0:2], - sha1hex[2:4], - sha1hex, - extension, - ) + obj_path = self._blob_path(folder, sha1hex, extension, prefix) if not bucket: bucket = self.default_bucket + assert bucket self.mc.put_object( - self.default_bucket, + bucket, obj_path, - blob, + io.BytesIO(blob), len(blob), ) return (bucket, obj_path) + + def get_blob(self, folder, sha1hex, extension="", prefix="", bucket=None): + """ + sha1hex is sha1 of the blob itself + + Fetched blob from the given bucket/folder, using the sandcrawler SHA1 path convention + """ + obj_path = self._blob_path(folder, sha1hex, extension, prefix) + if not bucket: + bucket = self.default_bucket + assert bucket + blob = self.mc.get_object( + bucket, + obj_path, + ) + # TODO: optionally verify SHA-1? + return blob diff --git a/python/sandcrawler/persist.py b/python/sandcrawler/persist.py index 86a1c22..b017a82 100644 --- a/python/sandcrawler/persist.py +++ b/python/sandcrawler/persist.py @@ -19,6 +19,8 @@ grobid - file_meta SQL push batch (on conflict update) """ +import os + from sandcrawler.workers import SandcrawlerWorker from sandcrawler.db import SandcrawlerPostgresClient from sandcrawler.minio import SandcrawlerMinioClient @@ -187,6 +189,13 @@ class PersistGrobidWorker(SandcrawlerWorker): self.db = SandcrawlerPostgresClient(db_url) self.cur = self.db.conn.cursor() self.grobid = GrobidClient() + self.s3 = SandcrawlerMinioClient( + host_url=kwargs.get('s3_url', 'localhost:9000'), + access_key=kwargs['s3_access_key'], + secret_key=kwargs['s3_secret_key'], + default_bucket=kwargs['s3_bucket'], + ) + self.s3_only = kwargs.get('s3_only', False) def process(self, record): """ @@ -200,6 +209,7 @@ class PersistGrobidWorker(SandcrawlerWorker): # enhance with teixml2json metadata, if available for r in batch: if r['status_code'] != 200 or not r.get('tei_xml'): + self.counts['s3-skip'] += 1 continue metadata = self.grobid.metadata(r) if not metadata: @@ -212,15 +222,25 @@ class PersistGrobidWorker(SandcrawlerWorker): r['updated'] = metadata['grobid_timestamp'] r['metadata'] = metadata - grobid_batch = [r['grobid'] for r in batch if r.get('grobid')] - resp = self.db.insert_grobid(self.cur, batch, on_conflict="update") - self.counts['insert-grobid'] += resp - - file_meta_batch = [r['file_meta'] for r in batch if r.get('file_meta')] - resp = self.db.insert_file_meta(self.cur, file_meta_batch) - self.counts['insert-file-meta'] += resp - - # TODO: minio + assert len(r['key']) == 40 + resp = self.s3.put_blob( + folder="grobid", + blob=r['tei_xml'], + sha1hex=r['key'], + extension=".tei.xml", + ) + self.counts['s3-put'] += 1 + + if not self.s3_only: + grobid_batch = [r['grobid'] for r in batch if r.get('grobid')] + resp = self.db.insert_grobid(self.cur, batch, on_conflict="update") + self.counts['insert-grobid'] += resp[0] + self.counts['update-grobid'] += resp[1] + + file_meta_batch = [r['file_meta'] for r in batch if r.get('file_meta')] + resp = self.db.insert_file_meta(self.cur, file_meta_batch) + self.counts['insert-file-meta'] += resp[0] + self.counts['update-file-meta'] += resp[1] self.db.commit() return [] |