aboutsummaryrefslogtreecommitdiffstats
path: root/python/sandcrawler/db.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/sandcrawler/db.py')
-rw-r--r--python/sandcrawler/db.py549
1 files changed, 395 insertions, 154 deletions
diff --git a/python/sandcrawler/db.py b/python/sandcrawler/db.py
index 066e53b..f9018ec 100644
--- a/python/sandcrawler/db.py
+++ b/python/sandcrawler/db.py
@@ -1,51 +1,58 @@
-
-import json
import datetime
-from typing import Optional
+import json
+from typing import Any, Dict, List, Optional, Tuple
import psycopg2
import psycopg2.extras
-import requests
-class SandcrawlerPostgrestClient:
+from .misc import requests_retry_session
- def __init__(self, api_url="http://aitio.us.archive.org:3030", **kwargs):
+
+class SandcrawlerPostgrestClient:
+ def __init__(self, api_url: str = "http://wbgrp-svc506.us.archive.org:3030", **kwargs):
self.api_url = api_url
+ self.http_session = requests_retry_session()
- def get_cdx(self, url):
- resp = requests.get(self.api_url + "/cdx", params=dict(url='eq.'+url))
+ def get_cdx(self, url: str) -> Optional[dict]:
+ resp = self.http_session.get(self.api_url + "/cdx", params=dict(url="eq." + url))
resp.raise_for_status()
return resp.json() or None
- def get_grobid(self, sha1):
- resp = requests.get(self.api_url + "/grobid", params=dict(sha1hex='eq.'+sha1))
+ def get_grobid(self, sha1: str) -> Optional[dict]:
+ resp = self.http_session.get(
+ self.api_url + "/grobid", params=dict(sha1hex="eq." + sha1)
+ )
resp.raise_for_status()
- resp = resp.json()
- if resp:
- return resp[0]
+ resp_json = resp.json()
+ if resp_json:
+ return resp_json[0]
else:
return None
- def get_pdftrio(self, sha1):
- resp = requests.get(self.api_url + "/pdftrio", params=dict(sha1hex='eq.'+sha1))
+ def get_pdftrio(self, sha1: str) -> Optional[dict]:
+ resp = self.http_session.get(
+ self.api_url + "/pdftrio", params=dict(sha1hex="eq." + sha1)
+ )
resp.raise_for_status()
- resp = resp.json()
- if resp:
- return resp[0]
+ resp_json = resp.json()
+ if resp_json:
+ return resp_json[0]
else:
return None
- def get_pdf_meta(self, sha1):
- resp = requests.get(self.api_url + "/pdf_meta", params=dict(sha1hex='eq.'+sha1))
+ def get_pdf_meta(self, sha1: str) -> Optional[dict]:
+ resp = self.http_session.get(
+ self.api_url + "/pdf_meta", params=dict(sha1hex="eq." + sha1)
+ )
resp.raise_for_status()
- resp = resp.json()
- if resp:
- return resp[0]
+ resp_json = resp.json()
+ if resp_json:
+ return resp_json[0]
else:
return None
def get_html_meta(self, sha1hex: str) -> Optional[dict]:
- resp = requests.get(
+ resp = self.http_session.get(
self.api_url + "/html_meta",
params=dict(sha1hex=f"eq.{sha1hex}"),
)
@@ -56,17 +63,19 @@ class SandcrawlerPostgrestClient:
else:
return None
- def get_file_meta(self, sha1):
- resp = requests.get(self.api_url + "/file_meta", params=dict(sha1hex='eq.'+sha1))
+ def get_file_meta(self, sha1: str) -> Optional[dict]:
+ resp = self.http_session.get(
+ self.api_url + "/file_meta", params=dict(sha1hex="eq." + sha1)
+ )
resp.raise_for_status()
- resp = resp.json()
- if resp:
- return resp[0]
+ resp_json = resp.json()
+ if resp_json:
+ return resp_json[0]
else:
return None
def get_ingest_file_result(self, ingest_type: str, url: str) -> Optional[dict]:
- resp = requests.get(
+ resp = self.http_session.get(
self.api_url + "/ingest_file_result",
params=dict(ingest_type=f"eq.{ingest_type}", base_url=f"eq.{url}"),
)
@@ -77,27 +86,76 @@ class SandcrawlerPostgrestClient:
else:
return None
-class SandcrawlerPostgresClient:
+ def get_ingest_fileset_platform(self, ingest_type: str, url: str) -> Optional[dict]:
+ resp = self.http_session.get(
+ self.api_url + "/ingest_fileset_platform",
+ params=dict(ingest_type=f"eq.{ingest_type}", base_url=f"eq.{url}"),
+ )
+ resp.raise_for_status()
+ resp_json = resp.json()
+ if resp_json:
+ return resp_json[0]
+ else:
+ return None
+
+ def get_crossref(self, doi: str) -> Optional[dict]:
+ resp = self.http_session.get(self.api_url + "/crossref", params=dict(doi=f"eq.{doi}"))
+ resp.raise_for_status()
+ resp_json = resp.json()
+ if resp_json:
+ return resp_json[0]
+ else:
+ return None
+
+ def get_crossref_with_refs(self, doi: str) -> Optional[dict]:
+ resp = self.http_session.get(
+ self.api_url + "/crossref_with_refs", params=dict(doi=f"eq.{doi}")
+ )
+ resp.raise_for_status()
+ resp_json = resp.json()
+ if resp_json:
+ return resp_json[0]
+ else:
+ return None
- def __init__(self, db_url, **kwargs):
+ def get_grobid_refs(self, source: str, source_id: str) -> Optional[dict]:
+ resp = self.http_session.get(
+ self.api_url + "/grobid_refs",
+ params=dict(source=f"eq.{source}", source_id=f"eq.{source_id}"),
+ )
+ resp.raise_for_status()
+ resp_json = resp.json()
+ if resp_json:
+ return resp_json[0]
+ else:
+ return None
+
+
+class SandcrawlerPostgresClient:
+ def __init__(self, db_url: str, **kwargs):
self.conn = psycopg2.connect(db_url)
- def cursor(self):
+ def cursor(self) -> psycopg2.extensions.cursor:
return self.conn.cursor()
- def commit(self):
- return self.conn.commit()
+ def commit(self) -> None:
+ self.conn.commit()
- def _inserts_and_updates(self, resp, on_conflict):
- resp = [int(r[0]) for r in resp]
- inserts = len([r for r in resp if r == 0])
+ def _inserts_and_updates(self, resp: List[Tuple], on_conflict: str) -> Tuple[int, int]:
+ resp_codes = [int(r[0]) for r in resp]
+ inserts = len([r for r in resp_codes if r == 0])
if on_conflict == "update":
- updates = len([r for r in resp if r != 0])
+ updates = len([r for r in resp_codes if r != 0])
else:
updates = 0
return (inserts, updates)
- def insert_cdx(self, cur, batch, on_conflict="nothing"):
+ def insert_cdx(
+ self,
+ cur: psycopg2.extensions.cursor,
+ batch: List[Dict[str, Any]],
+ on_conflict: str = "nothing",
+ ) -> Tuple[int, int]:
sql = """
INSERT INTO
cdx (url, datetime, sha1hex, mimetype, warc_path, warc_csize, warc_offset)
@@ -110,26 +168,35 @@ class SandcrawlerPostgresClient:
raise NotImplementedError("on_conflict: {}".format(on_conflict))
sql += " RETURNING xmax;"
- batch = [d for d in batch if d.get('warc_path')]
+ batch = [d for d in batch if d.get("warc_path")]
if not batch:
return (0, 0)
- batch = [(d['url'],
- d['datetime'],
- d['sha1hex'],
- d['mimetype'],
- d['warc_path'],
- int(d['warc_csize']),
- int(d['warc_offset']))
- for d in batch]
+ rows = [
+ (
+ d["url"],
+ d["datetime"],
+ d["sha1hex"],
+ d["mimetype"],
+ d["warc_path"],
+ int(d["warc_csize"]),
+ int(d["warc_offset"]),
+ )
+ for d in batch
+ ]
# filter out duplicate rows by key (url, datetime)
- batch_dict = dict()
- for b in batch:
- batch_dict[(b[0], b[1])] = b
- batch = list(batch_dict.values())
- resp = psycopg2.extras.execute_values(cur, sql, batch, page_size=250, fetch=True)
+ row_dict = dict()
+ for b in rows:
+ row_dict[(b[0], b[1])] = b
+ rows = list(row_dict.values())
+ resp = psycopg2.extras.execute_values(cur, sql, rows, page_size=250, fetch=True)
return self._inserts_and_updates(resp, on_conflict)
- def insert_file_meta(self, cur, batch, on_conflict="nothing"):
+ def insert_file_meta(
+ self,
+ cur: psycopg2.extensions.cursor,
+ batch: List[Dict[str, Any]],
+ on_conflict: str = "nothing",
+ ) -> Tuple[int, int]:
sql = """
INSERT INTO
file_meta(sha1hex, sha256hex, md5hex, size_bytes, mimetype)
@@ -148,21 +215,24 @@ class SandcrawlerPostgresClient:
else:
raise NotImplementedError("on_conflict: {}".format(on_conflict))
sql += " RETURNING xmax;"
- batch = [(d['sha1hex'],
- d['sha256hex'],
- d['md5hex'],
- int(d['size_bytes']),
- d['mimetype'])
- for d in batch]
+ rows = [
+ (d["sha1hex"], d["sha256hex"], d["md5hex"], int(d["size_bytes"]), d["mimetype"])
+ for d in batch
+ ]
# filter out duplicate rows by key (sha1hex)
- batch_dict = dict()
- for b in batch:
- batch_dict[b[0]] = b
- batch = list(batch_dict.values())
- resp = psycopg2.extras.execute_values(cur, sql, batch, page_size=250, fetch=True)
+ row_dict = dict()
+ for b in rows:
+ row_dict[b[0]] = b
+ rows = list(row_dict.values())
+ resp = psycopg2.extras.execute_values(cur, sql, rows, page_size=250, fetch=True)
return self._inserts_and_updates(resp, on_conflict)
- def insert_grobid(self, cur, batch, on_conflict="nothing"):
+ def insert_grobid(
+ self,
+ cur: psycopg2.extensions.cursor,
+ batch: List[Dict[str, Any]],
+ on_conflict: str = "nothing",
+ ) -> Tuple[int, int]:
sql = """
INSERT INTO
grobid (sha1hex, grobid_version, status_code, status, fatcat_release, updated, metadata)
@@ -184,33 +254,39 @@ class SandcrawlerPostgresClient:
raise NotImplementedError("on_conflict: {}".format(on_conflict))
sql += " RETURNING xmax;"
for r in batch:
- if r.get('metadata'):
+ if r.get("metadata"):
# sometimes these are only in metadata; shouldn't pass through
# though (to save database space)
- dupe_fields = ('fatcat_release', 'grobid_version')
+ dupe_fields = ("fatcat_release", "grobid_version")
for k in dupe_fields:
- if not k in r:
- r[k] = r['metadata'].get(k)
- r['metadata'].pop(k, None)
- r['metadata'] = json.dumps(r['metadata'], sort_keys=True)
- batch = [(d['key'],
- d.get('grobid_version') or None,
- d['status_code'],
- d['status'],
- d.get('fatcat_release') or None,
- d.get('updated') or datetime.datetime.now(),
- d.get('metadata') or None ,
- )
- for d in batch]
+ if k not in r:
+ r[k] = r["metadata"].get(k)
+ r["metadata"].pop(k, None)
+ r["metadata"] = json.dumps(r["metadata"], sort_keys=True)
+ now = datetime.datetime.now()
+ rows = [
+ (
+ d["key"],
+ d.get("grobid_version") or None,
+ d["status_code"],
+ d["status"],
+ d.get("fatcat_release") or None,
+ d.get("updated") or now,
+ d.get("metadata") or None,
+ )
+ for d in batch
+ ]
# filter out duplicate rows by key (sha1hex)
- batch_dict = dict()
- for b in batch:
- batch_dict[b[0]] = b
- batch = list(batch_dict.values())
- resp = psycopg2.extras.execute_values(cur, sql, batch, page_size=250, fetch=True)
+ row_dict = dict()
+ for b in rows:
+ row_dict[b[0]] = b
+ rows = list(row_dict.values())
+ resp = psycopg2.extras.execute_values(cur, sql, rows, page_size=250, fetch=True)
return self._inserts_and_updates(resp, on_conflict)
- def insert_pdf_meta(self, cur, batch, on_conflict="nothing"):
+ def insert_pdf_meta(
+ self, cur: psycopg2.extensions.cursor, rows: List[Tuple], on_conflict: str = "nothing"
+ ) -> Tuple[int, int]:
"""
batch elements are expected to have .to_sql_tuple() method
"""
@@ -239,16 +315,17 @@ class SandcrawlerPostgresClient:
else:
raise NotImplementedError("on_conflict: {}".format(on_conflict))
sql += " RETURNING xmax;"
- batch = [d.to_sql_tuple() for d in batch]
# filter out duplicate rows by key (sha1hex)
- batch_dict = dict()
- for b in batch:
- batch_dict[b[0]] = b
- batch = list(batch_dict.values())
- resp = psycopg2.extras.execute_values(cur, sql, batch, page_size=250, fetch=True)
+ row_dict = dict()
+ for b in rows:
+ row_dict[b[0]] = b
+ rows = list(row_dict.values())
+ resp = psycopg2.extras.execute_values(cur, sql, rows, page_size=250, fetch=True)
return self._inserts_and_updates(resp, on_conflict)
- def insert_html_meta(self, cur, batch, on_conflict="nothing"):
+ def insert_html_meta(
+ self, cur: psycopg2.extensions.cursor, rows: List[Tuple], on_conflict: str = "nothing"
+ ) -> Tuple[int, int]:
"""
batch elements are expected to have .to_sql_tuple() method
"""
@@ -274,16 +351,20 @@ class SandcrawlerPostgresClient:
else:
raise NotImplementedError("on_conflict: {}".format(on_conflict))
sql += " RETURNING xmax;"
- batch = [d.to_sql_tuple() for d in batch]
# filter out duplicate rows by key (sha1hex)
- batch_dict = dict()
- for b in batch:
- batch_dict[b[0]] = b
- batch = list(batch_dict.values())
- resp = psycopg2.extras.execute_values(cur, sql, batch, page_size=250, fetch=True)
+ row_dict = dict()
+ for b in rows:
+ row_dict[b[0]] = b
+ rows = list(row_dict.values())
+ resp = psycopg2.extras.execute_values(cur, sql, rows, page_size=250, fetch=True)
return self._inserts_and_updates(resp, on_conflict)
- def insert_pdftrio(self, cur, batch, on_conflict="nothing"):
+ def insert_pdftrio(
+ self,
+ cur: psycopg2.extensions.cursor,
+ batch: List[Dict[str, Any]],
+ on_conflict: str = "nothing",
+ ) -> Tuple[int, int]:
sql = """
INSERT INTO
pdftrio (sha1hex, updated, status_code, status, pdftrio_version,
@@ -309,29 +390,36 @@ class SandcrawlerPostgresClient:
else:
raise NotImplementedError("on_conflict: {}".format(on_conflict))
sql += " RETURNING xmax;"
- batch = [
+ now = datetime.datetime.now()
+ rows = [
(
- d['key'],
- d.get('updated') or datetime.datetime.now(),
- d['status_code'],
- d['status'],
- d.get('versions', {}).get('pdftrio_version') or None,
- d.get('versions', {}).get('models_date') or None,
- d.get('ensemble_score'),
- d.get('bert_score'),
- d.get('linear_score'),
- d.get('image_score'),
+ d["key"],
+ d.get("updated") or now,
+ d["status_code"],
+ d["status"],
+ d.get("versions", {}).get("pdftrio_version") or None,
+ d.get("versions", {}).get("models_date") or None,
+ d.get("ensemble_score"),
+ d.get("bert_score"),
+ d.get("linear_score"),
+ d.get("image_score"),
)
- for d in batch]
+ for d in batch
+ ]
# filter out duplicate rows by key (sha1hex)
- batch_dict = dict()
- for b in batch:
- batch_dict[b[0]] = b
- batch = list(batch_dict.values())
- resp = psycopg2.extras.execute_values(cur, sql, batch, page_size=250, fetch=True)
+ row_dict = dict()
+ for b in rows:
+ row_dict[b[0]] = b
+ rows = list(row_dict.values())
+ resp = psycopg2.extras.execute_values(cur, sql, rows, page_size=250, fetch=True)
return self._inserts_and_updates(resp, on_conflict)
- def insert_ingest_request(self, cur, batch, on_conflict="nothing"):
+ def insert_ingest_request(
+ self,
+ cur: psycopg2.extensions.cursor,
+ batch: List[Dict[str, Any]],
+ on_conflict: str = "nothing",
+ ) -> Tuple[int, int]:
sql = """
INSERT INTO
ingest_request (link_source, link_source_id, ingest_type, base_url, ingest_request_source, release_stage, request)
@@ -345,35 +433,43 @@ class SandcrawlerPostgresClient:
sql += " RETURNING xmax;"
for r in batch:
# in case these fields were already packed into 'request'
- extra = r.get('request', {})
- for k in ('ext_ids', 'fatcat_release', 'edit_extra', 'rel'):
+ extra = r.get("request", {})
+ for k in ("ext_ids", "fatcat_release", "edit_extra", "rel"):
if r.get(k):
extra[k] = r[k]
if extra:
- r['extra'] = json.dumps(extra, sort_keys=True)
- batch = [(d['link_source'],
- d['link_source_id'],
- d['ingest_type'],
- d['base_url'],
- d.get('ingest_request_source'),
- d.get('release_stage') or None,
- d.get('extra') or None,
- )
- for d in batch]
+ r["extra"] = json.dumps(extra, sort_keys=True)
+ rows = [
+ (
+ d["link_source"],
+ d["link_source_id"],
+ d["ingest_type"],
+ d["base_url"],
+ d.get("ingest_request_source"),
+ d.get("release_stage") or None,
+ d.get("extra") or None,
+ )
+ for d in batch
+ ]
# filter out duplicate rows by key (link_source, link_source_id, ingest_type, base_url)
- batch_dict = dict()
- for b in batch:
- batch_dict[(b[0], b[1], b[2], b[3])] = b
- batch = list(batch_dict.values())
- resp = psycopg2.extras.execute_values(cur, sql, batch, page_size=250, fetch=True)
+ row_dict = dict()
+ for b in rows:
+ row_dict[(b[0], b[1], b[2], b[3])] = b
+ rows = list(row_dict.values())
+ resp = psycopg2.extras.execute_values(cur, sql, rows, page_size=250, fetch=True)
return self._inserts_and_updates(resp, on_conflict)
- def insert_ingest_file_result(self, cur, batch, on_conflict="nothing"):
+ def insert_ingest_file_result(
+ self,
+ cur: psycopg2.extensions.cursor,
+ batch: List[Dict[str, Any]],
+ on_conflict: str = "nothing",
+ ) -> Tuple[int, int]:
sql = """
INSERT INTO
ingest_file_result (ingest_type, base_url, hit, status, terminal_url, terminal_dt, terminal_status_code, terminal_sha1hex)
VALUES %s
- ON CONFLICT ON CONSTRAINT ingest_file_result_pkey DO
+ ON CONFLICT ON CONSTRAINT ingest_file_result_pkey DO
"""
if on_conflict.lower() == "nothing":
sql += " NOTHING"
@@ -390,20 +486,165 @@ class SandcrawlerPostgresClient:
else:
raise NotImplementedError("on_conflict: {}".format(on_conflict))
sql += " RETURNING xmax;"
- batch = [(d['ingest_type'],
- d['base_url'],
- bool(d['hit']),
- d['status'],
- d.get('terminal_url'),
- d.get('terminal_dt'),
- d.get('terminal_status_code'),
- d.get('terminal_sha1hex'),
- )
- for d in batch]
+ rows = [
+ (
+ d["ingest_type"],
+ d["base_url"],
+ bool(d["hit"]),
+ d["status"],
+ d.get("terminal_url"),
+ d.get("terminal_dt"),
+ d.get("terminal_status_code"),
+ d.get("terminal_sha1hex"),
+ )
+ for d in batch
+ ]
+ # filter out duplicate rows by key (ingest_type, base_url)
+ row_dict = dict()
+ for b in rows:
+ row_dict[(b[0], b[1])] = b
+ rows = list(row_dict.values())
+ resp = psycopg2.extras.execute_values(cur, sql, rows, page_size=250, fetch=True)
+ return self._inserts_and_updates(resp, on_conflict)
+
+ def insert_ingest_fileset_platform(
+ self,
+ cur: psycopg2.extensions.cursor,
+ batch: List[Dict[str, Any]],
+ on_conflict: str = "nothing",
+ ) -> Tuple[int, int]:
+ sql = """
+ INSERT INTO
+ ingest_fileset_platform (ingest_type, base_url, hit, status, platform_name, platform_domain, platform_id, ingest_strategy, total_size, file_count, archiveorg_item_name, archiveorg_item_bundle_path, web_bundle_url, web_bundle_dt, manifest)
+ VALUES %s
+ ON CONFLICT ON CONSTRAINT ingest_fileset_platform_pkeypkey DO
+ """
+ if on_conflict.lower() == "nothing":
+ sql += " NOTHING"
+ elif on_conflict.lower() == "update":
+ sql += """ UPDATE SET
+ updated=now(),
+ hit=EXCLUDED.hit,
+ status=EXCLUDED.status,
+ platform_name=EXCLUDED.platform_name,
+ platform_domain=EXCLUDED.platform_domain,
+ platform_id=EXCLUDED.platform_id,
+ ingest_strategy=EXCLUDED.ingest_strategy,
+ total_size=EXCLUDED.total_size,
+ file_count=EXCLUDED.file_count,
+ archiveorg_item_name=EXCLUDED.archiveorg_item_name,
+ archiveorg_item_bundle_path=EXCLUDED.archiveorg_item_bundle_path,
+ web_bundle_url=EXCLUDED.web_bundle_url,
+ web_bundle_dt=EXCLUDED.web_bundle_dt,
+ manifest=EXCLUDED.manifest
+ """
+ else:
+ raise NotImplementedError("on_conflict: {}".format(on_conflict))
+ sql += " RETURNING xmax;"
+ rows = [
+ (
+ d["ingest_type"],
+ d["base_url"],
+ bool(d["hit"]),
+ d["status"],
+ d.get("platform_name"),
+ d.get("platform_domain"),
+ d.get("platform_id"),
+ d.get("ingest_strategy"),
+ d.get("total_size"),
+ d.get("file_count"),
+ d.get("archiveorg_item_name"),
+ d.get("archiveorg_item_bundle_path"),
+ d.get("web_bundle_url"),
+ d.get("web_bundle_dt"),
+ d.get("manifest"),
+ )
+ for d in batch
+ ]
# filter out duplicate rows by key (ingest_type, base_url)
- batch_dict = dict()
- for b in batch:
- batch_dict[(b[0], b[1])] = b
- batch = list(batch_dict.values())
- resp = psycopg2.extras.execute_values(cur, sql, batch, page_size=250, fetch=True)
+ row_dict = dict()
+ for b in rows:
+ row_dict[(b[0], b[1])] = b
+ rows = list(row_dict.values())
+ resp = psycopg2.extras.execute_values(cur, sql, rows, page_size=250, fetch=True)
+ return self._inserts_and_updates(resp, on_conflict)
+
+ def insert_crossref(
+ self,
+ cur: psycopg2.extensions.cursor,
+ batch: List[Dict[str, Any]],
+ on_conflict: str = "update",
+ ) -> Tuple[int, int]:
+ sql = """
+ INSERT INTO
+ crossref (doi, indexed, record)
+ VALUES %s
+ ON CONFLICT (doi) DO
+ """
+ if on_conflict.lower() == "nothing":
+ sql += " NOTHING"
+ elif on_conflict.lower() == "update":
+ sql += """ UPDATE SET
+ indexed=EXCLUDED.indexed,
+ record=EXCLUDED.record
+ """
+ else:
+ raise NotImplementedError("on_conflict: {}".format(on_conflict))
+ sql += " RETURNING xmax;"
+ rows = [
+ (
+ d["doi"],
+ d.get("indexed") or None,
+ json.dumps(d["record"], sort_keys=True),
+ )
+ for d in batch
+ ]
+ # filter out duplicate rows by key (sha1hex)
+ row_dict = dict()
+ for b in rows:
+ row_dict[b[0]] = b
+ rows = list(row_dict.values())
+ resp = psycopg2.extras.execute_values(cur, sql, rows, page_size=250, fetch=True)
+ return self._inserts_and_updates(resp, on_conflict)
+
+ def insert_grobid_refs(
+ self,
+ cur: psycopg2.extensions.cursor,
+ batch: List[Dict[str, Any]],
+ on_conflict: str = "update",
+ ) -> Tuple[int, int]:
+ sql = """
+ INSERT INTO
+ grobid_refs (source, source_id, source_ts, updated, refs_json)
+ VALUES %s
+ ON CONFLICT (source, source_id) DO
+ """
+ if on_conflict.lower() == "nothing":
+ sql += " NOTHING"
+ elif on_conflict.lower() == "update":
+ sql += """ UPDATE SET
+ source_ts=EXCLUDED.source_ts,
+ updated=EXCLUDED.updated,
+ refs_json=EXCLUDED.refs_json
+ """
+ else:
+ raise NotImplementedError("on_conflict: {}".format(on_conflict))
+ sql += " RETURNING xmax;"
+ now = datetime.datetime.now()
+ rows = [
+ (
+ d["source"],
+ d["source_id"],
+ d.get("source_ts") or None,
+ d.get("updated") or now,
+ json.dumps(d["refs_json"], sort_keys=True),
+ )
+ for d in batch
+ ]
+ # filter out duplicate rows by key (sha1hex)
+ row_dict = dict()
+ for b in rows:
+ row_dict[(b[0], b[1])] = b
+ rows = list(row_dict.values())
+ resp = psycopg2.extras.execute_values(cur, sql, rows, page_size=250, fetch=True)
return self._inserts_and_updates(resp, on_conflict)