aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--python/sandcrawler/db.py215
-rw-r--r--python/sandcrawler/persist.py6
2 files changed, 124 insertions, 97 deletions
diff --git a/python/sandcrawler/db.py b/python/sandcrawler/db.py
index 360add9..3ca2657 100644
--- a/python/sandcrawler/db.py
+++ b/python/sandcrawler/db.py
@@ -1,6 +1,6 @@
import datetime
import json
-from typing import Optional
+from typing import Any, Dict, List, Optional, Tuple
import psycopg2
import psycopg2.extras
@@ -8,38 +8,38 @@ import requests
class SandcrawlerPostgrestClient:
- def __init__(self, api_url="http://wbgrp-svc506.us.archive.org:3030", **kwargs):
+ def __init__(self, api_url: str = "http://wbgrp-svc506.us.archive.org:3030", **kwargs):
self.api_url = api_url
- def get_cdx(self, url):
+ def get_cdx(self, url: str) -> Optional[dict]:
resp = requests.get(self.api_url + "/cdx", params=dict(url='eq.' + url))
resp.raise_for_status()
return resp.json() or None
- def get_grobid(self, sha1):
+ def get_grobid(self, sha1: str) -> Optional[dict]:
resp = requests.get(self.api_url + "/grobid", params=dict(sha1hex='eq.' + sha1))
resp.raise_for_status()
- resp = resp.json()
- if resp:
- return resp[0]
+ resp_json = resp.json()
+ if resp_json:
+ return resp_json[0]
else:
return None
- def get_pdftrio(self, sha1):
+ def get_pdftrio(self, sha1: str) -> Optional[dict]:
resp = requests.get(self.api_url + "/pdftrio", params=dict(sha1hex='eq.' + sha1))
resp.raise_for_status()
- resp = resp.json()
- if resp:
- return resp[0]
+ resp_json = resp.json()
+ if resp_json:
+ return resp_json[0]
else:
return None
- def get_pdf_meta(self, sha1):
+ def get_pdf_meta(self, sha1: str) -> Optional[dict]:
resp = requests.get(self.api_url + "/pdf_meta", params=dict(sha1hex='eq.' + sha1))
resp.raise_for_status()
- resp = resp.json()
- if resp:
- return resp[0]
+ resp_json = resp.json()
+ if resp_json:
+ return resp_json[0]
else:
return None
@@ -55,12 +55,12 @@ class SandcrawlerPostgrestClient:
else:
return None
- def get_file_meta(self, sha1):
+ def get_file_meta(self, sha1: str) -> Optional[dict]:
resp = requests.get(self.api_url + "/file_meta", params=dict(sha1hex='eq.' + sha1))
resp.raise_for_status()
- resp = resp.json()
- if resp:
- return resp[0]
+ resp_json = resp.json()
+ if resp_json:
+ return resp_json[0]
else:
return None
@@ -88,12 +88,12 @@ class SandcrawlerPostgrestClient:
else:
return None
- def get_crossref(self, doi):
+ def get_crossref(self, doi: str) -> Optional[dict]:
resp = requests.get(self.api_url + "/crossref", params=dict(doi='eq.' + doi))
resp.raise_for_status()
- resp = resp.json()
- if resp:
- return resp[0]
+ resp_json = resp.json()
+ if resp_json:
+ return resp_json[0]
else:
return None
@@ -102,22 +102,25 @@ class SandcrawlerPostgresClient:
def __init__(self, db_url, **kwargs):
self.conn = psycopg2.connect(db_url)
- def cursor(self):
+ def cursor(self) -> psycopg2.extensions.cursor:
return self.conn.cursor()
- def commit(self):
- return self.conn.commit()
+ def commit(self) -> None:
+ self.conn.commit()
- def _inserts_and_updates(self, resp, on_conflict):
- resp = [int(r[0]) for r in resp]
- inserts = len([r for r in resp if r == 0])
+ def _inserts_and_updates(self, resp: List[Tuple[Any]], on_conflict: str):
+ resp_codes = [int(r[0]) for r in resp]
+ inserts = len([r for r in resp_codes if r == 0])
if on_conflict == "update":
- updates = len([r for r in resp if r != 0])
+ updates = len([r for r in resp_codes if r != 0])
else:
updates = 0
return (inserts, updates)
- def insert_cdx(self, cur, batch, on_conflict="nothing"):
+ def insert_cdx(self,
+ cur: psycopg2.extensions.cursor,
+ batch: List[Dict[str, Any]],
+ on_conflict: str = "nothing"):
sql = """
INSERT INTO
cdx (url, datetime, sha1hex, mimetype, warc_path, warc_csize, warc_offset)
@@ -133,17 +136,20 @@ class SandcrawlerPostgresClient:
batch = [d for d in batch if d.get('warc_path')]
if not batch:
return (0, 0)
- batch = [(d['url'], d['datetime'], d['sha1hex'], d['mimetype'], d['warc_path'],
- int(d['warc_csize']), int(d['warc_offset'])) for d in batch]
+ rows = [(d['url'], d['datetime'], d['sha1hex'], d['mimetype'], d['warc_path'],
+ int(d['warc_csize']), int(d['warc_offset'])) for d in batch]
# filter out duplicate rows by key (url, datetime)
- batch_dict = dict()
- for b in batch:
- batch_dict[(b[0], b[1])] = b
- batch = list(batch_dict.values())
- resp = psycopg2.extras.execute_values(cur, sql, batch, page_size=250, fetch=True)
+ row_dict = dict()
+ for b in rows:
+ row_dict[(b[0], b[1])] = b
+ rows = list(row_dict.values())
+ resp = psycopg2.extras.execute_values(cur, sql, rows, page_size=250, fetch=True)
return self._inserts_and_updates(resp, on_conflict)
- def insert_file_meta(self, cur, batch, on_conflict="nothing"):
+ def insert_file_meta(self,
+ cur: psycopg2.extensions.cursor,
+ batch: List[Dict[str, Any]],
+ on_conflict: str = "nothing"):
sql = """
INSERT INTO
file_meta(sha1hex, sha256hex, md5hex, size_bytes, mimetype)
@@ -162,17 +168,20 @@ class SandcrawlerPostgresClient:
else:
raise NotImplementedError("on_conflict: {}".format(on_conflict))
sql += " RETURNING xmax;"
- batch = [(d['sha1hex'], d['sha256hex'], d['md5hex'], int(d['size_bytes']),
- d['mimetype']) for d in batch]
+ rows = [(d['sha1hex'], d['sha256hex'], d['md5hex'], int(d['size_bytes']), d['mimetype'])
+ for d in batch]
# filter out duplicate rows by key (sha1hex)
- batch_dict = dict()
- for b in batch:
- batch_dict[b[0]] = b
- batch = list(batch_dict.values())
- resp = psycopg2.extras.execute_values(cur, sql, batch, page_size=250, fetch=True)
+ row_dict = dict()
+ for b in rows:
+ row_dict[b[0]] = b
+ rows = list(row_dict.values())
+ resp = psycopg2.extras.execute_values(cur, sql, rows, page_size=250, fetch=True)
return self._inserts_and_updates(resp, on_conflict)
- def insert_grobid(self, cur, batch, on_conflict="nothing"):
+ def insert_grobid(self,
+ cur: psycopg2.extensions.cursor,
+ batch: List[Dict[str, Any]],
+ on_conflict: str = "nothing"):
sql = """
INSERT INTO
grobid (sha1hex, grobid_version, status_code, status, fatcat_release, updated, metadata)
@@ -203,7 +212,7 @@ class SandcrawlerPostgresClient:
r[k] = r['metadata'].get(k)
r['metadata'].pop(k, None)
r['metadata'] = json.dumps(r['metadata'], sort_keys=True)
- batch = [(
+ rows = [(
d['key'],
d.get('grobid_version') or None,
d['status_code'],
@@ -213,14 +222,17 @@ class SandcrawlerPostgresClient:
d.get('metadata') or None,
) for d in batch]
# filter out duplicate rows by key (sha1hex)
- batch_dict = dict()
- for b in batch:
- batch_dict[b[0]] = b
- batch = list(batch_dict.values())
- resp = psycopg2.extras.execute_values(cur, sql, batch, page_size=250, fetch=True)
+ row_dict = dict()
+ for b in rows:
+ row_dict[b[0]] = b
+ rows = list(row_dict.values())
+ resp = psycopg2.extras.execute_values(cur, sql, rows, page_size=250, fetch=True)
return self._inserts_and_updates(resp, on_conflict)
- def insert_pdf_meta(self, cur, batch, on_conflict="nothing"):
+ def insert_pdf_meta(self,
+ cur: psycopg2.extensions.cursor,
+ rows: List[Tuple[Any]],
+ on_conflict: str = "nothing"):
"""
batch elements are expected to have .to_sql_tuple() method
"""
@@ -249,16 +261,18 @@ class SandcrawlerPostgresClient:
else:
raise NotImplementedError("on_conflict: {}".format(on_conflict))
sql += " RETURNING xmax;"
- batch = [d.to_sql_tuple() for d in batch]
# filter out duplicate rows by key (sha1hex)
- batch_dict = dict()
- for b in batch:
- batch_dict[b[0]] = b
- batch = list(batch_dict.values())
- resp = psycopg2.extras.execute_values(cur, sql, batch, page_size=250, fetch=True)
+ row_dict = dict()
+ for b in rows:
+ row_dict[b[0]] = b
+ rows = list(row_dict.values())
+ resp = psycopg2.extras.execute_values(cur, sql, rows, page_size=250, fetch=True)
return self._inserts_and_updates(resp, on_conflict)
- def insert_html_meta(self, cur, batch, on_conflict="nothing"):
+ def insert_html_meta(self,
+ cur: psycopg2.extensions.cursor,
+ rows: List[Tuple[Any]],
+ on_conflict: str = "nothing"):
"""
batch elements are expected to have .to_sql_tuple() method
"""
@@ -284,16 +298,18 @@ class SandcrawlerPostgresClient:
else:
raise NotImplementedError("on_conflict: {}".format(on_conflict))
sql += " RETURNING xmax;"
- batch = [d.to_sql_tuple() for d in batch]
# filter out duplicate rows by key (sha1hex)
- batch_dict = dict()
- for b in batch:
- batch_dict[b[0]] = b
- batch = list(batch_dict.values())
- resp = psycopg2.extras.execute_values(cur, sql, batch, page_size=250, fetch=True)
+ row_dict = dict()
+ for b in rows:
+ row_dict[b[0]] = b
+ rows = list(row_dict.values())
+ resp = psycopg2.extras.execute_values(cur, sql, rows, page_size=250, fetch=True)
return self._inserts_and_updates(resp, on_conflict)
- def insert_pdftrio(self, cur, batch, on_conflict="nothing"):
+ def insert_pdftrio(self,
+ cur: psycopg2.extensions.cursor,
+ batch: List[Dict[str, Any]],
+ on_conflict: str = "nothing"):
sql = """
INSERT INTO
pdftrio (sha1hex, updated, status_code, status, pdftrio_version,
@@ -319,7 +335,7 @@ class SandcrawlerPostgresClient:
else:
raise NotImplementedError("on_conflict: {}".format(on_conflict))
sql += " RETURNING xmax;"
- batch = [(
+ rows = [(
d['key'],
d.get('updated') or datetime.datetime.now(),
d['status_code'],
@@ -332,14 +348,17 @@ class SandcrawlerPostgresClient:
d.get('image_score'),
) for d in batch]
# filter out duplicate rows by key (sha1hex)
- batch_dict = dict()
- for b in batch:
- batch_dict[b[0]] = b
- batch = list(batch_dict.values())
- resp = psycopg2.extras.execute_values(cur, sql, batch, page_size=250, fetch=True)
+ row_dict = dict()
+ for b in rows:
+ row_dict[b[0]] = b
+ rows = list(row_dict.values())
+ resp = psycopg2.extras.execute_values(cur, sql, rows, page_size=250, fetch=True)
return self._inserts_and_updates(resp, on_conflict)
- def insert_ingest_request(self, cur, batch, on_conflict="nothing"):
+ def insert_ingest_request(self,
+ cur: psycopg2.extensions.cursor,
+ batch: List[Dict[str, Any]],
+ on_conflict: str = "nothing"):
sql = """
INSERT INTO
ingest_request (link_source, link_source_id, ingest_type, base_url, ingest_request_source, release_stage, request)
@@ -359,7 +378,7 @@ class SandcrawlerPostgresClient:
extra[k] = r[k]
if extra:
r['extra'] = json.dumps(extra, sort_keys=True)
- batch = [(
+ rows = [(
d['link_source'],
d['link_source_id'],
d['ingest_type'],
@@ -369,14 +388,17 @@ class SandcrawlerPostgresClient:
d.get('extra') or None,
) for d in batch]
# filter out duplicate rows by key (link_source, link_source_id, ingest_type, base_url)
- batch_dict = dict()
- for b in batch:
- batch_dict[(b[0], b[1], b[2], b[3])] = b
- batch = list(batch_dict.values())
- resp = psycopg2.extras.execute_values(cur, sql, batch, page_size=250, fetch=True)
+ row_dict = dict()
+ for b in rows:
+ row_dict[(b[0], b[1], b[2], b[3])] = b
+ rows = list(row_dict.values())
+ resp = psycopg2.extras.execute_values(cur, sql, rows, page_size=250, fetch=True)
return self._inserts_and_updates(resp, on_conflict)
- def insert_ingest_file_result(self, cur, batch, on_conflict="nothing"):
+ def insert_ingest_file_result(self,
+ cur: psycopg2.extensions.cursor,
+ batch: List[Dict[str, Any]],
+ on_conflict: str = "nothing"):
sql = """
INSERT INTO
ingest_file_result (ingest_type, base_url, hit, status, terminal_url, terminal_dt, terminal_status_code, terminal_sha1hex)
@@ -398,7 +420,7 @@ class SandcrawlerPostgresClient:
else:
raise NotImplementedError("on_conflict: {}".format(on_conflict))
sql += " RETURNING xmax;"
- batch = [(
+ rows = [(
d['ingest_type'],
d['base_url'],
bool(d['hit']),
@@ -409,14 +431,17 @@ class SandcrawlerPostgresClient:
d.get('terminal_sha1hex'),
) for d in batch]
# filter out duplicate rows by key (ingest_type, base_url)
- batch_dict = dict()
- for b in batch:
- batch_dict[(b[0], b[1])] = b
- batch = list(batch_dict.values())
- resp = psycopg2.extras.execute_values(cur, sql, batch, page_size=250, fetch=True)
+ row_dict = dict()
+ for b in rows:
+ row_dict[(b[0], b[1])] = b
+ rows = list(row_dict.values())
+ resp = psycopg2.extras.execute_values(cur, sql, rows, page_size=250, fetch=True)
return self._inserts_and_updates(resp, on_conflict)
- def insert_ingest_fileset_platform(self, cur, batch, on_conflict="nothing"):
+ def insert_ingest_fileset_platform(self,
+ cur: psycopg2.extensions.cursor,
+ batch: List[Dict[str, Any]],
+ on_conflict: str = "nothing"):
sql = """
INSERT INTO
ingest_fileset_platform (ingest_type, base_url, hit, status, platform_name, platform_domain, platform_id, ingest_strategy, total_size, file_count, archiveorg_item_name, archiveorg_item_bundle_path, web_bundle_url, web_bundle_dt, manifest)
@@ -445,7 +470,7 @@ class SandcrawlerPostgresClient:
else:
raise NotImplementedError("on_conflict: {}".format(on_conflict))
sql += " RETURNING xmax;"
- batch = [(
+ rows = [(
d['ingest_type'],
d['base_url'],
bool(d['hit']),
@@ -463,9 +488,9 @@ class SandcrawlerPostgresClient:
d.get('manifest'),
) for d in batch]
# filter out duplicate rows by key (ingest_type, base_url)
- batch_dict = dict()
- for b in batch:
- batch_dict[(b[0], b[1])] = b
- batch = list(batch_dict.values())
- resp = psycopg2.extras.execute_values(cur, sql, batch, page_size=250, fetch=True)
+ row_dict = dict()
+ for b in rows:
+ row_dict[(b[0], b[1])] = b
+ rows = list(row_dict.values())
+ resp = psycopg2.extras.execute_values(cur, sql, rows, page_size=250, fetch=True)
return self._inserts_and_updates(resp, on_conflict)
diff --git a/python/sandcrawler/persist.py b/python/sandcrawler/persist.py
index b714bc7..bb76e54 100644
--- a/python/sandcrawler/persist.py
+++ b/python/sandcrawler/persist.py
@@ -266,7 +266,8 @@ class PersistIngestFileResultWorker(SandcrawlerWorker):
self.result_to_html_meta(r) for r in batch if r.get('hit') and r.get('html_body')
]
if html_meta_batch:
- resp = self.db.insert_html_meta(self.cur, html_meta_batch, on_conflict="update")
+ rows = [d.to_sql_tuple() for d in html_meta_batch]
+ resp = self.db.insert_html_meta(self.cur, rows, on_conflict="update")
self.counts['insert-html_meta'] += resp[0]
self.counts['update-html_meta'] += resp[1]
@@ -534,7 +535,8 @@ class PersistPdfTextWorker(SandcrawlerWorker):
self.counts['s3-put'] += 1
if not self.s3_only:
- resp = self.db.insert_pdf_meta(self.cur, parsed_batch, on_conflict="update")
+ rows = [r.to_sql_tuple() for r in parsed_batch]
+ resp = self.db.insert_pdf_meta(self.cur, rows, on_conflict="update")
self.counts['insert-pdf-meta'] += resp[0]
self.counts['update-pdf-meta'] += resp[1]