From 05bd7cbcc62588e431c5efd533189e246b2a997e Mon Sep 17 00:00:00 2001 From: Bryan Newbold Date: Tue, 26 Oct 2021 12:54:37 -0700 Subject: make fmt --- python/sandcrawler/persist.py | 61 ++++++++++++++++++++++++------------------- 1 file changed, 34 insertions(+), 27 deletions(-) (limited to 'python/sandcrawler/persist.py') diff --git a/python/sandcrawler/persist.py b/python/sandcrawler/persist.py index 66a36bc..44c03f2 100644 --- a/python/sandcrawler/persist.py +++ b/python/sandcrawler/persist.py @@ -1,4 +1,3 @@ - """ cdx - read raw CDX, filter @@ -32,7 +31,6 @@ from sandcrawler.workers import SandcrawlerWorker class PersistCdxWorker(SandcrawlerWorker): - def __init__(self, db_url, **kwargs): super().__init__() self.db = SandcrawlerPostgresClient(db_url) @@ -56,8 +54,8 @@ class PersistCdxWorker(SandcrawlerWorker): self.db.commit() return [] -class PersistIngestFileResultWorker(SandcrawlerWorker): +class PersistIngestFileResultWorker(SandcrawlerWorker): def __init__(self, db_url, **kwargs): super().__init__() self.db = SandcrawlerPostgresClient(db_url) @@ -78,8 +76,7 @@ class PersistIngestFileResultWorker(SandcrawlerWorker): # backwards compat hacks; transform request to look like current schema if raw.get('ingest_type') == 'file': raw['ingest_type'] = 'pdf' - if (not raw.get('link_source') - and raw.get('base_url') + if (not raw.get('link_source') and raw.get('base_url') and raw.get('ext_ids', {}).get('doi') and raw['base_url'] == "https://doi.org/{}".format(raw['ext_ids']['doi'])): # set link_source(_id) for old ingest requests @@ -119,7 +116,6 @@ class PersistIngestFileResultWorker(SandcrawlerWorker): if not request['request']: request['request'] = None return request - def file_result_to_row(self, raw: dict) -> Optional[dict]: """ @@ -137,7 +133,8 @@ class PersistIngestFileResultWorker(SandcrawlerWorker): ingest_type = raw['request'].get('ingest_type') if ingest_type == 'file': ingest_type = 'pdf' - if ingest_type not in ('pdf', 'xml', 'html', 'component', 'src', 'dataset', 'dataset-file'): + if ingest_type not in ('pdf', 'xml', 'html', 'component', 'src', 'dataset', + 'dataset-file'): self.counts['skip-ingest-type'] += 1 return None if raw['status'] in ("existing", ): @@ -153,7 +150,9 @@ class PersistIngestFileResultWorker(SandcrawlerWorker): if terminal: result['terminal_url'] = terminal.get('terminal_url') or terminal.get('url') result['terminal_dt'] = terminal.get('terminal_dt') - result['terminal_status_code'] = terminal.get('terminal_status_code') or terminal.get('status_code') or terminal.get('http_code') + result['terminal_status_code'] = terminal.get( + 'terminal_status_code') or terminal.get('status_code') or terminal.get( + 'http_code') if result['terminal_status_code']: result['terminal_status_code'] = int(result['terminal_status_code']) result['terminal_sha1hex'] = terminal.get('terminal_sha1hex') @@ -215,9 +214,12 @@ class PersistIngestFileResultWorker(SandcrawlerWorker): 'manifest': raw.get('manifest'), } if result.get('fileset_bundle'): - result['archiveorg_item_bundle_path'] = result['fileset_bundle'].get('archiveorg_item_bundle_path') - result['web_bundle_url'] = result['fileset_bundle'].get('terminal', {}).get('terminal_url') - result['web_bundle_dt'] = result['fileset_bundle'].get('terminal', {}).get('terminal_dt') + result['archiveorg_item_bundle_path'] = result['fileset_bundle'].get( + 'archiveorg_item_bundle_path') + result['web_bundle_url'] = result['fileset_bundle'].get('terminal', + {}).get('terminal_url') + result['web_bundle_dt'] = result['fileset_bundle'].get('terminal', + {}).get('terminal_dt') return result def push_batch(self, batch): @@ -243,7 +245,9 @@ class PersistIngestFileResultWorker(SandcrawlerWorker): # these schemas match, so can just pass through cdx_batch = [r['cdx'] for r in batch if r.get('hit') and r.get('cdx')] - revisit_cdx_batch = [r['revisit_cdx'] for r in batch if r.get('hit') and r.get('revisit_cdx')] + revisit_cdx_batch = [ + r['revisit_cdx'] for r in batch if r.get('hit') and r.get('revisit_cdx') + ] cdx_batch.extend(revisit_cdx_batch) # filter to full CDX lines, with full warc_paths (not liveweb) cdx_batch = [r for r in cdx_batch if r.get('warc_path') and ("/" in r['warc_path'])] @@ -258,24 +262,31 @@ class PersistIngestFileResultWorker(SandcrawlerWorker): self.counts['insert-file_meta'] += resp[0] self.counts['update-file_meta'] += resp[1] - html_meta_batch = [self.result_to_html_meta(r) for r in batch if r.get('hit') and r.get('html_body')] + html_meta_batch = [ + self.result_to_html_meta(r) for r in batch if r.get('hit') and r.get('html_body') + ] if html_meta_batch: resp = self.db.insert_html_meta(self.cur, html_meta_batch, on_conflict="update") self.counts['insert-html_meta'] += resp[0] self.counts['update-html_meta'] += resp[1] - fileset_platform_batch = [self.result_to_platform_row(raw) for raw in batch if raw.get('request', {}).get('ingest_type') == 'dataset' and raw.get('platform_name')] + fileset_platform_batch = [ + self.result_to_platform_row(raw) for raw in batch if + raw.get('request', {}).get('ingest_type') == 'dataset' and raw.get('platform_name') + ] fileset_platform_batch = [p for p in fileset_platform_batch if p] if fileset_platform_batch: - resp = self.db.insert_ingest_fileset_platform(self.cur, fileset_platform_batch, on_conflict="update") + resp = self.db.insert_ingest_fileset_platform(self.cur, + fileset_platform_batch, + on_conflict="update") self.counts['insert-fileset_platform'] += resp[0] self.counts['update-fileset_platform'] += resp[1] self.db.commit() return [] -class PersistIngestFilesetWorker(SandcrawlerWorker): +class PersistIngestFilesetWorker(SandcrawlerWorker): def __init__(self, db_url, **kwargs): super().__init__() self.db = SandcrawlerPostgresClient(db_url) @@ -287,8 +298,8 @@ class PersistIngestFilesetWorker(SandcrawlerWorker): """ raise NotImplementedError -class PersistIngestRequestWorker(PersistIngestFileResultWorker): +class PersistIngestRequestWorker(PersistIngestFileResultWorker): def __init__(self, db_url, **kwargs): super().__init__(db_url=db_url) @@ -315,8 +326,8 @@ class PersistIngestRequestWorker(PersistIngestFileResultWorker): self.db.commit() return [] -class PersistGrobidWorker(SandcrawlerWorker): +class PersistGrobidWorker(SandcrawlerWorker): def __init__(self, db_url, **kwargs): super().__init__() self.grobid = GrobidClient() @@ -406,7 +417,6 @@ class PersistGrobidDiskWorker(SandcrawlerWorker): This could be refactored into a "Sink" type with an even thinner wrapper. """ - def __init__(self, output_dir): super().__init__() self.output_dir = output_dir @@ -424,7 +434,7 @@ class PersistGrobidDiskWorker(SandcrawlerWorker): if record.get('status_code') != 200 or not record.get('tei_xml'): return False - assert(len(record['key'])) == 40 + assert (len(record['key'])) == 40 p = "{}/{}".format(self.output_dir, self._blob_path(record['key'])) os.makedirs(os.path.dirname(p), exist_ok=True) with open(p, 'w') as f: @@ -434,7 +444,6 @@ class PersistGrobidDiskWorker(SandcrawlerWorker): class PersistPdfTrioWorker(SandcrawlerWorker): - def __init__(self, db_url, **kwargs): super().__init__() self.db = SandcrawlerPostgresClient(db_url) @@ -458,7 +467,10 @@ class PersistPdfTrioWorker(SandcrawlerWorker): self.counts['insert-pdftrio'] += resp[0] self.counts['update-pdftrio'] += resp[1] - file_meta_batch = [r['file_meta'] for r in batch if r['pdf_trio']['status'] == "success" and r.get('file_meta')] + file_meta_batch = [ + r['file_meta'] for r in batch + if r['pdf_trio']['status'] == "success" and r.get('file_meta') + ] resp = self.db.insert_file_meta(self.cur, file_meta_batch) self.counts['insert-file-meta'] += resp[0] self.counts['update-file-meta'] += resp[1] @@ -473,7 +485,6 @@ class PersistPdfTextWorker(SandcrawlerWorker): Should keep batch sizes small. """ - def __init__(self, db_url, **kwargs): super().__init__() self.s3 = SandcrawlerMinioClient( @@ -545,7 +556,6 @@ class PersistThumbnailWorker(SandcrawlerWorker): This worker *must* be used with raw kakfa mode; thumbnails are *not* wrapped in JSON like most sandcrawler kafka messages. """ - def __init__(self, **kwargs): super().__init__() self.s3 = SandcrawlerMinioClient( @@ -583,7 +593,6 @@ class GenericPersistDocWorker(SandcrawlerWorker): Objects are assumed to be JSON-wrapped strings. """ - def __init__(self, **kwargs): super().__init__() self.s3 = SandcrawlerMinioClient( @@ -624,7 +633,6 @@ class PersistXmlDocWorker(GenericPersistDocWorker): Pushes TEI-XML file to blob store (S3/seaweed/minio). Does not talk to sandcrawler database (SQL). """ - def __init__(self, **kwargs): super().__init__(**kwargs) self.s3_extension = kwargs.get('s3_extension', ".jats.xml") @@ -637,7 +645,6 @@ class PersistHtmlTeiXmlWorker(GenericPersistDocWorker): Pushes TEI-XML file to blob store (S3/seaweed/minio). Does not talk to sandcrawler database (SQL). """ - def __init__(self, **kwargs): super().__init__(**kwargs) self.s3_extension = kwargs.get('s3_extension', ".tei.xml") -- cgit v1.2.3