aboutsummaryrefslogtreecommitdiffstats
path: root/python/sandcrawler
diff options
context:
space:
mode:
authorBryan Newbold <bnewbold@archive.org>2019-12-24 16:41:09 -0800
committerBryan Newbold <bnewbold@archive.org>2020-01-02 18:12:58 -0800
commit03b04aabc9d9b63ff54a80f52590b619aee06159 (patch)
tree6121f29f9234fcb3a41a62120d614e9a2420542c /python/sandcrawler
parenta8cd91e6f6fb6dafac35f8c239113b55b2230b13 (diff)
downloadsandcrawler-03b04aabc9d9b63ff54a80f52590b619aee06159.tar.gz
sandcrawler-03b04aabc9d9b63ff54a80f52590b619aee06159.zip
start work on DB connector and minio client
Diffstat (limited to 'python/sandcrawler')
-rw-r--r--python/sandcrawler/db.py141
-rw-r--r--python/sandcrawler/minio.py59
2 files changed, 200 insertions, 0 deletions
diff --git a/python/sandcrawler/db.py b/python/sandcrawler/db.py
new file mode 100644
index 0000000..104920b
--- /dev/null
+++ b/python/sandcrawler/db.py
@@ -0,0 +1,141 @@
+
+import json
+import psycopg2
+import psycopg2.extras
+import requests
+
+class SandcrawlerPostgrestClient:
+
+ def __init__(self, api_url="http://aitio.us.archive.org:3030", **kwargs):
+ self.api_uri = api_url
+
+ def get_cdx(self, url):
+ resp = requests.get(self.api_url + "/cdx", params=dict(url='eq.'+url))
+ resp.raise_for_status()
+ return resp.json() or None
+
+ def get_grobid(self, sha1):
+ resp = requests.get(self.api_url + "/grobid", params=dict(sha1hex='eq.'+sha1))
+ resp.raise_for_status()
+ resp = resp.json()
+ if resp:
+ return resp[0]
+ else:
+ return None
+
+ def get_file_meta(self, sha1):
+ resp = requests.get(self.api_url + "/file_meta", params=dict(sha1hex='eq.'+sha1))
+ resp.raise_for_status()
+ resp = resp.json()
+ if resp:
+ return resp[0]
+ else:
+ return None
+
+class SandcrawlerPostgresClient:
+
+ def __init__(self, db_url, **kwargs):
+ self.conn = psycopg2.connect(db_url)
+
+ def cursor(self):
+ return self.conn.cursor()
+
+ def commit(self):
+ return self.conn.commit()
+
+ def insert_cdx(self, cur, batch, on_conflict="NOTHING"):
+ sql = """
+ INSERT INTO
+ cdx (url, datetime, sha1hex, mimetype, warc_path, warc_csize, warc_offset)
+ VALUES %s
+ ON CONFLICT ON CONSTRAINT cdx_pkey DO {}
+ RETURNING 1;
+ """.format(on_conflict)
+ batch = [d for d in batch if d.get('warc_offset')]
+ if not batch:
+ return 0
+ batch = [(d['url'],
+ d['datetime'],
+ d['sha1hex'],
+ d['mimetype'],
+ d['warc_path'],
+ d['warc_csize'],
+ d['warc_offset'])
+ for d in batch]
+ res = psycopg2.extras.execute_values(cur, sql, batch) # fetch=True
+ #return len(res)
+
+ def insert_file_meta(self, cur, batch, on_conflict="NOTHING"):
+ sql = """
+ INSERT INTO
+ file_meta(sha1hex, sha256hex, md5hex, size_bytes, mimetype)
+ VALUES %s
+ ON CONFLICT (sha1hex) DO {};
+ """.format(on_conflict)
+ batch = [(d['sha1hex'],
+ d['sha256hex'],
+ d['md5hex'],
+ int(d['size_bytes']),
+ d['mimetype'])
+ for d in batch]
+ res = psycopg2.extras.execute_values(cur, sql, batch)
+
+ def insert_grobid(self, cur, batch, on_conflict="NOTHING"): # XXX
+ sql = """
+ INSERT INTO
+ grobid (sha1hex, grobid_version, status_code, status, fatcat_release, metadata)
+ VALUES %s
+ ON CONFLICT (sha1hex) DO {};
+ """.format(on_conflict)
+ for r in batch:
+ if r.get('metadata'):
+ r['metadata'] = json.dumps(r['metadata'], sort_keys=True)
+ batch = [(d['key'],
+ d.get('grobid_version') or None,
+ d['status_code'],
+ d['status'],
+ d.get('fatcat_release') or None,
+ d.get('metadata') or None ,
+ )
+ for d in batch]
+ res = psycopg2.extras.execute_values(cur, sql, batch)
+
+ def insert_ingest_request(self, cur, batch, on_conflict="NOTHING"):
+ sql = """
+ INSERT INTO
+ ingest_request (link_source, link_source_id, ingest_type, base_url, ingest_request_source, release_stage, request, metadata)
+ VALUES %s
+ ON CONFLICT ON CONSTRAINT ingest_request_pkey DO {};
+ """.format(on_conflict)
+ for r in batch:
+ if r.get('metadata'):
+ r['metadata'] = json.dumps(r['metadata'], sort_keys=True)
+ batch = [(d['link_source'],
+ d['link_source_id'],
+ d['ingest_type'],
+ d['base_url'],
+ d.get('ingest_request_source'),
+ d.get('release_stage') or None,
+ d.get('request') or None,
+ )
+ for d in batch]
+ res = psycopg2.extras.execute_values(cur, sql, batch)
+
+ def insert_ingest_file_result(self, cur, batch, on_conflict="NOTHING"):
+ sql = """
+ INSERT INTO
+ ingest_file_result (ingest_type, base_url, hit, status, terminal_url, terminal_dt, terminal_status_code, terminal_sha1hex)
+ VALUES %s
+ ON CONFLICT DO {};
+ """.format(on_conflict)
+ batch = [(d['ingest_type'],
+ d['base_url'],
+ bool(d['hit']),
+ d['status'],
+ d.get('terminal_url'),
+ d.get('terminal_dt'),
+ d.get('terminal_status_code'),
+ d.get('terminal_sha1hex'),
+ )
+ for d in batch]
+ res = psycopg2.extras.execute_values(cur, sql, batch)
diff --git a/python/sandcrawler/minio.py b/python/sandcrawler/minio.py
new file mode 100644
index 0000000..e6ebe41
--- /dev/null
+++ b/python/sandcrawler/minio.py
@@ -0,0 +1,59 @@
+
+import os
+import minio
+
+
+class SandcrawlerMinioClient(object):
+
+ def __init__(self, host, access_key, secret_key, default_bucket=None):
+ """
+ host is minio connection string (host:port)
+ access and secret key are as expected
+ default_bucket can be supplied so that it doesn't need to be repeated for each function call
+
+ Example config:
+
+ host="localhost:9000",
+ access_key=os.environ['MINIO_ACCESS_KEY'],
+ secret_key=os.environ['MINIO_SECRET_KEY'],
+ """
+ self.mc = minio.Minio(
+ host,
+ access_key=access_key,
+ secret_key=secret_key,
+ secure=False,
+ )
+ self.default_bucket = default_bucket
+
+ def upload_blob(self, folder, blob, sha1hex=None, extension="", prefix="", bucket=None):
+ """
+ blob should be bytes
+ sha1hex is assumed to be sha1 of the blob itself; if not supplied it will be calculated
+ Uploads blob to path in the given bucket. Files are stored in a top-level
+ folder, then in two levels of sub-directory based on sha1, then the
+ filename is SHA1 with an optional file extension.
+ """
+ if type(blob) == str:
+ blob = blob.encode('utf-8')
+ assert type(blob) == bytes
+ if not sha1hex:
+ h = hashlib.sha1()
+ h.update(blob)
+ sha1hex = h.hexdigest()
+ obj_path = "{}{}/{}/{}/{}{}".format(
+ prefix,
+ folder,
+ sha1hex[0:2],
+ sha1hex[2:4],
+ sha1hex,
+ extension,
+ )
+ if not bucket:
+ bucket = self.default_bucket
+ self.mc.put_object(
+ self.default_bucket,
+ obj_path,
+ blob,
+ len(blob),
+ )
+ return (bucket, obj_path)