aboutsummaryrefslogtreecommitdiffstats
path: root/python/sandcrawler/ingest_fileset.py
diff options
context:
space:
mode:
authorBryan Newbold <bnewbold@archive.org>2021-10-26 12:54:37 -0700
committerBryan Newbold <bnewbold@archive.org>2021-10-26 12:54:37 -0700
commit05bd7cbcc62588e431c5efd533189e246b2a997e (patch)
treeabcc707a451e77ea1e8c5ac9a5925b97a4bd139a /python/sandcrawler/ingest_fileset.py
parentf3f424e42f2f4f383103cf80b30a00cfa6cfc179 (diff)
downloadsandcrawler-05bd7cbcc62588e431c5efd533189e246b2a997e.tar.gz
sandcrawler-05bd7cbcc62588e431c5efd533189e246b2a997e.zip
make fmt
Diffstat (limited to 'python/sandcrawler/ingest_fileset.py')
-rw-r--r--python/sandcrawler/ingest_fileset.py101
1 files changed, 60 insertions, 41 deletions
diff --git a/python/sandcrawler/ingest_fileset.py b/python/sandcrawler/ingest_fileset.py
index 11386df..5cbb908 100644
--- a/python/sandcrawler/ingest_fileset.py
+++ b/python/sandcrawler/ingest_fileset.py
@@ -1,4 +1,3 @@
-
import gzip
import json
import sys
@@ -14,17 +13,21 @@ from sandcrawler.fileset_platforms import DATASET_PLATFORM_HELPER_TABLE, Fileset
from sandcrawler.fileset_strategies import FILESET_STRATEGY_HELPER_TABLE, FilesetIngestStrategy
from sandcrawler.fileset_types import PlatformRestrictedError, PlatformScopeError
from sandcrawler.html import extract_fulltext_url
-from sandcrawler.html_metadata import BiblioMetadata, html_extract_biblio, html_extract_resources, load_adblock_rules
-from sandcrawler.ia import (CdxApiClient, CdxApiError, NoCaptureError, PetaboxError, ResourceResult, SavePageNowClient,
- SavePageNowError, WaybackClient, WaybackContentError, WaybackError, cdx_to_dict,
+from sandcrawler.html_metadata import (BiblioMetadata, html_extract_biblio,
+ html_extract_resources, load_adblock_rules)
+from sandcrawler.ia import (CdxApiClient, CdxApiError, NoCaptureError, PetaboxError,
+ ResourceResult, SavePageNowClient, SavePageNowError, WaybackClient,
+ WaybackContentError, WaybackError, cdx_to_dict,
fix_transfer_encoding)
from sandcrawler.ingest_file import IngestFileWorker
-from sandcrawler.ingest_html import (WebResource, fetch_html_resources, html_extract_body_teixml, html_guess_platform,
+from sandcrawler.ingest_html import (WebResource, fetch_html_resources,
+ html_extract_body_teixml, html_guess_platform,
html_guess_scope, quick_fetch_html_resources)
from sandcrawler.misc import clean_url, gen_file_metadata, parse_cdx_datetime
from sandcrawler.workers import SandcrawlerWorker
-MAX_BODY_SIZE_BYTES = 128*1024*1024
+MAX_BODY_SIZE_BYTES = 128 * 1024 * 1024
+
class IngestFilesetWorker(IngestFileWorker):
"""
@@ -39,14 +42,13 @@ class IngestFilesetWorker(IngestFileWorker):
checking to see if content has been archived already)
4. summarize status
"""
-
def __init__(self, sink=None, **kwargs):
super().__init__(sink=None, **kwargs)
self.sink = sink
self.dataset_platform_helpers = DATASET_PLATFORM_HELPER_TABLE
self.dataset_strategy_archivers = FILESET_STRATEGY_HELPER_TABLE
- self.max_total_size = kwargs.get('max_total_size', 64*1024*1024*1024)
+ self.max_total_size = kwargs.get('max_total_size', 64 * 1024 * 1024 * 1024)
self.max_file_count = kwargs.get('max_file_count', 200)
self.ingest_file_result_sink = kwargs.get('ingest_file_result_sink')
self.ingest_file_result_stdout = kwargs.get('ingest_file_result_stdout', False)
@@ -72,11 +74,12 @@ class IngestFilesetWorker(IngestFileWorker):
raise NotImplementedError("process_existing() not tested or safe yet")
def want(self, request: dict) -> bool:
- if not request.get('ingest_type') in ('dataset',):
+ if not request.get('ingest_type') in ('dataset', ):
return False
return True
- def fetch_resource_iteratively(self, ingest_type: str, base_url: str, force_recrawl: bool) -> dict:
+ def fetch_resource_iteratively(self, ingest_type: str, base_url: str,
+ force_recrawl: bool) -> dict:
"""
This is copypasta from process_file(), should probably refactor.
"""
@@ -174,10 +177,9 @@ class IngestFilesetWorker(IngestFileWorker):
# here we split based on ingest type to try and extract a next hop
html_ish_resource = bool(
"html" in file_meta['mimetype']
- or "xhtml" in file_meta['mimetype'] # matches "application/xhtml+xml"
+ or "xhtml" in file_meta['mimetype'] # matches "application/xhtml+xml"
or "application/xml" in file_meta['mimetype']
- or "text/xml" in file_meta['mimetype']
- )
+ or "text/xml" in file_meta['mimetype'])
html_biblio = None
html_doc = None
if html_ish_resource and resource.body:
@@ -186,7 +188,8 @@ class IngestFilesetWorker(IngestFileWorker):
html_biblio = html_extract_biblio(resource.terminal_url, html_doc)
if html_biblio:
if not 'html_biblio' in result or html_biblio.title:
- result['html_biblio'] = json.loads(html_biblio.json(exclude_none=True))
+ result['html_biblio'] = json.loads(
+ html_biblio.json(exclude_none=True))
#print(f" setting html_biblio: {result['html_biblio']}", file=sys.stderr)
except ValueError:
pass
@@ -214,7 +217,8 @@ class IngestFilesetWorker(IngestFileWorker):
result['status'] = "wrong-mimetype" # formerly: "other-mimetype"
return result
elif ingest_type == "xml":
- if file_meta['mimetype'] not in ("application/xml", "text/xml", "application/jats+xml"):
+ if file_meta['mimetype'] not in ("application/xml", "text/xml",
+ "application/jats+xml"):
result['status'] = "wrong-mimetype"
return result
elif ingest_type == "html":
@@ -229,11 +233,10 @@ class IngestFilesetWorker(IngestFileWorker):
result['_resource'] = resource
return result
-
def process(self, request: dict, key: Any = None) -> dict:
ingest_type = request.get('ingest_type')
- if ingest_type not in ("dataset",):
+ if ingest_type not in ("dataset", ):
raise NotImplementedError(f"can't handle ingest_type={ingest_type}")
# parse/clean URL
@@ -250,7 +253,9 @@ class IngestFilesetWorker(IngestFileWorker):
#if existing:
# return self.process_existing(request, existing)
- result = self.fetch_resource_iteratively(ingest_type, base_url, force_recrawl=force_recrawl)
+ result = self.fetch_resource_iteratively(ingest_type,
+ base_url,
+ force_recrawl=force_recrawl)
result['request'] = request
if result.get('status') != None:
result['request'] = request
@@ -323,14 +328,16 @@ class IngestFilesetWorker(IngestFileWorker):
return result
if result['file_count'] > self.max_file_count:
# hard max, to prevent downstream breakage
- if result['file_count'] > 10*1000:
+ if result['file_count'] > 10 * 1000:
result['manifest'] = result['manifest'][:self.max_file_count]
result['status'] = 'too-many-files'
return result
ingest_strategy = platform_helper.chose_strategy(dataset_meta)
result['ingest_strategy'] = ingest_strategy
- print(f"[PLATFORM {platform}] id={dataset_meta.platform_id} file_count={result['file_count']} total_size={result['total_size']} strategy={ingest_strategy}", file=sys.stderr)
+ print(
+ f"[PLATFORM {platform}] id={dataset_meta.platform_id} file_count={result['file_count']} total_size={result['total_size']} strategy={ingest_strategy}",
+ file=sys.stderr)
strategy_helper = self.dataset_strategy_archivers.get(ingest_strategy)
if not strategy_helper:
@@ -349,7 +356,8 @@ class IngestFilesetWorker(IngestFileWorker):
if archive_result.bundle_file_meta:
result['fileset_bundle']['file_meta'] = archive_result.bundle_file_meta
if archive_result.archiveorg_bundle_path:
- result['fileset_bundle']['archiveorg_bundle_path'] = archive_result.archiveorg_bundle_path
+ result['fileset_bundle'][
+ 'archiveorg_bundle_path'] = archive_result.archiveorg_bundle_path
if archive_result.bundle_resource:
result['fileset_bundle']['terminal'] = dict(
terminal_url=archive_result.bundle_resource.terminal_url,
@@ -357,14 +365,16 @@ class IngestFilesetWorker(IngestFileWorker):
terminal_status_code=archive_result.bundle_resource.terminal_status_code,
)
if archive_result.bundle_resource.cdx:
- result['fileset_bundle']['cdx'] = cdx_to_dict(archive_result.bundle_resource.cdx)
+ result['fileset_bundle']['cdx'] = cdx_to_dict(
+ archive_result.bundle_resource.cdx)
if archive_result.bundle_resource.revisit_cdx:
- result['fileset_bundle']['revisit_cdx'] = cdx_to_dict(archive_result.bundle_resource.revisit_cdx)
+ result['fileset_bundle']['revisit_cdx'] = cdx_to_dict(
+ archive_result.bundle_resource.revisit_cdx)
if ingest_strategy.endswith('-file'):
result['fileset_file'] = dict()
if archive_result.file_file_meta:
- result['fileset_file']['file_meta'] = file_meta=archive_result.file_file_meta,
+ result['fileset_file']['file_meta'] = file_meta = archive_result.file_file_meta,
if archive_result.file_resource:
result['fileset_file']['terminal'] = dict(
terminal_url=archive_result.file_resource.terminal_url,
@@ -372,16 +382,20 @@ class IngestFilesetWorker(IngestFileWorker):
terminal_status_code=archive_result.file_resource.terminal_status_code,
)
if archive_result.file_resource.cdx:
- result['fileset_file']['cdx'] = cdx_to_dict(archive_result.file_resource.cdx)
+ result['fileset_file']['cdx'] = cdx_to_dict(
+ archive_result.file_resource.cdx)
if archive_result.file_resource.revisit_cdx:
- result['fileset_file']['revisit_cdx'] = cdx_to_dict(archive_result.file_resource.revisit_cdx)
+ result['fileset_file']['revisit_cdx'] = cdx_to_dict(
+ archive_result.file_resource.revisit_cdx)
if result['status'].startswith('success'):
# check that these are still valid
assert result['file_count'] == len(archive_result.manifest)
- assert result['total_size'] == sum([m.size for m in archive_result.manifest if m.size])
+ assert result['total_size'] == sum(
+ [m.size for m in archive_result.manifest if m.size])
- if result['status'] == 'success-file' and archive_result.file_resource and archive_result.file_file_meta:
+ if result[
+ 'status'] == 'success-file' and archive_result.file_resource and archive_result.file_file_meta:
file_result = dict(
hit=True,
status='success',
@@ -397,10 +411,13 @@ class IngestFilesetWorker(IngestFileWorker):
if archive_result.file_resource.cdx:
file_result['cdx'] = cdx_to_dict(archive_result.file_resource.cdx)
if archive_result.file_resource.revisit_cdx:
- file_result['revisit_cdx'] = cdx_to_dict(archive_result.file_resource.revisit_cdx)
+ file_result['revisit_cdx'] = cdx_to_dict(
+ archive_result.file_resource.revisit_cdx)
file_result['request']['ingest_type'] = request['ingest_type'] + "-file"
# call the super() (ingest_file) version of process_hit()
- info = self.process_file_hit(file_result['request']['ingest_type'], archive_result.file_resource, archive_result.file_file_meta)
+ info = self.process_file_hit(file_result['request']['ingest_type'],
+ archive_result.file_resource,
+ archive_result.file_file_meta)
file_result.update(info)
if self.ingest_file_result_sink:
self.ingest_file_result_sink.push_record(result.copy())
@@ -410,17 +427,19 @@ class IngestFilesetWorker(IngestFileWorker):
if result['status'].startswith('success'):
result['hit'] = True
print("[SUCCESS {:>5}] file_count={} total_size={} strategy={}".format(
- ingest_type,
- result['file_count'],
- result['total_size'],
- ingest_strategy,
- ), file=sys.stderr)
+ ingest_type,
+ result['file_count'],
+ result['total_size'],
+ ingest_strategy,
+ ),
+ file=sys.stderr)
else:
print("[FAIL {:>5}] status={} file_count={} total_size={} strategy={}".format(
- ingest_type,
- result['status'],
- result['file_count'],
- result['total_size'],
- ingest_strategy,
- ), file=sys.stderr)
+ ingest_type,
+ result['status'],
+ result['file_count'],
+ result['total_size'],
+ ingest_strategy,
+ ),
+ file=sys.stderr)
return result