aboutsummaryrefslogtreecommitdiffstats
path: root/python/sandcrawler/db.py
diff options
context:
space:
mode:
authorBryan Newbold <bnewbold@archive.org>2021-10-27 18:50:17 -0700
committerBryan Newbold <bnewbold@archive.org>2021-10-27 18:50:17 -0700
commit826c7538e091fac14d987a3cd654975da964e240 (patch)
tree90345b4cabb461c624ca5a218c2fc01dce3055cd /python/sandcrawler/db.py
parent020037d4714e7ba2ab172c7278494aed0b2148ad (diff)
downloadsandcrawler-826c7538e091fac14d987a3cd654975da964e240.tar.gz
sandcrawler-826c7538e091fac14d987a3cd654975da964e240.zip
make fmt (black 21.9b0)
Diffstat (limited to 'python/sandcrawler/db.py')
-rw-r--r--python/sandcrawler/db.py263
1 files changed, 151 insertions, 112 deletions
diff --git a/python/sandcrawler/db.py b/python/sandcrawler/db.py
index ee4d3bf..69d2116 100644
--- a/python/sandcrawler/db.py
+++ b/python/sandcrawler/db.py
@@ -12,12 +12,12 @@ class SandcrawlerPostgrestClient:
self.api_url = api_url
def get_cdx(self, url: str) -> Optional[dict]:
- resp = requests.get(self.api_url + "/cdx", params=dict(url='eq.' + url))
+ resp = requests.get(self.api_url + "/cdx", params=dict(url="eq." + url))
resp.raise_for_status()
return resp.json() or None
def get_grobid(self, sha1: str) -> Optional[dict]:
- resp = requests.get(self.api_url + "/grobid", params=dict(sha1hex='eq.' + sha1))
+ resp = requests.get(self.api_url + "/grobid", params=dict(sha1hex="eq." + sha1))
resp.raise_for_status()
resp_json = resp.json()
if resp_json:
@@ -26,7 +26,7 @@ class SandcrawlerPostgrestClient:
return None
def get_pdftrio(self, sha1: str) -> Optional[dict]:
- resp = requests.get(self.api_url + "/pdftrio", params=dict(sha1hex='eq.' + sha1))
+ resp = requests.get(self.api_url + "/pdftrio", params=dict(sha1hex="eq." + sha1))
resp.raise_for_status()
resp_json = resp.json()
if resp_json:
@@ -35,7 +35,7 @@ class SandcrawlerPostgrestClient:
return None
def get_pdf_meta(self, sha1: str) -> Optional[dict]:
- resp = requests.get(self.api_url + "/pdf_meta", params=dict(sha1hex='eq.' + sha1))
+ resp = requests.get(self.api_url + "/pdf_meta", params=dict(sha1hex="eq." + sha1))
resp.raise_for_status()
resp_json = resp.json()
if resp_json:
@@ -56,7 +56,7 @@ class SandcrawlerPostgrestClient:
return None
def get_file_meta(self, sha1: str) -> Optional[dict]:
- resp = requests.get(self.api_url + "/file_meta", params=dict(sha1hex='eq.' + sha1))
+ resp = requests.get(self.api_url + "/file_meta", params=dict(sha1hex="eq." + sha1))
resp.raise_for_status()
resp_json = resp.json()
if resp_json:
@@ -89,7 +89,7 @@ class SandcrawlerPostgrestClient:
return None
def get_crossref(self, doi: str) -> Optional[dict]:
- resp = requests.get(self.api_url + "/crossref", params=dict(doi='eq.' + doi))
+ resp = requests.get(self.api_url + "/crossref", params=dict(doi="eq." + doi))
resp.raise_for_status()
resp_json = resp.json()
if resp_json:
@@ -117,10 +117,12 @@ class SandcrawlerPostgresClient:
updates = 0
return (inserts, updates)
- def insert_cdx(self,
- cur: psycopg2.extensions.cursor,
- batch: List[Dict[str, Any]],
- on_conflict: str = "nothing") -> Tuple[int, int]:
+ def insert_cdx(
+ self,
+ cur: psycopg2.extensions.cursor,
+ batch: List[Dict[str, Any]],
+ on_conflict: str = "nothing",
+ ) -> Tuple[int, int]:
sql = """
INSERT INTO
cdx (url, datetime, sha1hex, mimetype, warc_path, warc_csize, warc_offset)
@@ -133,11 +135,21 @@ class SandcrawlerPostgresClient:
raise NotImplementedError("on_conflict: {}".format(on_conflict))
sql += " RETURNING xmax;"
- batch = [d for d in batch if d.get('warc_path')]
+ batch = [d for d in batch if d.get("warc_path")]
if not batch:
return (0, 0)
- rows = [(d['url'], d['datetime'], d['sha1hex'], d['mimetype'], d['warc_path'],
- int(d['warc_csize']), int(d['warc_offset'])) for d in batch]
+ rows = [
+ (
+ d["url"],
+ d["datetime"],
+ d["sha1hex"],
+ d["mimetype"],
+ d["warc_path"],
+ int(d["warc_csize"]),
+ int(d["warc_offset"]),
+ )
+ for d in batch
+ ]
# filter out duplicate rows by key (url, datetime)
row_dict = dict()
for b in rows:
@@ -146,10 +158,12 @@ class SandcrawlerPostgresClient:
resp = psycopg2.extras.execute_values(cur, sql, rows, page_size=250, fetch=True)
return self._inserts_and_updates(resp, on_conflict)
- def insert_file_meta(self,
- cur: psycopg2.extensions.cursor,
- batch: List[Dict[str, Any]],
- on_conflict: str = "nothing") -> Tuple[int, int]:
+ def insert_file_meta(
+ self,
+ cur: psycopg2.extensions.cursor,
+ batch: List[Dict[str, Any]],
+ on_conflict: str = "nothing",
+ ) -> Tuple[int, int]:
sql = """
INSERT INTO
file_meta(sha1hex, sha256hex, md5hex, size_bytes, mimetype)
@@ -168,8 +182,10 @@ class SandcrawlerPostgresClient:
else:
raise NotImplementedError("on_conflict: {}".format(on_conflict))
sql += " RETURNING xmax;"
- rows = [(d['sha1hex'], d['sha256hex'], d['md5hex'], int(d['size_bytes']), d['mimetype'])
- for d in batch]
+ rows = [
+ (d["sha1hex"], d["sha256hex"], d["md5hex"], int(d["size_bytes"]), d["mimetype"])
+ for d in batch
+ ]
# filter out duplicate rows by key (sha1hex)
row_dict = dict()
for b in rows:
@@ -178,10 +194,12 @@ class SandcrawlerPostgresClient:
resp = psycopg2.extras.execute_values(cur, sql, rows, page_size=250, fetch=True)
return self._inserts_and_updates(resp, on_conflict)
- def insert_grobid(self,
- cur: psycopg2.extensions.cursor,
- batch: List[Dict[str, Any]],
- on_conflict: str = "nothing") -> Tuple[int, int]:
+ def insert_grobid(
+ self,
+ cur: psycopg2.extensions.cursor,
+ batch: List[Dict[str, Any]],
+ on_conflict: str = "nothing",
+ ) -> Tuple[int, int]:
sql = """
INSERT INTO
grobid (sha1hex, grobid_version, status_code, status, fatcat_release, updated, metadata)
@@ -203,24 +221,27 @@ class SandcrawlerPostgresClient:
raise NotImplementedError("on_conflict: {}".format(on_conflict))
sql += " RETURNING xmax;"
for r in batch:
- if r.get('metadata'):
+ if r.get("metadata"):
# sometimes these are only in metadata; shouldn't pass through
# though (to save database space)
- dupe_fields = ('fatcat_release', 'grobid_version')
+ dupe_fields = ("fatcat_release", "grobid_version")
for k in dupe_fields:
if k not in r:
- r[k] = r['metadata'].get(k)
- r['metadata'].pop(k, None)
- r['metadata'] = json.dumps(r['metadata'], sort_keys=True)
- rows = [(
- d['key'],
- d.get('grobid_version') or None,
- d['status_code'],
- d['status'],
- d.get('fatcat_release') or None,
- d.get('updated') or datetime.datetime.now(),
- d.get('metadata') or None,
- ) for d in batch]
+ r[k] = r["metadata"].get(k)
+ r["metadata"].pop(k, None)
+ r["metadata"] = json.dumps(r["metadata"], sort_keys=True)
+ rows = [
+ (
+ d["key"],
+ d.get("grobid_version") or None,
+ d["status_code"],
+ d["status"],
+ d.get("fatcat_release") or None,
+ d.get("updated") or datetime.datetime.now(),
+ d.get("metadata") or None,
+ )
+ for d in batch
+ ]
# filter out duplicate rows by key (sha1hex)
row_dict = dict()
for b in rows:
@@ -229,10 +250,9 @@ class SandcrawlerPostgresClient:
resp = psycopg2.extras.execute_values(cur, sql, rows, page_size=250, fetch=True)
return self._inserts_and_updates(resp, on_conflict)
- def insert_pdf_meta(self,
- cur: psycopg2.extensions.cursor,
- rows: List[Tuple],
- on_conflict: str = "nothing") -> Tuple[int, int]:
+ def insert_pdf_meta(
+ self, cur: psycopg2.extensions.cursor, rows: List[Tuple], on_conflict: str = "nothing"
+ ) -> Tuple[int, int]:
"""
batch elements are expected to have .to_sql_tuple() method
"""
@@ -269,10 +289,9 @@ class SandcrawlerPostgresClient:
resp = psycopg2.extras.execute_values(cur, sql, rows, page_size=250, fetch=True)
return self._inserts_and_updates(resp, on_conflict)
- def insert_html_meta(self,
- cur: psycopg2.extensions.cursor,
- rows: List[Tuple],
- on_conflict: str = "nothing") -> Tuple[int, int]:
+ def insert_html_meta(
+ self, cur: psycopg2.extensions.cursor, rows: List[Tuple], on_conflict: str = "nothing"
+ ) -> Tuple[int, int]:
"""
batch elements are expected to have .to_sql_tuple() method
"""
@@ -306,10 +325,12 @@ class SandcrawlerPostgresClient:
resp = psycopg2.extras.execute_values(cur, sql, rows, page_size=250, fetch=True)
return self._inserts_and_updates(resp, on_conflict)
- def insert_pdftrio(self,
- cur: psycopg2.extensions.cursor,
- batch: List[Dict[str, Any]],
- on_conflict: str = "nothing") -> Tuple[int, int]:
+ def insert_pdftrio(
+ self,
+ cur: psycopg2.extensions.cursor,
+ batch: List[Dict[str, Any]],
+ on_conflict: str = "nothing",
+ ) -> Tuple[int, int]:
sql = """
INSERT INTO
pdftrio (sha1hex, updated, status_code, status, pdftrio_version,
@@ -335,18 +356,21 @@ class SandcrawlerPostgresClient:
else:
raise NotImplementedError("on_conflict: {}".format(on_conflict))
sql += " RETURNING xmax;"
- rows = [(
- d['key'],
- d.get('updated') or datetime.datetime.now(),
- d['status_code'],
- d['status'],
- d.get('versions', {}).get('pdftrio_version') or None,
- d.get('versions', {}).get('models_date') or None,
- d.get('ensemble_score'),
- d.get('bert_score'),
- d.get('linear_score'),
- d.get('image_score'),
- ) for d in batch]
+ rows = [
+ (
+ d["key"],
+ d.get("updated") or datetime.datetime.now(),
+ d["status_code"],
+ d["status"],
+ d.get("versions", {}).get("pdftrio_version") or None,
+ d.get("versions", {}).get("models_date") or None,
+ d.get("ensemble_score"),
+ d.get("bert_score"),
+ d.get("linear_score"),
+ d.get("image_score"),
+ )
+ for d in batch
+ ]
# filter out duplicate rows by key (sha1hex)
row_dict = dict()
for b in rows:
@@ -355,10 +379,12 @@ class SandcrawlerPostgresClient:
resp = psycopg2.extras.execute_values(cur, sql, rows, page_size=250, fetch=True)
return self._inserts_and_updates(resp, on_conflict)
- def insert_ingest_request(self,
- cur: psycopg2.extensions.cursor,
- batch: List[Dict[str, Any]],
- on_conflict: str = "nothing") -> Tuple[int, int]:
+ def insert_ingest_request(
+ self,
+ cur: psycopg2.extensions.cursor,
+ batch: List[Dict[str, Any]],
+ on_conflict: str = "nothing",
+ ) -> Tuple[int, int]:
sql = """
INSERT INTO
ingest_request (link_source, link_source_id, ingest_type, base_url, ingest_request_source, release_stage, request)
@@ -372,21 +398,24 @@ class SandcrawlerPostgresClient:
sql += " RETURNING xmax;"
for r in batch:
# in case these fields were already packed into 'request'
- extra = r.get('request', {})
- for k in ('ext_ids', 'fatcat_release', 'edit_extra', 'rel'):
+ extra = r.get("request", {})
+ for k in ("ext_ids", "fatcat_release", "edit_extra", "rel"):
if r.get(k):
extra[k] = r[k]
if extra:
- r['extra'] = json.dumps(extra, sort_keys=True)
- rows = [(
- d['link_source'],
- d['link_source_id'],
- d['ingest_type'],
- d['base_url'],
- d.get('ingest_request_source'),
- d.get('release_stage') or None,
- d.get('extra') or None,
- ) for d in batch]
+ r["extra"] = json.dumps(extra, sort_keys=True)
+ rows = [
+ (
+ d["link_source"],
+ d["link_source_id"],
+ d["ingest_type"],
+ d["base_url"],
+ d.get("ingest_request_source"),
+ d.get("release_stage") or None,
+ d.get("extra") or None,
+ )
+ for d in batch
+ ]
# filter out duplicate rows by key (link_source, link_source_id, ingest_type, base_url)
row_dict = dict()
for b in rows:
@@ -395,10 +424,12 @@ class SandcrawlerPostgresClient:
resp = psycopg2.extras.execute_values(cur, sql, rows, page_size=250, fetch=True)
return self._inserts_and_updates(resp, on_conflict)
- def insert_ingest_file_result(self,
- cur: psycopg2.extensions.cursor,
- batch: List[Dict[str, Any]],
- on_conflict: str = "nothing") -> Tuple[int, int]:
+ def insert_ingest_file_result(
+ self,
+ cur: psycopg2.extensions.cursor,
+ batch: List[Dict[str, Any]],
+ on_conflict: str = "nothing",
+ ) -> Tuple[int, int]:
sql = """
INSERT INTO
ingest_file_result (ingest_type, base_url, hit, status, terminal_url, terminal_dt, terminal_status_code, terminal_sha1hex)
@@ -420,16 +451,19 @@ class SandcrawlerPostgresClient:
else:
raise NotImplementedError("on_conflict: {}".format(on_conflict))
sql += " RETURNING xmax;"
- rows = [(
- d['ingest_type'],
- d['base_url'],
- bool(d['hit']),
- d['status'],
- d.get('terminal_url'),
- d.get('terminal_dt'),
- d.get('terminal_status_code'),
- d.get('terminal_sha1hex'),
- ) for d in batch]
+ rows = [
+ (
+ d["ingest_type"],
+ d["base_url"],
+ bool(d["hit"]),
+ d["status"],
+ d.get("terminal_url"),
+ d.get("terminal_dt"),
+ d.get("terminal_status_code"),
+ d.get("terminal_sha1hex"),
+ )
+ for d in batch
+ ]
# filter out duplicate rows by key (ingest_type, base_url)
row_dict = dict()
for b in rows:
@@ -438,10 +472,12 @@ class SandcrawlerPostgresClient:
resp = psycopg2.extras.execute_values(cur, sql, rows, page_size=250, fetch=True)
return self._inserts_and_updates(resp, on_conflict)
- def insert_ingest_fileset_platform(self,
- cur: psycopg2.extensions.cursor,
- batch: List[Dict[str, Any]],
- on_conflict: str = "nothing") -> Tuple[int, int]:
+ def insert_ingest_fileset_platform(
+ self,
+ cur: psycopg2.extensions.cursor,
+ batch: List[Dict[str, Any]],
+ on_conflict: str = "nothing",
+ ) -> Tuple[int, int]:
sql = """
INSERT INTO
ingest_fileset_platform (ingest_type, base_url, hit, status, platform_name, platform_domain, platform_id, ingest_strategy, total_size, file_count, archiveorg_item_name, archiveorg_item_bundle_path, web_bundle_url, web_bundle_dt, manifest)
@@ -470,23 +506,26 @@ class SandcrawlerPostgresClient:
else:
raise NotImplementedError("on_conflict: {}".format(on_conflict))
sql += " RETURNING xmax;"
- rows = [(
- d['ingest_type'],
- d['base_url'],
- bool(d['hit']),
- d['status'],
- d.get('platform_name'),
- d.get('platform_domain'),
- d.get('platform_id'),
- d.get('ingest_strategy'),
- d.get('total_size'),
- d.get('file_count'),
- d.get('archiveorg_item_name'),
- d.get('archiveorg_item_bundle_path'),
- d.get('web_bundle_url'),
- d.get('web_bundle_dt'),
- d.get('manifest'),
- ) for d in batch]
+ rows = [
+ (
+ d["ingest_type"],
+ d["base_url"],
+ bool(d["hit"]),
+ d["status"],
+ d.get("platform_name"),
+ d.get("platform_domain"),
+ d.get("platform_id"),
+ d.get("ingest_strategy"),
+ d.get("total_size"),
+ d.get("file_count"),
+ d.get("archiveorg_item_name"),
+ d.get("archiveorg_item_bundle_path"),
+ d.get("web_bundle_url"),
+ d.get("web_bundle_dt"),
+ d.get("manifest"),
+ )
+ for d in batch
+ ]
# filter out duplicate rows by key (ingest_type, base_url)
row_dict = dict()
for b in rows: