From 03b04aabc9d9b63ff54a80f52590b619aee06159 Mon Sep 17 00:00:00 2001 From: Bryan Newbold Date: Tue, 24 Dec 2019 16:41:09 -0800 Subject: start work on DB connector and minio client --- python/sandcrawler/db.py | 141 +++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 141 insertions(+) create mode 100644 python/sandcrawler/db.py (limited to 'python/sandcrawler/db.py') diff --git a/python/sandcrawler/db.py b/python/sandcrawler/db.py new file mode 100644 index 0000000..104920b --- /dev/null +++ b/python/sandcrawler/db.py @@ -0,0 +1,141 @@ + +import json +import psycopg2 +import psycopg2.extras +import requests + +class SandcrawlerPostgrestClient: + + def __init__(self, api_url="http://aitio.us.archive.org:3030", **kwargs): + self.api_uri = api_url + + def get_cdx(self, url): + resp = requests.get(self.api_url + "/cdx", params=dict(url='eq.'+url)) + resp.raise_for_status() + return resp.json() or None + + def get_grobid(self, sha1): + resp = requests.get(self.api_url + "/grobid", params=dict(sha1hex='eq.'+sha1)) + resp.raise_for_status() + resp = resp.json() + if resp: + return resp[0] + else: + return None + + def get_file_meta(self, sha1): + resp = requests.get(self.api_url + "/file_meta", params=dict(sha1hex='eq.'+sha1)) + resp.raise_for_status() + resp = resp.json() + if resp: + return resp[0] + else: + return None + +class SandcrawlerPostgresClient: + + def __init__(self, db_url, **kwargs): + self.conn = psycopg2.connect(db_url) + + def cursor(self): + return self.conn.cursor() + + def commit(self): + return self.conn.commit() + + def insert_cdx(self, cur, batch, on_conflict="NOTHING"): + sql = """ + INSERT INTO + cdx (url, datetime, sha1hex, mimetype, warc_path, warc_csize, warc_offset) + VALUES %s + ON CONFLICT ON CONSTRAINT cdx_pkey DO {} + RETURNING 1; + """.format(on_conflict) + batch = [d for d in batch if d.get('warc_offset')] + if not batch: + return 0 + batch = [(d['url'], + d['datetime'], + d['sha1hex'], + d['mimetype'], + d['warc_path'], + d['warc_csize'], + d['warc_offset']) + for d in batch] + res = psycopg2.extras.execute_values(cur, sql, batch) # fetch=True + #return len(res) + + def insert_file_meta(self, cur, batch, on_conflict="NOTHING"): + sql = """ + INSERT INTO + file_meta(sha1hex, sha256hex, md5hex, size_bytes, mimetype) + VALUES %s + ON CONFLICT (sha1hex) DO {}; + """.format(on_conflict) + batch = [(d['sha1hex'], + d['sha256hex'], + d['md5hex'], + int(d['size_bytes']), + d['mimetype']) + for d in batch] + res = psycopg2.extras.execute_values(cur, sql, batch) + + def insert_grobid(self, cur, batch, on_conflict="NOTHING"): # XXX + sql = """ + INSERT INTO + grobid (sha1hex, grobid_version, status_code, status, fatcat_release, metadata) + VALUES %s + ON CONFLICT (sha1hex) DO {}; + """.format(on_conflict) + for r in batch: + if r.get('metadata'): + r['metadata'] = json.dumps(r['metadata'], sort_keys=True) + batch = [(d['key'], + d.get('grobid_version') or None, + d['status_code'], + d['status'], + d.get('fatcat_release') or None, + d.get('metadata') or None , + ) + for d in batch] + res = psycopg2.extras.execute_values(cur, sql, batch) + + def insert_ingest_request(self, cur, batch, on_conflict="NOTHING"): + sql = """ + INSERT INTO + ingest_request (link_source, link_source_id, ingest_type, base_url, ingest_request_source, release_stage, request, metadata) + VALUES %s + ON CONFLICT ON CONSTRAINT ingest_request_pkey DO {}; + """.format(on_conflict) + for r in batch: + if r.get('metadata'): + r['metadata'] = json.dumps(r['metadata'], sort_keys=True) + batch = [(d['link_source'], + d['link_source_id'], + d['ingest_type'], + d['base_url'], + d.get('ingest_request_source'), + d.get('release_stage') or None, + d.get('request') or None, + ) + for d in batch] + res = psycopg2.extras.execute_values(cur, sql, batch) + + def insert_ingest_file_result(self, cur, batch, on_conflict="NOTHING"): + sql = """ + INSERT INTO + ingest_file_result (ingest_type, base_url, hit, status, terminal_url, terminal_dt, terminal_status_code, terminal_sha1hex) + VALUES %s + ON CONFLICT DO {}; + """.format(on_conflict) + batch = [(d['ingest_type'], + d['base_url'], + bool(d['hit']), + d['status'], + d.get('terminal_url'), + d.get('terminal_dt'), + d.get('terminal_status_code'), + d.get('terminal_sha1hex'), + ) + for d in batch] + res = psycopg2.extras.execute_values(cur, sql, batch) -- cgit v1.2.3