From 03b04aabc9d9b63ff54a80f52590b619aee06159 Mon Sep 17 00:00:00 2001 From: Bryan Newbold Date: Tue, 24 Dec 2019 16:41:09 -0800 Subject: start work on DB connector and minio client --- python/sandcrawler/db.py | 141 ++++++++++++++++++++++++++++++++++++++++++++ python/sandcrawler/minio.py | 59 ++++++++++++++++++ 2 files changed, 200 insertions(+) create mode 100644 python/sandcrawler/db.py create mode 100644 python/sandcrawler/minio.py diff --git a/python/sandcrawler/db.py b/python/sandcrawler/db.py new file mode 100644 index 0000000..104920b --- /dev/null +++ b/python/sandcrawler/db.py @@ -0,0 +1,141 @@ + +import json +import psycopg2 +import psycopg2.extras +import requests + +class SandcrawlerPostgrestClient: + + def __init__(self, api_url="http://aitio.us.archive.org:3030", **kwargs): + self.api_uri = api_url + + def get_cdx(self, url): + resp = requests.get(self.api_url + "/cdx", params=dict(url='eq.'+url)) + resp.raise_for_status() + return resp.json() or None + + def get_grobid(self, sha1): + resp = requests.get(self.api_url + "/grobid", params=dict(sha1hex='eq.'+sha1)) + resp.raise_for_status() + resp = resp.json() + if resp: + return resp[0] + else: + return None + + def get_file_meta(self, sha1): + resp = requests.get(self.api_url + "/file_meta", params=dict(sha1hex='eq.'+sha1)) + resp.raise_for_status() + resp = resp.json() + if resp: + return resp[0] + else: + return None + +class SandcrawlerPostgresClient: + + def __init__(self, db_url, **kwargs): + self.conn = psycopg2.connect(db_url) + + def cursor(self): + return self.conn.cursor() + + def commit(self): + return self.conn.commit() + + def insert_cdx(self, cur, batch, on_conflict="NOTHING"): + sql = """ + INSERT INTO + cdx (url, datetime, sha1hex, mimetype, warc_path, warc_csize, warc_offset) + VALUES %s + ON CONFLICT ON CONSTRAINT cdx_pkey DO {} + RETURNING 1; + """.format(on_conflict) + batch = [d for d in batch if d.get('warc_offset')] + if not batch: + return 0 + batch = [(d['url'], + d['datetime'], + d['sha1hex'], + d['mimetype'], + d['warc_path'], + d['warc_csize'], + d['warc_offset']) + for d in batch] + res = psycopg2.extras.execute_values(cur, sql, batch) # fetch=True + #return len(res) + + def insert_file_meta(self, cur, batch, on_conflict="NOTHING"): + sql = """ + INSERT INTO + file_meta(sha1hex, sha256hex, md5hex, size_bytes, mimetype) + VALUES %s + ON CONFLICT (sha1hex) DO {}; + """.format(on_conflict) + batch = [(d['sha1hex'], + d['sha256hex'], + d['md5hex'], + int(d['size_bytes']), + d['mimetype']) + for d in batch] + res = psycopg2.extras.execute_values(cur, sql, batch) + + def insert_grobid(self, cur, batch, on_conflict="NOTHING"): # XXX + sql = """ + INSERT INTO + grobid (sha1hex, grobid_version, status_code, status, fatcat_release, metadata) + VALUES %s + ON CONFLICT (sha1hex) DO {}; + """.format(on_conflict) + for r in batch: + if r.get('metadata'): + r['metadata'] = json.dumps(r['metadata'], sort_keys=True) + batch = [(d['key'], + d.get('grobid_version') or None, + d['status_code'], + d['status'], + d.get('fatcat_release') or None, + d.get('metadata') or None , + ) + for d in batch] + res = psycopg2.extras.execute_values(cur, sql, batch) + + def insert_ingest_request(self, cur, batch, on_conflict="NOTHING"): + sql = """ + INSERT INTO + ingest_request (link_source, link_source_id, ingest_type, base_url, ingest_request_source, release_stage, request, metadata) + VALUES %s + ON CONFLICT ON CONSTRAINT ingest_request_pkey DO {}; + """.format(on_conflict) + for r in batch: + if r.get('metadata'): + r['metadata'] = json.dumps(r['metadata'], sort_keys=True) + batch = [(d['link_source'], + d['link_source_id'], + d['ingest_type'], + d['base_url'], + d.get('ingest_request_source'), + d.get('release_stage') or None, + d.get('request') or None, + ) + for d in batch] + res = psycopg2.extras.execute_values(cur, sql, batch) + + def insert_ingest_file_result(self, cur, batch, on_conflict="NOTHING"): + sql = """ + INSERT INTO + ingest_file_result (ingest_type, base_url, hit, status, terminal_url, terminal_dt, terminal_status_code, terminal_sha1hex) + VALUES %s + ON CONFLICT DO {}; + """.format(on_conflict) + batch = [(d['ingest_type'], + d['base_url'], + bool(d['hit']), + d['status'], + d.get('terminal_url'), + d.get('terminal_dt'), + d.get('terminal_status_code'), + d.get('terminal_sha1hex'), + ) + for d in batch] + res = psycopg2.extras.execute_values(cur, sql, batch) diff --git a/python/sandcrawler/minio.py b/python/sandcrawler/minio.py new file mode 100644 index 0000000..e6ebe41 --- /dev/null +++ b/python/sandcrawler/minio.py @@ -0,0 +1,59 @@ + +import os +import minio + + +class SandcrawlerMinioClient(object): + + def __init__(self, host, access_key, secret_key, default_bucket=None): + """ + host is minio connection string (host:port) + access and secret key are as expected + default_bucket can be supplied so that it doesn't need to be repeated for each function call + + Example config: + + host="localhost:9000", + access_key=os.environ['MINIO_ACCESS_KEY'], + secret_key=os.environ['MINIO_SECRET_KEY'], + """ + self.mc = minio.Minio( + host, + access_key=access_key, + secret_key=secret_key, + secure=False, + ) + self.default_bucket = default_bucket + + def upload_blob(self, folder, blob, sha1hex=None, extension="", prefix="", bucket=None): + """ + blob should be bytes + sha1hex is assumed to be sha1 of the blob itself; if not supplied it will be calculated + Uploads blob to path in the given bucket. Files are stored in a top-level + folder, then in two levels of sub-directory based on sha1, then the + filename is SHA1 with an optional file extension. + """ + if type(blob) == str: + blob = blob.encode('utf-8') + assert type(blob) == bytes + if not sha1hex: + h = hashlib.sha1() + h.update(blob) + sha1hex = h.hexdigest() + obj_path = "{}{}/{}/{}/{}{}".format( + prefix, + folder, + sha1hex[0:2], + sha1hex[2:4], + sha1hex, + extension, + ) + if not bucket: + bucket = self.default_bucket + self.mc.put_object( + self.default_bucket, + obj_path, + blob, + len(blob), + ) + return (bucket, obj_path) -- cgit v1.2.3