diff options
Diffstat (limited to 'python/scripts/deliver_gwb_to_s3.py')
-rwxr-xr-x | python/scripts/deliver_gwb_to_s3.py | 148 |
1 files changed, 83 insertions, 65 deletions
diff --git a/python/scripts/deliver_gwb_to_s3.py b/python/scripts/deliver_gwb_to_s3.py index f9b3b19..1f08c4f 100755 --- a/python/scripts/deliver_gwb_to_s3.py +++ b/python/scripts/deliver_gwb_to_s3.py @@ -54,111 +54,128 @@ sentry_client = raven.Client() class DeliverGwbS3: def __init__(self, s3_bucket, **kwargs): - self.warc_uri_prefix = kwargs.get('warc_uri_prefix') + self.warc_uri_prefix = kwargs.get("warc_uri_prefix") self.rstore = None self.count = Counter() # /serve/ instead of /download/ doesn't record view count - self.petabox_base_url = kwargs.get('petabox_base_url', 'http://archive.org/serve/') + self.petabox_base_url = kwargs.get("petabox_base_url", "http://archive.org/serve/") # gwb library will fall back to reading from /opt/.petabox/webdata.secret - self.petabox_webdata_secret = kwargs.get('petabox_webdata_secret', - os.environ.get('PETABOX_WEBDATA_SECRET')) + self.petabox_webdata_secret = kwargs.get( + "petabox_webdata_secret", os.environ.get("PETABOX_WEBDATA_SECRET") + ) self.s3_bucket = s3_bucket - self.s3_prefix = kwargs.get('s3_prefix', 'pdf/') - self.s3_suffix = kwargs.get('s3_suffix', '.pdf') - self.s3 = boto3.resource('s3') + self.s3_prefix = kwargs.get("s3_prefix", "pdf/") + self.s3_suffix = kwargs.get("s3_suffix", ".pdf") + self.s3 = boto3.resource("s3") self.bucket = self.s3.Bucket(self.s3_bucket) def fetch_warc_content(self, warc_path, offset, c_size): warc_uri = self.warc_uri_prefix + warc_path if not self.rstore: self.rstore = ResourceStore( - loaderfactory=CDXLoaderFactory(webdata_secret=self.petabox_webdata_secret, - download_base_url=self.petabox_base_url)) + loaderfactory=CDXLoaderFactory( + webdata_secret=self.petabox_webdata_secret, + download_base_url=self.petabox_base_url, + ) + ) try: gwb_record = self.rstore.load_resource(warc_uri, offset, c_size) except wayback.exception.ResourceUnavailable: return None, dict( status="error", - reason="failed to load file contents from wayback/petabox (ResourceUnavailable)" + reason="failed to load file contents from wayback/petabox (ResourceUnavailable)", ) except ValueError as ve: return None, dict( status="error", - reason="failed to load file contents from wayback/petabox (ValueError: {})". - format(ve)) + reason="failed to load file contents from wayback/petabox (ValueError: {})".format( + ve + ), + ) except EOFError as eofe: return None, dict( status="error", - reason="failed to load file contents from wayback/petabox (EOFError: {})". - format(eofe)) + reason="failed to load file contents from wayback/petabox (EOFError: {})".format( + eofe + ), + ) except TypeError as te: return None, dict( status="error", - reason= - "failed to load file contents from wayback/petabox (TypeError: {}; likely a bug in wayback python code)" - .format(te)) + reason="failed to load file contents from wayback/petabox (TypeError: {}; likely a bug in wayback python code)".format( + te + ), + ) # Note: could consider a generic "except Exception" here, as we get so # many petabox errors. Do want jobs to fail loud and clear when the # whole cluster is down though. if gwb_record.get_status()[0] != 200: - return None, dict(status="error", - reason="archived HTTP response (WARC) was not 200", - warc_status=gwb_record.get_status()[0]) + return None, dict( + status="error", + reason="archived HTTP response (WARC) was not 200", + warc_status=gwb_record.get_status()[0], + ) try: raw_content = gwb_record.open_raw_content().read() except IncompleteRead as ire: return None, dict( status="error", - reason= - "failed to read actual file contents from wayback/petabox (IncompleteRead: {})". - format(ire)) + reason="failed to read actual file contents from wayback/petabox (IncompleteRead: {})".format( + ire + ), + ) return raw_content, None def run(self, manifest_file): sys.stderr.write("Starting...\n") for line in manifest_file: - self.count['total'] += 1 - line = line.strip().split('\t') + self.count["total"] += 1 + line = line.strip().split("\t") if len(line) != 2: - self.count['skip-line'] += 1 + self.count["skip-line"] += 1 continue sha1_hex, cdx_json = line[0], line[1] assert len(sha1_hex) == 40 file_cdx = json.loads(cdx_json) # If warc is not item/file.(w)arc.gz form, skip it - if len(file_cdx['warc'].split('/')) != 2: - sys.stderr.write('WARC path not petabox item/file: {}'.format(file_cdx['warc'])) - print("{}\tskip warc\t{}".format(sha1_hex, file_cdx['warc'])) - self.count['skip-warc'] += 1 + if len(file_cdx["warc"].split("/")) != 2: + sys.stderr.write("WARC path not petabox item/file: {}".format(file_cdx["warc"])) + print("{}\tskip warc\t{}".format(sha1_hex, file_cdx["warc"])) + self.count["skip-warc"] += 1 continue # fetch from GWB/petabox via HTTP range-request - blob, status = self.fetch_warc_content(file_cdx['warc'], file_cdx['offset'], - file_cdx['c_size']) + blob, status = self.fetch_warc_content( + file_cdx["warc"], file_cdx["offset"], file_cdx["c_size"] + ) if blob is None and status: - print("{}\terror petabox\t{}\t{}".format(sha1_hex, file_cdx['warc'], - status['reason'])) - self.count['err-petabox-fetch'] += 1 + print( + "{}\terror petabox\t{}\t{}".format( + sha1_hex, file_cdx["warc"], status["reason"] + ) + ) + self.count["err-petabox-fetch"] += 1 continue elif not blob: print("{}\tskip-empty-blob".format(sha1_hex)) - self.count['skip-empty-blob'] += 1 + self.count["skip-empty-blob"] += 1 continue # verify sha1 if sha1_hex != hashlib.sha1(blob).hexdigest(): - #assert sha1_hex == hashlib.sha1(blob).hexdigest() - #sys.stderr.write("{}\terror petabox-mismatch\n".format(sha1_hex)) + # assert sha1_hex == hashlib.sha1(blob).hexdigest() + # sys.stderr.write("{}\terror petabox-mismatch\n".format(sha1_hex)) print("{}\terror petabox-hash-mismatch".format(sha1_hex)) - self.count['err-petabox-hash-mismatch'] += 1 + self.count["err-petabox-hash-mismatch"] += 1 - self.count['petabox-ok'] += 1 + self.count["petabox-ok"] += 1 # upload to AWS S3 - obj = self.bucket.put_object(Key="{}{}/{}{}".format(self.s3_prefix, sha1_hex[0:4], - sha1_hex, self.s3_suffix), - Body=blob) + obj = self.bucket.put_object( + Key="{}{}/{}{}".format(self.s3_prefix, sha1_hex[0:4], sha1_hex, self.s3_suffix), + Body=blob, + ) print("{}\tsuccess\t{}\t{}".format(sha1_hex, obj.key, len(blob))) - self.count['success-s3'] += 1 + self.count["success-s3"] += 1 sys.stderr.write("{}\n".format(self.count)) @@ -166,31 +183,32 @@ class DeliverGwbS3: def main(): parser = argparse.ArgumentParser() - parser.add_argument('--s3-bucket', - required=True, - type=str, - help='AWS S3 bucket to upload into') - parser.add_argument('--s3-prefix', - type=str, - default="pdf/", - help='key prefix for items created in bucket') - parser.add_argument('--s3-suffix', - type=str, - default=".pdf", - help='file suffix for created objects') - parser.add_argument('--warc-uri-prefix', - type=str, - default='https://archive.org/serve/', - help='URI where WARCs can be found') - parser.add_argument('manifest_file', - help="TSV/JSON manifest file", - default=sys.stdin, - type=argparse.FileType('r')) + parser.add_argument( + "--s3-bucket", required=True, type=str, help="AWS S3 bucket to upload into" + ) + parser.add_argument( + "--s3-prefix", type=str, default="pdf/", help="key prefix for items created in bucket" + ) + parser.add_argument( + "--s3-suffix", type=str, default=".pdf", help="file suffix for created objects" + ) + parser.add_argument( + "--warc-uri-prefix", + type=str, + default="https://archive.org/serve/", + help="URI where WARCs can be found", + ) + parser.add_argument( + "manifest_file", + help="TSV/JSON manifest file", + default=sys.stdin, + type=argparse.FileType("r"), + ) args = parser.parse_args() worker = DeliverGwbS3(**args.__dict__) worker.run(args.manifest_file) -if __name__ == '__main__': # pragma: no cover +if __name__ == "__main__": # pragma: no cover main() |