aboutsummaryrefslogtreecommitdiffstats
path: root/python/sandcrawler/persist.py
diff options
context:
space:
mode:
authorBryan Newbold <bnewbold@archive.org>2021-10-27 18:50:17 -0700
committerBryan Newbold <bnewbold@archive.org>2021-10-27 18:50:17 -0700
commit826c7538e091fac14d987a3cd654975da964e240 (patch)
tree90345b4cabb461c624ca5a218c2fc01dce3055cd /python/sandcrawler/persist.py
parent020037d4714e7ba2ab172c7278494aed0b2148ad (diff)
downloadsandcrawler-826c7538e091fac14d987a3cd654975da964e240.tar.gz
sandcrawler-826c7538e091fac14d987a3cd654975da964e240.zip
make fmt (black 21.9b0)
Diffstat (limited to 'python/sandcrawler/persist.py')
-rw-r--r--python/sandcrawler/persist.py450
1 files changed, 238 insertions, 212 deletions
diff --git a/python/sandcrawler/persist.py b/python/sandcrawler/persist.py
index 8ec5979..f7954b1 100644
--- a/python/sandcrawler/persist.py
+++ b/python/sandcrawler/persist.py
@@ -41,14 +41,14 @@ class PersistCdxWorker(SandcrawlerWorker):
raise NotImplementedError
def push_batch(self, batch: list) -> list:
- self.counts['total'] += len(batch)
+ self.counts["total"] += len(batch)
# filter to full CDX lines, no liveweb
- cdx_batch = [r for r in batch if r.get('warc_path') and ("/" in r['warc_path'])]
+ cdx_batch = [r for r in batch if r.get("warc_path") and ("/" in r["warc_path"])]
resp = self.db.insert_cdx(self.cur, cdx_batch)
if len(cdx_batch) < len(batch):
- self.counts['skip'] += len(batch) - len(cdx_batch)
- self.counts['insert-cdx'] += resp[0]
- self.counts['update-cdx'] += resp[1]
+ self.counts["skip"] += len(batch) - len(cdx_batch)
+ self.counts["insert-cdx"] += resp[0]
+ self.counts["update-cdx"] += resp[1]
self.db.commit()
return []
@@ -70,47 +70,52 @@ class PersistIngestFileResultWorker(SandcrawlerWorker):
if there is a problem with conversion, return None
"""
# backwards compat hacks; transform request to look like current schema
- if raw.get('ingest_type') == 'file':
- raw['ingest_type'] = 'pdf'
- if (not raw.get('link_source') and raw.get('base_url')
- and raw.get('ext_ids', {}).get('doi')
- and raw['base_url'] == "https://doi.org/{}".format(raw['ext_ids']['doi'])):
+ if raw.get("ingest_type") == "file":
+ raw["ingest_type"] = "pdf"
+ if (
+ not raw.get("link_source")
+ and raw.get("base_url")
+ and raw.get("ext_ids", {}).get("doi")
+ and raw["base_url"] == "https://doi.org/{}".format(raw["ext_ids"]["doi"])
+ ):
# set link_source(_id) for old ingest requests
- raw['link_source'] = 'doi'
- raw['link_source_id'] = raw['ext_ids']['doi']
- if (not raw.get('link_source')
- and raw.get('ingest_request_source', '').startswith('savepapernow')
- and raw.get('fatcat', {}).get('release_ident')):
+ raw["link_source"] = "doi"
+ raw["link_source_id"] = raw["ext_ids"]["doi"]
+ if (
+ not raw.get("link_source")
+ and raw.get("ingest_request_source", "").startswith("savepapernow")
+ and raw.get("fatcat", {}).get("release_ident")
+ ):
# set link_source(_id) for old ingest requests
- raw['link_source'] = 'spn'
- raw['link_source_id'] = raw['fatcat']['release_ident']
+ raw["link_source"] = "spn"
+ raw["link_source_id"] = raw["fatcat"]["release_ident"]
- for k in ('ingest_type', 'base_url', 'link_source', 'link_source_id'):
+ for k in ("ingest_type", "base_url", "link_source", "link_source_id"):
if k not in raw:
- self.counts['skip-request-fields'] += 1
+ self.counts["skip-request-fields"] += 1
return None
- if raw['ingest_type'] not in ('pdf', 'xml', 'html'):
- self.counts['skip-ingest-type'] += 1
+ if raw["ingest_type"] not in ("pdf", "xml", "html"):
+ self.counts["skip-ingest-type"] += 1
return None
request = {
- 'ingest_type': raw['ingest_type'],
- 'base_url': raw['base_url'],
- 'link_source': raw['link_source'],
- 'link_source_id': raw['link_source_id'],
- 'ingest_request_source': raw.get('ingest_request_source'),
- 'request': {},
+ "ingest_type": raw["ingest_type"],
+ "base_url": raw["base_url"],
+ "link_source": raw["link_source"],
+ "link_source_id": raw["link_source_id"],
+ "ingest_request_source": raw.get("ingest_request_source"),
+ "request": {},
}
# extra/optional fields
- if raw.get('release_stage'):
- request['release_stage'] = raw['release_stage']
- if raw.get('fatcat', {}).get('release_ident'):
- request['request']['release_ident'] = raw['fatcat']['release_ident']
- for k in ('ext_ids', 'edit_extra', 'rel'):
+ if raw.get("release_stage"):
+ request["release_stage"] = raw["release_stage"]
+ if raw.get("fatcat", {}).get("release_ident"):
+ request["request"]["release_ident"] = raw["fatcat"]["release_ident"]
+ for k in ("ext_ids", "edit_extra", "rel"):
if raw.get(k):
- request['request'][k] = raw[k]
+ request["request"][k] = raw[k]
# if this dict is empty, trim it to save DB space
- if not request['request']:
- request['request'] = None
+ if not request["request"]:
+ request["request"] = None
return request
def file_result_to_row(self, raw: dict) -> Optional[dict]:
@@ -119,59 +124,68 @@ class PersistIngestFileResultWorker(SandcrawlerWorker):
if there is a problem with conversion, return None and set skip count
"""
- for k in ('request', 'hit', 'status'):
+ for k in ("request", "hit", "status"):
if k not in raw:
- self.counts['skip-result-fields'] += 1
+ self.counts["skip-result-fields"] += 1
return None
- if 'base_url' not in raw['request']:
- self.counts['skip-result-fields'] += 1
+ if "base_url" not in raw["request"]:
+ self.counts["skip-result-fields"] += 1
return None
- ingest_type = raw['request'].get('ingest_type')
- if ingest_type == 'file':
- ingest_type = 'pdf'
- if ingest_type not in ('pdf', 'xml', 'html', 'component', 'src', 'dataset',
- 'dataset-file'):
- self.counts['skip-ingest-type'] += 1
+ ingest_type = raw["request"].get("ingest_type")
+ if ingest_type == "file":
+ ingest_type = "pdf"
+ if ingest_type not in (
+ "pdf",
+ "xml",
+ "html",
+ "component",
+ "src",
+ "dataset",
+ "dataset-file",
+ ):
+ self.counts["skip-ingest-type"] += 1
return None
- if raw['status'] in ("existing", ):
- self.counts['skip-existing'] += 1
+ if raw["status"] in ("existing",):
+ self.counts["skip-existing"] += 1
return None
result = {
- 'ingest_type': ingest_type,
- 'base_url': raw['request']['base_url'],
- 'hit': raw['hit'],
- 'status': raw['status'],
+ "ingest_type": ingest_type,
+ "base_url": raw["request"]["base_url"],
+ "hit": raw["hit"],
+ "status": raw["status"],
}
- terminal = raw.get('terminal')
+ terminal = raw.get("terminal")
if terminal:
- result['terminal_url'] = terminal.get('terminal_url') or terminal.get('url')
- result['terminal_dt'] = terminal.get('terminal_dt')
- result['terminal_status_code'] = terminal.get(
- 'terminal_status_code') or terminal.get('status_code') or terminal.get(
- 'http_code')
- if result['terminal_status_code']:
- result['terminal_status_code'] = int(result['terminal_status_code'])
- result['terminal_sha1hex'] = terminal.get('terminal_sha1hex')
- if len(result['terminal_url']) > 2048:
+ result["terminal_url"] = terminal.get("terminal_url") or terminal.get("url")
+ result["terminal_dt"] = terminal.get("terminal_dt")
+ result["terminal_status_code"] = (
+ terminal.get("terminal_status_code")
+ or terminal.get("status_code")
+ or terminal.get("http_code")
+ )
+ if result["terminal_status_code"]:
+ result["terminal_status_code"] = int(result["terminal_status_code"])
+ result["terminal_sha1hex"] = terminal.get("terminal_sha1hex")
+ if len(result["terminal_url"]) > 2048:
# postgresql13 doesn't like extremely large URLs in b-tree index
- self.counts['skip-huge-url'] += 1
+ self.counts["skip-huge-url"] += 1
return None
return result
def result_to_html_meta(self, record: dict) -> Optional[HtmlMetaRow]:
- html_body = record.get('html_body')
- file_meta = record.get('file_meta')
+ html_body = record.get("html_body")
+ file_meta = record.get("file_meta")
if not (file_meta and html_body):
return None
return HtmlMetaRow(
sha1hex=file_meta["sha1hex"],
- status=record.get('status'),
- scope=record.get('scope'),
- has_teixml=bool(html_body and html_body['status'] == 'success'),
+ status=record.get("status"),
+ scope=record.get("scope"),
+ has_teixml=bool(html_body and html_body["status"] == "success"),
has_thumbnail=False, # TODO
- word_count=(html_body and html_body.get('word_count')) or None,
- biblio=record.get('html_biblio'),
- resources=record.get('html_resources'),
+ word_count=(html_body and html_body.get("word_count")) or None,
+ biblio=record.get("html_biblio"),
+ resources=record.get("html_resources"),
)
def result_to_platform_row(self, raw: dict) -> Optional[dict]:
@@ -180,46 +194,49 @@ class PersistIngestFileResultWorker(SandcrawlerWorker):
if there is a problem with conversion, return None and set skip count
"""
- for k in ('request', 'hit', 'status'):
+ for k in ("request", "hit", "status"):
if k not in raw:
return None
- if 'base_url' not in raw['request']:
+ if "base_url" not in raw["request"]:
return None
- ingest_type = raw['request'].get('ingest_type')
- if ingest_type not in ('dataset'):
+ ingest_type = raw["request"].get("ingest_type")
+ if ingest_type not in ("dataset"):
return None
- if raw['status'] in ("existing", ):
+ if raw["status"] in ("existing",):
return None
- if not raw.get('platform_name'):
+ if not raw.get("platform_name"):
return None
result = {
- 'ingest_type': ingest_type,
- 'base_url': raw['request']['base_url'],
- 'hit': raw['hit'],
- 'status': raw['status'],
- 'platform_name': raw.get('platform_name'),
- 'platform_domain': raw.get('platform_domain'),
- 'platform_id': raw.get('platform_id'),
- 'ingest_strategy': raw.get('ingest_strategy'),
- 'total_size': raw.get('total_size'),
- 'file_count': raw.get('file_count'),
- 'archiveorg_item_name': raw.get('archiveorg_item_name'),
- 'archiveorg_item_bundle_path': None,
- 'web_bundle_url': None,
- 'web_bundle_dt': None,
- 'manifest': raw.get('manifest'),
+ "ingest_type": ingest_type,
+ "base_url": raw["request"]["base_url"],
+ "hit": raw["hit"],
+ "status": raw["status"],
+ "platform_name": raw.get("platform_name"),
+ "platform_domain": raw.get("platform_domain"),
+ "platform_id": raw.get("platform_id"),
+ "ingest_strategy": raw.get("ingest_strategy"),
+ "total_size": raw.get("total_size"),
+ "file_count": raw.get("file_count"),
+ "archiveorg_item_name": raw.get("archiveorg_item_name"),
+ "archiveorg_item_bundle_path": None,
+ "web_bundle_url": None,
+ "web_bundle_dt": None,
+ "manifest": raw.get("manifest"),
}
- if result.get('fileset_bundle'):
- result['archiveorg_item_bundle_path'] = result['fileset_bundle'].get(
- 'archiveorg_item_bundle_path')
- result['web_bundle_url'] = result['fileset_bundle'].get('terminal',
- {}).get('terminal_url')
- result['web_bundle_dt'] = result['fileset_bundle'].get('terminal',
- {}).get('terminal_dt')
+ if result.get("fileset_bundle"):
+ result["archiveorg_item_bundle_path"] = result["fileset_bundle"].get(
+ "archiveorg_item_bundle_path"
+ )
+ result["web_bundle_url"] = (
+ result["fileset_bundle"].get("terminal", {}).get("terminal_url")
+ )
+ result["web_bundle_dt"] = (
+ result["fileset_bundle"].get("terminal", {}).get("terminal_dt")
+ )
return result
def push_batch(self, batch: List[Any]) -> List[Any]:
- self.counts['total'] += len(batch)
+ self.counts["total"] += len(batch)
if not batch:
return []
@@ -228,60 +245,62 @@ class PersistIngestFileResultWorker(SandcrawlerWorker):
results = [r for r in results_unfiltered if r]
irequests_unfiltered = [
- self.request_to_row(raw['request']) for raw in batch if raw.get('request')
+ self.request_to_row(raw["request"]) for raw in batch if raw.get("request")
]
irequests = [
- r for r in irequests_unfiltered if r and r['ingest_type'] != 'dataset-file'
+ r for r in irequests_unfiltered if r and r["ingest_type"] != "dataset-file"
]
if irequests:
resp = self.db.insert_ingest_request(self.cur, irequests)
- self.counts['insert-requests'] += resp[0]
- self.counts['update-requests'] += resp[1]
+ self.counts["insert-requests"] += resp[0]
+ self.counts["update-requests"] += resp[1]
if results:
resp = self.db.insert_ingest_file_result(self.cur, results, on_conflict="update")
- self.counts['insert-results'] += resp[0]
- self.counts['update-results'] += resp[1]
+ self.counts["insert-results"] += resp[0]
+ self.counts["update-results"] += resp[1]
# these schemas match, so can just pass through
- cdx_batch = [r['cdx'] for r in batch if r.get('hit') and r.get('cdx')]
+ cdx_batch = [r["cdx"] for r in batch if r.get("hit") and r.get("cdx")]
revisit_cdx_batch = [
- r['revisit_cdx'] for r in batch if r.get('hit') and r.get('revisit_cdx')
+ r["revisit_cdx"] for r in batch if r.get("hit") and r.get("revisit_cdx")
]
cdx_batch.extend(revisit_cdx_batch)
# filter to full CDX lines, with full warc_paths (not liveweb)
- cdx_batch = [r for r in cdx_batch if r.get('warc_path') and ("/" in r['warc_path'])]
+ cdx_batch = [r for r in cdx_batch if r.get("warc_path") and ("/" in r["warc_path"])]
if cdx_batch:
resp = self.db.insert_cdx(self.cur, cdx_batch)
- self.counts['insert-cdx'] += resp[0]
- self.counts['update-cdx'] += resp[1]
+ self.counts["insert-cdx"] += resp[0]
+ self.counts["update-cdx"] += resp[1]
- file_meta_batch = [r['file_meta'] for r in batch if r.get('hit') and r.get('file_meta')]
+ file_meta_batch = [r["file_meta"] for r in batch if r.get("hit") and r.get("file_meta")]
if file_meta_batch:
resp = self.db.insert_file_meta(self.cur, file_meta_batch, on_conflict="nothing")
- self.counts['insert-file_meta'] += resp[0]
- self.counts['update-file_meta'] += resp[1]
+ self.counts["insert-file_meta"] += resp[0]
+ self.counts["update-file_meta"] += resp[1]
html_meta_batch = [
- self.result_to_html_meta(r) for r in batch if r.get('hit') and r.get('html_body')
+ self.result_to_html_meta(r) for r in batch if r.get("hit") and r.get("html_body")
]
if html_meta_batch:
rows = [d.to_sql_tuple() for d in html_meta_batch if d]
resp = self.db.insert_html_meta(self.cur, rows, on_conflict="update")
- self.counts['insert-html_meta'] += resp[0]
- self.counts['update-html_meta'] += resp[1]
+ self.counts["insert-html_meta"] += resp[0]
+ self.counts["update-html_meta"] += resp[1]
fileset_platform_batch_all = [
- self.result_to_platform_row(raw) for raw in batch if
- raw.get('request', {}).get('ingest_type') == 'dataset' and raw.get('platform_name')
+ self.result_to_platform_row(raw)
+ for raw in batch
+ if raw.get("request", {}).get("ingest_type") == "dataset"
+ and raw.get("platform_name")
]
fileset_platform_batch: List[Dict] = [p for p in fileset_platform_batch_all if p]
if fileset_platform_batch:
- resp = self.db.insert_ingest_fileset_platform(self.cur,
- fileset_platform_batch,
- on_conflict="update")
- self.counts['insert-fileset_platform'] += resp[0]
- self.counts['update-fileset_platform'] += resp[1]
+ resp = self.db.insert_ingest_fileset_platform(
+ self.cur, fileset_platform_batch, on_conflict="update"
+ )
+ self.counts["insert-fileset_platform"] += resp[0]
+ self.counts["update-fileset_platform"] += resp[1]
self.db.commit()
return []
@@ -307,7 +326,7 @@ class PersistIngestRequestWorker(PersistIngestFileResultWorker):
raise NotImplementedError
def push_batch(self, batch: list) -> list:
- self.counts['total'] += len(batch)
+ self.counts["total"] += len(batch)
if not batch:
return []
@@ -317,8 +336,8 @@ class PersistIngestRequestWorker(PersistIngestFileResultWorker):
if irequests:
resp = self.db.insert_ingest_request(self.cur, irequests)
- self.counts['insert-requests'] += resp[0]
- self.counts['update-requests'] += resp[1]
+ self.counts["insert-requests"] += resp[0]
+ self.counts["update-requests"] += resp[1]
self.db.commit()
return []
@@ -329,13 +348,13 @@ class PersistGrobidWorker(SandcrawlerWorker):
super().__init__()
self.grobid = GrobidClient()
self.s3 = SandcrawlerMinioClient(
- host_url=kwargs.get('s3_url', 'localhost:9000'),
- access_key=kwargs['s3_access_key'],
- secret_key=kwargs['s3_secret_key'],
- default_bucket=kwargs['s3_bucket'],
+ host_url=kwargs.get("s3_url", "localhost:9000"),
+ access_key=kwargs["s3_access_key"],
+ secret_key=kwargs["s3_secret_key"],
+ default_bucket=kwargs["s3_bucket"],
)
- self.s3_only = kwargs.get('s3_only', False)
- self.db_only = kwargs.get('db_only', False)
+ self.s3_only = kwargs.get("s3_only", False)
+ self.db_only = kwargs.get("db_only", False)
assert not (self.s3_only and self.db_only), "Only one of s3_only and db_only allowed"
if not self.s3_only:
self.db: Optional[SandcrawlerPostgresClient] = SandcrawlerPostgresClient(db_url)
@@ -349,58 +368,58 @@ class PersistGrobidWorker(SandcrawlerWorker):
raise NotImplementedError
def push_batch(self, batch: list) -> list:
- self.counts['total'] += len(batch)
+ self.counts["total"] += len(batch)
# filter out bad "missing status_code" timeout rows
- missing = [r for r in batch if not r.get('status_code')]
+ missing = [r for r in batch if not r.get("status_code")]
if missing:
- self.counts['skip-missing-status'] += len(missing)
- batch = [r for r in batch if r.get('status_code')]
+ self.counts["skip-missing-status"] += len(missing)
+ batch = [r for r in batch if r.get("status_code")]
for r in batch:
- if r['status_code'] != 200 or not r.get('tei_xml'):
- self.counts['s3-skip-status'] += 1
- if r.get('error_msg'):
- r['metadata'] = {'error_msg': r['error_msg'][:500]}
+ if r["status_code"] != 200 or not r.get("tei_xml"):
+ self.counts["s3-skip-status"] += 1
+ if r.get("error_msg"):
+ r["metadata"] = {"error_msg": r["error_msg"][:500]}
continue
- assert len(r['key']) == 40
+ assert len(r["key"]) == 40
if not self.db_only:
self.s3.put_blob(
folder="grobid",
- blob=r['tei_xml'],
- sha1hex=r['key'],
+ blob=r["tei_xml"],
+ sha1hex=r["key"],
extension=".tei.xml",
)
- self.counts['s3-put'] += 1
+ self.counts["s3-put"] += 1
# enhance with teixml2json metadata, if available
try:
metadata = self.grobid.metadata(r)
except xml.etree.ElementTree.ParseError as xml_e:
- r['status'] = 'bad-grobid-xml'
- r['metadata'] = {'error_msg': str(xml_e)[:1024]}
+ r["status"] = "bad-grobid-xml"
+ r["metadata"] = {"error_msg": str(xml_e)[:1024]}
continue
if not metadata:
continue
- for k in ('fatcat_release', 'grobid_version'):
+ for k in ("fatcat_release", "grobid_version"):
r[k] = metadata.pop(k, None)
- if r.get('fatcat_release'):
- r['fatcat_release'] = r['fatcat_release'].replace('release_', '')
- if metadata.get('grobid_timestamp'):
- r['updated'] = metadata['grobid_timestamp']
- r['metadata'] = metadata
+ if r.get("fatcat_release"):
+ r["fatcat_release"] = r["fatcat_release"].replace("release_", "")
+ if metadata.get("grobid_timestamp"):
+ r["updated"] = metadata["grobid_timestamp"]
+ r["metadata"] = metadata
if not self.s3_only:
assert self.db and self.cur
resp = self.db.insert_grobid(self.cur, batch, on_conflict="update")
- self.counts['insert-grobid'] += resp[0]
- self.counts['update-grobid'] += resp[1]
+ self.counts["insert-grobid"] += resp[0]
+ self.counts["update-grobid"] += resp[1]
- file_meta_batch = [r['file_meta'] for r in batch if r.get('file_meta')]
+ file_meta_batch = [r["file_meta"] for r in batch if r.get("file_meta")]
resp = self.db.insert_file_meta(self.cur, file_meta_batch, on_conflict="update")
- self.counts['insert-file-meta'] += resp[0]
- self.counts['update-file-meta'] += resp[1]
+ self.counts["insert-file-meta"] += resp[0]
+ self.counts["update-file-meta"] += resp[1]
self.db.commit()
@@ -413,6 +432,7 @@ class PersistGrobidDiskWorker(SandcrawlerWorker):
This could be refactored into a "Sink" type with an even thinner wrapper.
"""
+
def __init__(self, output_dir: str):
super().__init__()
self.output_dir = output_dir
@@ -428,14 +448,14 @@ class PersistGrobidDiskWorker(SandcrawlerWorker):
def process(self, record: Any, key: Optional[str] = None) -> Any:
- if record.get('status_code') != 200 or not record.get('tei_xml'):
+ if record.get("status_code") != 200 or not record.get("tei_xml"):
return False
- assert (len(record['key'])) == 40
- p = "{}/{}".format(self.output_dir, self._blob_path(record['key']))
+ assert (len(record["key"])) == 40
+ p = "{}/{}".format(self.output_dir, self._blob_path(record["key"]))
os.makedirs(os.path.dirname(p), exist_ok=True)
- with open(p, 'w') as f:
- f.write(record.pop('tei_xml'))
- self.counts['written'] += 1
+ with open(p, "w") as f:
+ f.write(record.pop("tei_xml"))
+ self.counts["written"] += 1
return record
@@ -450,24 +470,25 @@ class PersistPdfTrioWorker(SandcrawlerWorker):
raise NotImplementedError
def push_batch(self, batch: list) -> list:
- self.counts['total'] += len(batch)
+ self.counts["total"] += len(batch)
- batch = [r for r in batch if 'pdf_trio' in r and r['pdf_trio'].get('status_code')]
+ batch = [r for r in batch if "pdf_trio" in r and r["pdf_trio"].get("status_code")]
for r in batch:
# copy key (sha1hex) into sub-object
- r['pdf_trio']['key'] = r['key']
- pdftrio_batch = [r['pdf_trio'] for r in batch]
+ r["pdf_trio"]["key"] = r["key"]
+ pdftrio_batch = [r["pdf_trio"] for r in batch]
resp = self.db.insert_pdftrio(self.cur, pdftrio_batch, on_conflict="update")
- self.counts['insert-pdftrio'] += resp[0]
- self.counts['update-pdftrio'] += resp[1]
+ self.counts["insert-pdftrio"] += resp[0]
+ self.counts["update-pdftrio"] += resp[1]
file_meta_batch = [
- r['file_meta'] for r in batch
- if r['pdf_trio']['status'] == "success" and r.get('file_meta')
+ r["file_meta"]
+ for r in batch
+ if r["pdf_trio"]["status"] == "success" and r.get("file_meta")
]
resp = self.db.insert_file_meta(self.cur, file_meta_batch)
- self.counts['insert-file-meta'] += resp[0]
- self.counts['update-file-meta'] += resp[1]
+ self.counts["insert-file-meta"] += resp[0]
+ self.counts["update-file-meta"] += resp[1]
self.db.commit()
return []
@@ -479,16 +500,17 @@ class PersistPdfTextWorker(SandcrawlerWorker):
Should keep batch sizes small.
"""
+
def __init__(self, db_url: str, **kwargs):
super().__init__()
self.s3 = SandcrawlerMinioClient(
- host_url=kwargs.get('s3_url', 'localhost:9000'),
- access_key=kwargs['s3_access_key'],
- secret_key=kwargs['s3_secret_key'],
- default_bucket=kwargs['s3_bucket'],
+ host_url=kwargs.get("s3_url", "localhost:9000"),
+ access_key=kwargs["s3_access_key"],
+ secret_key=kwargs["s3_secret_key"],
+ default_bucket=kwargs["s3_bucket"],
)
- self.s3_only = kwargs.get('s3_only', False)
- self.db_only = kwargs.get('db_only', False)
+ self.s3_only = kwargs.get("s3_only", False)
+ self.db_only = kwargs.get("db_only", False)
assert not (self.s3_only and self.db_only), "Only one of s3_only and db_only allowed"
if not self.s3_only:
self.db: Optional[SandcrawlerPostgresClient] = SandcrawlerPostgresClient(db_url)
@@ -502,17 +524,17 @@ class PersistPdfTextWorker(SandcrawlerWorker):
raise NotImplementedError
def push_batch(self, batch: list) -> list:
- self.counts['total'] += len(batch)
+ self.counts["total"] += len(batch)
parsed_batch = []
for r in batch:
parsed_batch.append(PdfExtractResult.from_pdftext_dict(r))
for r in parsed_batch:
- if r.status != 'success' or not r.text:
- self.counts['s3-skip-status'] += 1
+ if r.status != "success" or not r.text:
+ self.counts["s3-skip-status"] += 1
if r.error_msg:
- r.metadata = {'error_msg': r.error_msg[:500]}
+ r.metadata = {"error_msg": r.error_msg[:500]}
continue
assert len(r.sha1hex) == 40
@@ -523,19 +545,19 @@ class PersistPdfTextWorker(SandcrawlerWorker):
sha1hex=r.sha1hex,
extension=".txt",
)
- self.counts['s3-put'] += 1
+ self.counts["s3-put"] += 1
if not self.s3_only:
assert self.db and self.cur
rows = [r.to_sql_tuple() for r in parsed_batch]
resp = self.db.insert_pdf_meta(self.cur, rows, on_conflict="update")
- self.counts['insert-pdf-meta'] += resp[0]
- self.counts['update-pdf-meta'] += resp[1]
+ self.counts["insert-pdf-meta"] += resp[0]
+ self.counts["update-pdf-meta"] += resp[1]
file_meta_batch = [r.file_meta for r in parsed_batch if r.file_meta]
resp = self.db.insert_file_meta(self.cur, file_meta_batch, on_conflict="update")
- self.counts['insert-file-meta'] += resp[0]
- self.counts['update-file-meta'] += resp[1]
+ self.counts["insert-file-meta"] += resp[0]
+ self.counts["update-file-meta"] += resp[1]
self.db.commit()
@@ -550,16 +572,17 @@ class PersistThumbnailWorker(SandcrawlerWorker):
This worker *must* be used with raw kakfa mode; thumbnails are *not*
wrapped in JSON like most sandcrawler kafka messages.
"""
+
def __init__(self, **kwargs):
super().__init__()
self.s3 = SandcrawlerMinioClient(
- host_url=kwargs.get('s3_url', 'localhost:9000'),
- access_key=kwargs['s3_access_key'],
- secret_key=kwargs['s3_secret_key'],
- default_bucket=kwargs['s3_bucket'],
+ host_url=kwargs.get("s3_url", "localhost:9000"),
+ access_key=kwargs["s3_access_key"],
+ secret_key=kwargs["s3_secret_key"],
+ default_bucket=kwargs["s3_bucket"],
)
- self.s3_extension = kwargs.get('s3_extension', ".jpg")
- self.s3_folder = kwargs.get('s3_folder', "pdf")
+ self.s3_extension = kwargs.get("s3_extension", ".jpg")
+ self.s3_folder = kwargs.get("s3_folder", "pdf")
def process(self, record: Any, key: Optional[str] = None) -> Any:
"""
@@ -569,7 +592,7 @@ class PersistThumbnailWorker(SandcrawlerWorker):
assert isinstance(record, bytes)
blob: bytes = record
if isinstance(key, bytes):
- key = key.decode('utf-8')
+ key = key.decode("utf-8")
assert key is not None and len(key) == 40 and isinstance(key, str)
assert len(blob) >= 50
@@ -579,7 +602,7 @@ class PersistThumbnailWorker(SandcrawlerWorker):
sha1hex=key,
extension=self.s3_extension,
)
- self.counts['s3-put'] += 1
+ self.counts["s3-put"] += 1
class GenericPersistDocWorker(SandcrawlerWorker):
@@ -588,39 +611,40 @@ class GenericPersistDocWorker(SandcrawlerWorker):
Objects are assumed to be JSON-wrapped strings.
"""
+
def __init__(self, **kwargs):
super().__init__()
self.s3 = SandcrawlerMinioClient(
- host_url=kwargs.get('s3_url', 'localhost:9000'),
- access_key=kwargs['s3_access_key'],
- secret_key=kwargs['s3_secret_key'],
- default_bucket=kwargs['s3_bucket'],
+ host_url=kwargs.get("s3_url", "localhost:9000"),
+ access_key=kwargs["s3_access_key"],
+ secret_key=kwargs["s3_secret_key"],
+ default_bucket=kwargs["s3_bucket"],
)
- self.s3_extension = kwargs.get('s3_extension', ".unknown")
- self.s3_folder = kwargs.get('s3_folder', "unknown")
+ self.s3_extension = kwargs.get("s3_extension", ".unknown")
+ self.s3_folder = kwargs.get("s3_folder", "unknown")
self.doc_key = "unknown"
def process(self, record: Any, key: Optional[str] = None) -> Any:
- if record.get('status') != 'success' or not record.get(self.doc_key):
+ if record.get("status") != "success" or not record.get(self.doc_key):
return
assert key is not None
if isinstance(key, bytes):
- key_str = key.decode('utf-8')
+ key_str = key.decode("utf-8")
elif isinstance(key, str):
key_str = key
assert len(key_str) == 40
- if 'sha1hex' in record:
- assert key_str == record['sha1hex']
+ if "sha1hex" in record:
+ assert key_str == record["sha1hex"]
self.s3.put_blob(
folder=self.s3_folder,
- blob=record[self.doc_key].encode('utf-8'),
+ blob=record[self.doc_key].encode("utf-8"),
sha1hex=key_str,
extension=self.s3_extension,
)
- self.counts['s3-put'] += 1
+ self.counts["s3-put"] += 1
class PersistXmlDocWorker(GenericPersistDocWorker):
@@ -628,10 +652,11 @@ class PersistXmlDocWorker(GenericPersistDocWorker):
Pushes TEI-XML file to blob store (S3/seaweed/minio). Does not talk to
sandcrawler database (SQL).
"""
+
def __init__(self, **kwargs):
super().__init__(**kwargs)
- self.s3_extension = kwargs.get('s3_extension', ".jats.xml")
- self.s3_folder = kwargs.get('s3_folder', "xml_doc")
+ self.s3_extension = kwargs.get("s3_extension", ".jats.xml")
+ self.s3_folder = kwargs.get("s3_folder", "xml_doc")
self.doc_key = "jats_xml"
@@ -640,8 +665,9 @@ class PersistHtmlTeiXmlWorker(GenericPersistDocWorker):
Pushes TEI-XML file to blob store (S3/seaweed/minio). Does not talk to
sandcrawler database (SQL).
"""
+
def __init__(self, **kwargs):
super().__init__(**kwargs)
- self.s3_extension = kwargs.get('s3_extension', ".tei.xml")
- self.s3_folder = kwargs.get('s3_folder', "html_body")
+ self.s3_extension = kwargs.get("s3_extension", ".tei.xml")
+ self.s3_folder = kwargs.get("s3_folder", "html_body")
self.doc_key = "tei_xml"