aboutsummaryrefslogtreecommitdiffstats
path: root/python/sandcrawler/persist.py
diff options
context:
space:
mode:
authorBryan Newbold <bnewbold@archive.org>2021-10-26 12:54:37 -0700
committerBryan Newbold <bnewbold@archive.org>2021-10-26 12:54:37 -0700
commit05bd7cbcc62588e431c5efd533189e246b2a997e (patch)
treeabcc707a451e77ea1e8c5ac9a5925b97a4bd139a /python/sandcrawler/persist.py
parentf3f424e42f2f4f383103cf80b30a00cfa6cfc179 (diff)
downloadsandcrawler-05bd7cbcc62588e431c5efd533189e246b2a997e.tar.gz
sandcrawler-05bd7cbcc62588e431c5efd533189e246b2a997e.zip
make fmt
Diffstat (limited to 'python/sandcrawler/persist.py')
-rw-r--r--python/sandcrawler/persist.py61
1 files changed, 34 insertions, 27 deletions
diff --git a/python/sandcrawler/persist.py b/python/sandcrawler/persist.py
index 66a36bc..44c03f2 100644
--- a/python/sandcrawler/persist.py
+++ b/python/sandcrawler/persist.py
@@ -1,4 +1,3 @@
-
"""
cdx
- read raw CDX, filter
@@ -32,7 +31,6 @@ from sandcrawler.workers import SandcrawlerWorker
class PersistCdxWorker(SandcrawlerWorker):
-
def __init__(self, db_url, **kwargs):
super().__init__()
self.db = SandcrawlerPostgresClient(db_url)
@@ -56,8 +54,8 @@ class PersistCdxWorker(SandcrawlerWorker):
self.db.commit()
return []
-class PersistIngestFileResultWorker(SandcrawlerWorker):
+class PersistIngestFileResultWorker(SandcrawlerWorker):
def __init__(self, db_url, **kwargs):
super().__init__()
self.db = SandcrawlerPostgresClient(db_url)
@@ -78,8 +76,7 @@ class PersistIngestFileResultWorker(SandcrawlerWorker):
# backwards compat hacks; transform request to look like current schema
if raw.get('ingest_type') == 'file':
raw['ingest_type'] = 'pdf'
- if (not raw.get('link_source')
- and raw.get('base_url')
+ if (not raw.get('link_source') and raw.get('base_url')
and raw.get('ext_ids', {}).get('doi')
and raw['base_url'] == "https://doi.org/{}".format(raw['ext_ids']['doi'])):
# set link_source(_id) for old ingest requests
@@ -119,7 +116,6 @@ class PersistIngestFileResultWorker(SandcrawlerWorker):
if not request['request']:
request['request'] = None
return request
-
def file_result_to_row(self, raw: dict) -> Optional[dict]:
"""
@@ -137,7 +133,8 @@ class PersistIngestFileResultWorker(SandcrawlerWorker):
ingest_type = raw['request'].get('ingest_type')
if ingest_type == 'file':
ingest_type = 'pdf'
- if ingest_type not in ('pdf', 'xml', 'html', 'component', 'src', 'dataset', 'dataset-file'):
+ if ingest_type not in ('pdf', 'xml', 'html', 'component', 'src', 'dataset',
+ 'dataset-file'):
self.counts['skip-ingest-type'] += 1
return None
if raw['status'] in ("existing", ):
@@ -153,7 +150,9 @@ class PersistIngestFileResultWorker(SandcrawlerWorker):
if terminal:
result['terminal_url'] = terminal.get('terminal_url') or terminal.get('url')
result['terminal_dt'] = terminal.get('terminal_dt')
- result['terminal_status_code'] = terminal.get('terminal_status_code') or terminal.get('status_code') or terminal.get('http_code')
+ result['terminal_status_code'] = terminal.get(
+ 'terminal_status_code') or terminal.get('status_code') or terminal.get(
+ 'http_code')
if result['terminal_status_code']:
result['terminal_status_code'] = int(result['terminal_status_code'])
result['terminal_sha1hex'] = terminal.get('terminal_sha1hex')
@@ -215,9 +214,12 @@ class PersistIngestFileResultWorker(SandcrawlerWorker):
'manifest': raw.get('manifest'),
}
if result.get('fileset_bundle'):
- result['archiveorg_item_bundle_path'] = result['fileset_bundle'].get('archiveorg_item_bundle_path')
- result['web_bundle_url'] = result['fileset_bundle'].get('terminal', {}).get('terminal_url')
- result['web_bundle_dt'] = result['fileset_bundle'].get('terminal', {}).get('terminal_dt')
+ result['archiveorg_item_bundle_path'] = result['fileset_bundle'].get(
+ 'archiveorg_item_bundle_path')
+ result['web_bundle_url'] = result['fileset_bundle'].get('terminal',
+ {}).get('terminal_url')
+ result['web_bundle_dt'] = result['fileset_bundle'].get('terminal',
+ {}).get('terminal_dt')
return result
def push_batch(self, batch):
@@ -243,7 +245,9 @@ class PersistIngestFileResultWorker(SandcrawlerWorker):
# these schemas match, so can just pass through
cdx_batch = [r['cdx'] for r in batch if r.get('hit') and r.get('cdx')]
- revisit_cdx_batch = [r['revisit_cdx'] for r in batch if r.get('hit') and r.get('revisit_cdx')]
+ revisit_cdx_batch = [
+ r['revisit_cdx'] for r in batch if r.get('hit') and r.get('revisit_cdx')
+ ]
cdx_batch.extend(revisit_cdx_batch)
# filter to full CDX lines, with full warc_paths (not liveweb)
cdx_batch = [r for r in cdx_batch if r.get('warc_path') and ("/" in r['warc_path'])]
@@ -258,24 +262,31 @@ class PersistIngestFileResultWorker(SandcrawlerWorker):
self.counts['insert-file_meta'] += resp[0]
self.counts['update-file_meta'] += resp[1]
- html_meta_batch = [self.result_to_html_meta(r) for r in batch if r.get('hit') and r.get('html_body')]
+ html_meta_batch = [
+ self.result_to_html_meta(r) for r in batch if r.get('hit') and r.get('html_body')
+ ]
if html_meta_batch:
resp = self.db.insert_html_meta(self.cur, html_meta_batch, on_conflict="update")
self.counts['insert-html_meta'] += resp[0]
self.counts['update-html_meta'] += resp[1]
- fileset_platform_batch = [self.result_to_platform_row(raw) for raw in batch if raw.get('request', {}).get('ingest_type') == 'dataset' and raw.get('platform_name')]
+ fileset_platform_batch = [
+ self.result_to_platform_row(raw) for raw in batch if
+ raw.get('request', {}).get('ingest_type') == 'dataset' and raw.get('platform_name')
+ ]
fileset_platform_batch = [p for p in fileset_platform_batch if p]
if fileset_platform_batch:
- resp = self.db.insert_ingest_fileset_platform(self.cur, fileset_platform_batch, on_conflict="update")
+ resp = self.db.insert_ingest_fileset_platform(self.cur,
+ fileset_platform_batch,
+ on_conflict="update")
self.counts['insert-fileset_platform'] += resp[0]
self.counts['update-fileset_platform'] += resp[1]
self.db.commit()
return []
-class PersistIngestFilesetWorker(SandcrawlerWorker):
+class PersistIngestFilesetWorker(SandcrawlerWorker):
def __init__(self, db_url, **kwargs):
super().__init__()
self.db = SandcrawlerPostgresClient(db_url)
@@ -287,8 +298,8 @@ class PersistIngestFilesetWorker(SandcrawlerWorker):
"""
raise NotImplementedError
-class PersistIngestRequestWorker(PersistIngestFileResultWorker):
+class PersistIngestRequestWorker(PersistIngestFileResultWorker):
def __init__(self, db_url, **kwargs):
super().__init__(db_url=db_url)
@@ -315,8 +326,8 @@ class PersistIngestRequestWorker(PersistIngestFileResultWorker):
self.db.commit()
return []
-class PersistGrobidWorker(SandcrawlerWorker):
+class PersistGrobidWorker(SandcrawlerWorker):
def __init__(self, db_url, **kwargs):
super().__init__()
self.grobid = GrobidClient()
@@ -406,7 +417,6 @@ class PersistGrobidDiskWorker(SandcrawlerWorker):
This could be refactored into a "Sink" type with an even thinner wrapper.
"""
-
def __init__(self, output_dir):
super().__init__()
self.output_dir = output_dir
@@ -424,7 +434,7 @@ class PersistGrobidDiskWorker(SandcrawlerWorker):
if record.get('status_code') != 200 or not record.get('tei_xml'):
return False
- assert(len(record['key'])) == 40
+ assert (len(record['key'])) == 40
p = "{}/{}".format(self.output_dir, self._blob_path(record['key']))
os.makedirs(os.path.dirname(p), exist_ok=True)
with open(p, 'w') as f:
@@ -434,7 +444,6 @@ class PersistGrobidDiskWorker(SandcrawlerWorker):
class PersistPdfTrioWorker(SandcrawlerWorker):
-
def __init__(self, db_url, **kwargs):
super().__init__()
self.db = SandcrawlerPostgresClient(db_url)
@@ -458,7 +467,10 @@ class PersistPdfTrioWorker(SandcrawlerWorker):
self.counts['insert-pdftrio'] += resp[0]
self.counts['update-pdftrio'] += resp[1]
- file_meta_batch = [r['file_meta'] for r in batch if r['pdf_trio']['status'] == "success" and r.get('file_meta')]
+ file_meta_batch = [
+ r['file_meta'] for r in batch
+ if r['pdf_trio']['status'] == "success" and r.get('file_meta')
+ ]
resp = self.db.insert_file_meta(self.cur, file_meta_batch)
self.counts['insert-file-meta'] += resp[0]
self.counts['update-file-meta'] += resp[1]
@@ -473,7 +485,6 @@ class PersistPdfTextWorker(SandcrawlerWorker):
Should keep batch sizes small.
"""
-
def __init__(self, db_url, **kwargs):
super().__init__()
self.s3 = SandcrawlerMinioClient(
@@ -545,7 +556,6 @@ class PersistThumbnailWorker(SandcrawlerWorker):
This worker *must* be used with raw kakfa mode; thumbnails are *not*
wrapped in JSON like most sandcrawler kafka messages.
"""
-
def __init__(self, **kwargs):
super().__init__()
self.s3 = SandcrawlerMinioClient(
@@ -583,7 +593,6 @@ class GenericPersistDocWorker(SandcrawlerWorker):
Objects are assumed to be JSON-wrapped strings.
"""
-
def __init__(self, **kwargs):
super().__init__()
self.s3 = SandcrawlerMinioClient(
@@ -624,7 +633,6 @@ class PersistXmlDocWorker(GenericPersistDocWorker):
Pushes TEI-XML file to blob store (S3/seaweed/minio). Does not talk to
sandcrawler database (SQL).
"""
-
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.s3_extension = kwargs.get('s3_extension', ".jats.xml")
@@ -637,7 +645,6 @@ class PersistHtmlTeiXmlWorker(GenericPersistDocWorker):
Pushes TEI-XML file to blob store (S3/seaweed/minio). Does not talk to
sandcrawler database (SQL).
"""
-
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.s3_extension = kwargs.get('s3_extension', ".tei.xml")