aboutsummaryrefslogtreecommitdiffstats
path: root/python/sandcrawler/db.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/sandcrawler/db.py')
-rw-r--r--python/sandcrawler/db.py141
1 files changed, 141 insertions, 0 deletions
diff --git a/python/sandcrawler/db.py b/python/sandcrawler/db.py
new file mode 100644
index 0000000..104920b
--- /dev/null
+++ b/python/sandcrawler/db.py
@@ -0,0 +1,141 @@
+
+import json
+import psycopg2
+import psycopg2.extras
+import requests
+
+class SandcrawlerPostgrestClient:
+
+ def __init__(self, api_url="http://aitio.us.archive.org:3030", **kwargs):
+ self.api_uri = api_url
+
+ def get_cdx(self, url):
+ resp = requests.get(self.api_url + "/cdx", params=dict(url='eq.'+url))
+ resp.raise_for_status()
+ return resp.json() or None
+
+ def get_grobid(self, sha1):
+ resp = requests.get(self.api_url + "/grobid", params=dict(sha1hex='eq.'+sha1))
+ resp.raise_for_status()
+ resp = resp.json()
+ if resp:
+ return resp[0]
+ else:
+ return None
+
+ def get_file_meta(self, sha1):
+ resp = requests.get(self.api_url + "/file_meta", params=dict(sha1hex='eq.'+sha1))
+ resp.raise_for_status()
+ resp = resp.json()
+ if resp:
+ return resp[0]
+ else:
+ return None
+
+class SandcrawlerPostgresClient:
+
+ def __init__(self, db_url, **kwargs):
+ self.conn = psycopg2.connect(db_url)
+
+ def cursor(self):
+ return self.conn.cursor()
+
+ def commit(self):
+ return self.conn.commit()
+
+ def insert_cdx(self, cur, batch, on_conflict="NOTHING"):
+ sql = """
+ INSERT INTO
+ cdx (url, datetime, sha1hex, mimetype, warc_path, warc_csize, warc_offset)
+ VALUES %s
+ ON CONFLICT ON CONSTRAINT cdx_pkey DO {}
+ RETURNING 1;
+ """.format(on_conflict)
+ batch = [d for d in batch if d.get('warc_offset')]
+ if not batch:
+ return 0
+ batch = [(d['url'],
+ d['datetime'],
+ d['sha1hex'],
+ d['mimetype'],
+ d['warc_path'],
+ d['warc_csize'],
+ d['warc_offset'])
+ for d in batch]
+ res = psycopg2.extras.execute_values(cur, sql, batch) # fetch=True
+ #return len(res)
+
+ def insert_file_meta(self, cur, batch, on_conflict="NOTHING"):
+ sql = """
+ INSERT INTO
+ file_meta(sha1hex, sha256hex, md5hex, size_bytes, mimetype)
+ VALUES %s
+ ON CONFLICT (sha1hex) DO {};
+ """.format(on_conflict)
+ batch = [(d['sha1hex'],
+ d['sha256hex'],
+ d['md5hex'],
+ int(d['size_bytes']),
+ d['mimetype'])
+ for d in batch]
+ res = psycopg2.extras.execute_values(cur, sql, batch)
+
+ def insert_grobid(self, cur, batch, on_conflict="NOTHING"): # XXX
+ sql = """
+ INSERT INTO
+ grobid (sha1hex, grobid_version, status_code, status, fatcat_release, metadata)
+ VALUES %s
+ ON CONFLICT (sha1hex) DO {};
+ """.format(on_conflict)
+ for r in batch:
+ if r.get('metadata'):
+ r['metadata'] = json.dumps(r['metadata'], sort_keys=True)
+ batch = [(d['key'],
+ d.get('grobid_version') or None,
+ d['status_code'],
+ d['status'],
+ d.get('fatcat_release') or None,
+ d.get('metadata') or None ,
+ )
+ for d in batch]
+ res = psycopg2.extras.execute_values(cur, sql, batch)
+
+ def insert_ingest_request(self, cur, batch, on_conflict="NOTHING"):
+ sql = """
+ INSERT INTO
+ ingest_request (link_source, link_source_id, ingest_type, base_url, ingest_request_source, release_stage, request, metadata)
+ VALUES %s
+ ON CONFLICT ON CONSTRAINT ingest_request_pkey DO {};
+ """.format(on_conflict)
+ for r in batch:
+ if r.get('metadata'):
+ r['metadata'] = json.dumps(r['metadata'], sort_keys=True)
+ batch = [(d['link_source'],
+ d['link_source_id'],
+ d['ingest_type'],
+ d['base_url'],
+ d.get('ingest_request_source'),
+ d.get('release_stage') or None,
+ d.get('request') or None,
+ )
+ for d in batch]
+ res = psycopg2.extras.execute_values(cur, sql, batch)
+
+ def insert_ingest_file_result(self, cur, batch, on_conflict="NOTHING"):
+ sql = """
+ INSERT INTO
+ ingest_file_result (ingest_type, base_url, hit, status, terminal_url, terminal_dt, terminal_status_code, terminal_sha1hex)
+ VALUES %s
+ ON CONFLICT DO {};
+ """.format(on_conflict)
+ batch = [(d['ingest_type'],
+ d['base_url'],
+ bool(d['hit']),
+ d['status'],
+ d.get('terminal_url'),
+ d.get('terminal_dt'),
+ d.get('terminal_status_code'),
+ d.get('terminal_sha1hex'),
+ )
+ for d in batch]
+ res = psycopg2.extras.execute_values(cur, sql, batch)