diff options
Diffstat (limited to 'python/scripts/deliver_gwb_to_disk.py')
-rwxr-xr-x | python/scripts/deliver_gwb_to_disk.py | 154 |
1 files changed, 90 insertions, 64 deletions
diff --git a/python/scripts/deliver_gwb_to_disk.py b/python/scripts/deliver_gwb_to_disk.py index ca19b97..fcaf51f 100755 --- a/python/scripts/deliver_gwb_to_disk.py +++ b/python/scripts/deliver_gwb_to_disk.py @@ -27,64 +27,76 @@ sentry_client = raven.Client() class DeliverGwbDisk: def __init__(self, disk_dir, **kwargs): - self.warc_uri_prefix = kwargs.get('warc_uri_prefix') + self.warc_uri_prefix = kwargs.get("warc_uri_prefix") self.rstore = None self.count = Counter() # /serve/ instead of /download/ doesn't record view count - self.petabox_base_url = kwargs.get('petabox_base_url', 'http://archive.org/serve/') + self.petabox_base_url = kwargs.get("petabox_base_url", "http://archive.org/serve/") # gwb library will fall back to reading from /opt/.petabox/webdata.secret - self.petabox_webdata_secret = kwargs.get('petabox_webdata_secret', - os.environ.get('PETABOX_WEBDATA_SECRET')) + self.petabox_webdata_secret = kwargs.get( + "petabox_webdata_secret", os.environ.get("PETABOX_WEBDATA_SECRET") + ) self.disk_dir = disk_dir - self.disk_prefix = kwargs.get('disk_prefix', 'pdf/') - self.disk_suffix = kwargs.get('disk_suffix', '.pdf') + self.disk_prefix = kwargs.get("disk_prefix", "pdf/") + self.disk_suffix = kwargs.get("disk_suffix", ".pdf") def fetch_warc_content(self, warc_path, offset, c_size): warc_uri = self.warc_uri_prefix + warc_path if not self.rstore: self.rstore = ResourceStore( - loaderfactory=CDXLoaderFactory(webdata_secret=self.petabox_webdata_secret, - download_base_url=self.petabox_base_url)) + loaderfactory=CDXLoaderFactory( + webdata_secret=self.petabox_webdata_secret, + download_base_url=self.petabox_base_url, + ) + ) try: gwb_record = self.rstore.load_resource(warc_uri, offset, c_size) except wayback.exception.ResourceUnavailable: return None, dict( status="error", - reason="failed to load file contents from wayback/petabox (ResourceUnavailable)" + reason="failed to load file contents from wayback/petabox (ResourceUnavailable)", ) except ValueError as ve: return None, dict( status="error", - reason="failed to load file contents from wayback/petabox (ValueError: {})". - format(ve)) + reason="failed to load file contents from wayback/petabox (ValueError: {})".format( + ve + ), + ) except EOFError as eofe: return None, dict( status="error", - reason="failed to load file contents from wayback/petabox (EOFError: {})". - format(eofe)) + reason="failed to load file contents from wayback/petabox (EOFError: {})".format( + eofe + ), + ) except TypeError as te: return None, dict( status="error", - reason= - "failed to load file contents from wayback/petabox (TypeError: {}; likely a bug in wayback python code)" - .format(te)) + reason="failed to load file contents from wayback/petabox (TypeError: {}; likely a bug in wayback python code)".format( + te + ), + ) # Note: could consider a generic "except Exception" here, as we get so # many petabox errors. Do want jobs to fail loud and clear when the # whole cluster is down though. if gwb_record.get_status()[0] != 200: - return None, dict(status="error", - reason="archived HTTP response (WARC) was not 200", - warc_status=gwb_record.get_status()[0]) + return None, dict( + status="error", + reason="archived HTTP response (WARC) was not 200", + warc_status=gwb_record.get_status()[0], + ) try: raw_content = gwb_record.open_raw_content().read() except IncompleteRead as ire: return None, dict( status="error", - reason= - "failed to read actual file contents from wayback/petabox (IncompleteRead: {})". - format(ire)) + reason="failed to read actual file contents from wayback/petabox (IncompleteRead: {})".format( + ire + ), + ) return raw_content, None def run(self, manifest_file): @@ -95,47 +107,57 @@ class DeliverGwbDisk: os.makedirs(fpath, exist_ok=True) sys.stderr.write("Starting...\n") for line in manifest_file: - self.count['total'] += 1 - line = line.strip().split('\t') + self.count["total"] += 1 + line = line.strip().split("\t") if len(line) != 2: - self.count['skip-line'] += 1 + self.count["skip-line"] += 1 continue sha1_hex, cdx_json = line[0], line[1] assert len(sha1_hex) == 40 file_cdx = json.loads(cdx_json) # If warc is not item/file.(w)arc.gz form, skip it - if len(file_cdx['warc'].split('/')) != 2: - sys.stderr.write('WARC path not petabox item/file: {}'.format(file_cdx['warc'])) - print("{}\tskip warc\t{}".format(sha1_hex, file_cdx['warc'])) - self.count['skip-warc'] += 1 + if len(file_cdx["warc"].split("/")) != 2: + sys.stderr.write("WARC path not petabox item/file: {}".format(file_cdx["warc"])) + print("{}\tskip warc\t{}".format(sha1_hex, file_cdx["warc"])) + self.count["skip-warc"] += 1 continue # fetch from GWB/petabox via HTTP range-request - blob, status = self.fetch_warc_content(file_cdx['warc'], file_cdx['offset'], - file_cdx['c_size']) + blob, status = self.fetch_warc_content( + file_cdx["warc"], file_cdx["offset"], file_cdx["c_size"] + ) if blob is None and status: - print("{}\terror petabox\t{}\t{}".format(sha1_hex, file_cdx['warc'], - status['reason'])) - self.count['err-petabox-fetch'] += 1 + print( + "{}\terror petabox\t{}\t{}".format( + sha1_hex, file_cdx["warc"], status["reason"] + ) + ) + self.count["err-petabox-fetch"] += 1 continue elif not blob: print("{}\tskip-empty-blob".format(sha1_hex)) - self.count['skip-empty-blob'] += 1 + self.count["skip-empty-blob"] += 1 continue # verify sha1 if sha1_hex != hashlib.sha1(blob).hexdigest(): - #assert sha1_hex == hashlib.sha1(blob).hexdigest() - #sys.stderr.write("{}\terror petabox-mismatch\n".format(sha1_hex)) + # assert sha1_hex == hashlib.sha1(blob).hexdigest() + # sys.stderr.write("{}\terror petabox-mismatch\n".format(sha1_hex)) print("{}\terror petabox-hash-mismatch".format(sha1_hex)) - self.count['err-petabox-hash-mismatch'] += 1 + self.count["err-petabox-hash-mismatch"] += 1 - self.count['petabox-ok'] += 1 + self.count["petabox-ok"] += 1 # save to disk - fpath = "{}/{}{}/{}/{}{}".format(self.disk_dir, self.disk_prefix, sha1_hex[0:2], - sha1_hex[2:4], sha1_hex, self.disk_suffix) - with open(fpath, 'wb') as f: + fpath = "{}/{}{}/{}/{}{}".format( + self.disk_dir, + self.disk_prefix, + sha1_hex[0:2], + sha1_hex[2:4], + sha1_hex, + self.disk_suffix, + ) + with open(fpath, "wb") as f: f.write(blob) print("{}\tsuccess\t{}\t{}".format(sha1_hex, fpath, len(blob))) - self.count['success-disk'] += 1 + self.count["success-disk"] += 1 sys.stderr.write("{}\n".format(self.count)) @@ -143,31 +165,35 @@ class DeliverGwbDisk: def main(): parser = argparse.ArgumentParser() - parser.add_argument('--disk-dir', - required=True, - type=str, - help='local base directory to save into') - parser.add_argument('--disk-prefix', - type=str, - default="pdf/", - help='directory prefix for items created in bucket') - parser.add_argument('--disk-suffix', - type=str, - default=".pdf", - help='file suffix for created files') - parser.add_argument('--warc-uri-prefix', - type=str, - default='https://archive.org/serve/', - help='URI where WARCs can be found') - parser.add_argument('manifest_file', - help="TSV/JSON manifest file", - default=sys.stdin, - type=argparse.FileType('r')) + parser.add_argument( + "--disk-dir", required=True, type=str, help="local base directory to save into" + ) + parser.add_argument( + "--disk-prefix", + type=str, + default="pdf/", + help="directory prefix for items created in bucket", + ) + parser.add_argument( + "--disk-suffix", type=str, default=".pdf", help="file suffix for created files" + ) + parser.add_argument( + "--warc-uri-prefix", + type=str, + default="https://archive.org/serve/", + help="URI where WARCs can be found", + ) + parser.add_argument( + "manifest_file", + help="TSV/JSON manifest file", + default=sys.stdin, + type=argparse.FileType("r"), + ) args = parser.parse_args() worker = DeliverGwbDisk(**args.__dict__) worker.run(args.manifest_file) -if __name__ == '__main__': # pragma: no cover +if __name__ == "__main__": # pragma: no cover main() |