aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rwxr-xr-xpython/persist_tool.py22
-rw-r--r--python/sandcrawler/minio.py55
-rw-r--r--python/sandcrawler/persist.py38
3 files changed, 91 insertions, 24 deletions
diff --git a/python/persist_tool.py b/python/persist_tool.py
index d65fa53..309601b 100755
--- a/python/persist_tool.py
+++ b/python/persist_tool.py
@@ -1,12 +1,13 @@
#!/usr/bin/env python3
"""
-Commands for backfilling content from bulk files into postgresql and minio.
+Commands for backfilling content from bulk files into postgresql and s3 (minio).
Normally this is done by workers (in sandcrawler_worker.py) consuming from
Kafka feeds, but sometimes we have bulk processing output we want to backfill.
"""
+import os
import sys
import argparse
import datetime
@@ -36,6 +37,11 @@ def run_cdx(args):
def run_grobid(args):
worker = PersistGrobidWorker(
db_url=args.db_url,
+ s3_url=args.s3_url,
+ s3_bucket=args.s3_bucket,
+ s3_access_key=args.s3_access_key,
+ s3_secret_key=args.s3_secret_key,
+ s3_only=args.s3_only,
)
pusher = JsonLinePusher(
worker,
@@ -61,6 +67,18 @@ def main():
parser.add_argument('--db-url',
help="postgresql database connection string",
default="postgres:///sandcrawler")
+ parser.add_argument('--s3-url',
+ help="S3 (minio) backend URL",
+ default="localhost:9000")
+ parser.add_argument('--s3-access-key',
+ help="S3 (minio) credential",
+ default=os.environ.get('MINIO_ACCESS_KEY'))
+ parser.add_argument('--s3-secret-key',
+ help="S3 (minio) credential",
+ default=os.environ.get('MINIO_SECRET_KEY'))
+ parser.add_argument('--s3-bucket',
+ help="S3 (minio) bucket to persist into",
+ default="sandcrawler-dev")
subparsers = parser.add_subparsers()
sub_cdx = subparsers.add_parser('cdx',
@@ -74,7 +92,7 @@ def main():
help="ignore mimetype filtering; insert all content types (eg, assuming pre-filtered)")
sub_grobid = subparsers.add_parser('grobid',
- help="backfill a grobid JSON ('pg') dump into postgresql and minio")
+ help="backfill a grobid JSON ('pg') dump into postgresql and s3 (minio)")
sub_grobid.set_defaults(func=run_grobid)
sub_grobid.add_argument('json_file',
help="grobid file to import from (or '-' for stdin)",
diff --git a/python/sandcrawler/minio.py b/python/sandcrawler/minio.py
index e6ebe41..39903e0 100644
--- a/python/sandcrawler/minio.py
+++ b/python/sandcrawler/minio.py
@@ -1,11 +1,13 @@
+import io
import os
+
import minio
class SandcrawlerMinioClient(object):
- def __init__(self, host, access_key, secret_key, default_bucket=None):
+ def __init__(self, host_url, access_key, secret_key, default_bucket=None):
"""
host is minio connection string (host:port)
access and secret key are as expected
@@ -18,14 +20,30 @@ class SandcrawlerMinioClient(object):
secret_key=os.environ['MINIO_SECRET_KEY'],
"""
self.mc = minio.Minio(
- host,
+ host_url,
access_key=access_key,
secret_key=secret_key,
secure=False,
)
self.default_bucket = default_bucket
- def upload_blob(self, folder, blob, sha1hex=None, extension="", prefix="", bucket=None):
+ def _blob_path(self, folder, sha1hex, extension, prefix):
+ if not extension:
+ extension = ""
+ if not prefix:
+ prefix = ""
+ assert len(sha1hex) == 40
+ obj_path = "{}{}/{}/{}/{}{}".format(
+ prefix,
+ folder,
+ sha1hex[0:2],
+ sha1hex[2:4],
+ sha1hex,
+ extension,
+ )
+ return obj_path
+
+ def put_blob(self, folder, blob, sha1hex=None, extension="", prefix="", bucket=None):
"""
blob should be bytes
sha1hex is assumed to be sha1 of the blob itself; if not supplied it will be calculated
@@ -40,20 +58,31 @@ class SandcrawlerMinioClient(object):
h = hashlib.sha1()
h.update(blob)
sha1hex = h.hexdigest()
- obj_path = "{}{}/{}/{}/{}{}".format(
- prefix,
- folder,
- sha1hex[0:2],
- sha1hex[2:4],
- sha1hex,
- extension,
- )
+ obj_path = self._blob_path(folder, sha1hex, extension, prefix)
if not bucket:
bucket = self.default_bucket
+ assert bucket
self.mc.put_object(
- self.default_bucket,
+ bucket,
obj_path,
- blob,
+ io.BytesIO(blob),
len(blob),
)
return (bucket, obj_path)
+
+ def get_blob(self, folder, sha1hex, extension="", prefix="", bucket=None):
+ """
+ sha1hex is sha1 of the blob itself
+
+ Fetched blob from the given bucket/folder, using the sandcrawler SHA1 path convention
+ """
+ obj_path = self._blob_path(folder, sha1hex, extension, prefix)
+ if not bucket:
+ bucket = self.default_bucket
+ assert bucket
+ blob = self.mc.get_object(
+ bucket,
+ obj_path,
+ )
+ # TODO: optionally verify SHA-1?
+ return blob
diff --git a/python/sandcrawler/persist.py b/python/sandcrawler/persist.py
index 86a1c22..b017a82 100644
--- a/python/sandcrawler/persist.py
+++ b/python/sandcrawler/persist.py
@@ -19,6 +19,8 @@ grobid
- file_meta SQL push batch (on conflict update)
"""
+import os
+
from sandcrawler.workers import SandcrawlerWorker
from sandcrawler.db import SandcrawlerPostgresClient
from sandcrawler.minio import SandcrawlerMinioClient
@@ -187,6 +189,13 @@ class PersistGrobidWorker(SandcrawlerWorker):
self.db = SandcrawlerPostgresClient(db_url)
self.cur = self.db.conn.cursor()
self.grobid = GrobidClient()
+ self.s3 = SandcrawlerMinioClient(
+ host_url=kwargs.get('s3_url', 'localhost:9000'),
+ access_key=kwargs['s3_access_key'],
+ secret_key=kwargs['s3_secret_key'],
+ default_bucket=kwargs['s3_bucket'],
+ )
+ self.s3_only = kwargs.get('s3_only', False)
def process(self, record):
"""
@@ -200,6 +209,7 @@ class PersistGrobidWorker(SandcrawlerWorker):
# enhance with teixml2json metadata, if available
for r in batch:
if r['status_code'] != 200 or not r.get('tei_xml'):
+ self.counts['s3-skip'] += 1
continue
metadata = self.grobid.metadata(r)
if not metadata:
@@ -212,15 +222,25 @@ class PersistGrobidWorker(SandcrawlerWorker):
r['updated'] = metadata['grobid_timestamp']
r['metadata'] = metadata
- grobid_batch = [r['grobid'] for r in batch if r.get('grobid')]
- resp = self.db.insert_grobid(self.cur, batch, on_conflict="update")
- self.counts['insert-grobid'] += resp
-
- file_meta_batch = [r['file_meta'] for r in batch if r.get('file_meta')]
- resp = self.db.insert_file_meta(self.cur, file_meta_batch)
- self.counts['insert-file-meta'] += resp
-
- # TODO: minio
+ assert len(r['key']) == 40
+ resp = self.s3.put_blob(
+ folder="grobid",
+ blob=r['tei_xml'],
+ sha1hex=r['key'],
+ extension=".tei.xml",
+ )
+ self.counts['s3-put'] += 1
+
+ if not self.s3_only:
+ grobid_batch = [r['grobid'] for r in batch if r.get('grobid')]
+ resp = self.db.insert_grobid(self.cur, batch, on_conflict="update")
+ self.counts['insert-grobid'] += resp[0]
+ self.counts['update-grobid'] += resp[1]
+
+ file_meta_batch = [r['file_meta'] for r in batch if r.get('file_meta')]
+ resp = self.db.insert_file_meta(self.cur, file_meta_batch)
+ self.counts['insert-file-meta'] += resp[0]
+ self.counts['update-file-meta'] += resp[1]
self.db.commit()
return []