diff options
Diffstat (limited to 'python')
59 files changed, 1582 insertions, 1225 deletions
diff --git a/python/grobid2json.py b/python/grobid2json.py index b4bfe2b..0d47f36 100755 --- a/python/grobid2json.py +++ b/python/grobid2json.py @@ -1,5 +1,4 @@ #!/usr/bin/env python3 - """ NB: adapted to work as a library for PDF extraction. Will probably be re-written eventually to be correct, complete, and robust; this is just a @@ -76,9 +75,7 @@ def all_authors(elem: Optional[ET.Element]) -> List[Dict[str, Any]]: def journal_info(elem: ET.Element) -> Dict[str, Any]: journal = dict() journal["name"] = elem.findtext(".//{%s}monogr/{%s}title" % (ns, ns)) - journal["publisher"] = elem.findtext( - ".//{%s}publicationStmt/{%s}publisher" % (ns, ns) - ) + journal["publisher"] = elem.findtext(".//{%s}publicationStmt/{%s}publisher" % (ns, ns)) if journal["publisher"] == "": journal["publisher"] = None journal["issn"] = elem.findtext('.//{%s}idno[@type="ISSN"]' % ns) @@ -145,9 +142,7 @@ def teixml2json(content: AnyStr, encumbered: bool = True) -> Dict[str, Any]: info["grobid_version"] = application_tag.attrib["version"].strip() info["grobid_timestamp"] = application_tag.attrib["when"].strip() info["title"] = header.findtext(".//{%s}analytic/{%s}title" % (ns, ns)) - info["authors"] = all_authors( - header.find(".//{%s}sourceDesc/{%s}biblStruct" % (ns, ns)) - ) + info["authors"] = all_authors(header.find(".//{%s}sourceDesc/{%s}biblStruct" % (ns, ns))) info["journal"] = journal_info(header) date = header.find('.//{%s}date[@type="published"]' % ns) info["date"] = (date is not None) and date.attrib.get("when") @@ -207,8 +202,7 @@ def main() -> None: # pragma no cover json.dumps( teixml2json(content, encumbered=(not args.no_encumbered)), sort_keys=True, - ) - ) + )) if __name__ == "__main__": # pragma no cover diff --git a/python/grobid_tool.py b/python/grobid_tool.py index 0084330..4ba9540 100755 --- a/python/grobid_tool.py +++ b/python/grobid_tool.py @@ -1,5 +1,4 @@ #!/usr/bin/env python3 - """ These are generally for running one-off tasks from the command line. Output might go to stdout, or might go to Kafka topic. @@ -30,6 +29,7 @@ def run_extract_json(args): pusher = JsonLinePusher(worker, args.json_file) pusher.run() + def run_extract_cdx(args): grobid_client = GrobidClient(host_url=args.grobid_host) wayback_client = WaybackClient() @@ -53,6 +53,7 @@ def run_extract_cdx(args): ) pusher.run() + def run_extract_zipfile(args): grobid_client = GrobidClient(host_url=args.grobid_host) if args.jobs > 1: @@ -65,6 +66,7 @@ def run_extract_zipfile(args): pusher = ZipfilePusher(worker, args.zip_file) pusher.run() + def run_transform(args): grobid_client = GrobidClient() for line in args.json_file: @@ -82,52 +84,54 @@ def run_transform(args): def main(): - parser = argparse.ArgumentParser( - formatter_class=argparse.ArgumentDefaultsHelpFormatter) + parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter) parser.add_argument('--kafka-mode', - action='store_true', - help="send output to Kafka (not stdout)") + action='store_true', + help="send output to Kafka (not stdout)") parser.add_argument('--kafka-hosts', - default="localhost:9092", - help="list of Kafka brokers (host/port) to use") + default="localhost:9092", + help="list of Kafka brokers (host/port) to use") parser.add_argument('--kafka-env', - default="dev", - help="Kafka topic namespace to use (eg, prod, qa, dev)") - parser.add_argument('-j', '--jobs', - default=8, type=int, - help="parallelism for batch CPU jobs") + default="dev", + help="Kafka topic namespace to use (eg, prod, qa, dev)") + parser.add_argument('-j', + '--jobs', + default=8, + type=int, + help="parallelism for batch CPU jobs") parser.add_argument('--grobid-host', - default="http://grobid.qa.fatcat.wiki", - help="GROBID API host/port") + default="http://grobid.qa.fatcat.wiki", + help="GROBID API host/port") subparsers = parser.add_subparsers() - sub_extract_json = subparsers.add_parser('extract-json', + sub_extract_json = subparsers.add_parser( + 'extract-json', help="for each JSON line with CDX info, fetches PDF and does GROBID extraction") sub_extract_json.set_defaults(func=run_extract_json) sub_extract_json.add_argument('json_file', - help="JSON file to import from (or '-' for stdin)", - type=argparse.FileType('r')) + help="JSON file to import from (or '-' for stdin)", + type=argparse.FileType('r')) - sub_extract_cdx = subparsers.add_parser('extract-cdx', - help="for each CDX line, fetches PDF and does GROBID extraction") + sub_extract_cdx = subparsers.add_parser( + 'extract-cdx', help="for each CDX line, fetches PDF and does GROBID extraction") sub_extract_cdx.set_defaults(func=run_extract_cdx) sub_extract_cdx.add_argument('cdx_file', - help="CDX file to import from (or '-' for stdin)", - type=argparse.FileType('r')) + help="CDX file to import from (or '-' for stdin)", + type=argparse.FileType('r')) - sub_extract_zipfile = subparsers.add_parser('extract-zipfile', + sub_extract_zipfile = subparsers.add_parser( + 'extract-zipfile', help="opens zipfile, iterates over PDF files inside and does GROBID extract for each") sub_extract_zipfile.set_defaults(func=run_extract_zipfile) - sub_extract_zipfile.add_argument('zip_file', - help="zipfile with PDFs to extract", - type=str) + sub_extract_zipfile.add_argument('zip_file', help="zipfile with PDFs to extract", type=str) sub_transform = subparsers.add_parser('transform') sub_transform.set_defaults(func=run_transform) sub_transform.add_argument('--metadata-only', - action='store_true', - help="Only pass through bibliographic metadata, not fulltext") - sub_transform.add_argument('json_file', + action='store_true', + help="Only pass through bibliographic metadata, not fulltext") + sub_transform.add_argument( + 'json_file', help="convert TEI-XML to JSON. Input is JSON lines with tei_xml field", type=argparse.FileType('r')) @@ -140,10 +144,10 @@ def main(): if args.kafka_mode: produce_topic = "sandcrawler-{}.grobid-output-pg".format(args.kafka_env) print("Running in kafka output mode, publishing to {}\n".format(produce_topic)) - args.sink = KafkaCompressSink(kafka_hosts=args.kafka_hosts, - produce_topic=produce_topic) + args.sink = KafkaCompressSink(kafka_hosts=args.kafka_hosts, produce_topic=produce_topic) args.func(args) + if __name__ == '__main__': main() diff --git a/python/ia_pdf_match.py b/python/ia_pdf_match.py index 137110c..c3d9c16 100755 --- a/python/ia_pdf_match.py +++ b/python/ia_pdf_match.py @@ -1,5 +1,4 @@ #!/usr/bin/env python3 - """ Input is IA item metadata JSON. Ouput is insertable fatcat "match" JSON @@ -81,21 +80,23 @@ def parse(obj): 'size': int(pdf_file['size']), 'mimetype': 'application/pdf', 'urls': [ - "https://archive.org/download/{}/{}".format( - obj['metadata']['identifier'], - pdf_file['name']), + "https://archive.org/download/{}/{}".format(obj['metadata']['identifier'], + pdf_file['name']), ], 'cdx': [], 'dois': [], } if extid_type == 'doi': - match['dois'] = [extid,] + match['dois'] = [ + extid, + ] else: match[extid_type] = extid return match + def run(): for line in sys.stdin: if not line: @@ -105,5 +106,6 @@ def run(): if match: print(json.dumps(match, sort_keys=True)) + if __name__ == '__main__': run() diff --git a/python/ingest_tool.py b/python/ingest_tool.py index c0ef5aa..305c3a8 100755 --- a/python/ingest_tool.py +++ b/python/ingest_tool.py @@ -18,7 +18,9 @@ def run_single_ingest(args): ) if args.force_recrawl: request['force_recrawl'] = True - if request['ingest_type'] in ['dataset',]: + if request['ingest_type'] in [ + 'dataset', + ]: ingester = IngestFilesetWorker( try_spn2=not args.no_spn2, ingest_file_result_stdout=True, @@ -32,75 +34,71 @@ def run_single_ingest(args): print(json.dumps(result, sort_keys=True)) return result + def run_requests(args): # TODO: switch to using JsonLinePusher file_worker = IngestFileWorker( try_spn2=not args.no_spn2, html_quick_mode=args.html_quick_mode, ) - fileset_worker = IngestFilesetWorker( - try_spn2=not args.no_spn2, - ) + fileset_worker = IngestFilesetWorker(try_spn2=not args.no_spn2, ) for l in args.json_file: request = json.loads(l.strip()) - if request['ingest_type'] in ['dataset',]: + if request['ingest_type'] in [ + 'dataset', + ]: result = fileset_worker.process(request) else: result = file_worker.process(request) print(json.dumps(result, sort_keys=True)) + def run_api(args): port = 8083 print("Listening on localhost:{}".format(port)) server = HTTPServer(('', port), IngestFileRequestHandler) server.serve_forever() + def main(): - parser = argparse.ArgumentParser( - formatter_class=argparse.ArgumentDefaultsHelpFormatter) + parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter) subparsers = parser.add_subparsers() - sub_single= subparsers.add_parser('single', - help="ingests a single base URL") + sub_single = subparsers.add_parser('single', help="ingests a single base URL") sub_single.set_defaults(func=run_single_ingest) sub_single.add_argument('ingest_type', - default="pdf", - help="type of ingest (pdf, html, etc)") + default="pdf", + help="type of ingest (pdf, html, etc)") sub_single.add_argument('--release-id', - help="(optional) existing release ident to match to") - sub_single.add_argument('--doi', - help="(optional) existing release DOI to match to") + help="(optional) existing release ident to match to") + sub_single.add_argument('--doi', help="(optional) existing release DOI to match to") sub_single.add_argument('--force-recrawl', - action='store_true', - help="ignore GWB history and use SPNv2 to re-crawl") - sub_single.add_argument('--no-spn2', - action='store_true', - help="don't use live web (SPNv2)") + action='store_true', + help="ignore GWB history and use SPNv2 to re-crawl") + sub_single.add_argument('--no-spn2', action='store_true', help="don't use live web (SPNv2)") sub_single.add_argument('--html-quick-mode', - action='store_true', - help="don't fetch individual sub-resources, just use CDX") - sub_single.add_argument('url', - help="URL of paper to fetch") + action='store_true', + help="don't fetch individual sub-resources, just use CDX") + sub_single.add_argument('url', help="URL of paper to fetch") - sub_requests = subparsers.add_parser('requests', - help="takes a series of ingest requests (JSON, per line) and runs each") + sub_requests = subparsers.add_parser( + 'requests', help="takes a series of ingest requests (JSON, per line) and runs each") sub_requests.add_argument('--no-spn2', - action='store_true', - help="don't use live web (SPNv2)") + action='store_true', + help="don't use live web (SPNv2)") sub_requests.add_argument('--html-quick-mode', - action='store_true', - help="don't fetch individual sub-resources, just use CDX") + action='store_true', + help="don't fetch individual sub-resources, just use CDX") sub_requests.set_defaults(func=run_requests) sub_requests.add_argument('json_file', - help="JSON file (request per line) to import from (or stdin)", - default=sys.stdin, type=argparse.FileType('r')) + help="JSON file (request per line) to import from (or stdin)", + default=sys.stdin, + type=argparse.FileType('r')) - sub_api = subparsers.add_parser('api', - help="starts a simple HTTP server that processes ingest requests") + sub_api = subparsers.add_parser( + 'api', help="starts a simple HTTP server that processes ingest requests") sub_api.set_defaults(func=run_api) - sub_api.add_argument('--port', - help="HTTP port to listen on", - default=8033, type=int) + sub_api.add_argument('--port', help="HTTP port to listen on", default=8033, type=int) args = parser.parse_args() if not args.__dict__.get("func"): @@ -109,5 +107,6 @@ def main(): args.func(args) + if __name__ == '__main__': main() diff --git a/python/pdfextract_tool.py b/python/pdfextract_tool.py index 89ecf1c..717b743 100755 --- a/python/pdfextract_tool.py +++ b/python/pdfextract_tool.py @@ -1,5 +1,4 @@ #!/usr/bin/env python3 - """ KNOWN ISSUE: thumbnails are not published to kafka in multi-processing mode """ @@ -20,10 +19,13 @@ def run_extract_json(args): multi_worker = MultiprocessWrapper(worker, args.sink) pusher = JsonLinePusher(multi_worker, args.json_file, batch_size=args.jobs) else: - worker = PdfExtractWorker(wayback_client, sink=args.sink, thumbnail_sink=args.thumbnail_sink) + worker = PdfExtractWorker(wayback_client, + sink=args.sink, + thumbnail_sink=args.thumbnail_sink) pusher = JsonLinePusher(worker, args.json_file) pusher.run() + def run_extract_cdx(args): wayback_client = WaybackClient() if args.jobs > 1: @@ -37,7 +39,9 @@ def run_extract_cdx(args): batch_size=args.jobs, ) else: - worker = PdfExtractWorker(wayback_client, sink=args.sink, thumbnail_sink=args.thumbnail_sink) + worker = PdfExtractWorker(wayback_client, + sink=args.sink, + thumbnail_sink=args.thumbnail_sink) pusher = CdxLinePusher( worker, args.cdx_file, @@ -46,6 +50,7 @@ def run_extract_cdx(args): ) pusher.run() + def run_extract_zipfile(args): if args.jobs > 1: print("multi-processing: {}".format(args.jobs), file=sys.stderr) @@ -57,6 +62,7 @@ def run_extract_zipfile(args): pusher = ZipfilePusher(worker, args.zip_file) pusher.run() + def run_single(args): worker = PdfExtractBlobWorker(sink=args.sink, thumbnail_sink=args.thumbnail_sink) with open(args.pdf_file, 'rb') as pdf_file: @@ -67,51 +73,48 @@ def run_single(args): args.thumbnail_sink.finish() - def main(): - parser = argparse.ArgumentParser( - formatter_class=argparse.ArgumentDefaultsHelpFormatter) + parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter) parser.add_argument('--kafka-mode', - action='store_true', - help="send output to Kafka (not stdout)") + action='store_true', + help="send output to Kafka (not stdout)") parser.add_argument('--kafka-hosts', - default="localhost:9092", - help="list of Kafka brokers (host/port) to use") + default="localhost:9092", + help="list of Kafka brokers (host/port) to use") parser.add_argument('--kafka-env', - default="dev", - help="Kafka topic namespace to use (eg, prod, qa, dev)") - parser.add_argument('-j', '--jobs', - default=8, type=int, - help="parallelism for batch CPU jobs") + default="dev", + help="Kafka topic namespace to use (eg, prod, qa, dev)") + parser.add_argument('-j', + '--jobs', + default=8, + type=int, + help="parallelism for batch CPU jobs") subparsers = parser.add_subparsers() - sub_extract_json = subparsers.add_parser('extract-json', + sub_extract_json = subparsers.add_parser( + 'extract-json', help="for each JSON line with CDX info, fetches PDF and does PDF extraction") sub_extract_json.set_defaults(func=run_extract_json) sub_extract_json.add_argument('json_file', - help="JSON file to import from (or '-' for stdin)", - type=argparse.FileType('r')) + help="JSON file to import from (or '-' for stdin)", + type=argparse.FileType('r')) - sub_extract_cdx = subparsers.add_parser('extract-cdx', - help="for each CDX line, fetches PDF and does PDF extraction") + sub_extract_cdx = subparsers.add_parser( + 'extract-cdx', help="for each CDX line, fetches PDF and does PDF extraction") sub_extract_cdx.set_defaults(func=run_extract_cdx) sub_extract_cdx.add_argument('cdx_file', - help="CDX file to import from (or '-' for stdin)", - type=argparse.FileType('r')) + help="CDX file to import from (or '-' for stdin)", + type=argparse.FileType('r')) - sub_extract_zipfile = subparsers.add_parser('extract-zipfile', + sub_extract_zipfile = subparsers.add_parser( + 'extract-zipfile', help="opens zipfile, iterates over PDF files inside and does PDF extract for each") sub_extract_zipfile.set_defaults(func=run_extract_zipfile) - sub_extract_zipfile.add_argument('zip_file', - help="zipfile with PDFs to extract", - type=str) + sub_extract_zipfile.add_argument('zip_file', help="zipfile with PDFs to extract", type=str) - sub_single = subparsers.add_parser('single', - help="opens single PDF and extracts it") + sub_single = subparsers.add_parser('single', help="opens single PDF and extracts it") sub_single.set_defaults(func=run_single) - sub_single.add_argument('pdf_file', - help="single PDF to extract", - type=str) + sub_single.add_argument('pdf_file', help="single PDF to extract", type=str) args = parser.parse_args() if not args.__dict__.get("func"): @@ -123,17 +126,18 @@ def main(): if args.kafka_mode: text_topic = "sandcrawler-{}.pdf-text".format(args.kafka_env) thumbnail_topic = "sandcrawler-{}.pdf-thumbnail-180px-jpg".format(args.kafka_env) - args.sink = KafkaCompressSink(kafka_hosts=args.kafka_hosts, - produce_topic=text_topic) + args.sink = KafkaCompressSink(kafka_hosts=args.kafka_hosts, produce_topic=text_topic) args.thumbnail_sink = KafkaSink(kafka_hosts=args.kafka_hosts, - produce_topic=thumbnail_topic) + produce_topic=thumbnail_topic) print("Running in kafka output mode, publishing to {} and {}\n".format( - text_topic, thumbnail_topic), file=sys.stderr) + text_topic, thumbnail_topic), + file=sys.stderr) else: args.sink = None args.thumbnail_sink = None args.func(args) + if __name__ == '__main__': main() diff --git a/python/pdftrio_tool.py b/python/pdftrio_tool.py index e195bc7..9316313 100755 --- a/python/pdftrio_tool.py +++ b/python/pdftrio_tool.py @@ -1,5 +1,4 @@ #!/usr/bin/env python3 - """ Basically just a copy of grobid_tool.py, but for PDF classification instead of text extraction. @@ -21,19 +20,29 @@ def run_classify_pdf_json(args): pdftrio_client = PdfTrioClient(host_url=args.pdftrio_host) wayback_client = WaybackClient() if args.jobs > 1: - worker = PdfTrioWorker(pdftrio_client, wayback_client, sink=None, mode=args.pdftrio_mode) + worker = PdfTrioWorker(pdftrio_client, + wayback_client, + sink=None, + mode=args.pdftrio_mode) multi_worker = MultiprocessWrapper(worker, args.sink) pusher = JsonLinePusher(multi_worker, args.json_file, batch_size=args.jobs) else: - worker = PdfTrioWorker(pdftrio_client, wayback_client, sink=args.sink, mode=args.pdftrio_mode) + worker = PdfTrioWorker(pdftrio_client, + wayback_client, + sink=args.sink, + mode=args.pdftrio_mode) pusher = JsonLinePusher(worker, args.json_file) pusher.run() + def run_classify_pdf_cdx(args): pdftrio_client = PdfTrioClient(host_url=args.pdftrio_host) wayback_client = WaybackClient() if args.jobs > 1: - worker = PdfTrioWorker(pdftrio_client, wayback_client, sink=None, mode=args.pdftrio_mode) + worker = PdfTrioWorker(pdftrio_client, + wayback_client, + sink=None, + mode=args.pdftrio_mode) multi_worker = MultiprocessWrapper(worker, args.sink) pusher = CdxLinePusher( multi_worker, @@ -43,7 +52,10 @@ def run_classify_pdf_cdx(args): batch_size=args.jobs, ) else: - worker = PdfTrioWorker(pdftrio_client, wayback_client, sink=args.sink, mode=args.pdftrio_mode) + worker = PdfTrioWorker(pdftrio_client, + wayback_client, + sink=args.sink, + mode=args.pdftrio_mode) pusher = CdxLinePusher( worker, args.cdx_file, @@ -52,6 +64,7 @@ def run_classify_pdf_cdx(args): ) pusher.run() + def run_classify_pdf_zipfile(args): pdftrio_client = PdfTrioClient(host_url=args.pdftrio_host) worker = PdfTrioBlobWorker(pdftrio_client, sink=args.sink, mode=args.pdftrio_mode) @@ -60,48 +73,53 @@ def run_classify_pdf_zipfile(args): def main(): - parser = argparse.ArgumentParser( - formatter_class=argparse.ArgumentDefaultsHelpFormatter) + parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter) parser.add_argument('--kafka-mode', - action='store_true', - help="send output to Kafka (not stdout)") + action='store_true', + help="send output to Kafka (not stdout)") parser.add_argument('--kafka-hosts', - default="localhost:9092", - help="list of Kafka brokers (host/port) to use") + default="localhost:9092", + help="list of Kafka brokers (host/port) to use") parser.add_argument('--kafka-env', - default="dev", - help="Kafka topic namespace to use (eg, prod, qa, dev)") - parser.add_argument('-j', '--jobs', - default=8, type=int, - help="parallelism for batch CPU jobs") + default="dev", + help="Kafka topic namespace to use (eg, prod, qa, dev)") + parser.add_argument('-j', + '--jobs', + default=8, + type=int, + help="parallelism for batch CPU jobs") parser.add_argument('--pdftrio-host', - default="http://pdftrio.qa.fatcat.wiki", - help="pdftrio API host/port") + default="http://pdftrio.qa.fatcat.wiki", + help="pdftrio API host/port") parser.add_argument('--pdftrio-mode', - default="auto", - help="which classification mode to use") + default="auto", + help="which classification mode to use") subparsers = parser.add_subparsers() - sub_classify_pdf_json = subparsers.add_parser('classify-pdf-json', + sub_classify_pdf_json = subparsers.add_parser( + 'classify-pdf-json', help="for each JSON line with CDX info, fetches PDF and does pdftrio classify_pdfion") sub_classify_pdf_json.set_defaults(func=run_classify_pdf_json) sub_classify_pdf_json.add_argument('json_file', - help="JSON file to import from (or '-' for stdin)", - type=argparse.FileType('r')) + help="JSON file to import from (or '-' for stdin)", + type=argparse.FileType('r')) - sub_classify_pdf_cdx = subparsers.add_parser('classify-pdf-cdx', + sub_classify_pdf_cdx = subparsers.add_parser( + 'classify-pdf-cdx', help="for each CDX line, fetches PDF and does pdftrio classify_pdfion") sub_classify_pdf_cdx.set_defaults(func=run_classify_pdf_cdx) sub_classify_pdf_cdx.add_argument('cdx_file', - help="CDX file to import from (or '-' for stdin)", - type=argparse.FileType('r')) + help="CDX file to import from (or '-' for stdin)", + type=argparse.FileType('r')) - sub_classify_pdf_zipfile = subparsers.add_parser('classify-pdf-zipfile', - help="opens zipfile, iterates over PDF files inside and does pdftrio classify_pdf for each") + sub_classify_pdf_zipfile = subparsers.add_parser( + 'classify-pdf-zipfile', + help= + "opens zipfile, iterates over PDF files inside and does pdftrio classify_pdf for each") sub_classify_pdf_zipfile.set_defaults(func=run_classify_pdf_zipfile) sub_classify_pdf_zipfile.add_argument('zip_file', - help="zipfile with PDFs to classify", - type=str) + help="zipfile with PDFs to classify", + type=str) args = parser.parse_args() if not args.__dict__.get("func"): @@ -112,10 +130,10 @@ def main(): if args.kafka_mode: produce_topic = "sandcrawler-{}.pdftrio-output".format(args.kafka_env) print("Running in kafka output mode, publishing to {}\n".format(produce_topic)) - args.sink = KafkaSink(kafka_hosts=args.kafka_hosts, - produce_topic=produce_topic) + args.sink = KafkaSink(kafka_hosts=args.kafka_hosts, produce_topic=produce_topic) args.func(args) + if __name__ == '__main__': main() diff --git a/python/persist_tool.py b/python/persist_tool.py index d52f7c1..9160db6 100755 --- a/python/persist_tool.py +++ b/python/persist_tool.py @@ -1,5 +1,4 @@ #!/usr/bin/env python3 - """ Commands for backfilling content from bulk files into postgresql and s3 (seaweedfs). @@ -16,9 +15,7 @@ from sandcrawler.persist import * def run_cdx(args): - worker = PersistCdxWorker( - db_url=args.db_url, - ) + worker = PersistCdxWorker(db_url=args.db_url, ) filter_mimetypes = ['application/pdf'] if args.no_mimetype_filter: filter_mimetypes = None @@ -32,6 +29,7 @@ def run_cdx(args): ) pusher.run() + def run_grobid(args): worker = PersistGrobidWorker( db_url=args.db_url, @@ -49,24 +47,22 @@ def run_grobid(args): ) pusher.run() + def run_grobid_disk(args): """ Writes XML to individual files on disk, and also prints non-XML metadata to stdout as JSON, which can be redirected to a separate file. """ - worker = PersistGrobidDiskWorker( - output_dir=args.output_dir, - ) + worker = PersistGrobidDiskWorker(output_dir=args.output_dir, ) pusher = JsonLinePusher( worker, args.json_file, ) pusher.run() + def run_pdftrio(args): - worker = PersistPdfTrioWorker( - db_url=args.db_url, - ) + worker = PersistPdfTrioWorker(db_url=args.db_url, ) pusher = JsonLinePusher( worker, args.json_file, @@ -74,6 +70,7 @@ def run_pdftrio(args): ) pusher.run() + def run_pdftext(args): worker = PersistPdfTextWorker( db_url=args.db_url, @@ -91,10 +88,9 @@ def run_pdftext(args): ) pusher.run() + def run_ingest_file_result(args): - worker = PersistIngestFileResultWorker( - db_url=args.db_url, - ) + worker = PersistIngestFileResultWorker(db_url=args.db_url, ) pusher = JsonLinePusher( worker, args.json_file, @@ -102,10 +98,9 @@ def run_ingest_file_result(args): ) pusher.run() + def run_ingest_request(args): - worker = PersistIngestRequestWorker( - db_url=args.db_url, - ) + worker = PersistIngestRequestWorker(db_url=args.db_url, ) pusher = JsonLinePusher( worker, args.json_file, @@ -113,92 +108,95 @@ def run_ingest_request(args): ) pusher.run() + def main(): - parser = argparse.ArgumentParser( - formatter_class=argparse.ArgumentDefaultsHelpFormatter) + parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter) parser.add_argument('--db-url', - help="postgresql database connection string", - default="postgres:///sandcrawler") - parser.add_argument('--s3-url', - help="S3 (seaweedfs) backend URL", - default="localhost:9000") + help="postgresql database connection string", + default="postgres:///sandcrawler") + parser.add_argument('--s3-url', help="S3 (seaweedfs) backend URL", default="localhost:9000") parser.add_argument('--s3-access-key', - help="S3 (seaweedfs) credential", - default=os.environ.get('SANDCRAWLER_BLOB_ACCESS_KEY') or os.environ.get('MINIO_ACCESS_KEY')) + help="S3 (seaweedfs) credential", + default=os.environ.get('SANDCRAWLER_BLOB_ACCESS_KEY') + or os.environ.get('MINIO_ACCESS_KEY')) parser.add_argument('--s3-secret-key', - help="S3 (seaweedfs) credential", - default=os.environ.get('SANDCRAWLER_BLOB_ACCESS_KEY') or os.environ.get('MINIO_SECRET_KEY')) + help="S3 (seaweedfs) credential", + default=os.environ.get('SANDCRAWLER_BLOB_ACCESS_KEY') + or os.environ.get('MINIO_SECRET_KEY')) parser.add_argument('--s3-bucket', - help="S3 (seaweedfs) bucket to persist into", - default="sandcrawler-dev") + help="S3 (seaweedfs) bucket to persist into", + default="sandcrawler-dev") subparsers = parser.add_subparsers() - sub_cdx = subparsers.add_parser('cdx', - help="backfill a CDX file into postgresql cdx table") + sub_cdx = subparsers.add_parser('cdx', help="backfill a CDX file into postgresql cdx table") sub_cdx.set_defaults(func=run_cdx) sub_cdx.add_argument('cdx_file', - help="CDX file to import from (or '-' for stdin)", - type=argparse.FileType('r')) - sub_cdx.add_argument('--no-mimetype-filter', + help="CDX file to import from (or '-' for stdin)", + type=argparse.FileType('r')) + sub_cdx.add_argument( + '--no-mimetype-filter', action='store_true', help="ignore mimetype filtering; insert all content types (eg, assuming pre-filtered)") - sub_grobid = subparsers.add_parser('grobid', - help="backfill a grobid JSON ('pg') dump into postgresql and s3 (seaweedfs)") + sub_grobid = subparsers.add_parser( + 'grobid', help="backfill a grobid JSON ('pg') dump into postgresql and s3 (seaweedfs)") sub_grobid.set_defaults(func=run_grobid) sub_grobid.add_argument('json_file', - help="grobid file to import from (or '-' for stdin)", - type=argparse.FileType('r')) + help="grobid file to import from (or '-' for stdin)", + type=argparse.FileType('r')) sub_grobid.add_argument('--s3-only', - action='store_true', - help="only upload TEI-XML to S3 (don't write to database)") - sub_grobid.add_argument('--db-only', + action='store_true', + help="only upload TEI-XML to S3 (don't write to database)") + sub_grobid.add_argument( + '--db-only', action='store_true', help="only write status to sandcrawler-db (don't save TEI-XML to S3)") - sub_pdftext = subparsers.add_parser('pdftext', + sub_pdftext = subparsers.add_parser( + 'pdftext', help="backfill a pdftext JSON ('pg') dump into postgresql and s3 (seaweedfs)") sub_pdftext.set_defaults(func=run_pdftext) sub_pdftext.add_argument('json_file', - help="pdftext file to import from (or '-' for stdin)", - type=argparse.FileType('r')) + help="pdftext file to import from (or '-' for stdin)", + type=argparse.FileType('r')) sub_pdftext.add_argument('--s3-only', - action='store_true', - help="only upload TEI-XML to S3 (don't write to database)") - sub_pdftext.add_argument('--db-only', + action='store_true', + help="only upload TEI-XML to S3 (don't write to database)") + sub_pdftext.add_argument( + '--db-only', action='store_true', help="only write status to sandcrawler-db (don't save TEI-XML to S3)") sub_grobid_disk = subparsers.add_parser('grobid-disk', - help="dump GRBOID output to (local) files on disk") + help="dump GRBOID output to (local) files on disk") sub_grobid_disk.set_defaults(func=run_grobid_disk) sub_grobid_disk.add_argument('json_file', - help="grobid file to import from (or '-' for stdin)", - type=argparse.FileType('r')) - sub_grobid_disk.add_argument('output_dir', - help="base directory to output into", - type=str) + help="grobid file to import from (or '-' for stdin)", + type=argparse.FileType('r')) + sub_grobid_disk.add_argument('output_dir', help="base directory to output into", type=str) - sub_pdftrio = subparsers.add_parser('pdftrio', + sub_pdftrio = subparsers.add_parser( + 'pdftrio', help="backfill a pdftrio JSON ('pg') dump into postgresql and s3 (seaweedfs)") sub_pdftrio.set_defaults(func=run_pdftrio) sub_pdftrio.add_argument('json_file', - help="pdftrio file to import from (or '-' for stdin)", - type=argparse.FileType('r')) + help="pdftrio file to import from (or '-' for stdin)", + type=argparse.FileType('r')) - sub_ingest_file_result = subparsers.add_parser('ingest-file-result', - help="backfill a ingest_file_result JSON dump into postgresql") + sub_ingest_file_result = subparsers.add_parser( + 'ingest-file-result', help="backfill a ingest_file_result JSON dump into postgresql") sub_ingest_file_result.set_defaults(func=run_ingest_file_result) - sub_ingest_file_result.add_argument('json_file', + sub_ingest_file_result.add_argument( + 'json_file', help="ingest_file_result file to import from (or '-' for stdin)", type=argparse.FileType('r')) - sub_ingest_request = subparsers.add_parser('ingest-request', - help="backfill a ingest_request JSON dump into postgresql") + sub_ingest_request = subparsers.add_parser( + 'ingest-request', help="backfill a ingest_request JSON dump into postgresql") sub_ingest_request.set_defaults(func=run_ingest_request) sub_ingest_request.add_argument('json_file', - help="ingest_request to import from (or '-' for stdin)", - type=argparse.FileType('r')) + help="ingest_request to import from (or '-' for stdin)", + type=argparse.FileType('r')) args = parser.parse_args() if not args.__dict__.get("func"): @@ -207,5 +205,6 @@ def main(): args.func(args) + if __name__ == '__main__': main() diff --git a/python/sandcrawler/__init__.py b/python/sandcrawler/__init__.py index bf2d92d..46735eb 100644 --- a/python/sandcrawler/__init__.py +++ b/python/sandcrawler/__init__.py @@ -1,14 +1,16 @@ - from .db import SandcrawlerPostgresClient, SandcrawlerPostgrestClient from .grobid import GrobidBlobWorker, GrobidClient, GrobidWorker -from .ia import (CdxApiClient, CdxApiError, CdxPartial, CdxRow, PetaboxError, ResourceResult, SavePageNowClient, - SavePageNowError, WarcResource, WaybackClient, WaybackContentError, WaybackError) +from .ia import (CdxApiClient, CdxApiError, CdxPartial, CdxRow, PetaboxError, ResourceResult, + SavePageNowClient, SavePageNowError, WarcResource, WaybackClient, + WaybackContentError, WaybackError) from .ingest_file import IngestFileWorker from .ingest_fileset import IngestFilesetWorker -from .misc import b32_hex, clean_url, gen_file_metadata, gen_file_metadata_path, parse_cdx_datetime, parse_cdx_line +from .misc import (b32_hex, clean_url, gen_file_metadata, gen_file_metadata_path, + parse_cdx_datetime, parse_cdx_line) from .pdfextract import PdfExtractBlobWorker, PdfExtractWorker from .pdftrio import PdfTrioBlobWorker, PdfTrioClient, PdfTrioWorker -from .persist import (PersistCdxWorker, PersistGrobidDiskWorker, PersistGrobidWorker, PersistIngestFileResultWorker, - PersistIngestRequestWorker, PersistPdfTextWorker, PersistPdfTrioWorker, PersistThumbnailWorker) -from .workers import (BlackholeSink, CdxLinePusher, JsonLinePusher, KafkaCompressSink, KafkaJsonPusher, KafkaSink, - MultiprocessWrapper, ZipfilePusher) +from .persist import (PersistCdxWorker, PersistGrobidDiskWorker, PersistGrobidWorker, + PersistIngestFileResultWorker, PersistIngestRequestWorker, + PersistPdfTextWorker, PersistPdfTrioWorker, PersistThumbnailWorker) +from .workers import (BlackholeSink, CdxLinePusher, JsonLinePusher, KafkaCompressSink, + KafkaJsonPusher, KafkaSink, MultiprocessWrapper, ZipfilePusher) diff --git a/python/sandcrawler/db.py b/python/sandcrawler/db.py index 4dcdb0e..360add9 100644 --- a/python/sandcrawler/db.py +++ b/python/sandcrawler/db.py @@ -1,4 +1,3 @@ - import datetime import json from typing import Optional @@ -9,17 +8,16 @@ import requests class SandcrawlerPostgrestClient: - def __init__(self, api_url="http://wbgrp-svc506.us.archive.org:3030", **kwargs): self.api_url = api_url def get_cdx(self, url): - resp = requests.get(self.api_url + "/cdx", params=dict(url='eq.'+url)) + resp = requests.get(self.api_url + "/cdx", params=dict(url='eq.' + url)) resp.raise_for_status() return resp.json() or None def get_grobid(self, sha1): - resp = requests.get(self.api_url + "/grobid", params=dict(sha1hex='eq.'+sha1)) + resp = requests.get(self.api_url + "/grobid", params=dict(sha1hex='eq.' + sha1)) resp.raise_for_status() resp = resp.json() if resp: @@ -28,7 +26,7 @@ class SandcrawlerPostgrestClient: return None def get_pdftrio(self, sha1): - resp = requests.get(self.api_url + "/pdftrio", params=dict(sha1hex='eq.'+sha1)) + resp = requests.get(self.api_url + "/pdftrio", params=dict(sha1hex='eq.' + sha1)) resp.raise_for_status() resp = resp.json() if resp: @@ -37,7 +35,7 @@ class SandcrawlerPostgrestClient: return None def get_pdf_meta(self, sha1): - resp = requests.get(self.api_url + "/pdf_meta", params=dict(sha1hex='eq.'+sha1)) + resp = requests.get(self.api_url + "/pdf_meta", params=dict(sha1hex='eq.' + sha1)) resp.raise_for_status() resp = resp.json() if resp: @@ -58,7 +56,7 @@ class SandcrawlerPostgrestClient: return None def get_file_meta(self, sha1): - resp = requests.get(self.api_url + "/file_meta", params=dict(sha1hex='eq.'+sha1)) + resp = requests.get(self.api_url + "/file_meta", params=dict(sha1hex='eq.' + sha1)) resp.raise_for_status() resp = resp.json() if resp: @@ -91,7 +89,7 @@ class SandcrawlerPostgrestClient: return None def get_crossref(self, doi): - resp = requests.get(self.api_url + "/crossref", params=dict(doi='eq.'+doi)) + resp = requests.get(self.api_url + "/crossref", params=dict(doi='eq.' + doi)) resp.raise_for_status() resp = resp.json() if resp: @@ -99,8 +97,8 @@ class SandcrawlerPostgrestClient: else: return None -class SandcrawlerPostgresClient: +class SandcrawlerPostgresClient: def __init__(self, db_url, **kwargs): self.conn = psycopg2.connect(db_url) @@ -135,14 +133,8 @@ class SandcrawlerPostgresClient: batch = [d for d in batch if d.get('warc_path')] if not batch: return (0, 0) - batch = [(d['url'], - d['datetime'], - d['sha1hex'], - d['mimetype'], - d['warc_path'], - int(d['warc_csize']), - int(d['warc_offset'])) - for d in batch] + batch = [(d['url'], d['datetime'], d['sha1hex'], d['mimetype'], d['warc_path'], + int(d['warc_csize']), int(d['warc_offset'])) for d in batch] # filter out duplicate rows by key (url, datetime) batch_dict = dict() for b in batch: @@ -170,12 +162,8 @@ class SandcrawlerPostgresClient: else: raise NotImplementedError("on_conflict: {}".format(on_conflict)) sql += " RETURNING xmax;" - batch = [(d['sha1hex'], - d['sha256hex'], - d['md5hex'], - int(d['size_bytes']), - d['mimetype']) - for d in batch] + batch = [(d['sha1hex'], d['sha256hex'], d['md5hex'], int(d['size_bytes']), + d['mimetype']) for d in batch] # filter out duplicate rows by key (sha1hex) batch_dict = dict() for b in batch: @@ -215,15 +203,15 @@ class SandcrawlerPostgresClient: r[k] = r['metadata'].get(k) r['metadata'].pop(k, None) r['metadata'] = json.dumps(r['metadata'], sort_keys=True) - batch = [(d['key'], - d.get('grobid_version') or None, - d['status_code'], - d['status'], - d.get('fatcat_release') or None, - d.get('updated') or datetime.datetime.now(), - d.get('metadata') or None , - ) - for d in batch] + batch = [( + d['key'], + d.get('grobid_version') or None, + d['status_code'], + d['status'], + d.get('fatcat_release') or None, + d.get('updated') or datetime.datetime.now(), + d.get('metadata') or None, + ) for d in batch] # filter out duplicate rows by key (sha1hex) batch_dict = dict() for b in batch: @@ -331,20 +319,18 @@ class SandcrawlerPostgresClient: else: raise NotImplementedError("on_conflict: {}".format(on_conflict)) sql += " RETURNING xmax;" - batch = [ - ( - d['key'], - d.get('updated') or datetime.datetime.now(), - d['status_code'], - d['status'], - d.get('versions', {}).get('pdftrio_version') or None, - d.get('versions', {}).get('models_date') or None, - d.get('ensemble_score'), - d.get('bert_score'), - d.get('linear_score'), - d.get('image_score'), - ) - for d in batch] + batch = [( + d['key'], + d.get('updated') or datetime.datetime.now(), + d['status_code'], + d['status'], + d.get('versions', {}).get('pdftrio_version') or None, + d.get('versions', {}).get('models_date') or None, + d.get('ensemble_score'), + d.get('bert_score'), + d.get('linear_score'), + d.get('image_score'), + ) for d in batch] # filter out duplicate rows by key (sha1hex) batch_dict = dict() for b in batch: @@ -373,15 +359,15 @@ class SandcrawlerPostgresClient: extra[k] = r[k] if extra: r['extra'] = json.dumps(extra, sort_keys=True) - batch = [(d['link_source'], - d['link_source_id'], - d['ingest_type'], - d['base_url'], - d.get('ingest_request_source'), - d.get('release_stage') or None, - d.get('extra') or None, - ) - for d in batch] + batch = [( + d['link_source'], + d['link_source_id'], + d['ingest_type'], + d['base_url'], + d.get('ingest_request_source'), + d.get('release_stage') or None, + d.get('extra') or None, + ) for d in batch] # filter out duplicate rows by key (link_source, link_source_id, ingest_type, base_url) batch_dict = dict() for b in batch: @@ -412,16 +398,16 @@ class SandcrawlerPostgresClient: else: raise NotImplementedError("on_conflict: {}".format(on_conflict)) sql += " RETURNING xmax;" - batch = [(d['ingest_type'], - d['base_url'], - bool(d['hit']), - d['status'], - d.get('terminal_url'), - d.get('terminal_dt'), - d.get('terminal_status_code'), - d.get('terminal_sha1hex'), - ) - for d in batch] + batch = [( + d['ingest_type'], + d['base_url'], + bool(d['hit']), + d['status'], + d.get('terminal_url'), + d.get('terminal_dt'), + d.get('terminal_status_code'), + d.get('terminal_sha1hex'), + ) for d in batch] # filter out duplicate rows by key (ingest_type, base_url) batch_dict = dict() for b in batch: @@ -459,23 +445,23 @@ class SandcrawlerPostgresClient: else: raise NotImplementedError("on_conflict: {}".format(on_conflict)) sql += " RETURNING xmax;" - batch = [(d['ingest_type'], - d['base_url'], - bool(d['hit']), - d['status'], - d.get('platform_name'), - d.get('platform_domain'), - d.get('platform_id'), - d.get('ingest_strategy'), - d.get('total_size'), - d.get('file_count'), - d.get('archiveorg_item_name'), - d.get('archiveorg_item_bundle_path'), - d.get('web_bundle_url'), - d.get('web_bundle_dt'), - d.get('manifest'), - ) - for d in batch] + batch = [( + d['ingest_type'], + d['base_url'], + bool(d['hit']), + d['status'], + d.get('platform_name'), + d.get('platform_domain'), + d.get('platform_id'), + d.get('ingest_strategy'), + d.get('total_size'), + d.get('file_count'), + d.get('archiveorg_item_name'), + d.get('archiveorg_item_bundle_path'), + d.get('web_bundle_url'), + d.get('web_bundle_dt'), + d.get('manifest'), + ) for d in batch] # filter out duplicate rows by key (ingest_type, base_url) batch_dict = dict() for b in batch: diff --git a/python/sandcrawler/fileset_platforms.py b/python/sandcrawler/fileset_platforms.py index 92fed37..f3441c9 100644 --- a/python/sandcrawler/fileset_platforms.py +++ b/python/sandcrawler/fileset_platforms.py @@ -1,4 +1,3 @@ - import gzip import json import sys @@ -16,17 +15,18 @@ from sandcrawler.ia import ResourceResult class FilesetPlatformHelper(): - def __init__(self): self.platform_name = 'unknown' - def match_request(self, request: dict , resource: Optional[ResourceResult], html_biblio: Optional[BiblioMetadata]) -> bool: + def match_request(self, request: dict, resource: Optional[ResourceResult], + html_biblio: Optional[BiblioMetadata]) -> bool: """ Does this request look like it matches this platform? """ raise NotImplementedError() - def process_request(self, request: dict, resource: Optional[ResourceResult], html_biblio: Optional[BiblioMetadata]) -> FilesetPlatformItem: + def process_request(self, request: dict, resource: Optional[ResourceResult], + html_biblio: Optional[BiblioMetadata]) -> FilesetPlatformItem: """ Fetch platform-specific metadata for this request (eg, via API calls) """ @@ -40,19 +40,18 @@ class FilesetPlatformHelper(): # XXX: while developing ArchiveorgFileset path #return IngestStrategy.ArchiveorgFileset if len(item.manifest) == 1: - if total_size < 64*1024*1024: + if total_size < 64 * 1024 * 1024: return IngestStrategy.WebFile else: return IngestStrategy.ArchiveorgFile else: - if largest_size < 64*1024*1024 and total_size < 128*1024*1024*1024: + if largest_size < 64 * 1024 * 1024 and total_size < 128 * 1024 * 1024 * 1024: return IngestStrategy.WebFileset else: return IngestStrategy.ArchiveorgFileset class DataverseHelper(FilesetPlatformHelper): - def __init__(self): super().__init__() self.platform_name = 'dataverse' @@ -122,8 +121,8 @@ class DataverseHelper(FilesetPlatformHelper): "file_id": file_id, } - - def match_request(self, request: dict , resource: Optional[ResourceResult], html_biblio: Optional[BiblioMetadata]) -> bool: + def match_request(self, request: dict, resource: Optional[ResourceResult], + html_biblio: Optional[BiblioMetadata]) -> bool: if resource and resource.terminal_url: url = resource.terminal_url else: @@ -146,7 +145,8 @@ class DataverseHelper(FilesetPlatformHelper): return True - def process_request(self, request: dict, resource: Optional[ResourceResult], html_biblio: Optional[BiblioMetadata]) -> FilesetPlatformItem: + def process_request(self, request: dict, resource: Optional[ResourceResult], + html_biblio: Optional[BiblioMetadata]) -> FilesetPlatformItem: """ Fetch platform-specific metadata for this request (eg, via API calls) @@ -179,22 +179,27 @@ class DataverseHelper(FilesetPlatformHelper): if parsed_id['file_id']: # XXX: maybe we could support this? - raise PlatformScopeError(f"only entire dataverse datasets can be archived with this tool") + raise PlatformScopeError( + f"only entire dataverse datasets can be archived with this tool") # 1b. if we didn't get a version number from URL, fetch it from API if not dataset_version: - resp = self.session.get(f"https://{platform_domain}/api/datasets/:persistentId/?persistentId={platform_id}") + resp = self.session.get( + f"https://{platform_domain}/api/datasets/:persistentId/?persistentId={platform_id}" + ) resp.raise_for_status() obj = resp.json() obj_latest = obj['data']['latestVersion'] dataset_version = f"{obj_latest['versionNumber']}.{obj_latest['versionMinorNumber']}" # 2. API fetch - resp = self.session.get(f"https://{platform_domain}/api/datasets/:persistentId/?persistentId={platform_id}&version={dataset_version}") + resp = self.session.get( + f"https://{platform_domain}/api/datasets/:persistentId/?persistentId={platform_id}&version={dataset_version}" + ) resp.raise_for_status() obj = resp.json() - obj_latest= obj['data']['latestVersion'] + obj_latest = obj['data']['latestVersion'] assert dataset_version == f"{obj_latest['versionNumber']}.{obj_latest['versionMinorNumber']}" assert platform_id == obj_latest['datasetPersistentId'] @@ -212,15 +217,16 @@ class DataverseHelper(FilesetPlatformHelper): extra['version'] = row['version'] if 'description' in df: extra['description'] = df['description'] - manifest.append(FilesetManifestFile( - path=df.get('originalFileName') or df['filename'], - size=df.get('originalFileSize') or df['filesize'], - md5=df['md5'], - # NOTE: don't get: sha1, sha256 - mimetype=df['contentType'], - platform_url=platform_url, - extra=extra or None, - )) + manifest.append( + FilesetManifestFile( + path=df.get('originalFileName') or df['filename'], + size=df.get('originalFileSize') or df['filesize'], + md5=df['md5'], + # NOTE: don't get: sha1, sha256 + mimetype=df['contentType'], + platform_url=platform_url, + extra=extra or None, + )) platform_sub_id = platform_id.split('/')[-1] archiveorg_item_name = f"{platform_domain}-{platform_sub_id}-v{dataset_version}" @@ -228,7 +234,8 @@ class DataverseHelper(FilesetPlatformHelper): # XXX: collection=platform_domain, collection="datasets", date=obj_latest['releaseTime'].split('T')[0], - source=f"https://{platform_domain}/dataset.xhtml?persistentId={platform_id}&version={dataset_version}", + source= + f"https://{platform_domain}/dataset.xhtml?persistentId={platform_id}&version={dataset_version}", ) if platform_id.startswith('doi:10.'): archiveorg_item_meta['doi'] = platform_id.replace('doi:', '') @@ -238,7 +245,8 @@ class DataverseHelper(FilesetPlatformHelper): elif block['typeName'] == 'depositor': archiveorg_item_meta['creator'] = block['value'] elif block['typeName'] == 'dsDescription': - archiveorg_item_meta['description'] = block['value'][0]['dsDescriptionValue']['value'] + archiveorg_item_meta['description'] = block['value'][0]['dsDescriptionValue'][ + 'value'] archiveorg_item_meta['description'] = archiveorg_item_meta.get('description', '') if obj_latest.get('termsOfUse'): @@ -252,11 +260,13 @@ class DataverseHelper(FilesetPlatformHelper): platform_id=platform_id, archiveorg_item_name=archiveorg_item_name, archiveorg_item_meta=archiveorg_item_meta, - web_bundle_url=f"https://{platform_domain}/api/access/dataset/:persistentId/?persistentId={platform_id}&format=original", + web_bundle_url= + f"https://{platform_domain}/api/access/dataset/:persistentId/?persistentId={platform_id}&format=original", # TODO: web_base_url= (for GWB downloading, in lieu of platform_url on individual files) extra=dict(version=dataset_version), ) + def test_parse_dataverse_persistentid(): valid = { @@ -322,8 +332,8 @@ def test_parse_dataverse_persistentid(): except ValueError: pass -class FigshareHelper(FilesetPlatformHelper): +class FigshareHelper(FilesetPlatformHelper): def __init__(self): super().__init__() self.platform_name = 'figshare' @@ -346,7 +356,9 @@ class FigshareHelper(FilesetPlatformHelper): raise ValueError(f"not a figshare URL: {path}") comp = comp[2:] - if comp[0] in ['dataset',]: + if comp[0] in [ + 'dataset', + ]: comp = comp[1:] if len(comp) == 3 and comp[1].isdigit() and comp[2].isdigit(): @@ -356,7 +368,8 @@ class FigshareHelper(FilesetPlatformHelper): else: raise ValueError(f"couldn't find figshare identiier: {path}") - def match_request(self, request: dict , resource: Optional[ResourceResult], html_biblio: Optional[BiblioMetadata]) -> bool: + def match_request(self, request: dict, resource: Optional[ResourceResult], + html_biblio: Optional[BiblioMetadata]) -> bool: if resource and resource.terminal_url: url = resource.terminal_url @@ -381,7 +394,8 @@ class FigshareHelper(FilesetPlatformHelper): return False - def process_request(self, request: dict, resource: Optional[ResourceResult], html_biblio: Optional[BiblioMetadata]) -> FilesetPlatformItem: + def process_request(self, request: dict, resource: Optional[ResourceResult], + html_biblio: Optional[BiblioMetadata]) -> FilesetPlatformItem: """ Fetch platform-specific metadata for this request (eg, via API calls) """ @@ -397,13 +411,15 @@ class FigshareHelper(FilesetPlatformHelper): (platform_id, dataset_version) = self.parse_figshare_url_path(components.path) assert platform_id.isdigit(), f"expected numeric: {platform_id}" - assert dataset_version and dataset_version.isdigit(), f"expected numeric: {dataset_version}" + assert dataset_version and dataset_version.isdigit( + ), f"expected numeric: {dataset_version}" # 1b. if we didn't get a version number from URL, fetch it from API # TODO: implement this code path # 2. API fetch - resp = self.session.get(f"https://api.figshare.com/v2/articles/{platform_id}/versions/{dataset_version}") + resp = self.session.get( + f"https://api.figshare.com/v2/articles/{platform_id}/versions/{dataset_version}") resp.raise_for_status() obj = resp.json() @@ -412,18 +428,21 @@ class FigshareHelper(FilesetPlatformHelper): if not obj['is_public']: raise PlatformRestrictedError(f'record not public: {platform_id} {dataset_version}') if obj['is_embargoed']: - raise PlatformRestrictedError(f'record is embargoed: {obj.get("embargo_title")} ({platform_id} {dataset_version})') + raise PlatformRestrictedError( + f'record is embargoed: {obj.get("embargo_title")} ({platform_id} {dataset_version})' + ) manifest = [] for row in obj['files']: - manifest.append(FilesetManifestFile( - path=row['name'], - size=row['size'], - md5=row['computed_md5'], - # NOTE: don't get: sha1, sha256, mimetype - platform_url=row['download_url'], - #extra=dict(), - )) + manifest.append( + FilesetManifestFile( + path=row['name'], + size=row['size'], + md5=row['computed_md5'], + # NOTE: don't get: sha1, sha256, mimetype + platform_url=row['download_url'], + #extra=dict(), + )) assert not row.get('is_link_only') authors = [] @@ -451,18 +470,23 @@ class FigshareHelper(FilesetPlatformHelper): platform_id=platform_id, archiveorg_item_name=archiveorg_item_name, archiveorg_item_meta=archiveorg_item_meta, - web_bundle_url=f"https://ndownloader.figshare.com/articles/{platform_id}/versions/{dataset_version}", + web_bundle_url= + f"https://ndownloader.figshare.com/articles/{platform_id}/versions/{dataset_version}", # TODO: web_base_url= (for GWB downloading, in lieu of platform_url on individual files) extra=dict(version=dataset_version), ) + def test_parse_figshare_url_path(): valid = { - "/articles/Optimized_protocol_to_isolate_high_quality_genomic_DNA_from_different_tissues_of_a_palm_species/8987858/1": ("8987858", "1"), - "/articles/Optimized_protocol_to_isolate_high_quality_genomic_DNA_from_different_tissues_of_a_palm_species/8987858": ("8987858", None), + "/articles/Optimized_protocol_to_isolate_high_quality_genomic_DNA_from_different_tissues_of_a_palm_species/8987858/1": + ("8987858", "1"), + "/articles/Optimized_protocol_to_isolate_high_quality_genomic_DNA_from_different_tissues_of_a_palm_species/8987858": + ("8987858", None), "/articles/CIBERSORT_p-value_0_05/8217188/1": ("8217188", "1"), - "/articles/dataset/STable_1_U-Pb_geochronologic_analyses_on_samples_xls/12127176/4": ("12127176", "4"), + "/articles/dataset/STable_1_U-Pb_geochronologic_analyses_on_samples_xls/12127176/4": + ("12127176", "4"), } invalid = [ @@ -479,14 +503,15 @@ def test_parse_figshare_url_path(): except ValueError: pass -class ZenodoHelper(FilesetPlatformHelper): +class ZenodoHelper(FilesetPlatformHelper): def __init__(self): super().__init__() self.platform_name = 'zenodo' self.session = requests.Session() - def match_request(self, request: dict , resource: Optional[ResourceResult], html_biblio: Optional[BiblioMetadata]) -> bool: + def match_request(self, request: dict, resource: Optional[ResourceResult], + html_biblio: Optional[BiblioMetadata]) -> bool: if resource and resource.terminal_url: url = resource.terminal_url @@ -499,7 +524,8 @@ class ZenodoHelper(FilesetPlatformHelper): return True return False - def process_request(self, request: dict, resource: Optional[ResourceResult], html_biblio: Optional[BiblioMetadata]) -> FilesetPlatformItem: + def process_request(self, request: dict, resource: Optional[ResourceResult], + html_biblio: Optional[BiblioMetadata]) -> FilesetPlatformItem: """ Fetch platform-specific metadata for this request (eg, via API calls) """ @@ -535,12 +561,15 @@ class ZenodoHelper(FilesetPlatformHelper): assert obj['id'] == int(platform_id) work_id = obj['conceptrecid'] if work_id == obj['id']: - raise PlatformScopeError("got a work-level zenodo record, not a versioned record: {work_id}") + raise PlatformScopeError( + "got a work-level zenodo record, not a versioned record: {work_id}") zenodo_type = obj['metadata']['resource_type']['type'] if obj['metadata']['access_right'] != 'open': - raise PlatformRestrictedError("not publicly available ({obj['metadata']['access_right']}): {platform_domain} {platform_id}") + raise PlatformRestrictedError( + "not publicly available ({obj['metadata']['access_right']}): {platform_domain} {platform_id}" + ) manifest = [] for row in obj['files']: @@ -600,11 +629,9 @@ class ArchiveOrgHelper(FilesetPlatformHelper): 'RAR': 'application/vnd.rar', 'TAR': 'application/x-tar', '7z': 'application/x-7z-compressed', - 'HTML': 'text/html', 'Text': 'text/plain', 'PDF': 'application/pdf', - 'CSV': 'text/csv', 'XML': 'application/xml', 'JSON': 'application/json', @@ -613,17 +640,13 @@ class ArchiveOrgHelper(FilesetPlatformHelper): #'application/vnd.openxmlformats-officedocument.wordprocessingml.document', # .docx #'application/vnd.ms-excel', # .xls #'application/vnd.openxmlformats-officedocument.spreadsheetml.sheet', # .xlsx - - 'MP3': 'audio/mpeg', # .mp3 - - 'MP4': 'video/mp4', # .mp4 - 'MPEG': 'video/mpeg', # .mpeg - + 'MP3': 'audio/mpeg', # .mp3 + 'MP4': 'video/mp4', # .mp4 + 'MPEG': 'video/mpeg', # .mpeg 'JPEG': 'image/jpeg', 'GIF': 'image/gif', 'PNG': 'image/png', 'TIFF': 'image/tiff', - 'Unknown': None, } @@ -640,24 +663,27 @@ class ArchiveOrgHelper(FilesetPlatformHelper): if f.source != 'original': return False for suffix in [ - '_meta.sqlite', - '_archive.torrent', - '_itemimage.jpg', - '_meta.xml', - '_thumb.png', - '_files.xml', + '_meta.sqlite', + '_archive.torrent', + '_itemimage.jpg', + '_meta.xml', + '_thumb.png', + '_files.xml', ]: if f.name == item_name + suffix or f.name == item_name.lower() + suffix: return False if f.name.startswith('_'): return False if item_name.startswith('academictorrents_'): - for suffix in ['_academictorrents.torrent', '_academictorrents_torrent.txt', '.bib']: + for suffix in [ + '_academictorrents.torrent', '_academictorrents_torrent.txt', '.bib' + ]: if f.name == item_name + suffix: return False return True - def match_request(self, request: dict , resource: Optional[ResourceResult], html_biblio: Optional[BiblioMetadata]) -> bool: + def match_request(self, request: dict, resource: Optional[ResourceResult], + html_biblio: Optional[BiblioMetadata]) -> bool: if resource and resource.terminal_url: url = resource.terminal_url @@ -672,20 +698,23 @@ class ArchiveOrgHelper(FilesetPlatformHelper): return True return False - def process_request(self, request: dict, resource: Optional[ResourceResult], html_biblio: Optional[BiblioMetadata]) -> FilesetPlatformItem: + def process_request(self, request: dict, resource: Optional[ResourceResult], + html_biblio: Optional[BiblioMetadata]) -> FilesetPlatformItem: """ Fetch platform-specific metadata for this request (eg, via API calls) """ base_url_split = request['base_url'].split('/') #print(base_url_split, file=sys.stderr) - assert len(base_url_split) in [5,6] + assert len(base_url_split) in [5, 6] assert base_url_split[0] in ['http:', 'https:'] assert base_url_split[2] == 'archive.org' assert base_url_split[3] in ['details', 'download'] item_name = base_url_split[4] if len(base_url_split) == 6 and base_url_split[5]: - raise PlatformScopeError("got an archive.org file path, not download/details page; individual files not handled yet") + raise PlatformScopeError( + "got an archive.org file path, not download/details page; individual files not handled yet" + ) #print(f" archiveorg processing item={item_name}", file=sys.stderr) item = self.session.get_item(item_name) diff --git a/python/sandcrawler/fileset_strategies.py b/python/sandcrawler/fileset_strategies.py index c9f182c..6c25276 100644 --- a/python/sandcrawler/fileset_strategies.py +++ b/python/sandcrawler/fileset_strategies.py @@ -1,4 +1,3 @@ - import gzip import json import os @@ -10,15 +9,15 @@ from typing import Any, Dict, List, Optional, Tuple import internetarchive -from sandcrawler.fileset_types import (ArchiveStrategyResult, FilesetManifestFile, FilesetPlatformItem, IngestStrategy, - PlatformScopeError) +from sandcrawler.fileset_types import (ArchiveStrategyResult, FilesetManifestFile, + FilesetPlatformItem, IngestStrategy, PlatformScopeError) from sandcrawler.html_metadata import BiblioMetadata -from sandcrawler.ia import ResourceResult, SavePageNowClient, WaybackClient, fix_transfer_encoding +from sandcrawler.ia import (ResourceResult, SavePageNowClient, WaybackClient, + fix_transfer_encoding) from sandcrawler.misc import gen_file_metadata, gen_file_metadata_path, sanitize_fs_path class FilesetIngestStrategy(): - def __init__(self): #self.ingest_strategy = 'unknown' self.success_status = "success" @@ -31,7 +30,6 @@ class FilesetIngestStrategy(): class ArchiveorgFilesetStrategy(FilesetIngestStrategy): - def __init__(self, **kwargs): super().__init__() self.ingest_strategy = IngestStrategy.ArchiveorgFileset @@ -61,7 +59,9 @@ class ArchiveorgFilesetStrategy(FilesetIngestStrategy): found = False for existing in item_files: if existing.name == wanted.path: - if ((existing.sha1 and existing.sha1 == wanted.sha1) or (existing.md5 and existing.md5 == wanted.md5)) and existing.name == wanted.path and existing.size == wanted.size: + if ((existing.sha1 and existing.sha1 == wanted.sha1) or + (existing.md5 and existing.md5 == wanted.md5) + ) and existing.name == wanted.path and existing.size == wanted.size: found = True wanted.status = 'exists' break @@ -69,7 +69,9 @@ class ArchiveorgFilesetStrategy(FilesetIngestStrategy): wanted.status = 'mismatch-existing' break if not found: - print(f" item exists ({item.archiveorg_item_name}) but didn't find at least one file: {wanted.path}", file=sys.stderr) + print( + f" item exists ({item.archiveorg_item_name}) but didn't find at least one file: {wanted.path}", + file=sys.stderr) return None return ArchiveStrategyResult( ingest_strategy=self.ingest_strategy, @@ -108,10 +110,11 @@ class ArchiveorgFilesetStrategy(FilesetIngestStrategy): if not os.path.exists(local_path): print(f" downloading {m.path}", file=sys.stderr) - with self.ia_session.get(m.platform_url, stream=True, allow_redirects=True) as r: + with self.ia_session.get(m.platform_url, stream=True, + allow_redirects=True) as r: r.raise_for_status() with open(local_path + '.partial', 'wb') as f: - for chunk in r.iter_content(chunk_size=256*1024): + for chunk in r.iter_content(chunk_size=256 * 1024): f.write(chunk) os.rename(local_path + '.partial', local_path) m.status = 'downloaded-local' @@ -120,7 +123,8 @@ class ArchiveorgFilesetStrategy(FilesetIngestStrategy): print(f" verifying {m.path}", file=sys.stderr) file_meta = gen_file_metadata_path(local_path, allow_empty=True) - assert file_meta['size_bytes'] == m.size, f"expected: {m.size} found: {file_meta['size_bytes']}" + assert file_meta[ + 'size_bytes'] == m.size, f"expected: {m.size} found: {file_meta['size_bytes']}" if m.sha1: assert file_meta['sha1hex'] == m.sha1 @@ -142,7 +146,9 @@ class ArchiveorgFilesetStrategy(FilesetIngestStrategy): if file_meta['mimetype'] != m.mimetype and file_meta['mimetype'] != 'text/plain': # these 'tab-separated-values' from dataverse are just noise, don't log them if m.mimetype != 'text/tab-separated-values': - print(f" WARN: mimetype mismatch: expected {m.mimetype}, found {file_meta['mimetype']}", file=sys.stderr) + print( + f" WARN: mimetype mismatch: expected {m.mimetype}, found {file_meta['mimetype']}", + file=sys.stderr) m.mimetype = file_meta['mimetype'] else: m.mimetype = file_meta['mimetype'] @@ -158,7 +164,9 @@ class ArchiveorgFilesetStrategy(FilesetIngestStrategy): 'remote_name': m.path, }) - print(f" uploading all files to {item.archiveorg_item_name} under {item.archiveorg_item_meta.get('collection')}...", file=sys.stderr) + print( + f" uploading all files to {item.archiveorg_item_name} under {item.archiveorg_item_meta.get('collection')}...", + file=sys.stderr) internetarchive.upload( item.archiveorg_item_name, files=item_files, @@ -183,25 +191,26 @@ class ArchiveorgFilesetStrategy(FilesetIngestStrategy): return result + class ArchiveorgFileStrategy(ArchiveorgFilesetStrategy): """ ArchiveorgFilesetStrategy currently works fine with individual files. Just need to over-ride the ingest_strategy name. """ - def __init__(self): super().__init__() self.ingest_strategy = IngestStrategy.ArchiveorgFileset self.success_status = "success-file" -class WebFilesetStrategy(FilesetIngestStrategy): +class WebFilesetStrategy(FilesetIngestStrategy): def __init__(self, **kwargs): super().__init__() self.ingest_strategy = IngestStrategy.WebFileset self.wayback_client = WaybackClient() self.try_spn2 = True - self.spn_client = SavePageNowClient(spn_cdx_retry_sec=kwargs.get('spn_cdx_retry_sec', 9.0)) + self.spn_client = SavePageNowClient( + spn_cdx_retry_sec=kwargs.get('spn_cdx_retry_sec', 9.0)) self.max_spn_manifest = 20 def process(self, item: FilesetPlatformItem) -> ArchiveStrategyResult: @@ -218,23 +227,26 @@ class WebFilesetStrategy(FilesetIngestStrategy): for m in item.manifest: fetch_url = m.platform_url if not fetch_url: - raise NotImplementedError("require 'platform_url' for each file when doing Web fetching") + raise NotImplementedError( + "require 'platform_url' for each file when doing Web fetching") via = "wayback" resource = self.wayback_client.lookup_resource(fetch_url, m.mimetype) - if self.try_spn2 and (resource == None or (resource and resource.status == 'no-capture')): + if self.try_spn2 and (resource == None or + (resource and resource.status == 'no-capture')): if len(item.manifest) > self.max_spn_manifest: m.status = 'too-much-spn' continue via = "spn2" - resource = self.spn_client.crawl_resource(fetch_url, self.wayback_client, force_simple_get=True) + resource = self.spn_client.crawl_resource(fetch_url, + self.wayback_client, + force_simple_get=True) - print("[FETCH {:>6}] {} {}".format( - via, - (resource and resource.status), - (resource and resource.terminal_url) or fetch_url), - file=sys.stderr) + print("[FETCH {:>6}] {} {}".format(via, (resource and resource.status), + (resource and resource.terminal_url) + or fetch_url), + file=sys.stderr) m.terminal_url = resource.terminal_url m.terminal_dt = resource.terminal_dt @@ -251,9 +263,11 @@ class WebFilesetStrategy(FilesetIngestStrategy): file_meta, html_resource = fix_transfer_encoding(file_meta, resource) if self.ingest_strategy == "web-file": - file_file_meta = file_meta + file_file_meta = file_meta - if file_meta['size_bytes'] != m.size or (m.md5 and m.md5 != file_meta['md5hex']) or (m.sha1 and m.sha1 != file_meta['sha1hex']): + if file_meta['size_bytes'] != m.size or (m.md5 and m.md5 != file_meta['md5hex'] + ) or (m.sha1 + and m.sha1 != file_meta['sha1hex']): m.status = 'mismatch' continue @@ -280,8 +294,8 @@ class WebFilesetStrategy(FilesetIngestStrategy): result.file_resource = file_resource return result -class WebFileStrategy(WebFilesetStrategy): +class WebFileStrategy(WebFilesetStrategy): def __init__(self, **kwargs): super().__init__(**kwargs) self.ingest_strategy = IngestStrategy.WebFile diff --git a/python/sandcrawler/fileset_types.py b/python/sandcrawler/fileset_types.py index 8ea136e..606af07 100644 --- a/python/sandcrawler/fileset_types.py +++ b/python/sandcrawler/fileset_types.py @@ -1,4 +1,3 @@ - from enum import Enum from typing import Any, Dict, List, Optional, Tuple @@ -13,6 +12,7 @@ class IngestStrategy(str, Enum): ArchiveorgFileset = "archiveorg-fileset" ArchiveorgFilesetBundled = "archiveorg-fileset-bundled" + class FilesetManifestFile(BaseModel): path: str size: Optional[int] @@ -27,6 +27,7 @@ class FilesetManifestFile(BaseModel): terminal_url: Optional[str] terminal_dt: Optional[str] + class FilesetPlatformItem(BaseModel): platform_name: str platform_status: str @@ -39,6 +40,7 @@ class FilesetPlatformItem(BaseModel): web_base_url: Optional[str] web_bundle_url: Optional[str] + class ArchiveStrategyResult(BaseModel): ingest_strategy: str status: str @@ -49,6 +51,7 @@ class ArchiveStrategyResult(BaseModel): bundle_resource: Optional[Any] bundle_archiveorg_path: Optional[str] + class PlatformScopeError(Exception): """ For incidents where platform helper discovers that the fileset/dataset is @@ -61,6 +64,7 @@ class PlatformScopeError(Exception): """ pass + class PlatformRestrictedError(Exception): """ When datasets are not publicly available on a platform (yet) diff --git a/python/sandcrawler/grobid.py b/python/sandcrawler/grobid.py index 5242b3a..16bbb01 100644 --- a/python/sandcrawler/grobid.py +++ b/python/sandcrawler/grobid.py @@ -1,4 +1,3 @@ - import requests from grobid2json import teixml2json @@ -8,7 +7,6 @@ from .workers import SandcrawlerFetchWorker, SandcrawlerWorker class GrobidClient(object): - def __init__(self, host_url="http://grobid.qa.fatcat.wiki", **kwargs): self.host_url = host_url self.consolidate_mode = int(kwargs.get('consolidate_mode', 0)) @@ -34,7 +32,7 @@ class GrobidClient(object): files={ 'input': blob, 'consolidateHeader': self.consolidate_mode, - 'consolidateCitations': 0, # too expensive for now + 'consolidateCitations': 0, # too expensive for now 'includeRawCitations': 1, }, timeout=180.0, @@ -46,9 +44,7 @@ class GrobidClient(object): 'error_msg': 'GROBID request (HTTP POST) timeout', } - info = dict( - status_code=grobid_response.status_code, - ) + info = dict(status_code=grobid_response.status_code, ) if grobid_response.status_code == 200: info['status'] = 'success' info['tei_xml'] = grobid_response.text @@ -56,7 +52,8 @@ class GrobidClient(object): # XML is larger than Kafka message size, and much larger than # an article in general; bail out info['status'] = 'error' - info['error_msg'] = "response XML too large: {} bytes".format(len(info['tei_xml'])) + info['error_msg'] = "response XML too large: {} bytes".format( + len(info['tei_xml'])) info.pop('tei_xml') else: # response.text is .content decoded as utf-8 @@ -70,7 +67,13 @@ class GrobidClient(object): tei_json = teixml2json(result['tei_xml'], encumbered=False) meta = dict() biblio = dict() - for k in ('title', 'authors', 'journal', 'date', 'doi', ): + for k in ( + 'title', + 'authors', + 'journal', + 'date', + 'doi', + ): if tei_json.get(k): biblio[k] = tei_json[k] meta['biblio'] = biblio @@ -79,8 +82,8 @@ class GrobidClient(object): meta[k] = tei_json[k] return meta -class GrobidWorker(SandcrawlerFetchWorker): +class GrobidWorker(SandcrawlerFetchWorker): def __init__(self, grobid_client, wayback_client=None, sink=None, **kwargs): super().__init__(wayback_client=wayback_client) self.grobid_client = grobid_client @@ -104,18 +107,19 @@ class GrobidWorker(SandcrawlerFetchWorker): return fetch_result blob = fetch_result['blob'] - result = self.grobid_client.process_fulltext(blob, consolidate_mode=self.consolidate_mode) + result = self.grobid_client.process_fulltext(blob, + consolidate_mode=self.consolidate_mode) result['file_meta'] = gen_file_metadata(blob) result['source'] = record result['key'] = result['file_meta']['sha1hex'] return result + class GrobidBlobWorker(SandcrawlerWorker): """ This is sort of like GrobidWorker, except it receives blobs directly, instead of fetching blobs from some remote store. """ - def __init__(self, grobid_client, sink=None, **kwargs): super().__init__() self.grobid_client = grobid_client @@ -125,8 +129,8 @@ class GrobidBlobWorker(SandcrawlerWorker): def process(self, blob, key=None): if not blob: return None - result = self.grobid_client.process_fulltext(blob, consolidate_mode=self.consolidate_mode) + result = self.grobid_client.process_fulltext(blob, + consolidate_mode=self.consolidate_mode) result['file_meta'] = gen_file_metadata(blob) result['key'] = result['file_meta']['sha1hex'] return result - diff --git a/python/sandcrawler/html.py b/python/sandcrawler/html.py index 6bdebdd..a44fc67 100644 --- a/python/sandcrawler/html.py +++ b/python/sandcrawler/html.py @@ -1,4 +1,3 @@ - import json import re import sys @@ -6,7 +5,8 @@ import urllib.parse from bs4 import BeautifulSoup -RESEARCHSQUARE_REGEX = re.compile(r'"url":"(https://assets.researchsquare.com/files/.{1,50}/v\d+/Manuscript.pdf)"') +RESEARCHSQUARE_REGEX = re.compile( + r'"url":"(https://assets.researchsquare.com/files/.{1,50}/v\d+/Manuscript.pdf)"') IEEEXPLORE_REGEX = re.compile(r'"pdfPath":"(/.*?\.pdf)"') OVID_JOURNAL_URL_REGEX = re.compile(r'journalURL = "(http.*)";') SCIENCEDIRECT_BOUNCE_URL_REGEX = re.compile(r"window.location = '(http.*)';") @@ -33,16 +33,16 @@ def extract_fulltext_url(html_url, html_body): ### General Tricks ### # highwire-style meta tag - meta = soup.find('meta', attrs={"name":"citation_pdf_url"}) + meta = soup.find('meta', attrs={"name": "citation_pdf_url"}) if not meta: - meta = soup.find('meta', attrs={"name":"bepress_citation_pdf_url"}) + meta = soup.find('meta', attrs={"name": "bepress_citation_pdf_url"}) if not meta: - meta = soup.find('meta', attrs={"name":"wkhealth_pdf_url"}) + meta = soup.find('meta', attrs={"name": "wkhealth_pdf_url"}) if not meta: # researchgate does this; maybe others also? - meta = soup.find('meta', attrs={"property":"citation_pdf_url"}) + meta = soup.find('meta', attrs={"property": "citation_pdf_url"}) if not meta: - meta = soup.find('meta', attrs={"name":"eprints.document_url"}) + meta = soup.find('meta', attrs={"name": "eprints.document_url"}) # if tag is only partially populated if meta and not meta.get('content'): meta = None @@ -52,10 +52,10 @@ def extract_fulltext_url(html_url, html_body): if '://doi.org/' in url: print(f"\tdoi.org in citation_pdf_url (loop?): {url}", file=sys.stderr) elif url.startswith('/'): - if host_prefix+url == html_url: + if host_prefix + url == html_url: print(f"\tavoiding citation_pdf_url link-loop", file=sys.stderr) else: - return dict(pdf_url=host_prefix+url, technique='citation_pdf_url') + return dict(pdf_url=host_prefix + url, technique='citation_pdf_url') elif url.startswith('http'): if url == html_url: print(f"\tavoiding citation_pdf_url link-loop", file=sys.stderr) @@ -64,7 +64,7 @@ def extract_fulltext_url(html_url, html_body): else: print("\tmalformed citation_pdf_url? {}".format(url), file=sys.stderr) - meta = soup.find('meta', attrs={"name":"generator"}) + meta = soup.find('meta', attrs={"name": "generator"}) meta_generator = None if meta and meta.get('content'): meta_generator = meta['content'].strip() @@ -105,7 +105,8 @@ def extract_fulltext_url(html_url, html_body): json_meta = json.loads(json_text) pdf_meta = json_meta['article']['pdfDownload']['urlMetadata'] # https://www.sciencedirect.com/science/article/pii/S0169204621000670/pdfft?md5=c4a83d06b334b627ded74cf9423bfa56&pid=1-s2.0-S0169204621000670-main.pdf - url = html_url + pdf_meta['pdfExtension'] + "?md5=" + pdf_meta['queryParams']['md5'] + "&pid=" + pdf_meta['queryParams']['pid'] + url = html_url + pdf_meta['pdfExtension'] + "?md5=" + pdf_meta['queryParams'][ + 'md5'] + "&pid=" + pdf_meta['queryParams']['pid'] except (KeyError, TypeError, json.JSONDecodeError): pass if url: @@ -130,7 +131,9 @@ def extract_fulltext_url(html_url, html_body): if m: url = m.group(1) assert len(url) < 4096 - return dict(release_stage="published", pdf_url=host_prefix+url, technique="ieeexplore") + return dict(release_stage="published", + pdf_url=host_prefix + url, + technique="ieeexplore") # https://ieeexplore.ieee.org/stamp/stamp.jsp?arnumber=8730313 if '://ieeexplore.ieee.org/stamp/stamp.jsp?arnumber' in html_url: # HTML iframe like: @@ -172,11 +175,12 @@ def extract_fulltext_url(html_url, html_body): '://thesiscommons.org/', ] for domain in OSF_DOMAINS: - if domain in html_url and (len(html_url.split('/')) in [4,5] or '/preprints/' in html_url) and '/download' not in html_url: + if domain in html_url and (len(html_url.split('/')) in [4, 5] or '/preprints/' + in html_url) and '/download' not in html_url: if not html_url.endswith("/"): - next_url = html_url+"/download" + next_url = html_url + "/download" else: - next_url = html_url+"download" + next_url = html_url + "download" return dict(next_url=next_url, technique='osf-by-url') # wiley @@ -199,14 +203,14 @@ def extract_fulltext_url(html_url, html_body): url = html_url.replace("/doi/10.", "/doi/pdf/10.") return dict(pdf_url=url, technique='archivist-url') # <a href="/doi/pdf/10.17723/aarc.62.2.j475270470145630" target="_blank"> - hrefs = soup.find_all('a', attrs={"target":"_blank"}) + hrefs = soup.find_all('a', attrs={"target": "_blank"}) for href in hrefs: url = href['href'].strip() if "/doi/pdf/" in url: if url.startswith('http'): return dict(pdf_url=url, technique='publisher-href') elif url.startswith('/'): - return dict(pdf_url=host_prefix+url, technique='publisher-href') + return dict(pdf_url=host_prefix + url, technique='publisher-href') # protocols.io # https://www.protocols.io/view/flow-cytometry-protocol-mgdc3s6 @@ -248,7 +252,8 @@ def extract_fulltext_url(html_url, html_body): if "://ehp.niehs.nih.gov/doi/" in html_url: # <a href="/doi/pdf/10.1289/EHP4709" target="_blank"> if b'/doi/pdf/10.' in html_body: - url = html_url.replace('/doi/full/10.', '/doi/pdf/10.').replace('/doi/10.', '/doi/pdf/10.') + url = html_url.replace('/doi/full/10.', + '/doi/pdf/10.').replace('/doi/10.', '/doi/pdf/10.') return dict(pdf_url=url, technique='ehp.niehs.nigh.gov-url') # cogentoa.com @@ -275,7 +280,7 @@ def extract_fulltext_url(html_url, html_body): # http://en.gzbd.cnki.net/gzbt/detail/detail.aspx?FileName=HBGF202002003&DbName=GZBJ7920&DbCode=GZBJ if '://en.gzbd.cnki.net/KCMS/detail/detail.aspx' in html_url: # <a onclick="WriteKrsDownLog()" target="_blank" id="pdfDown" name="pdfDown" href="/gzbt/download.aspx?filename=4Q1ZYpFdKFUZ6FDR1QkRrolayRXV2ZzattyQ3QFa2JXTyZXUSV3QRFkbndzaGV2KyJXWZVEbFdVYnZndD9EOxg1Tj5Eeys2SMFzLZ5kcuFkM3dEbsR2ZjxEaShVdJhFdp90KhlVVzcjVVlXUVNHWBtWS5Rlb5cnc&tablename=GZBJLAST2020&dflag=pdfdown
 "><i></i>PDF Download</a> - href = soup.find('a', attrs={"id":"pdfDown"}) + href = soup.find('a', attrs={"id": "pdfDown"}) if href: url = href['href'].strip().replace('
', '') if not url.startswith('http'): @@ -300,7 +305,7 @@ def extract_fulltext_url(html_url, html_body): # OJS 3 (some) if meta_generator and meta_generator.startswith("Open Journal Systems"): - href = soup.find('a', attrs={"class":"obj_galley_link file"}) + href = soup.find('a', attrs={"class": "obj_galley_link file"}) if href and href.text and "pdf" in href.text.lower(): url = href['href'].strip() if url.startswith('/'): @@ -329,13 +334,15 @@ def extract_fulltext_url(html_url, html_body): return dict() + def test_regex(): lines = """ blah var journalURL = "https://journals.lww.com/co-urology/fulltext/10.1097/MOU.0000000000000689"; asdf""" m = OVID_JOURNAL_URL_REGEX.search(lines) - assert m.group(1) == "https://journals.lww.com/co-urology/fulltext/10.1097/MOU.0000000000000689" + assert m.group( + 1) == "https://journals.lww.com/co-urology/fulltext/10.1097/MOU.0000000000000689" lines = """ window.onload = function () { diff --git a/python/sandcrawler/html_metadata.py b/python/sandcrawler/html_metadata.py index c6725dc..6d27a3a 100644 --- a/python/sandcrawler/html_metadata.py +++ b/python/sandcrawler/html_metadata.py @@ -1,4 +1,3 @@ - import datetime import sys import urllib.parse @@ -31,9 +30,7 @@ HEAD_META_PATTERNS: Any = { "meta[name='dcterms.title']", "meta[name='dc.title']", ], - "subtitle": [ - "meta[name='prism.subtitle']", - ], + "subtitle": ["meta[name='prism.subtitle']", ], "doi": [ "meta[name='citation_doi']", "meta[name='DOI']", @@ -43,9 +40,7 @@ HEAD_META_PATTERNS: Any = { "meta[name='dc.identifier.doi']", "meta[name='dc.identifier'][scheme='doi']", ], - "pmid": [ - "meta[name='citation_pmid']", - ], + "pmid": ["meta[name='citation_pmid']", ], "abstract": [ "meta[name='citation_abstract']", "meta[name='bepress_citation_abstract']", @@ -66,9 +61,7 @@ HEAD_META_PATTERNS: Any = { "meta[name='dc.source']", "meta[property='og:site_name']", ], - "container_abbrev": [ - "meta[name='citation_journal_abbrev']", - ], + "container_abbrev": ["meta[name='citation_journal_abbrev']", ], "raw_date": [ "meta[name='citation_publication_date']", "meta[name='bepress_citation_publication_date']", @@ -169,9 +162,7 @@ HEAD_META_LIST_PATTERNS: Any = { "meta[name='dc.contributor']", ], # TODO: citation_author_institution - "raw_references": [ - "meta[name='citation_reference']", - ], + "raw_references": ["meta[name='citation_reference']", ], "raw_identifiers": [ "meta[name='eprints.id_number']", "meta[name='dcterms.identifier']", @@ -260,7 +251,7 @@ HTML_FULLTEXT_PATTERNS: List[dict] = [ COMPONENT_FULLTEXT_PATTERNS: List[dict] = [ { - "in_doc_url": "pensoft.net/article/", # also /element/ + "in_doc_url": "pensoft.net/article/", # also /element/ "in_fulltext_url": "/download/fig/", "selector": ".Main-Content .figure a.P-Article-Preview-Picture-Download-Small", "attr": "href", @@ -652,12 +643,11 @@ class BiblioMetadata(pydantic.BaseModel): component_url: Optional[str] class Config: - json_encoders = { - datetime.date: lambda dt: dt.isoformat() - } + json_encoders = {datetime.date: lambda dt: dt.isoformat()} -def html_extract_fulltext_url(doc_url: str, doc: HTMLParser, patterns: List[dict]) -> Optional[Tuple[str, str]]: +def html_extract_fulltext_url(doc_url: str, doc: HTMLParser, + patterns: List[dict]) -> Optional[Tuple[str, str]]: """ Tries to quickly extract fulltext URLs using a set of patterns. This function is intendend to be generic across various extraction techniques. @@ -701,6 +691,7 @@ def html_extract_fulltext_url(doc_url: str, doc: HTMLParser, patterns: List[dict return self_doc_url return None + def html_extract_biblio(doc_url: str, doc: HTMLParser) -> Optional[BiblioMetadata]: meta: Any = dict() @@ -772,6 +763,7 @@ def html_extract_biblio(doc_url: str, doc: HTMLParser) -> Optional[BiblioMetadat return BiblioMetadata(**meta) + def load_adblock_rules() -> braveblock.Adblocker: """ TODO: consider blocking very generic assets: @@ -838,7 +830,8 @@ def _extract_generic(doc: HTMLParser, selector: str, attrs: List[str], type_name return resources -def html_extract_resources(doc_url: str, doc: HTMLParser, adblock: braveblock.Adblocker) -> list: +def html_extract_resources(doc_url: str, doc: HTMLParser, + adblock: braveblock.Adblocker) -> list: """ This function tries to find all the important resources in a page. The presumption is that the HTML document is article fulltext, and we want the @@ -869,10 +862,12 @@ def html_extract_resources(doc_url: str, doc: HTMLParser, adblock: braveblock.Ad r['url'] = urllib.parse.urljoin(doc_url, r['url']) # filter using adblocker - resources = [r for r in resources if adblock.check_network_urls(r['url'], source_url=doc_url, request_type=r['type']) == False] + resources = [ + r for r in resources if adblock.check_network_urls( + r['url'], source_url=doc_url, request_type=r['type']) == False + ] # remove duplicates resources = [dict(t) for t in {tuple(d.items()) for d in resources}] return resources - diff --git a/python/sandcrawler/ia.py b/python/sandcrawler/ia.py index ca1182f..a8ce193 100644 --- a/python/sandcrawler/ia.py +++ b/python/sandcrawler/ia.py @@ -1,4 +1,3 @@ - # XXX: some broken MRO thing going on in here due to python3 object wrangling # in `wayback` library. Means we can't run pylint. # pylint: skip-file @@ -38,6 +37,7 @@ class SandcrawlerBackoffError(Exception): """ pass + ResourceResult = namedtuple("ResourceResult", [ "start_url", "hit", @@ -80,6 +80,7 @@ CdxPartial = namedtuple('CdxPartial', [ 'sha1hex', ]) + def cdx_partial_from_row(full): return CdxPartial( surt=full.surt, @@ -91,6 +92,7 @@ def cdx_partial_from_row(full): sha1hex=full.sha1hex, ) + def cdx_to_dict(cdx): d = { "surt": cdx.surt, @@ -107,6 +109,7 @@ def cdx_to_dict(cdx): d['warc_path'] = cdx.warc_path return d + def fuzzy_match_url(left, right): """ Matches URLs agnostic of http/https (and maybe other normalizations in the @@ -123,6 +126,7 @@ def fuzzy_match_url(left, right): return True return False + def test_fuzzy_match_url(): assert fuzzy_match_url("http://thing.com", "http://thing.com") == True assert fuzzy_match_url("http://thing.com", "https://thing.com") == True @@ -137,18 +141,19 @@ def test_fuzzy_match_url(): assert fuzzy_match_url("http://www.thing.com", "http://www2.thing.com") == False assert fuzzy_match_url("http://www.thing.com", "https://www2.thing.com") == False + class CdxApiError(Exception): pass -class CdxApiClient: +class CdxApiClient: def __init__(self, host_url="https://web.archive.org/cdx/search/cdx", **kwargs): self.host_url = host_url self.http_session = requests_retry_session(retries=3, backoff_factor=3) - cdx_auth_token = kwargs.get('cdx_auth_token', - os.environ.get('CDX_AUTH_TOKEN')) + cdx_auth_token = kwargs.get('cdx_auth_token', os.environ.get('CDX_AUTH_TOKEN')) if not cdx_auth_token: - raise Exception("CDX auth token required (as parameter or environment variable CDX_AUTH_TOKEN)") + raise Exception( + "CDX auth token required (as parameter or environment variable CDX_AUTH_TOKEN)") self.http_session.headers.update({ 'User-Agent': 'Mozilla/5.0 sandcrawler.CdxApiClient', 'Cookie': 'cdx_auth_token={}'.format(cdx_auth_token), @@ -208,7 +213,8 @@ class CdxApiClient: found, because we expect to be looking up a specific full record. """ if len(datetime) != 14: - raise ValueError("CDX fetch requires full 14 digit timestamp. Got: {}".format(datetime)) + raise ValueError( + "CDX fetch requires full 14 digit timestamp. Got: {}".format(datetime)) params = { 'url': url, 'from': datetime, @@ -226,18 +232,28 @@ class CdxApiClient: if retry_sleep > 3: next_sleep = retry_sleep - 3 retry_sleep = 3 - print(" CDX fetch failed; will sleep {}sec and try again".format(retry_sleep), file=sys.stderr) + print(" CDX fetch failed; will sleep {}sec and try again".format(retry_sleep), + file=sys.stderr) time.sleep(retry_sleep) - return self.fetch(url, datetime, filter_status_code=filter_status_code, retry_sleep=next_sleep) + return self.fetch(url, + datetime, + filter_status_code=filter_status_code, + retry_sleep=next_sleep) raise KeyError("CDX url/datetime not found: {} {}".format(url, datetime)) row = resp[0] # allow fuzzy http/https match if not (fuzzy_match_url(row.url, url) and row.datetime == datetime): if retry_sleep and retry_sleep > 0: - print(" CDX fetch failed; will sleep {}sec and try again".format(retry_sleep), file=sys.stderr) + print(" CDX fetch failed; will sleep {}sec and try again".format(retry_sleep), + file=sys.stderr) time.sleep(retry_sleep) - return self.fetch(url, datetime, filter_status_code=filter_status_code, retry_sleep=None) - raise KeyError("Didn't get exact CDX url/datetime match. url:{} dt:{} got:{}".format(url, datetime, row)) + return self.fetch(url, + datetime, + filter_status_code=filter_status_code, + retry_sleep=None) + raise KeyError( + "Didn't get exact CDX url/datetime match. url:{} dt:{} got:{}".format( + url, datetime, row)) if filter_status_code: assert row.status_code == filter_status_code return row @@ -311,17 +327,20 @@ class CdxApiClient: class WaybackError(Exception): pass + class WaybackContentError(Exception): pass + class PetaboxError(Exception): pass + class NoCaptureError(Exception): pass -class WaybackClient: +class WaybackClient: def __init__(self, cdx_client=None, **kwargs): if cdx_client: self.cdx_client = cdx_client @@ -367,32 +386,42 @@ class WaybackClient: if not self.petabox_webdata_secret: raise Exception("WaybackClient needs petabox secret to do direct WARC fetches") if not "/" in warc_path: - raise ValueError("what looks like a liveweb/SPN temporary warc path: {}".format(warc_path)) + raise ValueError( + "what looks like a liveweb/SPN temporary warc path: {}".format(warc_path)) warc_uri = self.warc_uri_prefix + warc_path if not self.rstore: - self.rstore = ResourceStore(loaderfactory=CDXLoaderFactory3( - webdata_secret=self.petabox_webdata_secret, - )) + self.rstore = ResourceStore( + loaderfactory=CDXLoaderFactory3(webdata_secret=self.petabox_webdata_secret, )) try: #print("offset: {} csize: {} uri: {}".format(offset, csize, warc_uri), file=sys.stderr) gwb_record = self.rstore.load_resource(warc_uri, offset, csize) except wayback.exception.ResourceUnavailable: print(" Failed to fetch from warc_path:{}".format(warc_path), file=sys.stderr) - raise PetaboxError("failed to load file contents from wayback/petabox (ResourceUnavailable)") + raise PetaboxError( + "failed to load file contents from wayback/petabox (ResourceUnavailable)") except wayback.exception.InvalidResource: print(" Failed to fetch from warc_path:{}".format(warc_path), file=sys.stderr) - raise WaybackContentError("failed to load file contents from wayback/petabox (InvalidResource)") + raise WaybackContentError( + "failed to load file contents from wayback/petabox (InvalidResource)") except urllib3.exceptions.ReadTimeoutError as rte: - raise PetaboxError("failed to load file contents from wayback/petabox (ReadTimeoutError: {})".format(rte)) + raise PetaboxError( + "failed to load file contents from wayback/petabox (ReadTimeoutError: {})". + format(rte)) except ValueError as ve: - raise PetaboxError("failed to load file contents from wayback/petabox (ValueError: {})".format(ve)) + raise PetaboxError( + "failed to load file contents from wayback/petabox (ValueError: {})".format(ve)) except EOFError as eofe: - raise PetaboxError("failed to load file contents from wayback/petabox (EOFError: {})".format(eofe)) + raise PetaboxError( + "failed to load file contents from wayback/petabox (EOFError: {})".format(eofe)) except TypeError as te: - raise PetaboxError("failed to load file contents from wayback/petabox (TypeError: {}; likely a bug in wayback python code)".format(te)) + raise PetaboxError( + "failed to load file contents from wayback/petabox (TypeError: {}; likely a bug in wayback python code)" + .format(te)) except Exception as e: if "while decompressing data: invalid block type" in str(e): - raise PetaboxError("decompression error fetching WARC record; usually due to bad alexa ARC files") + raise PetaboxError( + "decompression error fetching WARC record; usually due to bad alexa ARC files" + ) else: raise e # Note: could consider a generic "except Exception" here, as we get so @@ -405,7 +434,8 @@ class WaybackClient: raise WaybackContentError("too many HTTP headers (in wayback fetch)") location = gwb_record.get_location() or None - if status_code is None and gwb_record.target_uri.startswith(b"ftp://") and not gwb_record.is_revisit(): + if status_code is None and gwb_record.target_uri.startswith( + b"ftp://") and not gwb_record.is_revisit(): # TODO: some additional verification here? status_code = 226 @@ -416,8 +446,9 @@ class WaybackClient: raise WaybackContentError("found revisit record, but won't resolve (loop?)") revisit_uri, revisit_dt = gwb_record.refers_to if not (revisit_uri and revisit_dt): - raise WaybackContentError("revisit record missing URI and/or DT: warc:{} offset:{}".format( - warc_path, offset)) + raise WaybackContentError( + "revisit record missing URI and/or DT: warc:{} offset:{}".format( + warc_path, offset)) # convert revisit_dt # len("2018-07-24T11:56:49"), or with "Z" assert len(revisit_dt) in (19, 20) @@ -425,7 +456,9 @@ class WaybackClient: revisit_uri = revisit_uri.decode('utf-8') if type(revisit_dt) is bytes: revisit_dt = revisit_dt.decode('utf-8') - revisit_dt = revisit_dt.replace('-', '').replace(':', '').replace('T', '').replace('Z', '') + revisit_dt = revisit_dt.replace('-', '').replace(':', + '').replace('T', + '').replace('Z', '') assert len(revisit_dt) == 14 try: revisit_cdx = self.cdx_client.fetch(revisit_uri, revisit_dt) @@ -443,10 +476,10 @@ class WaybackClient: body = gwb_record.open_raw_content().read() except IncompleteRead as ire: raise WaybackError( - "failed to read actual file contents from wayback/petabox (IncompleteRead: {})".format(ire)) + "failed to read actual file contents from wayback/petabox (IncompleteRead: {})" + .format(ire)) elif status_code is None: - raise WaybackContentError( - "got a None status_code in (W)ARC record") + raise WaybackContentError("got a None status_code in (W)ARC record") return WarcResource( status_code=status_code, location=location, @@ -454,7 +487,12 @@ class WaybackClient: revisit_cdx=revisit_cdx, ) - def fetch_petabox_body(self, csize, offset, warc_path, resolve_revisit=True, expected_status_code=None): + def fetch_petabox_body(self, + csize, + offset, + warc_path, + resolve_revisit=True, + expected_status_code=None): """ Fetches HTTP 200 WARC resource directly from petabox using WARC path/offset/csize. @@ -474,12 +512,10 @@ class WaybackClient: raise KeyError("archived HTTP response (WARC) was not {}: {}".format( expected_status_code, resource.status_code, - ) - ) + )) elif resource.status_code not in (200, 226): raise KeyError("archived HTTP response (WARC) was not 200: {}".format( - resource.status_code) - ) + resource.status_code)) return resource.body @@ -514,7 +550,9 @@ class WaybackClient: except requests.exceptions.ChunkedEncodingError: raise WaybackError("ChunkedEncodingError (wayback replay fetch)") except UnicodeDecodeError: - raise WaybackContentError("UnicodeDecodeError in replay request (can mean nasty redirect URL): {}".format(url)) + raise WaybackContentError( + "UnicodeDecodeError in replay request (can mean nasty redirect URL): {}".format( + url)) try: resp.raise_for_status() @@ -526,21 +564,20 @@ class WaybackClient: if not "X-Archive-Src" in resp.headers: raise WaybackError("replay fetch didn't return X-Archive-Src in headers") if not datetime in resp.url: - raise WaybackError("didn't get exact reply (redirect?) datetime:{} got:{}".format(datetime, resp.url)) + raise WaybackError("didn't get exact reply (redirect?) datetime:{} got:{}".format( + datetime, resp.url)) if cdx_sha1hex: # verify that body matches CDX hash # TODO: don't need *all* these hashes, just sha1 file_meta = gen_file_metadata(resp.content) if cdx_sha1hex != file_meta['sha1hex']: - print(" REPLAY MISMATCH: cdx:{} replay:{}".format( - cdx_sha1hex, - file_meta['sha1hex']), - file=sys.stderr) - raise WaybackContentError("replay fetch body didn't match CDX hash cdx:{} body:{}".format( - cdx_sha1hex, - file_meta['sha1hex']), - ) + print(" REPLAY MISMATCH: cdx:{} replay:{}".format(cdx_sha1hex, + file_meta['sha1hex']), + file=sys.stderr) + raise WaybackContentError( + "replay fetch body didn't match CDX hash cdx:{} body:{}".format( + cdx_sha1hex, file_meta['sha1hex']), ) return resp.content def fetch_replay_redirect(self, url, datetime): @@ -568,7 +605,9 @@ class WaybackClient: except requests.exceptions.TooManyRedirects: raise WaybackContentError("redirect loop (wayback replay fetch)") except UnicodeDecodeError: - raise WaybackContentError("UnicodeDecodeError in replay request (can mean nasty redirect URL): {}".format(url)) + raise WaybackContentError( + "UnicodeDecodeError in replay request (can mean nasty redirect URL): {}".format( + url)) try: resp.raise_for_status() except Exception as e: @@ -580,7 +619,8 @@ class WaybackClient: if not "X-Archive-Src" in resp.headers: raise WaybackError("redirect replay fetch didn't return X-Archive-Src in headers") if not datetime in resp.url: - raise WaybackError("didn't get exact reply (redirect?) datetime:{} got:{}".format(datetime, resp.url)) + raise WaybackError("didn't get exact reply (redirect?) datetime:{} got:{}".format( + datetime, resp.url)) redirect_url = resp.headers.get("Location") # eg, https://web.archive.org/web/20200111003923id_/https://dx.doi.org/10.17504/protocols.io.y2gfybw @@ -622,7 +662,9 @@ class WaybackClient: urls_seen = [start_url] for i in range(self.max_redirects): print(" URL: {}".format(next_url), file=sys.stderr) - cdx_row = self.cdx_client.lookup_best(next_url, best_mimetype=best_mimetype, closest=closest) + cdx_row = self.cdx_client.lookup_best(next_url, + best_mimetype=best_mimetype, + closest=closest) #print(cdx_row, file=sys.stderr) if not cdx_row: return ResourceResult( @@ -776,9 +818,11 @@ class WaybackClient: class SavePageNowError(Exception): pass + class SavePageNowBackoffError(SandcrawlerBackoffError): pass + SavePageNowResult = namedtuple('SavePageNowResult', [ 'success', 'status', @@ -789,13 +833,11 @@ SavePageNowResult = namedtuple('SavePageNowResult', [ 'resources', ]) -class SavePageNowClient: +class SavePageNowClient: def __init__(self, v2endpoint="https://web.archive.org/save", **kwargs): - self.ia_access_key = kwargs.get('ia_access_key', - os.environ.get('IA_ACCESS_KEY')) - self.ia_secret_key = kwargs.get('ia_secret_key', - os.environ.get('IA_SECRET_KEY')) + self.ia_access_key = kwargs.get('ia_access_key', os.environ.get('IA_ACCESS_KEY')) + self.ia_secret_key = kwargs.get('ia_secret_key', os.environ.get('IA_SECRET_KEY')) self.v2endpoint = v2endpoint self.v2_session = requests_retry_session(retries=5, backoff_factor=3) self.v2_session.headers.update({ @@ -886,12 +928,15 @@ class SavePageNowClient: }, ) if resp.status_code == 429: - raise SavePageNowBackoffError("status_code: {}, url: {}".format(resp.status_code, request_url)) + raise SavePageNowBackoffError("status_code: {}, url: {}".format( + resp.status_code, request_url)) elif resp.status_code != 200: - raise SavePageNowError("SPN2 status_code: {}, url: {}".format(resp.status_code, request_url)) + raise SavePageNowError("SPN2 status_code: {}, url: {}".format( + resp.status_code, request_url)) resp_json = resp.json() - if resp_json and 'message' in resp_json and 'You have already reached the limit of active sessions' in resp_json['message']: + if resp_json and 'message' in resp_json and 'You have already reached the limit of active sessions' in resp_json[ + 'message']: raise SavePageNowBackoffError(resp_json['message']) elif not resp_json or 'job_id' not in resp_json or not resp_json['job_id']: raise SavePageNowError( @@ -915,7 +960,8 @@ class SavePageNowClient: final_json = resp.json() break else: - raise SavePageNowError("Unknown SPN2 status:{} url:{}".format(status, request_url)) + raise SavePageNowError("Unknown SPN2 status:{} url:{}".format( + status, request_url)) if not final_json: raise SavePageNowError("SPN2 timed out (polling count exceeded)") @@ -923,8 +969,10 @@ class SavePageNowClient: # if there was a recent crawl of same URL, fetch the status of that # crawl to get correct datetime if final_json.get('original_job_id'): - print(f" SPN recent capture: {job_id} -> {final_json['original_job_id']}", file=sys.stderr) - resp = self.v2_session.get("{}/status/{}".format(self.v2endpoint, final_json['original_job_id'])) + print(f" SPN recent capture: {job_id} -> {final_json['original_job_id']}", + file=sys.stderr) + resp = self.v2_session.get("{}/status/{}".format(self.v2endpoint, + final_json['original_job_id'])) try: resp.raise_for_status() except: @@ -935,7 +983,8 @@ class SavePageNowClient: if final_json['status'] == "success": if final_json.get('original_url').startswith('/'): - print(f" truncateded URL in JSON: {request_url} {json.dumps(final_json)}", file=sys.stderr) + print(f" truncateded URL in JSON: {request_url} {json.dumps(final_json)}", + file=sys.stderr) return SavePageNowResult( True, "success", @@ -969,15 +1018,17 @@ class SavePageNowClient: # HACK: capture CNKI domains with outlinks (for COVID-19 crawling) if 'gzbd.cnki.net/' in start_url: - spn_result = self.save_url_now_v2(start_url, force_simple_get=force_simple_get, capture_outlinks=1) + spn_result = self.save_url_now_v2(start_url, + force_simple_get=force_simple_get, + capture_outlinks=1) else: spn_result = self.save_url_now_v2(start_url, force_simple_get=force_simple_get) if not spn_result.success: status = spn_result.status if status in ("error:invalid-url", "error:not-found", - "error:invalid-host-resolution", "error:gateway-timeout", - "error:too-many-redirects", "error:read-timeout"): + "error:invalid-host-resolution", "error:gateway-timeout", + "error:too-many-redirects", "error:read-timeout"): status = status.replace("error:", "") elif status in ("error:no-access", "error:forbidden"): status = "forbidden" @@ -988,7 +1039,8 @@ class SavePageNowClient: elif status.startswith("error:"): status = "spn2-" + status # despite other errors, call these a failure (so we don't retry) - if spn_result.terminal_url and (spn_result.terminal_url.endswith('/cookieAbsent') or spn_result.terminal_url.endswith("cookieSet=1")): + if spn_result.terminal_url and (spn_result.terminal_url.endswith('/cookieAbsent') + or spn_result.terminal_url.endswith("cookieSet=1")): status = "blocked-cookie" return ResourceResult( start_url=start_url, @@ -1018,7 +1070,8 @@ class SavePageNowClient: ) # don't try to CDX fetch for this common cookie block terminal - if spn_result.terminal_url.endswith('/cookieAbsent') or spn_result.terminal_url.endswith("cookieSet=1"): + if spn_result.terminal_url.endswith( + '/cookieAbsent') or spn_result.terminal_url.endswith("cookieSet=1"): return ResourceResult( start_url=start_url, hit=False, @@ -1143,9 +1196,12 @@ class SavePageNowClient: ) -def fix_transfer_encoding(file_meta: dict, resource: ResourceResult) -> Tuple[dict, ResourceResult]: - if resource.body and file_meta['mimetype'] == 'application/gzip' and resource.cdx and resource.cdx.mimetype != 'application/gzip': - print(" transfer encoding not stripped: {}".format(resource.cdx.mimetype), file=sys.stderr) +def fix_transfer_encoding(file_meta: dict, + resource: ResourceResult) -> Tuple[dict, ResourceResult]: + if resource.body and file_meta[ + 'mimetype'] == 'application/gzip' and resource.cdx and resource.cdx.mimetype != 'application/gzip': + print(" transfer encoding not stripped: {}".format(resource.cdx.mimetype), + file=sys.stderr) inner_body = gzip.decompress(resource.body) if not inner_body: raise Exception("null body inside transfer encoding") diff --git a/python/sandcrawler/ingest_file.py b/python/sandcrawler/ingest_file.py index 137a793..b480cc2 100644 --- a/python/sandcrawler/ingest_file.py +++ b/python/sandcrawler/ingest_file.py @@ -1,4 +1,3 @@ - import base64 import gzip import json @@ -15,18 +14,22 @@ from selectolax.parser import HTMLParser from sandcrawler.db import SandcrawlerPostgrestClient from sandcrawler.grobid import GrobidClient from sandcrawler.html import extract_fulltext_url -from sandcrawler.html_metadata import BiblioMetadata, html_extract_biblio, html_extract_resources, load_adblock_rules -from sandcrawler.ia import (CdxApiClient, CdxApiError, NoCaptureError, PetaboxError, ResourceResult, SavePageNowClient, - SavePageNowError, WaybackClient, WaybackContentError, WaybackError, cdx_to_dict, +from sandcrawler.html_metadata import (BiblioMetadata, html_extract_biblio, + html_extract_resources, load_adblock_rules) +from sandcrawler.ia import (CdxApiClient, CdxApiError, NoCaptureError, PetaboxError, + ResourceResult, SavePageNowClient, SavePageNowError, WaybackClient, + WaybackContentError, WaybackError, cdx_to_dict, fix_transfer_encoding) -from sandcrawler.ingest_html import (WebResource, fetch_html_resources, html_extract_body_teixml, html_guess_platform, +from sandcrawler.ingest_html import (WebResource, fetch_html_resources, + html_extract_body_teixml, html_guess_platform, html_guess_scope, quick_fetch_html_resources) from sandcrawler.misc import clean_url, gen_file_metadata, parse_cdx_datetime from sandcrawler.pdfextract import PdfExtractResult, process_pdf from sandcrawler.workers import SandcrawlerWorker from sandcrawler.xml import xml_reserialize -MAX_BODY_SIZE_BYTES = 128*1024*1024 +MAX_BODY_SIZE_BYTES = 128 * 1024 * 1024 + class IngestFileWorker(SandcrawlerWorker): """ @@ -54,7 +57,6 @@ class IngestFileWorker(SandcrawlerWorker): process_file_hit(ResourceResult) -> response process_grobid(ResourceResult) """ - def __init__(self, sink=None, **kwargs): super().__init__() @@ -64,7 +66,8 @@ class IngestFileWorker(SandcrawlerWorker): self.wayback_client = WaybackClient() self.spn_client = kwargs.get('spn_client') if not self.spn_client: - self.spn_client = SavePageNowClient(spn_cdx_retry_sec=kwargs.get('spn_cdx_retry_sec', 9.0)) + self.spn_client = SavePageNowClient( + spn_cdx_retry_sec=kwargs.get('spn_cdx_retry_sec', 9.0)) self.grobid_client = kwargs.get('grobid_client') if not self.grobid_client: self.grobid_client = GrobidClient() @@ -123,13 +126,13 @@ class IngestFileWorker(SandcrawlerWorker): "fao.org/glis/", # Historical non-paper content: - "dhz.uni-passau.de/", # newspapers - "digital.ucd.ie/", # ireland national historical + "dhz.uni-passau.de/", # newspapers + "digital.ucd.ie/", # ireland national historical # DOI prefixes - "doi.org/10.2307/", # JSTOR; slow and many redirects - "doi.org/10.18730/", # fao.org: database entry - "doi.org/10.15468/", # gbif.org: database entry + "doi.org/10.2307/", # JSTOR; slow and many redirects + "doi.org/10.18730/", # fao.org: database entry + "doi.org/10.15468/", # gbif.org: database entry # deprecated domain (doesn't redirect correctly) "://edoc.mpg.de/", @@ -173,10 +176,10 @@ class IngestFileWorker(SandcrawlerWorker): "video/mpeg", "text/plain", "text/csv", - "text/x-r-source", # dataverse - "text/tab-separated-values", # dataverse - "text/x-rst", # dataverse - "application/x-rlang-transport", # dataverse + "text/x-r-source", # dataverse + "text/tab-separated-values", # dataverse + "text/x-rst", # dataverse + "application/x-rlang-transport", # dataverse "application/json", "application/xml", "application/pdf", @@ -194,7 +197,6 @@ class IngestFileWorker(SandcrawlerWorker): "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet", ] - def check_existing_ingest(self, ingest_type: str, base_url: str) -> Optional[dict]: """ Check in sandcrawler-db (postgres) to see if we have already ingested @@ -214,7 +216,10 @@ class IngestFileWorker(SandcrawlerWorker): else: return None - def find_resource(self, url, best_mimetype=None, force_recrawl=False) -> Optional[ResourceResult]: + def find_resource(self, + url, + best_mimetype=None, + force_recrawl=False) -> Optional[ResourceResult]: """ Looks in wayback for a resource starting at the URL, following any redirects. If a hit isn't found, try crawling with SPN. @@ -222,7 +227,8 @@ class IngestFileWorker(SandcrawlerWorker): via = "none" resource = None - if url.startswith("http://web.archive.org/web/") or url.startswith("https://web.archive.org/web/"): + if url.startswith("http://web.archive.org/web/") or url.startswith( + "https://web.archive.org/web/"): raise NotImplementedError("handling direct wayback links not supported yet") if url.startswith("http://archive.org/") or url.startswith("https://archive.org/"): @@ -243,14 +249,13 @@ class IngestFileWorker(SandcrawlerWorker): if resource and not resource.hit and resource.terminal_dt and resource.terminal_dt < '20190000000000': old_failure = True - if self.try_spn2 and (resource == None or (resource and resource.status == 'no-capture') or soft404 or old_failure): + if self.try_spn2 and (resource == None or (resource and resource.status == 'no-capture') + or soft404 or old_failure): via = "spn2" resource = self.spn_client.crawl_resource(url, self.wayback_client) - print("[FETCH {:>6}] {} {}".format( - via, - (resource and resource.status), - (resource and resource.terminal_url) or url), - file=sys.stderr) + print("[FETCH {:>6}] {} {}".format(via, (resource and resource.status), + (resource and resource.terminal_url) or url), + file=sys.stderr) return resource def process_existing(self, request: dict, result_row: dict) -> dict: @@ -262,7 +267,8 @@ class IngestFileWorker(SandcrawlerWorker): assert result_row['hit'] existing_file_meta = self.pgrest_client.get_file_meta(result_row['terminal_sha1hex']) existing_grobid = self.pgrest_client.get_grobid(result_row['terminal_sha1hex']) - existing_cdx = self.pgrest_client.get_cdx(result_row['terminal_url'], result_row['terminal_dt']) + existing_cdx = self.pgrest_client.get_cdx(result_row['terminal_url'], + result_row['terminal_dt']) if not (existing_file_meta and existing_grobid and existing_cdx): raise NotImplementedError("partially-exsiting records not implemented yet") result = { @@ -281,11 +287,13 @@ class IngestFileWorker(SandcrawlerWorker): } return result - def process_file_hit(self, ingest_type: str, resource: ResourceResult, file_meta: dict) -> dict: + def process_file_hit(self, ingest_type: str, resource: ResourceResult, + file_meta: dict) -> dict: """ Run all the necessary processing for a new/fresh ingest hit. """ - if ingest_type in ["dataset-file", "component"] and file_meta['mimetype'] == "application/pdf": + if ingest_type in ["dataset-file", "component" + ] and file_meta['mimetype'] == "application/pdf": ingest_type = "pdf" if ingest_type == "pdf": return { @@ -396,24 +404,26 @@ class IngestFileWorker(SandcrawlerWorker): try: html_doc = HTMLParser(resource.body) except ValueError as ve: - return dict( - status="html-selectolax-error", - ) + return dict(status="html-selectolax-error", ) html_biblio = html_extract_biblio(resource.terminal_url, html_doc) assert html_biblio html_body = html_extract_body_teixml(resource.body) html_platform = html_guess_platform(resource.terminal_url, html_doc, html_biblio) - html_scope = html_guess_scope(resource.terminal_url, html_doc, html_biblio, html_body.get('word_count')) + html_scope = html_guess_scope(resource.terminal_url, html_doc, html_biblio, + html_body.get('word_count')) html_biblio_dict = json.loads(html_biblio.json(exclude_none=True)) - if html_scope in ('blocked-captcha','blocked-cookie','blocked-forbidden'): + if html_scope in ('blocked-captcha', 'blocked-cookie', 'blocked-forbidden'): return dict( status=html_scope, html_biblio=html_biblio_dict, scope=html_scope, platform=html_platform, ) - elif html_scope not in ('article-fulltext','unknown',): + elif html_scope not in ( + 'article-fulltext', + 'unknown', + ): html_body.pop("tei_xml", None) return dict( status="wrong-scope", @@ -423,7 +433,8 @@ class IngestFileWorker(SandcrawlerWorker): html_body=html_body, ) - raw_resources = html_extract_resources(resource.terminal_url, html_doc, self.adblock_rules) + raw_resources = html_extract_resources(resource.terminal_url, html_doc, + self.adblock_rules) if len(raw_resources) > self.max_html_resources: html_body.pop("tei_xml", None) return dict( @@ -452,7 +463,9 @@ class IngestFileWorker(SandcrawlerWorker): try: if self.html_quick_mode: print(" WARN: running quick CDX-only fetches", file=sys.stderr) - full_resources = quick_fetch_html_resources(raw_resources, self.wayback_client.cdx_client, when) + full_resources = quick_fetch_html_resources(raw_resources, + self.wayback_client.cdx_client, + when) else: full_resources = fetch_html_resources(raw_resources, self.wayback_client, when) except PetaboxError as e: @@ -572,7 +585,9 @@ class IngestFileWorker(SandcrawlerWorker): return result try: - resource = self.find_resource(next_url, best_mimetype, force_recrawl=force_recrawl) + resource = self.find_resource(next_url, + best_mimetype, + force_recrawl=force_recrawl) except SavePageNowError as e: result['status'] = 'spn2-error' result['error_message'] = str(e)[:1600] @@ -650,10 +665,9 @@ class IngestFileWorker(SandcrawlerWorker): # here we split based on ingest type to try and extract a next hop html_ish_resource = bool( "html" in file_meta['mimetype'] - or "xhtml" in file_meta['mimetype'] # matches "application/xhtml+xml" + or "xhtml" in file_meta['mimetype'] # matches "application/xhtml+xml" or "application/xml" in file_meta['mimetype'] - or "text/xml" in file_meta['mimetype'] - ) + or "text/xml" in file_meta['mimetype']) html_biblio = None html_doc = None if html_ish_resource and resource.body: @@ -662,7 +676,8 @@ class IngestFileWorker(SandcrawlerWorker): html_biblio = html_extract_biblio(resource.terminal_url, html_doc) if html_biblio: if not 'html_biblio' in result or html_biblio.title: - result['html_biblio'] = json.loads(html_biblio.json(exclude_none=True)) + result['html_biblio'] = json.loads( + html_biblio.json(exclude_none=True)) #print(f" setting html_biblio: {result['html_biblio']}", file=sys.stderr) except ValueError: pass @@ -686,18 +701,19 @@ class IngestFileWorker(SandcrawlerWorker): assert next_url next_url = clean_url(next_url) print("[PARSE {:>6}] {} {}".format( - ingest_type, - fulltext_url.get('technique'), - next_url, - ), - file=sys.stderr) + ingest_type, + fulltext_url.get('technique'), + next_url, + ), + file=sys.stderr) if next_url in hops: result['status'] = 'link-loop' result['error_message'] = "repeated: {}".format(next_url) return result hops.append(next_url) continue - elif ingest_type in ("xml", "html", "component") and html_ish_resource and html_biblio: + elif ingest_type in ("xml", "html", + "component") and html_ish_resource and html_biblio: # NOTE: src_fulltext_url is not a thing next_url_found = None if ingest_type == "xml" and html_biblio.xml_fulltext_url: @@ -711,11 +727,11 @@ class IngestFileWorker(SandcrawlerWorker): next_url = next_url_found technique = "html_biblio" print("[PARSE {:>6}] {} {}".format( - ingest_type, - technique, - next_url, - ), - file=sys.stderr) + ingest_type, + technique, + next_url, + ), + file=sys.stderr) if next_url in hops: if ingest_type == "html": # for HTML ingest, we don't count this as a link-loop @@ -756,7 +772,8 @@ class IngestFileWorker(SandcrawlerWorker): result['status'] = "wrong-mimetype" # formerly: "other-mimetype" return result elif ingest_type == "xml": - if file_meta['mimetype'] not in ("application/xml", "text/xml", "application/jats+xml"): + if file_meta['mimetype'] not in ("application/xml", "text/xml", + "application/jats+xml"): result['status'] = "wrong-mimetype" return result elif ingest_type == "html": @@ -786,18 +803,18 @@ class IngestFileWorker(SandcrawlerWorker): result['hit'] = True if ingest_type == "pdf": print("[SUCCESS {:>5}] sha1:{} grobid:{} pdfextract:{}".format( - ingest_type, - result.get('file_meta', {}).get('sha1hex'), - result.get('grobid', {}).get('status_code'), - result.get('pdf_meta', {}).get('status'), - ), - file=sys.stderr) + ingest_type, + result.get('file_meta', {}).get('sha1hex'), + result.get('grobid', {}).get('status_code'), + result.get('pdf_meta', {}).get('status'), + ), + file=sys.stderr) else: print("[SUCCESS {:>5}] sha1:{}".format( - ingest_type, - result.get('file_meta', {}).get('sha1hex'), - ), - file=sys.stderr) + ingest_type, + result.get('file_meta', {}).get('sha1hex'), + ), + file=sys.stderr) return result diff --git a/python/sandcrawler/ingest_fileset.py b/python/sandcrawler/ingest_fileset.py index 11386df..5cbb908 100644 --- a/python/sandcrawler/ingest_fileset.py +++ b/python/sandcrawler/ingest_fileset.py @@ -1,4 +1,3 @@ - import gzip import json import sys @@ -14,17 +13,21 @@ from sandcrawler.fileset_platforms import DATASET_PLATFORM_HELPER_TABLE, Fileset from sandcrawler.fileset_strategies import FILESET_STRATEGY_HELPER_TABLE, FilesetIngestStrategy from sandcrawler.fileset_types import PlatformRestrictedError, PlatformScopeError from sandcrawler.html import extract_fulltext_url -from sandcrawler.html_metadata import BiblioMetadata, html_extract_biblio, html_extract_resources, load_adblock_rules -from sandcrawler.ia import (CdxApiClient, CdxApiError, NoCaptureError, PetaboxError, ResourceResult, SavePageNowClient, - SavePageNowError, WaybackClient, WaybackContentError, WaybackError, cdx_to_dict, +from sandcrawler.html_metadata import (BiblioMetadata, html_extract_biblio, + html_extract_resources, load_adblock_rules) +from sandcrawler.ia import (CdxApiClient, CdxApiError, NoCaptureError, PetaboxError, + ResourceResult, SavePageNowClient, SavePageNowError, WaybackClient, + WaybackContentError, WaybackError, cdx_to_dict, fix_transfer_encoding) from sandcrawler.ingest_file import IngestFileWorker -from sandcrawler.ingest_html import (WebResource, fetch_html_resources, html_extract_body_teixml, html_guess_platform, +from sandcrawler.ingest_html import (WebResource, fetch_html_resources, + html_extract_body_teixml, html_guess_platform, html_guess_scope, quick_fetch_html_resources) from sandcrawler.misc import clean_url, gen_file_metadata, parse_cdx_datetime from sandcrawler.workers import SandcrawlerWorker -MAX_BODY_SIZE_BYTES = 128*1024*1024 +MAX_BODY_SIZE_BYTES = 128 * 1024 * 1024 + class IngestFilesetWorker(IngestFileWorker): """ @@ -39,14 +42,13 @@ class IngestFilesetWorker(IngestFileWorker): checking to see if content has been archived already) 4. summarize status """ - def __init__(self, sink=None, **kwargs): super().__init__(sink=None, **kwargs) self.sink = sink self.dataset_platform_helpers = DATASET_PLATFORM_HELPER_TABLE self.dataset_strategy_archivers = FILESET_STRATEGY_HELPER_TABLE - self.max_total_size = kwargs.get('max_total_size', 64*1024*1024*1024) + self.max_total_size = kwargs.get('max_total_size', 64 * 1024 * 1024 * 1024) self.max_file_count = kwargs.get('max_file_count', 200) self.ingest_file_result_sink = kwargs.get('ingest_file_result_sink') self.ingest_file_result_stdout = kwargs.get('ingest_file_result_stdout', False) @@ -72,11 +74,12 @@ class IngestFilesetWorker(IngestFileWorker): raise NotImplementedError("process_existing() not tested or safe yet") def want(self, request: dict) -> bool: - if not request.get('ingest_type') in ('dataset',): + if not request.get('ingest_type') in ('dataset', ): return False return True - def fetch_resource_iteratively(self, ingest_type: str, base_url: str, force_recrawl: bool) -> dict: + def fetch_resource_iteratively(self, ingest_type: str, base_url: str, + force_recrawl: bool) -> dict: """ This is copypasta from process_file(), should probably refactor. """ @@ -174,10 +177,9 @@ class IngestFilesetWorker(IngestFileWorker): # here we split based on ingest type to try and extract a next hop html_ish_resource = bool( "html" in file_meta['mimetype'] - or "xhtml" in file_meta['mimetype'] # matches "application/xhtml+xml" + or "xhtml" in file_meta['mimetype'] # matches "application/xhtml+xml" or "application/xml" in file_meta['mimetype'] - or "text/xml" in file_meta['mimetype'] - ) + or "text/xml" in file_meta['mimetype']) html_biblio = None html_doc = None if html_ish_resource and resource.body: @@ -186,7 +188,8 @@ class IngestFilesetWorker(IngestFileWorker): html_biblio = html_extract_biblio(resource.terminal_url, html_doc) if html_biblio: if not 'html_biblio' in result or html_biblio.title: - result['html_biblio'] = json.loads(html_biblio.json(exclude_none=True)) + result['html_biblio'] = json.loads( + html_biblio.json(exclude_none=True)) #print(f" setting html_biblio: {result['html_biblio']}", file=sys.stderr) except ValueError: pass @@ -214,7 +217,8 @@ class IngestFilesetWorker(IngestFileWorker): result['status'] = "wrong-mimetype" # formerly: "other-mimetype" return result elif ingest_type == "xml": - if file_meta['mimetype'] not in ("application/xml", "text/xml", "application/jats+xml"): + if file_meta['mimetype'] not in ("application/xml", "text/xml", + "application/jats+xml"): result['status'] = "wrong-mimetype" return result elif ingest_type == "html": @@ -229,11 +233,10 @@ class IngestFilesetWorker(IngestFileWorker): result['_resource'] = resource return result - def process(self, request: dict, key: Any = None) -> dict: ingest_type = request.get('ingest_type') - if ingest_type not in ("dataset",): + if ingest_type not in ("dataset", ): raise NotImplementedError(f"can't handle ingest_type={ingest_type}") # parse/clean URL @@ -250,7 +253,9 @@ class IngestFilesetWorker(IngestFileWorker): #if existing: # return self.process_existing(request, existing) - result = self.fetch_resource_iteratively(ingest_type, base_url, force_recrawl=force_recrawl) + result = self.fetch_resource_iteratively(ingest_type, + base_url, + force_recrawl=force_recrawl) result['request'] = request if result.get('status') != None: result['request'] = request @@ -323,14 +328,16 @@ class IngestFilesetWorker(IngestFileWorker): return result if result['file_count'] > self.max_file_count: # hard max, to prevent downstream breakage - if result['file_count'] > 10*1000: + if result['file_count'] > 10 * 1000: result['manifest'] = result['manifest'][:self.max_file_count] result['status'] = 'too-many-files' return result ingest_strategy = platform_helper.chose_strategy(dataset_meta) result['ingest_strategy'] = ingest_strategy - print(f"[PLATFORM {platform}] id={dataset_meta.platform_id} file_count={result['file_count']} total_size={result['total_size']} strategy={ingest_strategy}", file=sys.stderr) + print( + f"[PLATFORM {platform}] id={dataset_meta.platform_id} file_count={result['file_count']} total_size={result['total_size']} strategy={ingest_strategy}", + file=sys.stderr) strategy_helper = self.dataset_strategy_archivers.get(ingest_strategy) if not strategy_helper: @@ -349,7 +356,8 @@ class IngestFilesetWorker(IngestFileWorker): if archive_result.bundle_file_meta: result['fileset_bundle']['file_meta'] = archive_result.bundle_file_meta if archive_result.archiveorg_bundle_path: - result['fileset_bundle']['archiveorg_bundle_path'] = archive_result.archiveorg_bundle_path + result['fileset_bundle'][ + 'archiveorg_bundle_path'] = archive_result.archiveorg_bundle_path if archive_result.bundle_resource: result['fileset_bundle']['terminal'] = dict( terminal_url=archive_result.bundle_resource.terminal_url, @@ -357,14 +365,16 @@ class IngestFilesetWorker(IngestFileWorker): terminal_status_code=archive_result.bundle_resource.terminal_status_code, ) if archive_result.bundle_resource.cdx: - result['fileset_bundle']['cdx'] = cdx_to_dict(archive_result.bundle_resource.cdx) + result['fileset_bundle']['cdx'] = cdx_to_dict( + archive_result.bundle_resource.cdx) if archive_result.bundle_resource.revisit_cdx: - result['fileset_bundle']['revisit_cdx'] = cdx_to_dict(archive_result.bundle_resource.revisit_cdx) + result['fileset_bundle']['revisit_cdx'] = cdx_to_dict( + archive_result.bundle_resource.revisit_cdx) if ingest_strategy.endswith('-file'): result['fileset_file'] = dict() if archive_result.file_file_meta: - result['fileset_file']['file_meta'] = file_meta=archive_result.file_file_meta, + result['fileset_file']['file_meta'] = file_meta = archive_result.file_file_meta, if archive_result.file_resource: result['fileset_file']['terminal'] = dict( terminal_url=archive_result.file_resource.terminal_url, @@ -372,16 +382,20 @@ class IngestFilesetWorker(IngestFileWorker): terminal_status_code=archive_result.file_resource.terminal_status_code, ) if archive_result.file_resource.cdx: - result['fileset_file']['cdx'] = cdx_to_dict(archive_result.file_resource.cdx) + result['fileset_file']['cdx'] = cdx_to_dict( + archive_result.file_resource.cdx) if archive_result.file_resource.revisit_cdx: - result['fileset_file']['revisit_cdx'] = cdx_to_dict(archive_result.file_resource.revisit_cdx) + result['fileset_file']['revisit_cdx'] = cdx_to_dict( + archive_result.file_resource.revisit_cdx) if result['status'].startswith('success'): # check that these are still valid assert result['file_count'] == len(archive_result.manifest) - assert result['total_size'] == sum([m.size for m in archive_result.manifest if m.size]) + assert result['total_size'] == sum( + [m.size for m in archive_result.manifest if m.size]) - if result['status'] == 'success-file' and archive_result.file_resource and archive_result.file_file_meta: + if result[ + 'status'] == 'success-file' and archive_result.file_resource and archive_result.file_file_meta: file_result = dict( hit=True, status='success', @@ -397,10 +411,13 @@ class IngestFilesetWorker(IngestFileWorker): if archive_result.file_resource.cdx: file_result['cdx'] = cdx_to_dict(archive_result.file_resource.cdx) if archive_result.file_resource.revisit_cdx: - file_result['revisit_cdx'] = cdx_to_dict(archive_result.file_resource.revisit_cdx) + file_result['revisit_cdx'] = cdx_to_dict( + archive_result.file_resource.revisit_cdx) file_result['request']['ingest_type'] = request['ingest_type'] + "-file" # call the super() (ingest_file) version of process_hit() - info = self.process_file_hit(file_result['request']['ingest_type'], archive_result.file_resource, archive_result.file_file_meta) + info = self.process_file_hit(file_result['request']['ingest_type'], + archive_result.file_resource, + archive_result.file_file_meta) file_result.update(info) if self.ingest_file_result_sink: self.ingest_file_result_sink.push_record(result.copy()) @@ -410,17 +427,19 @@ class IngestFilesetWorker(IngestFileWorker): if result['status'].startswith('success'): result['hit'] = True print("[SUCCESS {:>5}] file_count={} total_size={} strategy={}".format( - ingest_type, - result['file_count'], - result['total_size'], - ingest_strategy, - ), file=sys.stderr) + ingest_type, + result['file_count'], + result['total_size'], + ingest_strategy, + ), + file=sys.stderr) else: print("[FAIL {:>5}] status={} file_count={} total_size={} strategy={}".format( - ingest_type, - result['status'], - result['file_count'], - result['total_size'], - ingest_strategy, - ), file=sys.stderr) + ingest_type, + result['status'], + result['file_count'], + result['total_size'], + ingest_strategy, + ), + file=sys.stderr) return result diff --git a/python/sandcrawler/ingest_html.py b/python/sandcrawler/ingest_html.py index 9c72dd5..bf25d5d 100644 --- a/python/sandcrawler/ingest_html.py +++ b/python/sandcrawler/ingest_html.py @@ -1,4 +1,3 @@ - import argparse import datetime import io @@ -11,16 +10,20 @@ import pydantic import trafilatura from selectolax.parser import HTMLParser -from sandcrawler.html_metadata import BiblioMetadata, html_extract_biblio, html_extract_resources, load_adblock_rules -from sandcrawler.ia import (CdxApiClient, NoCaptureError, ResourceResult, WaybackClient, WaybackContentError, - cdx_to_dict, fix_transfer_encoding) -from sandcrawler.misc import clean_url, datetime_to_cdx, gen_file_metadata, parse_cdx_datetime, url_fuzzy_equal +from sandcrawler.html_metadata import (BiblioMetadata, html_extract_biblio, + html_extract_resources, load_adblock_rules) +from sandcrawler.ia import (CdxApiClient, NoCaptureError, ResourceResult, WaybackClient, + WaybackContentError, cdx_to_dict, fix_transfer_encoding) +from sandcrawler.misc import (clean_url, datetime_to_cdx, gen_file_metadata, parse_cdx_datetime, + url_fuzzy_equal) TRAFILATURA_AGENT = f"trafilatura/{trafilatura.__version__}" + def html_extract_body_teixml(doc: bytes) -> dict: try: - tei_xml = trafilatura.extract(doc, + tei_xml = trafilatura.extract( + doc, output_format='xmltei', include_comments=False, include_formatting=True, @@ -33,13 +36,19 @@ def html_extract_body_teixml(doc: bytes) -> dict: if tei_xml: body_txt = teixml_body_text(tei_xml) word_count = len(body_txt.split()) - return dict(status="success", agent=TRAFILATURA_AGENT, tei_xml=tei_xml, word_count=word_count) - elif doc.startswith(b'<!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" 2012"http://www.w3.org/TR/html4/loose.dtd">'): + return dict(status="success", + agent=TRAFILATURA_AGENT, + tei_xml=tei_xml, + word_count=word_count) + elif doc.startswith( + b'<!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" 2012"http://www.w3.org/TR/html4/loose.dtd">' + ): # hack for firstmonday.org return html_extract_body_teixml(doc[106:]) else: return dict(status="empty-xml", agent=TRAFILATURA_AGENT) + def teixml_body_text(doc_xml: str) -> str: ns = {"tei": "http://www.tei-c.org/ns/1.0"} tree = ET.fromstring(doc_xml) @@ -49,6 +58,7 @@ def teixml_body_text(doc_xml: str) -> str: else: return "" + class WebResource(pydantic.BaseModel): surt: str timestamp: datetime.datetime @@ -61,16 +71,15 @@ class WebResource(pydantic.BaseModel): resource_type: Optional[str] class Config: - json_encoders = { - datetime.datetime: lambda dt: dt.isoformat() - } + json_encoders = {datetime.datetime: lambda dt: dt.isoformat()} + class IngestWebResult(pydantic.BaseModel): status: str hit: bool error_message: Optional[str] cdx: Optional[dict] - terminal: Optional[Any] # TODO + terminal: Optional[Any] # TODO request: Optional[Any] # TODO file_meta: Optional[dict] html_biblio: Optional[BiblioMetadata] @@ -84,6 +93,7 @@ class IngestWebResult(pydantic.BaseModel): datetime.datetime: lambda dt: dt.isoformat(), } + class HtmlMetaRow(pydantic.BaseModel): sha1hex: str status: str @@ -106,7 +116,7 @@ class HtmlMetaRow(pydantic.BaseModel): """ return ( self.sha1hex, - datetime.datetime.now(), # updated + datetime.datetime.now(), # updated self.status, self.scope, self.has_teixml, @@ -117,7 +127,8 @@ class HtmlMetaRow(pydantic.BaseModel): ) -def quick_fetch_html_resources(resources: List[dict], cdx_client: CdxApiClient, when: Optional[datetime.datetime]) -> List[WebResource]: +def quick_fetch_html_resources(resources: List[dict], cdx_client: CdxApiClient, + when: Optional[datetime.datetime]) -> List[WebResource]: """ This is the lazy version that just does a CDX lookup for each resource. @@ -132,27 +143,30 @@ def quick_fetch_html_resources(resources: List[dict], cdx_client: CdxApiClient, if not cdx_row: raise NoCaptureError(f"HTML sub-resource not found: {resource['url']}") if cdx_row.url != resource['url'] and not url_fuzzy_equal(cdx_row.url, resource['url']): - print(f" WARN: CDX fuzzy match: {cdx_row.url} != {resource['url']}", file=sys.stderr) + print(f" WARN: CDX fuzzy match: {cdx_row.url} != {resource['url']}", + file=sys.stderr) if not cdx_row.status_code: # TODO: fall back to a full fetch? print(f" WARN: skipping revisit record", file=sys.stderr) continue - full.append(WebResource( - surt=cdx_row.surt, - timestamp=cdx_row.datetime, - url=cdx_row.url, - sha1hex=cdx_row.sha1hex, - mimetype=cdx_row.mimetype, - status_code=cdx_row.status_code, - size=None, - sha256hex=None, - resource_type=resource['type'], - )) + full.append( + WebResource( + surt=cdx_row.surt, + timestamp=cdx_row.datetime, + url=cdx_row.url, + sha1hex=cdx_row.sha1hex, + mimetype=cdx_row.mimetype, + status_code=cdx_row.status_code, + size=None, + sha256hex=None, + resource_type=resource['type'], + )) return full -def fetch_html_resources(resources: List[dict], wayback_client: WaybackClient, when: Optional[datetime.datetime]) -> List[WebResource]: +def fetch_html_resources(resources: List[dict], wayback_client: WaybackClient, + when: Optional[datetime.datetime]) -> List[WebResource]: """ This is the full version which fetches each resource from wayback/petabox and calculates additional hashes. @@ -168,23 +182,28 @@ def fetch_html_resources(resources: List[dict], wayback_client: WaybackClient, w raise NoCaptureError(f"HTML sub-resource not found: {resource['url']}") file_meta = gen_file_metadata(wayback_resp.body, allow_empty=True) if file_meta['sha1hex'] != wayback_resp.cdx.sha1hex: - raise WaybackContentError(f"wayback payload sha1hex mismatch: {wayback_resp.cdx.datetime} {wayback_resp.cdx.url}") - full.append(WebResource( - surt=wayback_resp.cdx.surt, - timestamp=parse_cdx_datetime(wayback_resp.cdx.datetime), - url=wayback_resp.cdx.url, - sha1hex=file_meta['sha1hex'], - mimetype=file_meta['mimetype'], - status_code=wayback_resp.cdx.status_code or wayback_resp.revisit_cdx.status_code, - size=file_meta['size_bytes'], - sha256hex=file_meta['sha256hex'], - resource_type=resource['type'], - )) + raise WaybackContentError( + f"wayback payload sha1hex mismatch: {wayback_resp.cdx.datetime} {wayback_resp.cdx.url}" + ) + full.append( + WebResource( + surt=wayback_resp.cdx.surt, + timestamp=parse_cdx_datetime(wayback_resp.cdx.datetime), + url=wayback_resp.cdx.url, + sha1hex=file_meta['sha1hex'], + mimetype=file_meta['mimetype'], + status_code=wayback_resp.cdx.status_code + or wayback_resp.revisit_cdx.status_code, + size=file_meta['size_bytes'], + sha256hex=file_meta['sha256hex'], + resource_type=resource['type'], + )) return full -def html_guess_platform(url: str, doc: HTMLParser, biblio: Optional[BiblioMetadata]) -> Optional[str]: +def html_guess_platform(url: str, doc: HTMLParser, + biblio: Optional[BiblioMetadata]) -> Optional[str]: generator: Optional[str] = None generator_elem = doc.css_first("meta[name='generator']") @@ -229,7 +248,9 @@ def html_guess_platform(url: str, doc: HTMLParser, biblio: Optional[BiblioMetada return None -def html_guess_scope(url: str, doc: HTMLParser, biblio: Optional[BiblioMetadata], word_count: Optional[int]) -> str: + +def html_guess_scope(url: str, doc: HTMLParser, biblio: Optional[BiblioMetadata], + word_count: Optional[int]) -> str: """ This function tries to guess if an HTML document represents one of: @@ -328,7 +349,9 @@ def html_guess_scope(url: str, doc: HTMLParser, biblio: Optional[BiblioMetadata] return "unknown" -def run_single(url: str, timestamp: Optional[str] = None, quick_mode: bool = False) -> IngestWebResult: +def run_single(url: str, + timestamp: Optional[str] = None, + quick_mode: bool = False) -> IngestWebResult: adblock = load_adblock_rules() wayback_client = WaybackClient() @@ -375,7 +398,8 @@ def run_single(url: str, timestamp: Optional[str] = None, quick_mode: bool = Fal full_resources: List[WebResource] = [] if quick_mode: - full_resources = quick_fetch_html_resources(raw_resources, wayback_client.cdx_client, when) + full_resources = quick_fetch_html_resources(raw_resources, wayback_client.cdx_client, + when) else: full_resources = fetch_html_resources(raw_resources, wayback_client, when) @@ -399,14 +423,11 @@ def main() -> None: python -m sandcrawler.ingest_html """ - parser = argparse.ArgumentParser( - formatter_class=argparse.ArgumentDefaultsHelpFormatter - ) + parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter) subparsers = parser.add_subparsers() - sub = subparsers.add_parser( - "single", help="tries to ingest a single URL, dumps result to stdout" - ) + sub = subparsers.add_parser("single", + help="tries to ingest a single URL, dumps result to stdout") sub.set_defaults(func="run_single") sub.add_argument( "url", @@ -437,5 +458,6 @@ def main() -> None: #func() raise NotImplementedError() + if __name__ == "__main__": main() diff --git a/python/sandcrawler/minio.py b/python/sandcrawler/minio.py index b617178..188621f 100644 --- a/python/sandcrawler/minio.py +++ b/python/sandcrawler/minio.py @@ -1,4 +1,3 @@ - import hashlib import io import os @@ -7,7 +6,6 @@ import minio class SandcrawlerMinioClient(object): - def __init__(self, host_url, access_key, secret_key, default_bucket=None): """ host is minio connection string (host:port) diff --git a/python/sandcrawler/misc.py b/python/sandcrawler/misc.py index cf8c4bd..ddbd95a 100644 --- a/python/sandcrawler/misc.py +++ b/python/sandcrawler/misc.py @@ -1,4 +1,3 @@ - import base64 import datetime import hashlib @@ -19,22 +18,28 @@ def clean_url(s: str) -> str: parsed.colon_before_port = b'' return str(urlcanon.whatwg(parsed)) + def url_fuzzy_equal(left: str, right: str) -> bool: """ TODO: use proper surt library and canonicalization for this check """ - fuzzy_left = '://'.join(clean_url(left).replace('www.', '').replace(':80/', '/').split('://')[1:]) - fuzzy_right = '://'.join(clean_url(right).replace('www.', '').replace(':80/', '/').split('://')[1:]) + fuzzy_left = '://'.join( + clean_url(left).replace('www.', '').replace(':80/', '/').split('://')[1:]) + fuzzy_right = '://'.join( + clean_url(right).replace('www.', '').replace(':80/', '/').split('://')[1:]) if fuzzy_left == fuzzy_right: return True elif fuzzy_left == fuzzy_right + "/" or fuzzy_right == fuzzy_left + "/": return True return False + def test_url_fuzzy_equal() -> None: assert True == url_fuzzy_equal( "http://www.annalsofian.org/article.asp?issn=0972-2327;year=2014;volume=17;issue=4;spage=463;epage=465;aulast=Nithyashree", - "http://annalsofian.org/article.asp?issn=0972-2327;year=2014;volume=17;issue=4;spage=463;epage=465;aulast=Nithyashree") + "http://annalsofian.org/article.asp?issn=0972-2327;year=2014;volume=17;issue=4;spage=463;epage=465;aulast=Nithyashree" + ) + def gen_file_metadata(blob: bytes, allow_empty: bool = False) -> dict: """ @@ -45,10 +50,10 @@ def gen_file_metadata(blob: bytes, allow_empty: bool = False) -> dict: assert blob is not None if not allow_empty: assert blob - if len(blob) < 1024*1024: + if len(blob) < 1024 * 1024: mimetype = magic.Magic(mime=True).from_buffer(blob) else: - mimetype = magic.Magic(mime=True).from_buffer(blob[:(1024*1024)]) + mimetype = magic.Magic(mime=True).from_buffer(blob[:(1024 * 1024)]) if mimetype in ("application/xml", "text/xml"): # crude checks for XHTML or JATS XML, using only first 1 kB of file if b"<htm" in blob[:1024] and b'xmlns="http://www.w3.org/1999/xhtml"' in blob[:1024]: @@ -70,6 +75,7 @@ def gen_file_metadata(blob: bytes, allow_empty: bool = False) -> dict: mimetype=mimetype, ) + def gen_file_metadata_path(path: str, allow_empty: bool = False) -> dict: """ Variant of gen_file_metadata() which works with files on local disk @@ -92,7 +98,7 @@ def gen_file_metadata_path(path: str, allow_empty: bool = False) -> dict: size_bytes = 0 with open(path, 'rb') as f: while True: - chunk = f.read(1024*1024) + chunk = f.read(1024 * 1024) if not chunk: break size_bytes += len(chunk) @@ -108,6 +114,7 @@ def gen_file_metadata_path(path: str, allow_empty: bool = False) -> dict: mimetype=mimetype, ) + def b32_hex(s: str) -> str: """ Converts a base32-encoded SHA-1 checksum into hex-encoded @@ -123,6 +130,7 @@ def b32_hex(s: str) -> str: raise ValueError("not a base-32 encoded SHA-1 hash: {}".format(s)) return base64.b16encode(base64.b32decode(s.upper())).lower().decode('utf-8') + NORMAL_MIME = ( 'application/pdf', 'application/postscript', @@ -131,6 +139,7 @@ NORMAL_MIME = ( 'application/octet-stream', ) + def normalize_mime(raw: str) -> Optional[str]: raw = raw.lower().strip() for norm in NORMAL_MIME: @@ -142,9 +151,7 @@ def normalize_mime(raw: str) -> Optional[str]: return 'text/xml' if raw.startswith('application/x-pdf'): return 'application/pdf' - if raw in ( - '.pdf', - ): + if raw in ('.pdf', ): return 'application/pdf' if raw in ( 'application/download', @@ -154,7 +161,7 @@ def normalize_mime(raw: str) -> Optional[str]: 'application/octetstream', 'application/force-download', 'application/unknown', - ): + ): return 'application/octet-stream' return None @@ -193,8 +200,8 @@ def parse_cdx_line(raw_cdx: str, normalize=True) -> Optional[dict]: offset = cdx[9] warc = cdx[10] - if not (sha1b32.isalnum() and c_size.isdigit() and offset.isdigit() - and len(sha1b32) == 32 and dt.isdigit()): + if not (sha1b32.isalnum() and c_size.isdigit() and offset.isdigit() and len(sha1b32) == 32 + and dt.isdigit()): return None if '-' in (surt, dt, url, http_status, sha1b32, c_size, offset, warc): @@ -221,6 +228,7 @@ def parse_cdx_line(raw_cdx: str, normalize=True) -> Optional[dict]: warc_path=warc, ) + def parse_cdx_datetime(dt_str: str) -> Optional[datetime.datetime]: if not dt_str: return None @@ -229,23 +237,39 @@ def parse_cdx_datetime(dt_str: str) -> Optional[datetime.datetime]: except Exception: return None + def test_parse_cdx_datetime() -> None: assert parse_cdx_datetime("") == None assert parse_cdx_datetime("asdf") == None assert parse_cdx_datetime("19930203123045") != None - assert parse_cdx_datetime("20201028235103") == datetime.datetime(year=2020, month=10, day=28, hour=23, minute=51, second=3) + assert parse_cdx_datetime("20201028235103") == datetime.datetime(year=2020, + month=10, + day=28, + hour=23, + minute=51, + second=3) + def datetime_to_cdx(dt: datetime.datetime) -> str: return '%04d%02d%02d%02d%02d%02d' % ( - dt.year, dt.month, dt.day, - dt.hour, dt.minute, dt.second, + dt.year, + dt.month, + dt.day, + dt.hour, + dt.minute, + dt.second, ) + def test_datetime_to_cdx() -> None: - assert "20201028235103" == datetime_to_cdx(datetime.datetime(year=2020, month=10, day=28, hour=23, minute=51, second=3)) + assert "20201028235103" == datetime_to_cdx( + datetime.datetime(year=2020, month=10, day=28, hour=23, minute=51, second=3)) + -def requests_retry_session(retries=10, backoff_factor=3, - status_forcelist=(500, 502, 504), session=None) -> requests.Session: +def requests_retry_session(retries=10, + backoff_factor=3, + status_forcelist=(500, 502, 504), + session=None) -> requests.Session: """ From: https://www.peterbe.com/plog/best-practice-with-retries-with-requests """ @@ -262,6 +286,7 @@ def requests_retry_session(retries=10, backoff_factor=3, session.mount('https://', adapter) return session + def sanitize_fs_path(path: str) -> str: """ From: https://stackoverflow.com/questions/13939120/sanitizing-a-file-path-in-python/66950540#66950540 @@ -271,6 +296,7 @@ def sanitize_fs_path(path: str) -> str: # - making the path relative return os.path.relpath(os.path.normpath(os.path.join("/", path)), "/") + def test_sanitize_fs_path() -> None: assert sanitize_fs_path("/thing.png") == "thing.png" assert sanitize_fs_path("../../thing.png") == "thing.png" diff --git a/python/sandcrawler/pdfextract.py b/python/sandcrawler/pdfextract.py index 2fb34b8..190672d 100644 --- a/python/sandcrawler/pdfextract.py +++ b/python/sandcrawler/pdfextract.py @@ -1,4 +1,3 @@ - import datetime import json import sys @@ -153,19 +152,20 @@ BAD_PDF_SHA1HEX = [ "fd9bd560662e070b222d63052830837829c490f0", ] + @dataclass class PdfExtractResult: sha1hex: str status: str error_msg: Optional[str] = None - file_meta: Optional[Dict[str,Any]] = None + file_meta: Optional[Dict[str, Any]] = None text: Optional[str] = None page0_thumbnail: Optional[bytes] = None has_page0_thumbnail: bool = False meta_xml: Optional[str] = None - pdf_info: Optional[Dict[str,Any]] = None - pdf_extra: Optional[Dict[str,Any]] = None - source: Optional[Dict[str,Any]] = None + pdf_info: Optional[Dict[str, Any]] = None + pdf_extra: Optional[Dict[str, Any]] = None + source: Optional[Dict[str, Any]] = None def to_pdftext_dict(self) -> dict: """ @@ -221,7 +221,8 @@ class PdfExtractResult: ) else: pdf_extra = dict() - for k in ('page_count', 'page0_height', 'page0_width', 'permanent_id', 'pdf_version'): + for k in ('page_count', 'page0_height', 'page0_width', 'permanent_id', + 'pdf_version'): if record.get(k): pdf_extra[k] = record[k] return PdfExtractResult( @@ -255,7 +256,7 @@ class PdfExtractResult: metadata_json = json.dumps(metadata, sort_keys=True) return ( self.sha1hex, - datetime.datetime.now(), # updated + datetime.datetime.now(), # updated self.status, self.has_page0_thumbnail, pdf_extra.get('page_count'), @@ -269,7 +270,7 @@ class PdfExtractResult: ) -def process_pdf(blob: bytes, thumb_size=(180,300), thumb_type="JPEG") -> PdfExtractResult: +def process_pdf(blob: bytes, thumb_size=(180, 300), thumb_type="JPEG") -> PdfExtractResult: """ A known issue is that output text is in "physical layout" mode, which means columns will be side-by-side. We would prefer a single stream of tokens! @@ -330,7 +331,8 @@ def process_pdf(blob: bytes, thumb_size=(180,300), thumb_type="JPEG") -> PdfExtr renderer = poppler.PageRenderer() try: full_img = renderer.render_page(page0) - img = Image.frombuffer("RGBA", (full_img.width, full_img.height), full_img.data, 'raw', "BGRA", 0, 1) + img = Image.frombuffer("RGBA", (full_img.width, full_img.height), full_img.data, 'raw', + "BGRA", 0, 1) img.thumbnail(thumb_size, Image.BICUBIC) buf = BytesIO() img.save(buf, thumb_type) @@ -356,14 +358,14 @@ def process_pdf(blob: bytes, thumb_size=(180,300), thumb_type="JPEG") -> PdfExtr ) # Kafka message size limit; cap at about 1 MByte - if len(full_text)> 1000000: + if len(full_text) > 1000000: return PdfExtractResult( sha1hex=sha1hex, status='text-too-large', error_msg="full_text chars: {}".format(len(full_text)), file_meta=file_meta, ) - if len(pdf.metadata)> 1000000: + if len(pdf.metadata) > 1000000: return PdfExtractResult( sha1hex=sha1hex, status='text-too-large', @@ -414,8 +416,8 @@ def process_pdf(blob: bytes, thumb_size=(180,300), thumb_type="JPEG") -> PdfExtr ), ) -class PdfExtractWorker(SandcrawlerFetchWorker): +class PdfExtractWorker(SandcrawlerFetchWorker): def __init__(self, wayback_client=None, sink=None, **kwargs): super().__init__(wayback_client=wayback_client) self.wayback_client = wayback_client @@ -445,12 +447,12 @@ class PdfExtractWorker(SandcrawlerFetchWorker): self.thumbnail_sink.push_record(result.page0_thumbnail, key=result.sha1hex) return result.to_pdftext_dict() + class PdfExtractBlobWorker(SandcrawlerWorker): """ This is sort of like PdfExtractWorker, except it receives blobs directly, instead of fetching blobs from some remote store. """ - def __init__(self, sink=None, **kwargs): super().__init__() self.sink = sink @@ -466,4 +468,3 @@ class PdfExtractBlobWorker(SandcrawlerWorker): self.thumbnail_sink.push_record(result.page0_thumbnail, key=result.sha1hex) return result.to_pdftext_dict() - diff --git a/python/sandcrawler/pdftrio.py b/python/sandcrawler/pdftrio.py index 7d03357..e3d4a54 100644 --- a/python/sandcrawler/pdftrio.py +++ b/python/sandcrawler/pdftrio.py @@ -1,4 +1,3 @@ - import time import requests @@ -8,7 +7,6 @@ from .workers import SandcrawlerFetchWorker, SandcrawlerWorker class PdfTrioClient(object): - def __init__(self, host_url="http://pdftrio.qa.fatcat.wiki", **kwargs): self.host_url = host_url self.http_session = requests_retry_session(retries=3, backoff_factor=3) @@ -51,9 +49,7 @@ class PdfTrioClient(object): 'error_msg': 'pdftrio request connection timout', } - info = dict( - status_code=pdftrio_response.status_code, - ) + info = dict(status_code=pdftrio_response.status_code, ) if pdftrio_response.status_code == 200: resp_json = pdftrio_response.json() assert 'ensemble_score' in resp_json @@ -72,7 +68,6 @@ class PdfTrioWorker(SandcrawlerFetchWorker): """ This class is basically copied directly from GrobidWorker """ - def __init__(self, pdftrio_client, wayback_client=None, sink=None, **kwargs): super().__init__(wayback_client=wayback_client) self.pdftrio_client = pdftrio_client @@ -103,12 +98,12 @@ class PdfTrioWorker(SandcrawlerFetchWorker): result['timing']['fetch_sec'] = fetch_sec return result + class PdfTrioBlobWorker(SandcrawlerWorker): """ This is sort of like PdfTrioWorker, except it receives blobs directly, instead of fetching blobs from some remote store. """ - def __init__(self, pdftrio_client, sink=None, mode="auto", **kwargs): super().__init__() self.pdftrio_client = pdftrio_client @@ -128,4 +123,3 @@ class PdfTrioBlobWorker(SandcrawlerWorker): total_sec=time.time() - start_process, ) return result - diff --git a/python/sandcrawler/persist.py b/python/sandcrawler/persist.py index 66a36bc..44c03f2 100644 --- a/python/sandcrawler/persist.py +++ b/python/sandcrawler/persist.py @@ -1,4 +1,3 @@ - """ cdx - read raw CDX, filter @@ -32,7 +31,6 @@ from sandcrawler.workers import SandcrawlerWorker class PersistCdxWorker(SandcrawlerWorker): - def __init__(self, db_url, **kwargs): super().__init__() self.db = SandcrawlerPostgresClient(db_url) @@ -56,8 +54,8 @@ class PersistCdxWorker(SandcrawlerWorker): self.db.commit() return [] -class PersistIngestFileResultWorker(SandcrawlerWorker): +class PersistIngestFileResultWorker(SandcrawlerWorker): def __init__(self, db_url, **kwargs): super().__init__() self.db = SandcrawlerPostgresClient(db_url) @@ -78,8 +76,7 @@ class PersistIngestFileResultWorker(SandcrawlerWorker): # backwards compat hacks; transform request to look like current schema if raw.get('ingest_type') == 'file': raw['ingest_type'] = 'pdf' - if (not raw.get('link_source') - and raw.get('base_url') + if (not raw.get('link_source') and raw.get('base_url') and raw.get('ext_ids', {}).get('doi') and raw['base_url'] == "https://doi.org/{}".format(raw['ext_ids']['doi'])): # set link_source(_id) for old ingest requests @@ -119,7 +116,6 @@ class PersistIngestFileResultWorker(SandcrawlerWorker): if not request['request']: request['request'] = None return request - def file_result_to_row(self, raw: dict) -> Optional[dict]: """ @@ -137,7 +133,8 @@ class PersistIngestFileResultWorker(SandcrawlerWorker): ingest_type = raw['request'].get('ingest_type') if ingest_type == 'file': ingest_type = 'pdf' - if ingest_type not in ('pdf', 'xml', 'html', 'component', 'src', 'dataset', 'dataset-file'): + if ingest_type not in ('pdf', 'xml', 'html', 'component', 'src', 'dataset', + 'dataset-file'): self.counts['skip-ingest-type'] += 1 return None if raw['status'] in ("existing", ): @@ -153,7 +150,9 @@ class PersistIngestFileResultWorker(SandcrawlerWorker): if terminal: result['terminal_url'] = terminal.get('terminal_url') or terminal.get('url') result['terminal_dt'] = terminal.get('terminal_dt') - result['terminal_status_code'] = terminal.get('terminal_status_code') or terminal.get('status_code') or terminal.get('http_code') + result['terminal_status_code'] = terminal.get( + 'terminal_status_code') or terminal.get('status_code') or terminal.get( + 'http_code') if result['terminal_status_code']: result['terminal_status_code'] = int(result['terminal_status_code']) result['terminal_sha1hex'] = terminal.get('terminal_sha1hex') @@ -215,9 +214,12 @@ class PersistIngestFileResultWorker(SandcrawlerWorker): 'manifest': raw.get('manifest'), } if result.get('fileset_bundle'): - result['archiveorg_item_bundle_path'] = result['fileset_bundle'].get('archiveorg_item_bundle_path') - result['web_bundle_url'] = result['fileset_bundle'].get('terminal', {}).get('terminal_url') - result['web_bundle_dt'] = result['fileset_bundle'].get('terminal', {}).get('terminal_dt') + result['archiveorg_item_bundle_path'] = result['fileset_bundle'].get( + 'archiveorg_item_bundle_path') + result['web_bundle_url'] = result['fileset_bundle'].get('terminal', + {}).get('terminal_url') + result['web_bundle_dt'] = result['fileset_bundle'].get('terminal', + {}).get('terminal_dt') return result def push_batch(self, batch): @@ -243,7 +245,9 @@ class PersistIngestFileResultWorker(SandcrawlerWorker): # these schemas match, so can just pass through cdx_batch = [r['cdx'] for r in batch if r.get('hit') and r.get('cdx')] - revisit_cdx_batch = [r['revisit_cdx'] for r in batch if r.get('hit') and r.get('revisit_cdx')] + revisit_cdx_batch = [ + r['revisit_cdx'] for r in batch if r.get('hit') and r.get('revisit_cdx') + ] cdx_batch.extend(revisit_cdx_batch) # filter to full CDX lines, with full warc_paths (not liveweb) cdx_batch = [r for r in cdx_batch if r.get('warc_path') and ("/" in r['warc_path'])] @@ -258,24 +262,31 @@ class PersistIngestFileResultWorker(SandcrawlerWorker): self.counts['insert-file_meta'] += resp[0] self.counts['update-file_meta'] += resp[1] - html_meta_batch = [self.result_to_html_meta(r) for r in batch if r.get('hit') and r.get('html_body')] + html_meta_batch = [ + self.result_to_html_meta(r) for r in batch if r.get('hit') and r.get('html_body') + ] if html_meta_batch: resp = self.db.insert_html_meta(self.cur, html_meta_batch, on_conflict="update") self.counts['insert-html_meta'] += resp[0] self.counts['update-html_meta'] += resp[1] - fileset_platform_batch = [self.result_to_platform_row(raw) for raw in batch if raw.get('request', {}).get('ingest_type') == 'dataset' and raw.get('platform_name')] + fileset_platform_batch = [ + self.result_to_platform_row(raw) for raw in batch if + raw.get('request', {}).get('ingest_type') == 'dataset' and raw.get('platform_name') + ] fileset_platform_batch = [p for p in fileset_platform_batch if p] if fileset_platform_batch: - resp = self.db.insert_ingest_fileset_platform(self.cur, fileset_platform_batch, on_conflict="update") + resp = self.db.insert_ingest_fileset_platform(self.cur, + fileset_platform_batch, + on_conflict="update") self.counts['insert-fileset_platform'] += resp[0] self.counts['update-fileset_platform'] += resp[1] self.db.commit() return [] -class PersistIngestFilesetWorker(SandcrawlerWorker): +class PersistIngestFilesetWorker(SandcrawlerWorker): def __init__(self, db_url, **kwargs): super().__init__() self.db = SandcrawlerPostgresClient(db_url) @@ -287,8 +298,8 @@ class PersistIngestFilesetWorker(SandcrawlerWorker): """ raise NotImplementedError -class PersistIngestRequestWorker(PersistIngestFileResultWorker): +class PersistIngestRequestWorker(PersistIngestFileResultWorker): def __init__(self, db_url, **kwargs): super().__init__(db_url=db_url) @@ -315,8 +326,8 @@ class PersistIngestRequestWorker(PersistIngestFileResultWorker): self.db.commit() return [] -class PersistGrobidWorker(SandcrawlerWorker): +class PersistGrobidWorker(SandcrawlerWorker): def __init__(self, db_url, **kwargs): super().__init__() self.grobid = GrobidClient() @@ -406,7 +417,6 @@ class PersistGrobidDiskWorker(SandcrawlerWorker): This could be refactored into a "Sink" type with an even thinner wrapper. """ - def __init__(self, output_dir): super().__init__() self.output_dir = output_dir @@ -424,7 +434,7 @@ class PersistGrobidDiskWorker(SandcrawlerWorker): if record.get('status_code') != 200 or not record.get('tei_xml'): return False - assert(len(record['key'])) == 40 + assert (len(record['key'])) == 40 p = "{}/{}".format(self.output_dir, self._blob_path(record['key'])) os.makedirs(os.path.dirname(p), exist_ok=True) with open(p, 'w') as f: @@ -434,7 +444,6 @@ class PersistGrobidDiskWorker(SandcrawlerWorker): class PersistPdfTrioWorker(SandcrawlerWorker): - def __init__(self, db_url, **kwargs): super().__init__() self.db = SandcrawlerPostgresClient(db_url) @@ -458,7 +467,10 @@ class PersistPdfTrioWorker(SandcrawlerWorker): self.counts['insert-pdftrio'] += resp[0] self.counts['update-pdftrio'] += resp[1] - file_meta_batch = [r['file_meta'] for r in batch if r['pdf_trio']['status'] == "success" and r.get('file_meta')] + file_meta_batch = [ + r['file_meta'] for r in batch + if r['pdf_trio']['status'] == "success" and r.get('file_meta') + ] resp = self.db.insert_file_meta(self.cur, file_meta_batch) self.counts['insert-file-meta'] += resp[0] self.counts['update-file-meta'] += resp[1] @@ -473,7 +485,6 @@ class PersistPdfTextWorker(SandcrawlerWorker): Should keep batch sizes small. """ - def __init__(self, db_url, **kwargs): super().__init__() self.s3 = SandcrawlerMinioClient( @@ -545,7 +556,6 @@ class PersistThumbnailWorker(SandcrawlerWorker): This worker *must* be used with raw kakfa mode; thumbnails are *not* wrapped in JSON like most sandcrawler kafka messages. """ - def __init__(self, **kwargs): super().__init__() self.s3 = SandcrawlerMinioClient( @@ -583,7 +593,6 @@ class GenericPersistDocWorker(SandcrawlerWorker): Objects are assumed to be JSON-wrapped strings. """ - def __init__(self, **kwargs): super().__init__() self.s3 = SandcrawlerMinioClient( @@ -624,7 +633,6 @@ class PersistXmlDocWorker(GenericPersistDocWorker): Pushes TEI-XML file to blob store (S3/seaweed/minio). Does not talk to sandcrawler database (SQL). """ - def __init__(self, **kwargs): super().__init__(**kwargs) self.s3_extension = kwargs.get('s3_extension', ".jats.xml") @@ -637,7 +645,6 @@ class PersistHtmlTeiXmlWorker(GenericPersistDocWorker): Pushes TEI-XML file to blob store (S3/seaweed/minio). Does not talk to sandcrawler database (SQL). """ - def __init__(self, **kwargs): super().__init__(**kwargs) self.s3_extension = kwargs.get('s3_extension', ".tei.xml") diff --git a/python/sandcrawler/workers.py b/python/sandcrawler/workers.py index d8a4016..7135f4c 100644 --- a/python/sandcrawler/workers.py +++ b/python/sandcrawler/workers.py @@ -1,4 +1,3 @@ - import json import multiprocessing.pool import signal @@ -21,7 +20,6 @@ class SandcrawlerWorker(object): Usually these get "pushed" into by a RecordPusher. Output goes to another worker (pipeline-style), or defaults to stdout. """ - def __init__(self): self.counts = Counter() self.sink = None @@ -62,9 +60,9 @@ class SandcrawlerWorker(object): multithreading or if signal-based timeouts are used elsewhere in the same process. """ - def timeout_handler(signum, frame): raise TimeoutError("timeout processing record") + signal.signal(signal.SIGALRM, timeout_handler) resp = None signal.alarm(int(timeout)) @@ -72,7 +70,7 @@ class SandcrawlerWorker(object): resp = self.push_record(task, key=key) except TimeoutError: self.counts['timeout'] += 1 - resp = self.timeout_response(task) # pylint: disable=assignment-from-none + resp = self.timeout_response(task) # pylint: disable=assignment-from-none # TODO: what if it is this push_record() itself that is timing out? if resp and self.sink: self.sink.push_record(resp) @@ -113,7 +111,6 @@ class SandcrawlerFetchWorker(SandcrawlerWorker): Wrapper of SandcrawlerWorker that adds a helper method to fetch blobs (eg, PDFs) from wayback, archive.org, or other sources. """ - def __init__(self, wayback_client, **kwargs): super().__init__(**kwargs) self.wayback_client = wayback_client @@ -178,7 +175,8 @@ class SandcrawlerFetchWorker(SandcrawlerWorker): ) blob = resp.content else: - raise ValueError("not a CDX (wayback) or petabox (archive.org) dict; not sure how to proceed") + raise ValueError( + "not a CDX (wayback) or petabox (archive.org) dict; not sure how to proceed") if not blob: return dict( key=default_key, @@ -192,8 +190,8 @@ class SandcrawlerFetchWorker(SandcrawlerWorker): blob=blob, ) -class MultiprocessWrapper(SandcrawlerWorker): +class MultiprocessWrapper(SandcrawlerWorker): def __init__(self, worker, sink, jobs=None): self.counts = Counter() self.worker = worker @@ -226,21 +224,21 @@ class MultiprocessWrapper(SandcrawlerWorker): print("Multiprocessing: {}".format(self.counts), file=sys.stderr) return worker_counts + class BlackholeSink(SandcrawlerWorker): """ Dummy SandcrawlerWorker. That doesn't do or process anything. Useful for tests. """ - def push_record(self, task, key=None): return def push_batch(self, tasks): return -class KafkaSink(SandcrawlerWorker): +class KafkaSink(SandcrawlerWorker): def __init__(self, kafka_hosts, produce_topic, **kwargs): self.sink = None self.counts = Counter() @@ -249,13 +247,12 @@ class KafkaSink(SandcrawlerWorker): config = self.producer_config({ 'bootstrap.servers': kafka_hosts, - 'message.max.bytes': 30000000, # ~30 MBytes; broker is ~50 MBytes + 'message.max.bytes': 30000000, # ~30 MBytes; broker is ~50 MBytes 'api.version.request': True, 'api.version.fallback.ms': 0, }) self.producer = Producer(config) - @staticmethod def _fail_fast(err, msg): if err is not None: @@ -270,7 +267,7 @@ class KafkaSink(SandcrawlerWorker): 'delivery.report.only.error': True, 'default.topic.config': { 'message.timeout.ms': 30000, - 'request.required.acks': -1, # all brokers must confirm + 'request.required.acks': -1, # all brokers must confirm } }) return config @@ -285,11 +282,7 @@ class KafkaSink(SandcrawlerWorker): msg = msg.encode('utf-8') assert type(msg) == bytes - self.producer.produce( - self.produce_topic, - msg, - key=key, - on_delivery=self._fail_fast) + self.producer.produce(self.produce_topic, msg, key=key, on_delivery=self._fail_fast) self.counts['produced'] += 1 # check for errors etc @@ -308,7 +301,6 @@ class KafkaCompressSink(KafkaSink): """ Variant of KafkaSink for large documents. Used for, eg, GROBID output. """ - def producer_config(self, kafka_config): config = kafka_config.copy() config.update({ @@ -319,7 +311,7 @@ class KafkaCompressSink(KafkaSink): 'delivery.report.only.error': True, 'default.topic.config': { 'message.timeout.ms': 30000, - 'request.required.acks': -1, # all brokers must confirm + 'request.required.acks': -1, # all brokers must confirm } }) return config @@ -330,7 +322,6 @@ class RecordPusher: Base class for different record sources to be pushed into workers. Pretty trivial interface, just wraps an importer and pushes records in to it. """ - def __init__(self, worker, **kwargs): self.counts = Counter() self.worker = worker @@ -348,7 +339,6 @@ class RecordPusher: class JsonLinePusher(RecordPusher): - def __init__(self, worker, json_file, **kwargs): self.counts = Counter() self.worker = worker @@ -387,7 +377,6 @@ class JsonLinePusher(RecordPusher): class CdxLinePusher(RecordPusher): - def __init__(self, worker, cdx_file, **kwargs): self.counts = Counter() self.worker = worker @@ -409,7 +398,8 @@ class CdxLinePusher(RecordPusher): if not record: self.counts['skip-parse'] += 1 continue - if self.filter_http_statuses and record['http_status'] not in self.filter_http_statuses: + if self.filter_http_statuses and record[ + 'http_status'] not in self.filter_http_statuses: self.counts['skip-http_status'] += 1 continue if self.filter_mimetypes and record['mimetype'] not in self.filter_mimetypes: @@ -434,7 +424,6 @@ class CdxLinePusher(RecordPusher): class ZipfilePusher(RecordPusher): - def __init__(self, worker, zipfile_path, **kwargs): self.counts = Counter() self.worker = worker @@ -472,8 +461,8 @@ class ZipfilePusher(RecordPusher): print("ZIP PDFs pushed: {}".format(self.counts), file=sys.stderr) return self.counts -class KafkaJsonPusher(RecordPusher): +class KafkaJsonPusher(RecordPusher): def __init__(self, worker, kafka_hosts, consume_topic, group, **kwargs): self.counts = Counter() self.worker = worker @@ -499,12 +488,11 @@ class KafkaJsonPusher(RecordPusher): # case where there there is one update and thousands of creates; # update would be lingering in worker, and if worker crashed # never created. Not great. - batch = self.consumer.consume( - num_messages=self.batch_size, - timeout=self.poll_interval) + batch = self.consumer.consume(num_messages=self.batch_size, + timeout=self.poll_interval) print("... got {} kafka messages ({}sec poll interval)".format( - len(batch), self.poll_interval), - file=sys.stderr) + len(batch), self.poll_interval), + file=sys.stderr) if not batch: # TODO: could have some larger timeout here and # self.worker.finish() if it's been more than, eg, a couple @@ -541,7 +529,9 @@ class KafkaJsonPusher(RecordPusher): while not done: try: # use timeouts; don't want kafka itself to timeout - self.worker.push_record_timeout(record, key=msg.key(), timeout=self.process_timeout_sec) + self.worker.push_record_timeout(record, + key=msg.key(), + timeout=self.process_timeout_sec) break except SandcrawlerBackoffError as be: print("Backing off for 200 seconds: {}".format(be)) @@ -611,14 +601,14 @@ def make_kafka_consumer(hosts, consume_topic, group): for p in partitions: if p.error: raise KafkaException(p.error) - print("Kafka partitions rebalanced: {} / {}".format( - consumer, partitions), - file=sys.stderr) + print("Kafka partitions rebalanced: {} / {}".format(consumer, partitions), + file=sys.stderr) consumer = Consumer(conf) # NOTE: it's actually important that topic_name *not* be bytes (UTF-8 # encoded) - consumer.subscribe([topic_name], + consumer.subscribe( + [topic_name], on_assign=on_rebalance, on_revoke=on_rebalance, ) diff --git a/python/sandcrawler/xml.py b/python/sandcrawler/xml.py index 7a0086d..83d53d4 100644 --- a/python/sandcrawler/xml.py +++ b/python/sandcrawler/xml.py @@ -1,4 +1,3 @@ - import xml.etree.ElementTree as ET diff --git a/python/sandcrawler_worker.py b/python/sandcrawler_worker.py index e185fad..3c76c17 100755 --- a/python/sandcrawler_worker.py +++ b/python/sandcrawler_worker.py @@ -1,5 +1,4 @@ #!/usr/bin/env python3 - """ These are generally for continuously running workers that consume from Kafka. Outputs might either be pushed back into Kafka, or directly into sandcrawler-db @@ -31,12 +30,8 @@ def run_grobid_extract(args): kafka_hosts=args.kafka_hosts, produce_topic=produce_topic, ) - grobid_client = GrobidClient( - host_url=args.grobid_host, - ) - wayback_client = WaybackClient( - host_url=args.grobid_host, - ) + grobid_client = GrobidClient(host_url=args.grobid_host, ) + wayback_client = WaybackClient(host_url=args.grobid_host, ) worker = GrobidWorker( grobid_client=grobid_client, wayback_client=wayback_client, @@ -51,6 +46,7 @@ def run_grobid_extract(args): ) pusher.run() + def run_pdf_extract(args): consume_topic = "sandcrawler-{}.unextracted".format(args.env) pdftext_topic = "sandcrawler-{}.pdf-text".format(args.env) @@ -63,9 +59,7 @@ def run_pdf_extract(args): kafka_hosts=args.kafka_hosts, produce_topic=thumbnail_topic, ) - wayback_client = WaybackClient( - host_url=args.grobid_host, - ) + wayback_client = WaybackClient(host_url=args.grobid_host, ) worker = PdfExtractWorker( wayback_client=wayback_client, sink=pdftext_sink, @@ -81,6 +75,7 @@ def run_pdf_extract(args): ) pusher.run() + def run_persist_grobid(args): consume_topic = "sandcrawler-{}.grobid-output-pg".format(args.env) worker = PersistGrobidWorker( @@ -105,6 +100,7 @@ def run_persist_grobid(args): ) pusher.run() + def run_persist_pdftext(args): consume_topic = "sandcrawler-{}.pdf-text".format(args.env) worker = PersistPdfTextWorker( @@ -129,6 +125,7 @@ def run_persist_pdftext(args): ) pusher.run() + def run_persist_thumbnail(args): consume_topic = "sandcrawler-{}.pdf-thumbnail-180px-jpg".format(args.env) worker = PersistThumbnailWorker( @@ -150,6 +147,7 @@ def run_persist_thumbnail(args): ) pusher.run() + def run_persist_xml_doc(args: argparse.Namespace) -> None: consume_topic = f"sandcrawler-{args.env}.xml-doc" worker = PersistXmlDocWorker( @@ -168,6 +166,7 @@ def run_persist_xml_doc(args: argparse.Namespace) -> None: ) pusher.run() + def run_persist_html_teixml(args: argparse.Namespace) -> None: consume_topic = f"sandcrawler-{args.env}.html-teixml" worker = PersistHtmlTeiXmlWorker( @@ -186,11 +185,10 @@ def run_persist_html_teixml(args: argparse.Namespace) -> None: ) pusher.run() + def run_persist_pdftrio(args): consume_topic = "sandcrawler-{}.pdftrio-output".format(args.env) - worker = PersistPdfTrioWorker( - db_url=args.db_url, - ) + worker = PersistPdfTrioWorker(db_url=args.db_url, ) pusher = KafkaJsonPusher( worker=worker, kafka_hosts=args.kafka_hosts, @@ -201,6 +199,7 @@ def run_persist_pdftrio(args): ) pusher.run() + def run_ingest_file(args): spn_cdx_retry_sec = 9.0 if args.bulk: @@ -228,9 +227,7 @@ def run_ingest_file(args): kafka_hosts=args.kafka_hosts, produce_topic=grobid_topic, ) - grobid_client = GrobidClient( - host_url=args.grobid_host, - ) + grobid_client = GrobidClient(host_url=args.grobid_host, ) pdftext_sink = KafkaCompressSink( kafka_hosts=args.kafka_hosts, produce_topic=pdftext_topic, @@ -268,11 +265,10 @@ def run_ingest_file(args): ) pusher.run() + def run_persist_ingest_file(args): consume_topic = "sandcrawler-{}.ingest-file-results".format(args.env) - worker = PersistIngestFileResultWorker( - db_url=args.db_url, - ) + worker = PersistIngestFileResultWorker(db_url=args.db_url, ) pusher = KafkaJsonPusher( worker=worker, kafka_hosts=args.kafka_hosts, @@ -283,90 +279,116 @@ def run_persist_ingest_file(args): ) pusher.run() + def main(): - parser = argparse.ArgumentParser( - formatter_class=argparse.ArgumentDefaultsHelpFormatter) + parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter) parser.add_argument('--kafka-hosts', - default="localhost:9092", - help="list of Kafka brokers (host/port) to use") + default="localhost:9092", + help="list of Kafka brokers (host/port) to use") parser.add_argument('--env', - default="dev", - help="Kafka topic namespace to use (eg, prod, qa, dev)") + default="dev", + help="Kafka topic namespace to use (eg, prod, qa, dev)") parser.add_argument('--grobid-host', - default="http://grobid.qa.fatcat.wiki", - help="GROBID API host/port") + default="http://grobid.qa.fatcat.wiki", + help="GROBID API host/port") parser.add_argument('--db-url', - help="postgresql database connection string", - default="postgres:///sandcrawler") - parser.add_argument('--s3-url', - help="S3 (seaweedfs) backend URL", - default="localhost:9000") + help="postgresql database connection string", + default="postgres:///sandcrawler") + parser.add_argument('--s3-url', help="S3 (seaweedfs) backend URL", default="localhost:9000") parser.add_argument('--s3-access-key', - help="S3 (seaweedfs) credential", - default=os.environ.get('SANDCRAWLER_BLOB_ACCESS_KEY') or os.environ.get('MINIO_ACCESS_KEY')) + help="S3 (seaweedfs) credential", + default=os.environ.get('SANDCRAWLER_BLOB_ACCESS_KEY') + or os.environ.get('MINIO_ACCESS_KEY')) parser.add_argument('--s3-secret-key', - help="S3 (seaweedfs) credential", - default=os.environ.get('SANDCRAWLER_BLOB_SECRET_KEY') or os.environ.get('MINIO_SECRET_KEY')) + help="S3 (seaweedfs) credential", + default=os.environ.get('SANDCRAWLER_BLOB_SECRET_KEY') + or os.environ.get('MINIO_SECRET_KEY')) parser.add_argument('--s3-bucket', - help="S3 (seaweedfs) bucket to persist into", - default="sandcrawler-dev") + help="S3 (seaweedfs) bucket to persist into", + default="sandcrawler-dev") subparsers = parser.add_subparsers() - sub_grobid_extract = subparsers.add_parser('grobid-extract', - help="daemon that consumes CDX JSON objects from Kafka, uses GROBID to extract XML, pushes to Kafka") + sub_grobid_extract = subparsers.add_parser( + 'grobid-extract', + help= + "daemon that consumes CDX JSON objects from Kafka, uses GROBID to extract XML, pushes to Kafka" + ) sub_grobid_extract.set_defaults(func=run_grobid_extract) - sub_pdf_extract = subparsers.add_parser('pdf-extract', - help="daemon that consumes CDX JSON objects from Kafka, extracts text and thumbnail, pushes to Kafka") + sub_pdf_extract = subparsers.add_parser( + 'pdf-extract', + help= + "daemon that consumes CDX JSON objects from Kafka, extracts text and thumbnail, pushes to Kafka" + ) sub_pdf_extract.set_defaults(func=run_pdf_extract) - sub_persist_grobid = subparsers.add_parser('persist-grobid', - help="daemon that consumes GROBID output from Kafka and pushes to S3 (seaweedfs) and postgres") + sub_persist_grobid = subparsers.add_parser( + 'persist-grobid', + help= + "daemon that consumes GROBID output from Kafka and pushes to S3 (seaweedfs) and postgres" + ) sub_persist_grobid.add_argument('--s3-only', - action='store_true', - help="only upload TEI-XML to S3 (don't write to database)") - sub_persist_grobid.add_argument('--db-only', + action='store_true', + help="only upload TEI-XML to S3 (don't write to database)") + sub_persist_grobid.add_argument( + '--db-only', action='store_true', help="only write status to database (don't upload TEI-XML to S3)") sub_persist_grobid.set_defaults(func=run_persist_grobid) - sub_persist_pdftext = subparsers.add_parser('persist-pdftext', - help="daemon that consumes pdftext output from Kafka and pushes to S3 (seaweedfs) and postgres") + sub_persist_pdftext = subparsers.add_parser( + 'persist-pdftext', + help= + "daemon that consumes pdftext output from Kafka and pushes to S3 (seaweedfs) and postgres" + ) sub_persist_pdftext.add_argument('--s3-only', - action='store_true', - help="only upload TEI-XML to S3 (don't write to database)") - sub_persist_pdftext.add_argument('--db-only', + action='store_true', + help="only upload TEI-XML to S3 (don't write to database)") + sub_persist_pdftext.add_argument( + '--db-only', action='store_true', help="only write status to database (don't upload TEI-XML to S3)") sub_persist_pdftext.set_defaults(func=run_persist_pdftext) - sub_persist_thumbnail = subparsers.add_parser('persist-thumbnail', - help="daemon that consumes thumbnail output from Kafka and pushes to S3 (seaweedfs) and postgres") + sub_persist_thumbnail = subparsers.add_parser( + 'persist-thumbnail', + help= + "daemon that consumes thumbnail output from Kafka and pushes to S3 (seaweedfs) and postgres" + ) sub_persist_thumbnail.set_defaults(func=run_persist_thumbnail) - sub_persist_xml_doc = subparsers.add_parser('persist-xml-doc', - help="daemon that consumes xml-doc output from Kafka and pushes to S3 (seaweedfs) bucket") + sub_persist_xml_doc = subparsers.add_parser( + 'persist-xml-doc', + help="daemon that consumes xml-doc output from Kafka and pushes to S3 (seaweedfs) bucket" + ) sub_persist_xml_doc.set_defaults(func=run_persist_xml_doc) - sub_persist_html_teixml = subparsers.add_parser('persist-html-teixml', - help="daemon that consumes html-teixml output from Kafka and pushes to S3 (seaweedfs) bucket") + sub_persist_html_teixml = subparsers.add_parser( + 'persist-html-teixml', + help= + "daemon that consumes html-teixml output from Kafka and pushes to S3 (seaweedfs) bucket" + ) sub_persist_html_teixml.set_defaults(func=run_persist_html_teixml) - sub_persist_pdftrio = subparsers.add_parser('persist-pdftrio', + sub_persist_pdftrio = subparsers.add_parser( + 'persist-pdftrio', help="daemon that consumes pdftrio output from Kafka and pushes to postgres") sub_persist_pdftrio.set_defaults(func=run_persist_pdftrio) - sub_ingest_file = subparsers.add_parser('ingest-file', + sub_ingest_file = subparsers.add_parser( + 'ingest-file', help="daemon that consumes requests from Kafka, ingests, pushes results to Kafka") sub_ingest_file.add_argument('--bulk', - action='store_true', - help="consume from bulk kafka topic (eg, for ingest backfill)") - sub_ingest_file.add_argument('--priority', + action='store_true', + help="consume from bulk kafka topic (eg, for ingest backfill)") + sub_ingest_file.add_argument( + '--priority', action='store_true', help="consume from priority kafka topic (eg, for SPN requests)") sub_ingest_file.set_defaults(func=run_ingest_file) - sub_persist_ingest_file = subparsers.add_parser('persist-ingest-file', + sub_persist_ingest_file = subparsers.add_parser( + 'persist-ingest-file', help="daemon that consumes ingest-file output from Kafka and pushes to postgres") sub_persist_ingest_file.set_defaults(func=run_persist_ingest_file) @@ -377,5 +399,6 @@ def main(): args.func(args) + if __name__ == '__main__': main() diff --git a/python/scripts/arabesque2ingestrequest.py b/python/scripts/arabesque2ingestrequest.py index 69fe320..9cc9055 100755 --- a/python/scripts/arabesque2ingestrequest.py +++ b/python/scripts/arabesque2ingestrequest.py @@ -1,5 +1,4 @@ #!/usr/bin/env python3 - """ This script is intended to be used for backfill ingest of old crawls. It can also be used as a fast path for getting freshly crawled content into fatcat if @@ -36,37 +35,35 @@ def run(args): }, } if args.release_stage: - assert args.release_stage in ('published', 'submitted', 'accepted', 'draft', 'update') + assert args.release_stage in ('published', 'submitted', 'accepted', 'draft', + 'update') request['release_stage'] = args.release_stage print("{}".format(json.dumps(request, sort_keys=True))) + def main(): - parser = argparse.ArgumentParser( - formatter_class=argparse.ArgumentDefaultsHelpFormatter) + parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter) parser.add_argument('--link-source', - required=True, - help="link_source to include in request") - parser.add_argument('--extid-type', - required=True, - help="extid to encode identifier as") + required=True, + help="link_source to include in request") + parser.add_argument('--extid-type', required=True, help="extid to encode identifier as") parser.add_argument('--ingest-type', - default="pdf", - help="ingest type (pdf, html, xml, etc)") + default="pdf", + help="ingest type (pdf, html, xml, etc)") parser.add_argument('--ingest-request-source', - default="arabesque", - help="to include in request") - parser.add_argument('--release-stage', - default=None, - help="to include in request") + default="arabesque", + help="to include in request") + parser.add_argument('--release-stage', default=None, help="to include in request") parser.add_argument('json_file', - help="arabesque output file to use", - type=argparse.FileType('r')) + help="arabesque output file to use", + type=argparse.FileType('r')) subparsers = parser.add_subparsers() args = parser.parse_args() run(args) + if __name__ == '__main__': main() diff --git a/python/scripts/archiveorg_fileset.py b/python/scripts/archiveorg_fileset.py index 86ca062..83c04e3 100755 --- a/python/scripts/archiveorg_fileset.py +++ b/python/scripts/archiveorg_fileset.py @@ -23,11 +23,9 @@ FORMAT_TO_MIMETYPE = { 'RAR': 'application/vnd.rar', 'TAR': 'application/x-tar', '7z': 'application/x-7z-compressed', - 'HTML': 'text/html', 'Text': 'text/plain', 'PDF': 'application/pdf', - 'CSV': 'text/csv', 'XML': 'application/xml', 'JSON': 'application/json', @@ -36,20 +34,17 @@ FORMAT_TO_MIMETYPE = { #'application/vnd.openxmlformats-officedocument.wordprocessingml.document', # .docx #'application/vnd.ms-excel', # .xls #'application/vnd.openxmlformats-officedocument.spreadsheetml.sheet', # .xlsx - - 'MP3': 'audio/mpeg', # .mp3 - - 'MP4': 'video/mp4', # .mp4 - 'MPEG': 'video/mpeg', # .mpeg - + 'MP3': 'audio/mpeg', # .mp3 + 'MP4': 'video/mp4', # .mp4 + 'MPEG': 'video/mpeg', # .mpeg 'JPEG': 'image/jpeg', 'GIF': 'image/gif', 'PNG': 'image/png', 'TIFF': 'image/tiff', - 'Unknown': None, } + def want_file(f: dict, item_name: str) -> bool: """ Filters IA API files @@ -57,12 +52,12 @@ def want_file(f: dict, item_name: str) -> bool: if f.source != 'original': return False for suffix in [ - '_meta.sqlite', - '_archive.torrent', - '_itemimage.jpg', - '_meta.xml', - '_thumb.png', - '_files.xml', + '_meta.sqlite', + '_archive.torrent', + '_itemimage.jpg', + '_meta.xml', + '_thumb.png', + '_files.xml', ]: if f.name == item_name + suffix or f.name == item_name.lower() + suffix: return False @@ -74,6 +69,7 @@ def want_file(f: dict, item_name: str) -> bool: return False return True + def parse_file(f: dict) -> dict: """ Takes an IA API file and turns it in to a fatcat fileset manifest file @@ -93,6 +89,7 @@ def parse_file(f: dict) -> dict: mf['extra'] = dict(mimetype=mimetype) return mf + def item_to_fileset(item_name: str, release_id: str, session: internetarchive.ArchiveSession): print(f"processing item={item_name} release_id={release_id}", file=sys.stderr) if release_id.startswith('release_'): @@ -104,18 +101,17 @@ def item_to_fileset(item_name: str, release_id: str, session: internetarchive.Ar manifest = [parse_file(f) for f in item_files if want_file(f, item_name)] fileset = { 'manifest': manifest, - 'urls': [ - { - 'rel': 'archive', - 'url': f'https://archive.org/download/{item_name}/', - }, - ], + 'urls': [{ + 'rel': 'archive', + 'url': f'https://archive.org/download/{item_name}/', + }, ], 'release_ids': [release_id], #extra={}, } print(json.dumps(fileset)) return fileset + def main(): session = internetarchive.get_session() if len(sys.argv) == 3: @@ -133,5 +129,6 @@ def main(): release_id = fields[1] item_to_fileset(item_name, release_id=release_id, session=session) + if __name__ == '__main__': main() diff --git a/python/scripts/cdx_collection.py b/python/scripts/cdx_collection.py index 5e33def..aa78aec 100755 --- a/python/scripts/cdx_collection.py +++ b/python/scripts/cdx_collection.py @@ -35,9 +35,7 @@ def run(): print("Looking up collection: {}".format(collection)) # First fetch list - item_list = list( - ia.search_items( - query="collection:{} mediatype:web".format(collection))) + item_list = list(ia.search_items(query="collection:{} mediatype:web".format(collection))) if len(item_list) == 0: print("No items found, bailing") @@ -50,11 +48,12 @@ def run(): item = item['identifier'] # TODO: error handling try: - ret = ia.download(item, files=[item + '.cdx.gz'], - verbose=True, - destdir=tempdir, - no_directory=True, - retries=1000) + ret = ia.download(item, + files=[item + '.cdx.gz'], + verbose=True, + destdir=tempdir, + no_directory=True, + retries=1000) status = ret and status except requests.exceptions.ReadTimeout as rt: print(str(rt), file=sys.stderr) @@ -69,14 +68,13 @@ def run(): # Combine files print("Merging and re-compressing all CDX files...") #subprocess.run('zcat {0}/*.cdx.gz | pigz > {0}/combined.gz'.format(tempdir), - subprocess.run('zcat {0}/*.cdx.gz | gzip > {0}/combined.gz'.format(tempdir), - shell=True) + subprocess.run('zcat {0}/*.cdx.gz | gzip > {0}/combined.gz'.format(tempdir), shell=True) # Move and cleanup - shutil.move('{}/combined.gz'.format(tempdir), - '{}.cdx.gz'.format(collection)) + shutil.move('{}/combined.gz'.format(tempdir), '{}.cdx.gz'.format(collection)) print("Done!") -if __name__=='__main__': + +if __name__ == '__main__': run() diff --git a/python/scripts/covid2ingestrequest.py b/python/scripts/covid2ingestrequest.py index 1b7c85c..4714b60 100755 --- a/python/scripts/covid2ingestrequest.py +++ b/python/scripts/covid2ingestrequest.py @@ -1,5 +1,4 @@ #!/usr/bin/env python3 - """ Transform an unpaywall dump (JSON) into ingest requests. """ @@ -21,7 +20,6 @@ def transform_cnki(obj): requests = [] assert obj['cnki_id'] - requests = [] requests.append({ 'base_url': canon(obj['info_url']), @@ -41,6 +39,7 @@ def transform_cnki(obj): return requests + def transform_wanfang(obj): assert obj['wanfang_id'] @@ -68,17 +67,18 @@ def run(args): for r in requests: print("{}".format(json.dumps(r, sort_keys=True))) + def main(): - parser = argparse.ArgumentParser( - formatter_class=argparse.ArgumentDefaultsHelpFormatter) + parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter) parser.add_argument('json_file', - help="COVID-19 metadata file to use", - type=argparse.FileType('r')) + help="COVID-19 metadata file to use", + type=argparse.FileType('r')) subparsers = parser.add_subparsers() args = parser.parse_args() run(args) + if __name__ == '__main__': main() diff --git a/python/scripts/deliver_dumpgrobid_to_s3.py b/python/scripts/deliver_dumpgrobid_to_s3.py index 62a85e6..3b53235 100755 --- a/python/scripts/deliver_dumpgrobid_to_s3.py +++ b/python/scripts/deliver_dumpgrobid_to_s3.py @@ -49,7 +49,6 @@ def b32_hex(s): class DeliverDumpGrobidS3(): - def __init__(self, s3_bucket, **kwargs): self.rstore = None self.count = Counter() @@ -80,11 +79,7 @@ class DeliverDumpGrobidS3(): tei_xml = tei_xml.encode('utf-8') # upload to AWS S3 obj = self.bucket.put_object( - Key="{}{}/{}{}".format( - self.s3_prefix, - sha1_hex[0:4], - sha1_hex, - self.s3_suffix), + Key="{}{}/{}{}".format(self.s3_prefix, sha1_hex[0:4], sha1_hex, self.s3_suffix), Body=tei_xml, StorageClass=self.s3_storage_class, ) @@ -92,6 +87,7 @@ class DeliverDumpGrobidS3(): self.count['success-s3'] += 1 sys.stderr.write("{}\n".format(self.count)) + @sentry_client.capture_exceptions def main(): @@ -121,5 +117,6 @@ def main(): worker = DeliverDumpGrobidS3(**args.__dict__) worker.run(args.dump_file) -if __name__ == '__main__': # pragma: no cover + +if __name__ == '__main__': # pragma: no cover main() diff --git a/python/scripts/deliver_gwb_to_disk.py b/python/scripts/deliver_gwb_to_disk.py index ab1906a..ca19b97 100755 --- a/python/scripts/deliver_gwb_to_disk.py +++ b/python/scripts/deliver_gwb_to_disk.py @@ -26,7 +26,6 @@ sentry_client = raven.Client() class DeliverGwbDisk: - def __init__(self, disk_dir, **kwargs): self.warc_uri_prefix = kwargs.get('warc_uri_prefix') self.rstore = None @@ -34,7 +33,8 @@ class DeliverGwbDisk: # /serve/ instead of /download/ doesn't record view count self.petabox_base_url = kwargs.get('petabox_base_url', 'http://archive.org/serve/') # gwb library will fall back to reading from /opt/.petabox/webdata.secret - self.petabox_webdata_secret = kwargs.get('petabox_webdata_secret', os.environ.get('PETABOX_WEBDATA_SECRET')) + self.petabox_webdata_secret = kwargs.get('petabox_webdata_secret', + os.environ.get('PETABOX_WEBDATA_SECRET')) self.disk_dir = disk_dir self.disk_prefix = kwargs.get('disk_prefix', 'pdf/') self.disk_suffix = kwargs.get('disk_suffix', '.pdf') @@ -42,48 +42,56 @@ class DeliverGwbDisk: def fetch_warc_content(self, warc_path, offset, c_size): warc_uri = self.warc_uri_prefix + warc_path if not self.rstore: - self.rstore = ResourceStore(loaderfactory=CDXLoaderFactory( - webdata_secret=self.petabox_webdata_secret, - download_base_url=self.petabox_base_url)) + self.rstore = ResourceStore( + loaderfactory=CDXLoaderFactory(webdata_secret=self.petabox_webdata_secret, + download_base_url=self.petabox_base_url)) try: gwb_record = self.rstore.load_resource(warc_uri, offset, c_size) except wayback.exception.ResourceUnavailable: - return None, dict(status="error", - reason="failed to load file contents from wayback/petabox (ResourceUnavailable)") + return None, dict( + status="error", + reason="failed to load file contents from wayback/petabox (ResourceUnavailable)" + ) except ValueError as ve: - return None, dict(status="error", - reason="failed to load file contents from wayback/petabox (ValueError: {})".format(ve)) + return None, dict( + status="error", + reason="failed to load file contents from wayback/petabox (ValueError: {})". + format(ve)) except EOFError as eofe: - return None, dict(status="error", - reason="failed to load file contents from wayback/petabox (EOFError: {})".format(eofe)) + return None, dict( + status="error", + reason="failed to load file contents from wayback/petabox (EOFError: {})". + format(eofe)) except TypeError as te: - return None, dict(status="error", - reason="failed to load file contents from wayback/petabox (TypeError: {}; likely a bug in wayback python code)".format(te)) + return None, dict( + status="error", + reason= + "failed to load file contents from wayback/petabox (TypeError: {}; likely a bug in wayback python code)" + .format(te)) # Note: could consider a generic "except Exception" here, as we get so # many petabox errors. Do want jobs to fail loud and clear when the # whole cluster is down though. if gwb_record.get_status()[0] != 200: return None, dict(status="error", - reason="archived HTTP response (WARC) was not 200", - warc_status=gwb_record.get_status()[0]) + reason="archived HTTP response (WARC) was not 200", + warc_status=gwb_record.get_status()[0]) try: raw_content = gwb_record.open_raw_content().read() except IncompleteRead as ire: - return None, dict(status="error", - reason="failed to read actual file contents from wayback/petabox (IncompleteRead: {})".format(ire)) + return None, dict( + status="error", + reason= + "failed to read actual file contents from wayback/petabox (IncompleteRead: {})". + format(ire)) return raw_content, None def run(self, manifest_file): sys.stderr.write("Ensuring all 65536 base directories exist...\n") for i in range(256): for j in range(256): - fpath = "{}/{}{:02x}/{:02x}".format( - self.disk_dir, - self.disk_prefix, - i, - j) + fpath = "{}/{}{:02x}/{:02x}".format(self.disk_dir, self.disk_prefix, i, j) os.makedirs(fpath, exist_ok=True) sys.stderr.write("Starting...\n") for line in manifest_file: @@ -102,9 +110,11 @@ class DeliverGwbDisk: self.count['skip-warc'] += 1 continue # fetch from GWB/petabox via HTTP range-request - blob, status = self.fetch_warc_content(file_cdx['warc'], file_cdx['offset'], file_cdx['c_size']) + blob, status = self.fetch_warc_content(file_cdx['warc'], file_cdx['offset'], + file_cdx['c_size']) if blob is None and status: - print("{}\terror petabox\t{}\t{}".format(sha1_hex, file_cdx['warc'], status['reason'])) + print("{}\terror petabox\t{}\t{}".format(sha1_hex, file_cdx['warc'], + status['reason'])) self.count['err-petabox-fetch'] += 1 continue elif not blob: @@ -120,19 +130,15 @@ class DeliverGwbDisk: self.count['petabox-ok'] += 1 # save to disk - fpath = "{}/{}{}/{}/{}{}".format( - self.disk_dir, - self.disk_prefix, - sha1_hex[0:2], - sha1_hex[2:4], - sha1_hex, - self.disk_suffix) + fpath = "{}/{}{}/{}/{}{}".format(self.disk_dir, self.disk_prefix, sha1_hex[0:2], + sha1_hex[2:4], sha1_hex, self.disk_suffix) with open(fpath, 'wb') as f: f.write(blob) print("{}\tsuccess\t{}\t{}".format(sha1_hex, fpath, len(blob))) self.count['success-disk'] += 1 sys.stderr.write("{}\n".format(self.count)) + @sentry_client.capture_exceptions def main(): @@ -162,5 +168,6 @@ def main(): worker = DeliverGwbDisk(**args.__dict__) worker.run(args.manifest_file) -if __name__ == '__main__': # pragma: no cover + +if __name__ == '__main__': # pragma: no cover main() diff --git a/python/scripts/deliver_gwb_to_s3.py b/python/scripts/deliver_gwb_to_s3.py index f103205..f9b3b19 100755 --- a/python/scripts/deliver_gwb_to_s3.py +++ b/python/scripts/deliver_gwb_to_s3.py @@ -53,7 +53,6 @@ sentry_client = raven.Client() class DeliverGwbS3: - def __init__(self, s3_bucket, **kwargs): self.warc_uri_prefix = kwargs.get('warc_uri_prefix') self.rstore = None @@ -61,7 +60,8 @@ class DeliverGwbS3: # /serve/ instead of /download/ doesn't record view count self.petabox_base_url = kwargs.get('petabox_base_url', 'http://archive.org/serve/') # gwb library will fall back to reading from /opt/.petabox/webdata.secret - self.petabox_webdata_secret = kwargs.get('petabox_webdata_secret', os.environ.get('PETABOX_WEBDATA_SECRET')) + self.petabox_webdata_secret = kwargs.get('petabox_webdata_secret', + os.environ.get('PETABOX_WEBDATA_SECRET')) self.s3_bucket = s3_bucket self.s3_prefix = kwargs.get('s3_prefix', 'pdf/') self.s3_suffix = kwargs.get('s3_suffix', '.pdf') @@ -71,37 +71,49 @@ class DeliverGwbS3: def fetch_warc_content(self, warc_path, offset, c_size): warc_uri = self.warc_uri_prefix + warc_path if not self.rstore: - self.rstore = ResourceStore(loaderfactory=CDXLoaderFactory( - webdata_secret=self.petabox_webdata_secret, - download_base_url=self.petabox_base_url)) + self.rstore = ResourceStore( + loaderfactory=CDXLoaderFactory(webdata_secret=self.petabox_webdata_secret, + download_base_url=self.petabox_base_url)) try: gwb_record = self.rstore.load_resource(warc_uri, offset, c_size) except wayback.exception.ResourceUnavailable: - return None, dict(status="error", - reason="failed to load file contents from wayback/petabox (ResourceUnavailable)") + return None, dict( + status="error", + reason="failed to load file contents from wayback/petabox (ResourceUnavailable)" + ) except ValueError as ve: - return None, dict(status="error", - reason="failed to load file contents from wayback/petabox (ValueError: {})".format(ve)) + return None, dict( + status="error", + reason="failed to load file contents from wayback/petabox (ValueError: {})". + format(ve)) except EOFError as eofe: - return None, dict(status="error", - reason="failed to load file contents from wayback/petabox (EOFError: {})".format(eofe)) + return None, dict( + status="error", + reason="failed to load file contents from wayback/petabox (EOFError: {})". + format(eofe)) except TypeError as te: - return None, dict(status="error", - reason="failed to load file contents from wayback/petabox (TypeError: {}; likely a bug in wayback python code)".format(te)) + return None, dict( + status="error", + reason= + "failed to load file contents from wayback/petabox (TypeError: {}; likely a bug in wayback python code)" + .format(te)) # Note: could consider a generic "except Exception" here, as we get so # many petabox errors. Do want jobs to fail loud and clear when the # whole cluster is down though. if gwb_record.get_status()[0] != 200: return None, dict(status="error", - reason="archived HTTP response (WARC) was not 200", - warc_status=gwb_record.get_status()[0]) + reason="archived HTTP response (WARC) was not 200", + warc_status=gwb_record.get_status()[0]) try: raw_content = gwb_record.open_raw_content().read() except IncompleteRead as ire: - return None, dict(status="error", - reason="failed to read actual file contents from wayback/petabox (IncompleteRead: {})".format(ire)) + return None, dict( + status="error", + reason= + "failed to read actual file contents from wayback/petabox (IncompleteRead: {})". + format(ire)) return raw_content, None def run(self, manifest_file): @@ -122,9 +134,11 @@ class DeliverGwbS3: self.count['skip-warc'] += 1 continue # fetch from GWB/petabox via HTTP range-request - blob, status = self.fetch_warc_content(file_cdx['warc'], file_cdx['offset'], file_cdx['c_size']) + blob, status = self.fetch_warc_content(file_cdx['warc'], file_cdx['offset'], + file_cdx['c_size']) if blob is None and status: - print("{}\terror petabox\t{}\t{}".format(sha1_hex, file_cdx['warc'], status['reason'])) + print("{}\terror petabox\t{}\t{}".format(sha1_hex, file_cdx['warc'], + status['reason'])) self.count['err-petabox-fetch'] += 1 continue elif not blob: @@ -140,17 +154,14 @@ class DeliverGwbS3: self.count['petabox-ok'] += 1 # upload to AWS S3 - obj = self.bucket.put_object( - Key="{}{}/{}{}".format( - self.s3_prefix, - sha1_hex[0:4], - sha1_hex, - self.s3_suffix), - Body=blob) + obj = self.bucket.put_object(Key="{}{}/{}{}".format(self.s3_prefix, sha1_hex[0:4], + sha1_hex, self.s3_suffix), + Body=blob) print("{}\tsuccess\t{}\t{}".format(sha1_hex, obj.key, len(blob))) self.count['success-s3'] += 1 sys.stderr.write("{}\n".format(self.count)) + @sentry_client.capture_exceptions def main(): @@ -180,5 +191,6 @@ def main(): worker = DeliverGwbS3(**args.__dict__) worker.run(args.manifest_file) -if __name__ == '__main__': # pragma: no cover + +if __name__ == '__main__': # pragma: no cover main() diff --git a/python/scripts/doaj2ingestrequest.py b/python/scripts/doaj2ingestrequest.py index 15b30a0..84a2c2c 100755 --- a/python/scripts/doaj2ingestrequest.py +++ b/python/scripts/doaj2ingestrequest.py @@ -1,5 +1,4 @@ #!/usr/bin/env python3 - """ Transform an DOAJ article dump (JSON) into ingest requests. @@ -42,22 +41,22 @@ CONTENT_TYPE_MAP = { "abstract": [], "doc": [], "": ["pdf"], - "doi": ["pdf"], "url": ["pdf"], "fulltext": ["pdf"], "anySimpleType": ["pdf"], - "application/pdf": ["pdf"], "html": ["html", "pdf"], "text/html": ["html", "pdf"], "xml": ["xml"], } + def canon(s: str) -> str: parsed = urlcanon.parse_url(s) return str(urlcanon.whatwg(parsed)) + def transform(obj: dict) -> List[dict]: """ Transforms from a single DOAJ object to zero or more ingest requests. @@ -118,6 +117,7 @@ def transform(obj: dict) -> List[dict]: return requests + def run(args) -> None: for l in args.json_file: if not l.strip(): @@ -128,17 +128,18 @@ def run(args) -> None: for r in requests: print("{}".format(json.dumps(r, sort_keys=True))) + def main() -> None: - parser = argparse.ArgumentParser( - formatter_class=argparse.ArgumentDefaultsHelpFormatter) + parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter) parser.add_argument('json_file', - help="DOAJ article dump file to use", - type=argparse.FileType('r')) + help="DOAJ article dump file to use", + type=argparse.FileType('r')) subparsers = parser.add_subparsers() args = parser.parse_args() run(args) + if __name__ == '__main__': main() diff --git a/python/scripts/enrich_scored_matches.py b/python/scripts/enrich_scored_matches.py index 3085346..54c3d5f 100755 --- a/python/scripts/enrich_scored_matches.py +++ b/python/scripts/enrich_scored_matches.py @@ -34,13 +34,13 @@ def run(): sha1 = base64.b16encode(base64.b32decode(raw_sha1)).decode('ascii').lower() - obj = dict( - sha1=sha1, - dois=dois, - cdx=[dict(url=cdx['url'], dt=cdx['dt'])], - size=size, - mimetype=mimetype) + obj = dict(sha1=sha1, + dois=dois, + cdx=[dict(url=cdx['url'], dt=cdx['dt'])], + size=size, + mimetype=mimetype) print(json.dumps(obj)) -if __name__=='__main__': + +if __name__ == '__main__': run() diff --git a/python/scripts/filter_grobid_metadata.py b/python/scripts/filter_grobid_metadata.py index d0666ce..a474393 100755 --- a/python/scripts/filter_grobid_metadata.py +++ b/python/scripts/filter_grobid_metadata.py @@ -24,6 +24,7 @@ NAME_DENYLIST = ( 'phdstudent', ) + def tokenize(s, remove_whitespace=True): s.replace(''', "'") @@ -36,9 +37,11 @@ def tokenize(s, remove_whitespace=True): # Encode as dumb ASCII (TODO: this is horrible) return s.encode('ascii', 'replace').decode('utf8').replace('?', '') + assert tokenize("Impact Factor: 2.114") == "impactfactor" assert tokenize("Impact Factor: 2.114") in TITLE_DENYLIST + def filter_title(title): title = title.strip() @@ -83,19 +86,23 @@ def filter_title(title): return title + def filter_author_name(name): name = name['name'] if name.strip().lower().replace(' ', '') in NAME_DENYLIST: return None return ' '.join([t for t in name.split() if tokenize(t)]) + def filter_authors(l): return [dict(name=n) for n in map(filter_author_name, l) if n and len(n) > 1] + def filter_refs(l): # TODO: return l + def filter_journal_name(name): # same denylist, for now if not name: @@ -104,10 +111,12 @@ def filter_journal_name(name): slug_name = tokenize(name) if slug_name in TITLE_DENYLIST or len(slug_name) < 4 or name == "N.º": return None - for prefix in ("/ ", "~ ", "& ", "© ", "Original Research Article ", "Original Article ", "Research Article ", "Available online www.jocpr.com "): + for prefix in ("/ ", "~ ", "& ", "© ", "Original Research Article ", "Original Article ", + "Research Article ", "Available online www.jocpr.com "): if name.startswith(prefix): name = name.replace(prefix, '') - for suffix in (" Available online at www.sciarena.com", " Original Article", " Available online at", " ISSN", " ISSUE"): + for suffix in (" Available online at www.sciarena.com", " Original Article", + " Available online at", " ISSN", " ISSUE"): if name.endswith(suffix): name = name.replace(suffix, '') if "====================" in name: @@ -116,6 +125,7 @@ def filter_journal_name(name): return None return ' '.join(name.split()) + def filter_metadata(obj): if not (obj.get('title') and obj.get('authors')): return None @@ -132,6 +142,7 @@ def filter_metadata(obj): return obj + def run(invert=False): for line in sys.stdin: fields = line.split('\t') @@ -155,5 +166,6 @@ def run(invert=False): elif invert: print(raw.strip()) -if __name__=="__main__": + +if __name__ == "__main__": run(invert="--invert" in sys.argv) diff --git a/python/scripts/filter_groupworks.py b/python/scripts/filter_groupworks.py index 494da71..fda9098 100755 --- a/python/scripts/filter_groupworks.py +++ b/python/scripts/filter_groupworks.py @@ -28,6 +28,7 @@ MAX_SLUG_LINES = 50 REQUIRE_AUTHORS = False + def tokenize(s, remove_whitespace=False): s.replace(''', "'") @@ -40,6 +41,7 @@ def tokenize(s, remove_whitespace=False): # Encode as dumb ASCII (TODO: this is horrible) return s.encode('ascii', 'replace').replace(b'?', b'') + def check_authors(left, right): """ Intended to check GROBID extracted authors (right) against "known good" @@ -63,6 +65,7 @@ def check_authors(left, right): return False return True + def test_check_authors(): assert check_authors([], []) == bool(not REQUIRE_AUTHORS) assert not check_authors([], ['one']) @@ -74,6 +77,7 @@ def test_check_authors(): assert check_authors(['Mr. Magoo'], ['Mr Magoo']) assert check_authors(['one', 'tw', 'thr'], ['one', 'two', 'three']) + # Rows are (score, left, right) def process_group(rows): @@ -119,6 +123,7 @@ def process_group(rows): print(json.dumps([releases[ident] for ident in group_ids])) + def run(): last_slug = None @@ -140,5 +145,6 @@ def run(): if lines: process_group(lines) -if __name__=='__main__': + +if __name__ == '__main__': run() diff --git a/python/scripts/filter_scored_matches.py b/python/scripts/filter_scored_matches.py index abf81bd..3251852 100755 --- a/python/scripts/filter_scored_matches.py +++ b/python/scripts/filter_scored_matches.py @@ -33,6 +33,7 @@ def tokenize(s, remove_whitespace=False): # Encode as dumb ASCII (TODO: this is horrible) return s.encode('ascii', 'replace').replace(b'?', b'') + def check_authors(left, right): """ Intended to check GROBID extracted authors (right) against "known good" @@ -56,6 +57,7 @@ def check_authors(left, right): return False return True + def test_check_authors(): assert not check_authors([], []) assert not check_authors([], ['one']) @@ -67,6 +69,7 @@ def test_check_authors(): assert check_authors(['Mr. Magoo'], ['Mr Magoo']) assert check_authors(['one', 'tw', 'thr'], ['one', 'two', 'three']) + # Rows are (score, grobid, crossref) def process_group(rows): if len(rows) > max_slug_lines: @@ -92,6 +95,7 @@ def process_group(rows): for sha1, doi_list in keepers.items(): print("{}\t{}".format(sha1, json.dumps(doi_list))) + def run(): last_slug = None @@ -112,5 +116,6 @@ def run(): if lines: process_group(lines) -if __name__=='__main__': + +if __name__ == '__main__': run() diff --git a/python/scripts/grobid_affiliations.py b/python/scripts/grobid_affiliations.py index d391f60..b42153c 100755 --- a/python/scripts/grobid_affiliations.py +++ b/python/scripts/grobid_affiliations.py @@ -1,5 +1,4 @@ #!/usr/bin/env python3 - """ Takes old (HBase) or new (pg) style JSON wrappers of GROBID XML extraction output, converts the XML to JSON, filters out raw affiliation strings, and @@ -24,10 +23,12 @@ def parse_hbase(line): tei_xml = obj['tei_xml'] return sha1hex, tei_xml + def parse_pg(line): obj = json.loads(line) return obj['sha1hex'], obj['tei_xml'] + def run(mode='hbase'): for line in sys.stdin: if mode == 'hbase': @@ -49,5 +50,6 @@ def run(mode='hbase'): affiliations = [json.loads(a) for a in affiliations] print('\t'.join([sha1hex, json.dumps(affiliations)])) -if __name__=='__main__': + +if __name__ == '__main__': run() diff --git a/python/scripts/import_grobid_metadata.py b/python/scripts/import_grobid_metadata.py index 8aee0be..c9bc134 100755 --- a/python/scripts/import_grobid_metadata.py +++ b/python/scripts/import_grobid_metadata.py @@ -4,7 +4,8 @@ import datetime import json import sys -MAX_ABSTRACT_BYTES=4096 +MAX_ABSTRACT_BYTES = 4096 + def parse_grobid_json(obj): @@ -14,10 +15,7 @@ def parse_grobid_json(obj): extra = dict() if obj.get('abstract') and len(obj.get('abstract')) < MAX_ABSTRACT_BYTES: - abobj = dict( - mimetype="text/plain", - language=None, - content=obj.get('abstract').strip()) + abobj = dict(mimetype="text/plain", language=None, content=obj.get('abstract').strip()) abstracts = [abobj] else: abstracts = None @@ -72,16 +70,16 @@ def parse_grobid_json(obj): else: extra = None - return dict( - title=obj['title'].strip(), - contribs=contribs, - publisher=obj['journal'].get('publisher'), - volume=obj['journal'].get('volume'), - issue=obj['journal'].get('issue'), - abstracts=abstracts, - release_type=release_type, - release_date=release_date, - extra=extra) + return dict(title=obj['title'].strip(), + contribs=contribs, + publisher=obj['journal'].get('publisher'), + volume=obj['journal'].get('volume'), + issue=obj['journal'].get('issue'), + abstracts=abstracts, + release_type=release_type, + release_date=release_date, + extra=extra) + def run(): for line in sys.stdin: @@ -90,5 +88,6 @@ def run(): if out: print(out) -if __name__=="__main__": + +if __name__ == "__main__": run() diff --git a/python/scripts/ingestrequest_row2json.py b/python/scripts/ingestrequest_row2json.py index acba2a8..70731d5 100755 --- a/python/scripts/ingestrequest_row2json.py +++ b/python/scripts/ingestrequest_row2json.py @@ -1,5 +1,4 @@ #!/usr/bin/env python3 - """ This script is used to turn ingest request postgres rows (in JSON export format) back in to regular ingest request JSON. @@ -25,6 +24,7 @@ def transform(row): row['fatcat'] = dict(release_ident=extra['release_ident']) return row + def run(args): for l in args.json_file: if not l.strip(): @@ -35,17 +35,18 @@ def run(args): print(l, file=sys.stderr) print(json.dumps(req, sort_keys=True)) + def main(): - parser = argparse.ArgumentParser( - formatter_class=argparse.ArgumentDefaultsHelpFormatter) + parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter) parser.add_argument('json_file', - help="arabesque output file to use", - type=argparse.FileType('r')) + help="arabesque output file to use", + type=argparse.FileType('r')) subparsers = parser.add_subparsers() args = parser.parse_args() run(args) + if __name__ == '__main__': main() diff --git a/python/scripts/manifest_converter.py b/python/scripts/manifest_converter.py index 8267003..24e22fd 100755 --- a/python/scripts/manifest_converter.py +++ b/python/scripts/manifest_converter.py @@ -20,6 +20,7 @@ import sys # 2. select all file metadata # 3. output object + def or_none(s): if s is None: return None @@ -27,6 +28,7 @@ def or_none(s): return None return s + def process_db(db_path): db = sqlite3.connect(db_path) @@ -52,5 +54,6 @@ def process_db(db_path): dois = db.execute("SELECT doi FROM files_id_doi WHERE sha1=?", [sha1]) print(json.dumps(obj)) -if __name__=="__main__": + +if __name__ == "__main__": process_db(sys.argv[1]) diff --git a/python/scripts/oai2ingestrequest.py b/python/scripts/oai2ingestrequest.py index 315b8d2..1f4a19f 100755 --- a/python/scripts/oai2ingestrequest.py +++ b/python/scripts/oai2ingestrequest.py @@ -1,5 +1,4 @@ #!/usr/bin/env python3 - """ Transform an OAI-PMH bulk dump (JSON) into ingest requests. @@ -33,17 +32,19 @@ DOMAIN_BLOCKLIST = [ ] RELEASE_STAGE_MAP = { - 'info:eu-repo/semantics/draftVersion': 'draft', + 'info:eu-repo/semantics/draftVersion': 'draft', 'info:eu-repo/semantics/submittedVersion': 'submitted', - 'info:eu-repo/semantics/acceptedVersion': 'accepted', + 'info:eu-repo/semantics/acceptedVersion': 'accepted', 'info:eu-repo/semantics/publishedVersion': 'published', - 'info:eu-repo/semantics/updatedVersion': 'updated', + 'info:eu-repo/semantics/updatedVersion': 'updated', } + def canon(s): parsed = urlcanon.parse_url(s) return str(urlcanon.whatwg(parsed)) + def transform(obj): """ Transforms from a single OAI-PMH object to zero or more ingest requests. @@ -112,6 +113,7 @@ def transform(obj): return requests + def run(args): for l in args.json_file: if not l.strip(): @@ -122,17 +124,18 @@ def run(args): for r in requests: print("{}".format(json.dumps(r, sort_keys=True))) + def main(): - parser = argparse.ArgumentParser( - formatter_class=argparse.ArgumentDefaultsHelpFormatter) + parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter) parser.add_argument('json_file', - help="OAI-PMH dump file to use (usually stdin)", - type=argparse.FileType('r')) + help="OAI-PMH dump file to use (usually stdin)", + type=argparse.FileType('r')) subparsers = parser.add_subparsers() args = parser.parse_args() run(args) + if __name__ == '__main__': main() diff --git a/python/scripts/pdf_thumbnail.py b/python/scripts/pdf_thumbnail.py index 71fbe54..3f81b3b 100755 --- a/python/scripts/pdf_thumbnail.py +++ b/python/scripts/pdf_thumbnail.py @@ -1,5 +1,4 @@ #!/usr/bin/env python3 - """ Quick CLI script to convert a PDF to thumbnail (.png, jpeg, etc). @@ -23,12 +22,14 @@ def run(inpath, outpath): renderer = poppler.PageRenderer() full_page = renderer.render_page(page) - img = Image.frombuffer("RGBA", (full_page.width, full_page.height), full_page.data, 'raw', "BGRA", 0, 1) - img.thumbnail((180,300), Image.BICUBIC) + img = Image.frombuffer("RGBA", (full_page.width, full_page.height), full_page.data, 'raw', + "BGRA", 0, 1) + img.thumbnail((180, 300), Image.BICUBIC) #img.thumbnail((360,600), Image.BICUBIC) img.save(outpath) #img.save(outpath, quality=95) + if __name__ == '__main__': if len(sys.argv) != 3: print("expect two parameters: INPUT.png OUTPUT.png", file=sys.stderr) diff --git a/python/scripts/unpaywall2ingestrequest.py b/python/scripts/unpaywall2ingestrequest.py index 590b429..b79f316 100755 --- a/python/scripts/unpaywall2ingestrequest.py +++ b/python/scripts/unpaywall2ingestrequest.py @@ -1,5 +1,4 @@ #!/usr/bin/env python3 - """ Transform an unpaywall dump (JSON) into ingest requests. """ @@ -26,17 +25,19 @@ DOMAIN_BLOCKLIST = [ ] RELEASE_STAGE_MAP = { - 'draftVersion': 'draft', + 'draftVersion': 'draft', 'submittedVersion': 'submitted', - 'acceptedVersion': 'accepted', + 'acceptedVersion': 'accepted', 'publishedVersion': 'published', - 'updatedVersion': 'updated', + 'updatedVersion': 'updated', } + def canon(s): parsed = urlcanon.parse_url(s) return str(urlcanon.whatwg(parsed)) + def transform(obj): """ Transforms from a single unpaywall object to zero or more ingest requests. @@ -86,6 +87,7 @@ def transform(obj): return requests + def run(args): for l in args.json_file: if not l.strip(): @@ -96,17 +98,18 @@ def run(args): for r in requests: print("{}".format(json.dumps(r, sort_keys=True))) + def main(): - parser = argparse.ArgumentParser( - formatter_class=argparse.ArgumentDefaultsHelpFormatter) + parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter) parser.add_argument('json_file', - help="unpaywall dump file to use", - type=argparse.FileType('r')) + help="unpaywall dump file to use", + type=argparse.FileType('r')) subparsers = parser.add_subparsers() args = parser.parse_args() run(args) + if __name__ == '__main__': main() diff --git a/python/tests/test_grobid.py b/python/tests/test_grobid.py index 7d950df..55636dc 100644 --- a/python/tests/test_grobid.py +++ b/python/tests/test_grobid.py @@ -1,4 +1,3 @@ - import struct import pytest @@ -12,20 +11,21 @@ FAKE_PDF_BYTES = b"%PDF SOME JUNK" + struct.pack("!q", 112853843) with open('tests/files/23b29ea36382680716be08fc71aa81bd226e8a85.xml', 'rb') as f: REAL_TEI_XML = f.read() + @pytest.fixture def grobid_client(): - client = GrobidClient( - host_url="http://dummy-grobid", - ) + client = GrobidClient(host_url="http://dummy-grobid", ) return client + @responses.activate def test_grobid_503(grobid_client): status = b'{"status": "done broke due to 503"}' responses.add(responses.POST, - 'http://dummy-grobid/api/processFulltextDocument', status=503, - body=status) + 'http://dummy-grobid/api/processFulltextDocument', + status=503, + body=status) resp = grobid_client.process_fulltext(FAKE_PDF_BYTES) @@ -35,12 +35,15 @@ def test_grobid_503(grobid_client): assert resp['status_code'] == 503 assert resp['status'] == "error" + @responses.activate def test_grobid_success(grobid_client): responses.add(responses.POST, - 'http://dummy-grobid/api/processFulltextDocument', status=200, - body=REAL_TEI_XML, content_type='text/xml') + 'http://dummy-grobid/api/processFulltextDocument', + status=200, + body=REAL_TEI_XML, + content_type='text/xml') resp = grobid_client.process_fulltext(FAKE_PDF_BYTES) @@ -53,6 +56,7 @@ def test_grobid_success(grobid_client): #print(type(REAL_TEI_XML)) assert resp['tei_xml'] == REAL_TEI_XML.decode('ISO-8859-1') + @responses.activate def test_grobid_worker_cdx(grobid_client, wayback_client): @@ -60,8 +64,10 @@ def test_grobid_worker_cdx(grobid_client, wayback_client): worker = GrobidWorker(grobid_client, wayback_client, sink=sink) responses.add(responses.POST, - 'http://dummy-grobid/api/processFulltextDocument', status=200, - body=REAL_TEI_XML, content_type='text/xml') + 'http://dummy-grobid/api/processFulltextDocument', + status=200, + body=REAL_TEI_XML, + content_type='text/xml') with open('tests/files/example.cdx', 'r') as cdx_file: pusher = CdxLinePusher( @@ -76,4 +82,3 @@ def test_grobid_worker_cdx(grobid_client, wayback_client): assert pusher_counts['pushed'] == worker.counts['total'] assert len(responses.calls) == worker.counts['total'] - diff --git a/python/tests/test_grobid2json.py b/python/tests/test_grobid2json.py index b8999b1..7637871 100644 --- a/python/tests/test_grobid2json.py +++ b/python/tests/test_grobid2json.py @@ -1,4 +1,3 @@ - import json import xml @@ -8,14 +7,15 @@ from grobid2json import * def test_small_xml(): - + with open('tests/files/small.xml', 'r') as f: tei_xml = f.read() with open('tests/files/small.json', 'r') as f: - json_form = json.loads(f.read()) + json_form = json.loads(f.read()) assert teixml2json(tei_xml) == json_form + def test_invalid_xml(): with pytest.raises(xml.etree.ElementTree.ParseError): diff --git a/python/tests/test_html.py b/python/tests/test_html.py index d4bffc1..c5f422e 100644 --- a/python/tests/test_html.py +++ b/python/tests/test_html.py @@ -1,4 +1,3 @@ - import json import pytest @@ -13,8 +12,7 @@ def test_extract_fulltext_url(): assert resp == {} resp = extract_fulltext_url( - "http://dummy-site/", - b"""<html> + "http://dummy-site/", b"""<html> <head> <meta name="citation_pdf_url" content="http://www.example.com/content/271/20/11761.full.pdf"> </head> @@ -22,8 +20,7 @@ def test_extract_fulltext_url(): <h1>my big article here</h1> blah </body> - </html>""" - ) + </html>""") assert resp['pdf_url'] == "http://www.example.com/content/271/20/11761.full.pdf" assert resp['technique'] == "citation_pdf_url" @@ -32,4 +29,5 @@ def test_extract_fulltext_url(): "https://journals.plos.org/plosone/article?id=10.1371/journal.pone.0213978", f.read(), ) - assert resp['pdf_url'] == "https://journals.plos.org/plosone/article/file?id=10.1371/journal.pone.0213978&type=printable" + assert resp[ + 'pdf_url'] == "https://journals.plos.org/plosone/article/file?id=10.1371/journal.pone.0213978&type=printable" diff --git a/python/tests/test_html_ingest.py b/python/tests/test_html_ingest.py index 943e5da..3bf94e2 100644 --- a/python/tests/test_html_ingest.py +++ b/python/tests/test_html_ingest.py @@ -1,4 +1,3 @@ - import datetime import pytest diff --git a/python/tests/test_html_metadata.py b/python/tests/test_html_metadata.py index 7f35d55..a4c1e41 100644 --- a/python/tests/test_html_metadata.py +++ b/python/tests/test_html_metadata.py @@ -1,4 +1,3 @@ - import datetime import pytest @@ -44,11 +43,12 @@ def test_html_metadata_plos() -> None: def test_html_metadata_elife() -> None: - + with open('tests/files/elife_article.html', 'r') as f: elife_html = f.read() - meta = html_extract_biblio("https://elifesciences.org/articles/44753", HTMLParser(elife_html)) + meta = html_extract_biblio("https://elifesciences.org/articles/44753", + HTMLParser(elife_html)) assert meta is not None assert meta.title == "Parallel visual circuitry in a basal chordate" assert meta.doi == "10.7554/eLife.44753" @@ -69,7 +69,7 @@ def test_html_metadata_elife() -> None: def test_html_metadata_peerj() -> None: - + with open('tests/files/peerj_oa_article.html', 'r') as f: peerj_html = f.read() @@ -78,15 +78,15 @@ def test_html_metadata_peerj() -> None: assert meta.title == "The state of OA: a large-scale analysis of the prevalence and impact of Open Access articles" assert meta.doi == "10.7717/peerj.4375" assert meta.contrib_names == [ - "Heather Piwowar", - "Jason Priem", - "Vincent Larivière", - "Juan Pablo Alperin", - "Lisa Matthias", - "Bree Norlander", - "Ashley Farley", - "Jevin West", - "Stefanie Haustein", + "Heather Piwowar", + "Jason Priem", + "Vincent Larivière", + "Juan Pablo Alperin", + "Lisa Matthias", + "Bree Norlander", + "Ashley Farley", + "Jevin West", + "Stefanie Haustein", ] assert meta.container_name == "PeerJ" # "2018-02-13" @@ -129,7 +129,7 @@ def test_html_metadata_ojs3() -> None: "Os Keyes", ] assert meta.container_name == "First Monday" - assert meta.container_abbrev == "1" # NOTE: bad source metadata + assert meta.container_abbrev == "1" # NOTE: bad source metadata assert meta.container_issn == "1396-0466" # "2020/09/10" assert meta.release_date == datetime.date(year=2020, month=9, day=10) @@ -150,6 +150,7 @@ def test_html_metadata_dlib() -> None: # "2017-05-15" assert meta.release_date == datetime.date(year=2017, month=5, day=15) + def test_html_metadata_dc_case() -> None: """ This tests that CSS selector <meta name=""> attribute lookups are not case-sensitive. @@ -167,10 +168,12 @@ def test_html_metadata_dc_case() -> None: assert meta is not None assert meta.issue == "123" + @pytest.fixture def adblock() -> Any: return load_adblock_rules() + def test_html_resources(adblock) -> None: with open('tests/files/dlib_05vanhyning.html', 'r') as f: @@ -227,4 +230,3 @@ def test_html_resources(adblock) -> None: HTMLParser(nature_html), adblock, ) - diff --git a/python/tests/test_ingest.py b/python/tests/test_ingest.py index 0965fcb..79f50f4 100644 --- a/python/tests/test_ingest.py +++ b/python/tests/test_ingest.py @@ -1,4 +1,3 @@ - import json import pytest @@ -12,9 +11,7 @@ from sandcrawler import * @pytest.fixture def ingest_worker(wayback_client, spn_client): - grobid_client = GrobidClient( - host_url="http://dummy-grobid", - ) + grobid_client = GrobidClient(host_url="http://dummy-grobid", ) worker = IngestFileWorker( wayback_client=wayback_client, spn_client=spn_client, @@ -22,14 +19,11 @@ def ingest_worker(wayback_client, spn_client): ) return worker + @pytest.fixture def ingest_worker_pdf(wayback_client_pdf, spn_client): - grobid_client = GrobidClient( - host_url="http://dummy-grobid", - ) - pgrest_client = SandcrawlerPostgrestClient( - api_url="http://dummy-postgrest", - ) + grobid_client = GrobidClient(host_url="http://dummy-grobid", ) + pgrest_client = SandcrawlerPostgrestClient(api_url="http://dummy-postgrest", ) worker = IngestFileWorker( wayback_client=wayback_client_pdf, spn_client=spn_client, @@ -50,37 +44,45 @@ def test_ingest_success(ingest_worker_pdf): 'base_url': "http://dummy-host/", } responses.add(responses.POST, - 'http://dummy-spnv2/save', - status=200, - body=json.dumps({"url": TARGET, "job_id": JOB_ID})) + 'http://dummy-spnv2/save', + status=200, + body=json.dumps({ + "url": TARGET, + "job_id": JOB_ID + })) responses.add(responses.GET, - 'http://dummy-spnv2/save/status/' + JOB_ID, - status=200, - body=json.dumps(PENDING_BODY)) + 'http://dummy-spnv2/save/status/' + JOB_ID, + status=200, + body=json.dumps(PENDING_BODY)) responses.add(responses.GET, - 'http://dummy-spnv2/save/status/' + JOB_ID, - status=200, - body=json.dumps(SUCCESS_BODY)) + 'http://dummy-spnv2/save/status/' + JOB_ID, + status=200, + body=json.dumps(SUCCESS_BODY)) responses.add(responses.GET, - 'http://dummy-cdx/cdx', - status=200, - body=json.dumps(CDX_SPN_HIT)) + 'http://dummy-cdx/cdx', + status=200, + body=json.dumps(CDX_SPN_HIT)) responses.add(responses.GET, - 'https://web.archive.org/web/{}id_/{}'.format("20180326070330", TARGET + "/redirect"), - status=200, - headers={"X-Archive-Src": "liveweb-whatever.warc.gz"}, - body=pdf_bytes) + 'https://web.archive.org/web/{}id_/{}'.format("20180326070330", + TARGET + "/redirect"), + status=200, + headers={"X-Archive-Src": "liveweb-whatever.warc.gz"}, + body=pdf_bytes) responses.add(responses.GET, - 'http://dummy-postgrest/grobid?sha1hex=eq.{}'.format("90ffd2359008d82298821d16b21778c5c39aec36"), - status=200, - body=json.dumps([])) + 'http://dummy-postgrest/grobid?sha1hex=eq.{}'.format( + "90ffd2359008d82298821d16b21778c5c39aec36"), + status=200, + body=json.dumps([])) responses.add(responses.GET, - 'http://dummy-postgrest/pdf_meta?sha1hex=eq.{}'.format("90ffd2359008d82298821d16b21778c5c39aec36"), - status=200, - body=json.dumps([])) + 'http://dummy-postgrest/pdf_meta?sha1hex=eq.{}'.format( + "90ffd2359008d82298821d16b21778c5c39aec36"), + status=200, + body=json.dumps([])) responses.add(responses.POST, - 'http://dummy-grobid/api/processFulltextDocument', status=200, - body=REAL_TEI_XML, content_type='text/xml') + 'http://dummy-grobid/api/processFulltextDocument', + status=200, + body=REAL_TEI_XML, + content_type='text/xml') resp = ingest_worker_pdf.process(request) @@ -108,6 +110,7 @@ def test_ingest_success(ingest_worker_pdf): assert resp['pdf_meta']['pdf_extra']['page_count'] == 1 assert resp['pdf_meta'].get('text') is None + @responses.activate def test_ingest_landing(ingest_worker): @@ -116,34 +119,39 @@ def test_ingest_landing(ingest_worker): 'base_url': "http://dummy-host/", } responses.add(responses.POST, - 'http://dummy-spnv2/save', - status=200, - body=json.dumps({"url": TARGET, "job_id": JOB_ID})) + 'http://dummy-spnv2/save', + status=200, + body=json.dumps({ + "url": TARGET, + "job_id": JOB_ID + })) responses.add(responses.GET, - 'http://dummy-spnv2/save/status/' + JOB_ID, - status=200, - body=json.dumps(PENDING_BODY)) + 'http://dummy-spnv2/save/status/' + JOB_ID, + status=200, + body=json.dumps(PENDING_BODY)) responses.add(responses.GET, - 'http://dummy-spnv2/save/status/' + JOB_ID, - status=200, - body=json.dumps(SUCCESS_BODY)) + 'http://dummy-spnv2/save/status/' + JOB_ID, + status=200, + body=json.dumps(SUCCESS_BODY)) responses.add(responses.GET, - 'http://dummy-cdx/cdx', - status=200, - body=json.dumps(CDX_SPN_HIT)) + 'http://dummy-cdx/cdx', + status=200, + body=json.dumps(CDX_SPN_HIT)) responses.add(responses.GET, - 'https://web.archive.org/web/{}id_/{}'.format("20180326070330", TARGET + "/redirect"), - status=200, - headers={"X-Archive-Src": "liveweb-whatever.warc.gz"}, - body=WARC_BODY) + 'https://web.archive.org/web/{}id_/{}'.format("20180326070330", + TARGET + "/redirect"), + status=200, + headers={"X-Archive-Src": "liveweb-whatever.warc.gz"}, + body=WARC_BODY) # this is for second time around; don't want to fetch same landing page # HTML again and result in a loop responses.add(responses.GET, - 'https://web.archive.org/web/{}id_/{}'.format("20180326070330", TARGET + "/redirect"), - status=200, - headers={"X-Archive-Src": "liveweb-whatever.warc.gz"}, - body="<html></html>") + 'https://web.archive.org/web/{}id_/{}'.format("20180326070330", + TARGET + "/redirect"), + status=200, + headers={"X-Archive-Src": "liveweb-whatever.warc.gz"}, + body="<html></html>") resp = ingest_worker.process(request) @@ -157,6 +165,7 @@ def test_ingest_landing(ingest_worker): assert 'revisit_cdx' not in resp assert 'grobid' not in resp + @responses.activate def test_ingest_blocklist(ingest_worker): @@ -192,6 +201,7 @@ def test_ingest_wall_blocklist(ingest_worker): assert resp['status'] == "skip-wall" assert resp['request'] == request + @responses.activate def test_ingest_cookie_blocklist(ingest_worker): @@ -205,4 +215,3 @@ def test_ingest_cookie_blocklist(ingest_worker): assert resp['hit'] == False assert resp['status'] == "blocked-cookie" assert resp['request'] == request - diff --git a/python/tests/test_live_wayback.py b/python/tests/test_live_wayback.py index b501dc3..0ff4902 100644 --- a/python/tests/test_live_wayback.py +++ b/python/tests/test_live_wayback.py @@ -1,4 +1,3 @@ - """ This file contains tests to run against "live" wayback services. They default to "skip" because you need authentication, and we shouldn't hit these services @@ -11,8 +10,8 @@ import json import pytest -from sandcrawler import (CdxApiClient, CdxApiError, CdxPartial, PetaboxError, SavePageNowClient, SavePageNowError, - WaybackClient, WaybackError, gen_file_metadata) +from sandcrawler import (CdxApiClient, CdxApiError, CdxPartial, PetaboxError, SavePageNowClient, + SavePageNowError, WaybackClient, WaybackError, gen_file_metadata) @pytest.fixture @@ -20,16 +19,19 @@ def cdx_client(): client = CdxApiClient() return client + @pytest.fixture def wayback_client(): client = WaybackClient() return client + @pytest.fixture def spn_client(): client = SavePageNowClient() return client + @pytest.mark.skip(reason="hits prod services, requires auth") def test_cdx_fetch(cdx_client): @@ -50,6 +52,7 @@ def test_cdx_fetch(cdx_client): with pytest.raises(KeyError): resp = cdx_client.fetch(url, "12345678123456") + @pytest.mark.skip(reason="hits prod services, requires auth") def test_cdx_lookup_best(cdx_client): @@ -68,13 +71,18 @@ def test_cdx_lookup_best(cdx_client): assert resp.mimetype == "text/html" assert resp.status_code == 200 + @pytest.mark.skip(reason="hits prod services, requires auth") def test_wayback_fetch(wayback_client): - resp = wayback_client.fetch_petabox(25683, 2676464871, "archiveteam_archivebot_go_20171205210002/arstechnica.co.uk-inf-20171201-061309-bb65j-00021.warc.gz") + resp = wayback_client.fetch_petabox( + 25683, 2676464871, + "archiveteam_archivebot_go_20171205210002/arstechnica.co.uk-inf-20171201-061309-bb65j-00021.warc.gz" + ) assert resp.body + @pytest.mark.skip(reason="hits prod services, requires auth") def test_lookup_resource_success(wayback_client): @@ -86,6 +94,7 @@ def test_lookup_resource_success(wayback_client): assert resp.terminal_url in (url, url.replace("https://", "http://")) assert resp.cdx.url in (url, url.replace("https://", "http://")) + @pytest.mark.skip(reason="hits prod services, requires auth") def test_cdx_fetch_spn2(cdx_client): @@ -107,8 +116,8 @@ def test_cdx_fetch_spn2(cdx_client): # https://onlinelibrary.wiley.com/doi/pdf/10.1002/lrh2.10209 20200110222410 #com,wiley,onlinelibrary)/doi/pdf/10.1002/lrh2.10209 20200110222410 https://onlinelibrary.wiley.com/doi/pdf/10.1002/lrh2.10209 text/html 200 VYW7JXFK6EC2KC537N5B7PHYZC4B6MZL - - 9006 815069841 liveweb-20200110214015-wwwb-spn18.us.archive.org-8002.warc.gz -#com,wiley,onlinelibrary)/doi/pdf/10.1002/lrh2.10209 20200110222410 https://onlinelibrary.wiley.com/doi/pdf/10.1002/lrh2.10209 text/html 302 AFI55BZE23HDTTEERUFKRP6WQVO3LOLS - - 1096 815066572 liveweb-20200110214015-wwwb-spn18.us.archive.org-8002.warc.gz -#com,wiley,onlinelibrary)/doi/pdf/10.1002/lrh2.10209 20200110222422 https://onlinelibrary.wiley.com/doi/pdf/10.1002/lrh2.10209 text/html 302 AFI55BZE23HDTTEERUFKRP6WQVO3LOLS - - 1094 307563475 liveweb-20200110214449-wwwb-spn18.us.archive.org-8003.warc.gz + #com,wiley,onlinelibrary)/doi/pdf/10.1002/lrh2.10209 20200110222410 https://onlinelibrary.wiley.com/doi/pdf/10.1002/lrh2.10209 text/html 302 AFI55BZE23HDTTEERUFKRP6WQVO3LOLS - - 1096 815066572 liveweb-20200110214015-wwwb-spn18.us.archive.org-8002.warc.gz + #com,wiley,onlinelibrary)/doi/pdf/10.1002/lrh2.10209 20200110222422 https://onlinelibrary.wiley.com/doi/pdf/10.1002/lrh2.10209 text/html 302 AFI55BZE23HDTTEERUFKRP6WQVO3LOLS - - 1094 307563475 liveweb-20200110214449-wwwb-spn18.us.archive.org-8003.warc.gz url = "https://onlinelibrary.wiley.com/doi/pdf/10.1002/lrh2.10209" datetime = "20200110222410" @@ -119,6 +128,7 @@ def test_cdx_fetch_spn2(cdx_client): assert resp.sha1b32 == "VYW7JXFK6EC2KC537N5B7PHYZC4B6MZL" assert resp.status_code == 200 + @pytest.mark.skip(reason="hits prod services, requires auth") def test_lookup_ftp(wayback_client): # ftp://ftp.ncbi.nlm.nih.gov/pub/pmc/oa_pdf/80/23/10.1177_1559827617708562.PMC6236633.pdf @@ -153,6 +163,7 @@ def test_lookup_ftp(wayback_client): file_meta = gen_file_metadata(resp.body) assert file_meta['sha1hex'] == resp.cdx.sha1hex + @pytest.mark.skip(reason="hits prod services, requires auth") def test_crawl_ftp(spn_client, wayback_client): diff --git a/python/tests/test_misc.py b/python/tests/test_misc.py index 0788c38..dcc1202 100644 --- a/python/tests/test_misc.py +++ b/python/tests/test_misc.py @@ -1,11 +1,11 @@ - import pytest -from sandcrawler import b32_hex, clean_url, gen_file_metadata, gen_file_metadata_path, parse_cdx_line +from sandcrawler import (b32_hex, clean_url, gen_file_metadata, gen_file_metadata_path, + parse_cdx_line) def test_gen_file_metadata(): - + # valid (but very small) PDF file with open('tests/files/dummy.pdf', 'rb') as f: file_meta = gen_file_metadata(f.read()) @@ -27,8 +27,9 @@ def test_gen_file_metadata(): assert fm['mimetype'] == 'text/plain' assert fm['size_bytes'] == 8 + def test_gen_file_metadata_path(): - + # valid (but very small) PDF file file_meta = gen_file_metadata_path('tests/files/dummy.pdf') assert file_meta == { @@ -39,11 +40,14 @@ def test_gen_file_metadata_path(): 'size_bytes': 13264, } + def test_b32_hex(): # valid b32 - assert b32_hex('sha1:TZCYZ2ULEHYGESS4L3RNH75I23KKFSMC') == '9e458cea8b21f0624a5c5ee2d3ffa8d6d4a2c982' - assert b32_hex('TZCYZ2ULEHYGESS4L3RNH75I23KKFSMC') == '9e458cea8b21f0624a5c5ee2d3ffa8d6d4a2c982' + assert b32_hex( + 'sha1:TZCYZ2ULEHYGESS4L3RNH75I23KKFSMC') == '9e458cea8b21f0624a5c5ee2d3ffa8d6d4a2c982' + assert b32_hex( + 'TZCYZ2ULEHYGESS4L3RNH75I23KKFSMC') == '9e458cea8b21f0624a5c5ee2d3ffa8d6d4a2c982' # sha1hex pass-through s = 'bda3c1017d52e826bbd1da51efad877272d300f9' @@ -53,6 +57,7 @@ def test_b32_hex(): with pytest.raises(ValueError): assert b32_hex('blah') == 'blah' + def test_parse_cdx_line(): raw = "edu,upenn,ldc)/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf 20170828233154 https://www.ldc.upenn.edu/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf application/pdf 200 WL3FEA62TEU4F52Y5DOVQ62VET4QJW7G - - 210251 931661233 SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828231135742-00000-00009-wbgrp-svc284/SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828232253025-00005-3480~wbgrp-svc284.us.archive.org~8443.warc.gz" @@ -73,6 +78,7 @@ def test_parse_cdx_line(): assert parse_cdx_line(raw + "\n") == correct assert parse_cdx_line(raw + " extra_field") == correct + def test_invalid_cdx(): print("missing warc") @@ -80,11 +86,11 @@ def test_invalid_cdx(): assert parse_cdx_line(raw) == None print("bad datetime") - raw = "edu,upenn,ldc)/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf 2070828233154 https://www.ldc.upenn.edu/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf application/pdf 200 WL3FEA62TEU4F52Y5DOVQ62VET4QJW7G - - 210251 931661233i SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828231135742-00000-00009-wbgrp-svc284/SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828232253025-00005-3480~wbgrp-svc284.us.archive.org~8443.warc.gz" + raw = "edu,upenn,ldc)/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf 2070828233154 https://www.ldc.upenn.edu/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf application/pdf 200 WL3FEA62TEU4F52Y5DOVQ62VET4QJW7G - - 210251 931661233i SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828231135742-00000-00009-wbgrp-svc284/SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828232253025-00005-3480~wbgrp-svc284.us.archive.org~8443.warc.gz" assert parse_cdx_line(raw) == None + def test_clean_url(): assert clean_url("http://BLAH.COM/file.pdf") == "http://blah.com/file.pdf" assert clean_url("https://opensky.ucar.edu:/islandora/object/articles%3A10809/datastream/PDF/view") == \ "https://opensky.ucar.edu/islandora/object/articles%3A10809/datastream/PDF/view" - diff --git a/python/tests/test_pdfextract.py b/python/tests/test_pdfextract.py index 1d334d6..146b138 100644 --- a/python/tests/test_pdfextract.py +++ b/python/tests/test_pdfextract.py @@ -1,4 +1,3 @@ - import struct import poppler @@ -6,11 +5,13 @@ import pytest import responses from test_wayback import cdx_client, wayback_client -from sandcrawler import BlackholeSink, CdxLinePusher, PdfExtractBlobWorker, PdfExtractWorker, WaybackClient +from sandcrawler import (BlackholeSink, CdxLinePusher, PdfExtractBlobWorker, PdfExtractWorker, + WaybackClient) from sandcrawler.pdfextract import process_pdf FAKE_PDF_BYTES = b"%PDF SOME JUNK" + struct.pack("!q", 112853843) + def test_process_fake_pdf(): resp = process_pdf(FAKE_PDF_BYTES) print(resp) @@ -21,7 +22,9 @@ def test_process_fake_pdf(): resp = process_pdf(pdf_bytes) assert resp.status == 'not-pdf' -@pytest.mark.skipif(poppler.version_string() == '0.71.0', reason="unsupported version of poppler") + +@pytest.mark.skipif(poppler.version_string() == '0.71.0', + reason="unsupported version of poppler") def test_process_dummy_pdf(): with open('tests/files/dummy.pdf', 'rb') as f: pdf_bytes = f.read() @@ -39,6 +42,7 @@ def test_process_dummy_pdf(): assert resp.pdf_extra['page0_width'] == 595 assert resp.pdf_extra['page_count'] == 1 + def test_pdfextract_worker_cdx(wayback_client): sink = BlackholeSink() @@ -56,6 +60,7 @@ def test_pdfextract_worker_cdx(wayback_client): assert pusher_counts['pushed'] == 7 assert pusher_counts['pushed'] == worker.counts['total'] + def test_pdfextract_blob_worker(): sink = BlackholeSink() @@ -65,4 +70,3 @@ def test_pdfextract_blob_worker(): pdf_bytes = f.read() worker.process(pdf_bytes) - diff --git a/python/tests/test_pushers.py b/python/tests/test_pushers.py index 62fa515..63f90d3 100644 --- a/python/tests/test_pushers.py +++ b/python/tests/test_pushers.py @@ -1,4 +1,3 @@ - import pytest from sandcrawler.workers import BlackholeSink, CdxLinePusher @@ -18,8 +17,10 @@ def test_cdx_line_pusher(): # HTTP 200 and application/pdf with open('tests/files/example.cdx', 'r') as cdx_file: - pusher = CdxLinePusher(sink, cdx_file, - filter_mimetypes=['application/pdf'], filter_http_statuses=[200, 226]) + pusher = CdxLinePusher(sink, + cdx_file, + filter_mimetypes=['application/pdf'], + filter_http_statuses=[200, 226]) counts = pusher.run() assert counts['total'] == 20 assert counts['skip-parse'] == 1 diff --git a/python/tests/test_savepagenow.py b/python/tests/test_savepagenow.py index f3fbfda..80334d9 100644 --- a/python/tests/test_savepagenow.py +++ b/python/tests/test_savepagenow.py @@ -1,4 +1,3 @@ - import json import pytest @@ -26,9 +25,7 @@ SUCCESS_BODY = { "timestamp": "20180326070330", "duration_sec": 6.203, "resources": [ - TARGET, - TARGET + "/redirect", - "http://brewster.kahle.org/", + TARGET, TARGET + "/redirect", "http://brewster.kahle.org/", "http://brewster.kahle.org/favicon.ico", "http://brewster.kahle.org/files/2011/07/bkheader-follow.jpg", "http://brewster.kahle.org/files/2016/12/amazon-unhappy.jpg", @@ -43,8 +40,7 @@ SUCCESS_BODY = { "http://brewster.kahle.org/wp-content/themes/twentyten/style.css", "http://brewster.kahle.org/wp-includes/js/wp-embed.min.js?ver=4.9.4", "http://brewster.kahle.org/wp-includes/js/wp-emoji-release.min.js?ver=4.9.4", - "http://platform.twitter.com/widgets.js", - "https://archive-it.org/piwik.js", + "http://platform.twitter.com/widgets.js", "https://archive-it.org/piwik.js", "https://platform.twitter.com/jot.html", "https://platform.twitter.com/js/button.556f0ea0e4da4e66cfdc182016dbd6db.js", "https://platform.twitter.com/widgets/follow_button.f47a2e0b4471326b6fa0f163bda46011.en.html", @@ -60,7 +56,7 @@ SUCCESS_BODY = { "https://www.syndikat.org/wp-includes/js/jquery/jquery.js?ver=1.12.4", "https://www.syndikat.org/wp-includes/js/wp-emoji-release.min.js?ver=4.9.4" ], - "outlinks":{ + "outlinks": { "https://archive.org/": "xxxxxx89b-f3ca-48d0-9ea6-1d1225e98695", "https://other.com": "yyyy89b-f3ca-48d0-9ea6-1d1225e98695" } @@ -74,10 +70,18 @@ ERROR_BODY = { "resources": [] } CDX_SPN_HIT = [ - ["urlkey","timestamp","original","mimetype","statuscode","digest","redirect","robotflags","length","offset","filename"], - ["wiki,fatcat)/", "20180326070330", TARGET + "/redirect", "application/pdf", "200", CDX_BEST_SHA1B32, "-", "-", "8445", "108062304", "liveweb-20200108215212-wwwb-spn04.us.archive.org-kols1pud.warc.gz"], + [ + "urlkey", "timestamp", "original", "mimetype", "statuscode", "digest", "redirect", + "robotflags", "length", "offset", "filename" + ], + [ + "wiki,fatcat)/", "20180326070330", TARGET + "/redirect", "application/pdf", "200", + CDX_BEST_SHA1B32, "-", "-", "8445", "108062304", + "liveweb-20200108215212-wwwb-spn04.us.archive.org-kols1pud.warc.gz" + ], ] + @pytest.fixture def spn_client(): client = SavePageNowClient( @@ -88,25 +92,29 @@ def spn_client(): client.poll_seconds = 0.0 return client + @responses.activate def test_savepagenow_success(spn_client): responses.add(responses.POST, - 'http://dummy-spnv2/save', - status=200, - body=json.dumps({"url": TARGET, "job_id": JOB_ID})) + 'http://dummy-spnv2/save', + status=200, + body=json.dumps({ + "url": TARGET, + "job_id": JOB_ID + })) responses.add(responses.GET, - 'http://dummy-spnv2/save/status/' + JOB_ID, - status=200, - body=json.dumps(PENDING_BODY)) + 'http://dummy-spnv2/save/status/' + JOB_ID, + status=200, + body=json.dumps(PENDING_BODY)) responses.add(responses.GET, - 'http://dummy-spnv2/save/status/' + JOB_ID, - status=200, - body=json.dumps(PENDING_BODY)) + 'http://dummy-spnv2/save/status/' + JOB_ID, + status=200, + body=json.dumps(PENDING_BODY)) responses.add(responses.GET, - 'http://dummy-spnv2/save/status/' + JOB_ID, - status=200, - body=json.dumps(SUCCESS_BODY)) + 'http://dummy-spnv2/save/status/' + JOB_ID, + status=200, + body=json.dumps(SUCCESS_BODY)) resp = spn_client.save_url_now_v2(TARGET) @@ -119,21 +127,25 @@ def test_savepagenow_success(spn_client): assert resp.terminal_dt == SUCCESS_BODY['timestamp'] assert resp.resources == SUCCESS_BODY['resources'] + @responses.activate def test_savepagenow_remote_error(spn_client): responses.add(responses.POST, - 'http://dummy-spnv2/save', - status=200, - body=json.dumps({"url": TARGET, "job_id": JOB_ID})) + 'http://dummy-spnv2/save', + status=200, + body=json.dumps({ + "url": TARGET, + "job_id": JOB_ID + })) responses.add(responses.GET, - 'http://dummy-spnv2/save/status/' + JOB_ID, - status=200, - body=json.dumps(PENDING_BODY)) + 'http://dummy-spnv2/save/status/' + JOB_ID, + status=200, + body=json.dumps(PENDING_BODY)) responses.add(responses.GET, - 'http://dummy-spnv2/save/status/' + JOB_ID, - status=200, - body=json.dumps(ERROR_BODY)) + 'http://dummy-spnv2/save/status/' + JOB_ID, + status=200, + body=json.dumps(ERROR_BODY)) resp = spn_client.save_url_now_v2(TARGET) @@ -146,47 +158,56 @@ def test_savepagenow_remote_error(spn_client): assert resp.terminal_dt == None assert resp.resources == None + @responses.activate def test_savepagenow_500(spn_client): responses.add(responses.POST, - 'http://dummy-spnv2/save', - status=200, - body=json.dumps({"url": TARGET, "job_id": JOB_ID})) + 'http://dummy-spnv2/save', + status=200, + body=json.dumps({ + "url": TARGET, + "job_id": JOB_ID + })) responses.add(responses.GET, - 'http://dummy-spnv2/save/status/' + JOB_ID, - status=500, - body=json.dumps(ERROR_BODY)) + 'http://dummy-spnv2/save/status/' + JOB_ID, + status=500, + body=json.dumps(ERROR_BODY)) with pytest.raises(SavePageNowError): resp = spn_client.save_url_now_v2(TARGET) assert len(responses.calls) == 2 + @responses.activate def test_crawl_resource(spn_client, wayback_client): responses.add(responses.POST, - 'http://dummy-spnv2/save', - status=200, - body=json.dumps({"url": TARGET, "job_id": JOB_ID})) + 'http://dummy-spnv2/save', + status=200, + body=json.dumps({ + "url": TARGET, + "job_id": JOB_ID + })) responses.add(responses.GET, - 'http://dummy-spnv2/save/status/' + JOB_ID, - status=200, - body=json.dumps(PENDING_BODY)) + 'http://dummy-spnv2/save/status/' + JOB_ID, + status=200, + body=json.dumps(PENDING_BODY)) responses.add(responses.GET, - 'http://dummy-spnv2/save/status/' + JOB_ID, - status=200, - body=json.dumps(SUCCESS_BODY)) + 'http://dummy-spnv2/save/status/' + JOB_ID, + status=200, + body=json.dumps(SUCCESS_BODY)) responses.add(responses.GET, - 'http://dummy-cdx/cdx', - status=200, - body=json.dumps(CDX_SPN_HIT)) + 'http://dummy-cdx/cdx', + status=200, + body=json.dumps(CDX_SPN_HIT)) responses.add(responses.GET, - 'https://web.archive.org/web/{}id_/{}'.format("20180326070330", TARGET + "/redirect"), - status=200, - headers={"X-Archive-Src": "liveweb-whatever.warc.gz"}, - body=WARC_BODY) + 'https://web.archive.org/web/{}id_/{}'.format("20180326070330", + TARGET + "/redirect"), + status=200, + headers={"X-Archive-Src": "liveweb-whatever.warc.gz"}, + body=WARC_BODY) print('https://web.archive.org/web/{}id_/{}'.format("20180326070330", TARGET + "/redirect")) resp = spn_client.crawl_resource(TARGET, wayback_client) @@ -201,4 +222,3 @@ def test_crawl_resource(spn_client, wayback_client): assert type(resp.cdx) == CdxPartial with pytest.raises(AttributeError): print(resp.cdx.warc_path) - diff --git a/python/tests/test_wayback.py b/python/tests/test_wayback.py index 83311b9..6ccf775 100644 --- a/python/tests/test_wayback.py +++ b/python/tests/test_wayback.py @@ -1,4 +1,3 @@ - import json import pytest @@ -10,27 +9,66 @@ CDX_TARGET = "http://fatcat.wiki/" CDX_DT = "20180812220054" # cdx -m exact -p output=json -p from=20180812220054 -p to=20180812220054 http://fatcat.wiki/ CDX_SINGLE_HIT = [ - ["urlkey","timestamp","original","mimetype","statuscode","digest","redirect","robotflags","length","offset","filename"], - ["wiki,fatcat)/", CDX_DT, CDX_TARGET, "text/html", "200", "O5RHV6OQ7SIHDJIEP7ZW53DLRX5NFIJR", "-", "-", "8445", "108062304", "WIDE-20180810142205-crawl802/WIDE-20180812131623-00059.warc.gz"], + [ + "urlkey", "timestamp", "original", "mimetype", "statuscode", "digest", "redirect", + "robotflags", "length", "offset", "filename" + ], + [ + "wiki,fatcat)/", CDX_DT, CDX_TARGET, "text/html", "200", + "O5RHV6OQ7SIHDJIEP7ZW53DLRX5NFIJR", "-", "-", "8445", "108062304", + "WIDE-20180810142205-crawl802/WIDE-20180812131623-00059.warc.gz" + ], ] CDX_BEST_SHA1B32 = "AAAAAAAAASIHDJIEP7ZW53DLRX5NFIJR" # cdx -m exact -p output=json -p from=20180812220054 -p to=20180812220054 http://fatcat.wiki/ CDX_MULTI_HIT = [ - ["urlkey","timestamp","original","mimetype","statuscode","digest","redirect","robotflags","length","offset","filename"], - ["wiki,fatcat)/", CDX_DT, CDX_TARGET, "text/html", "200", "O5RHV6OQ7SIHDJIEP7ZW53DLRX5NFIJR", "-", "-", "8445", "108062304", "WIDE-20180810142205-crawl802/WIDE-20180812131623-00059.warc.gz"], - # sooner, but not right mimetype - ["wiki,fatcat)/", "20180912220054", CDX_TARGET, "text/html", "200", "O5RHV6OQ7SIHDJIEP7ZW53DLRX5NFIJR", "-", "-", "8445", "108062304", "WIDE-20180810142205-crawl802/WIDE-20180812131623-00059.warc.gz"], - # sooner and mimetype, but wrong status code - ["wiki,fatcat)/", "20180912220054", CDX_TARGET, "application/pdf", "400", "O5RHV6OQ7SIHDJIEP7ZW53DLRX5NFIJR", "-", "-", "8445", "108062304", "WIDE-20180810142205-crawl802/WIDE-20180812131623-00059.warc.gz"], - ["wiki,fatcat)/", "20180912220054", CDX_TARGET, "application/pdf", "500", "O5RHV6OQ7SIHDJIEP7ZW53DLRX5NFIJR", "-", "-", "8445", "108062304", "WIDE-20180810142205-crawl802/WIDE-20180812131623-00059.warc.gz"], - ["wiki,fatcat)/", "20180912220054", CDX_TARGET, "application/pdf", "150", "O5RHV6OQ7SIHDJIEP7ZW53DLRX5NFIJR", "-", "-", "8445", "108062304", "WIDE-20180810142205-crawl802/WIDE-20180812131623-00059.warc.gz"], - # "best" - ["wiki,fatcat)/", CDX_DT, CDX_TARGET, "application/pdf", "200", CDX_BEST_SHA1B32, "-", "-", "8445", "108062304", "WIDE-20180810142205-crawl802/WIDE-20180812131623-00059.warc.gz"], - # older - ["wiki,fatcat)/", "20180712220054", CDX_TARGET, "application/pdf", "200", "O5RHV6OQ7SIHDJIEP7ZW53DLRX5NFIJR", "-", "-", "8445", "108062304", "WIDE-20180810142205-crawl802/WIDE-20180812131623-00059.warc.gz"], + [ + "urlkey", "timestamp", "original", "mimetype", "statuscode", "digest", "redirect", + "robotflags", "length", "offset", "filename" + ], + [ + "wiki,fatcat)/", CDX_DT, CDX_TARGET, "text/html", "200", + "O5RHV6OQ7SIHDJIEP7ZW53DLRX5NFIJR", "-", "-", "8445", "108062304", + "WIDE-20180810142205-crawl802/WIDE-20180812131623-00059.warc.gz" + ], + # sooner, but not right mimetype + [ + "wiki,fatcat)/", "20180912220054", CDX_TARGET, "text/html", "200", + "O5RHV6OQ7SIHDJIEP7ZW53DLRX5NFIJR", "-", "-", "8445", "108062304", + "WIDE-20180810142205-crawl802/WIDE-20180812131623-00059.warc.gz" + ], + # sooner and mimetype, but wrong status code + [ + "wiki,fatcat)/", "20180912220054", CDX_TARGET, "application/pdf", "400", + "O5RHV6OQ7SIHDJIEP7ZW53DLRX5NFIJR", "-", "-", "8445", "108062304", + "WIDE-20180810142205-crawl802/WIDE-20180812131623-00059.warc.gz" + ], + [ + "wiki,fatcat)/", "20180912220054", CDX_TARGET, "application/pdf", "500", + "O5RHV6OQ7SIHDJIEP7ZW53DLRX5NFIJR", "-", "-", "8445", "108062304", + "WIDE-20180810142205-crawl802/WIDE-20180812131623-00059.warc.gz" + ], + [ + "wiki,fatcat)/", "20180912220054", CDX_TARGET, "application/pdf", "150", + "O5RHV6OQ7SIHDJIEP7ZW53DLRX5NFIJR", "-", "-", "8445", "108062304", + "WIDE-20180810142205-crawl802/WIDE-20180812131623-00059.warc.gz" + ], + # "best" + [ + "wiki,fatcat)/", CDX_DT, CDX_TARGET, "application/pdf", "200", CDX_BEST_SHA1B32, "-", + "-", "8445", "108062304", + "WIDE-20180810142205-crawl802/WIDE-20180812131623-00059.warc.gz" + ], + # older + [ + "wiki,fatcat)/", "20180712220054", CDX_TARGET, "application/pdf", "200", + "O5RHV6OQ7SIHDJIEP7ZW53DLRX5NFIJR", "-", "-", "8445", "108062304", + "WIDE-20180810142205-crawl802/WIDE-20180812131623-00059.warc.gz" + ], ] + @pytest.fixture def cdx_client(): client = CdxApiClient( @@ -39,13 +77,14 @@ def cdx_client(): ) return client + @responses.activate def test_cdx_fetch(cdx_client): responses.add(responses.GET, - 'http://dummy-cdx/cdx', - status=200, - body=json.dumps(CDX_SINGLE_HIT)) + 'http://dummy-cdx/cdx', + status=200, + body=json.dumps(CDX_SINGLE_HIT)) resp = cdx_client.fetch(CDX_TARGET, CDX_DT) @@ -58,6 +97,7 @@ def test_cdx_fetch(cdx_client): assert resp.warc_offset == 108062304 assert resp.warc_path == "WIDE-20180810142205-crawl802/WIDE-20180812131623-00059.warc.gz" + @responses.activate def test_cdx_fetch_errors(cdx_client): @@ -65,9 +105,9 @@ def test_cdx_fetch_errors(cdx_client): resp = cdx_client.fetch(CDX_TARGET, "2019") responses.add(responses.GET, - 'http://dummy-cdx/cdx', - status=200, - body=json.dumps(CDX_SINGLE_HIT)) + 'http://dummy-cdx/cdx', + status=200, + body=json.dumps(CDX_SINGLE_HIT)) with pytest.raises(KeyError): resp = cdx_client.fetch(CDX_TARGET, "20180812220055") @@ -78,13 +118,14 @@ def test_cdx_fetch_errors(cdx_client): resp = cdx_client.fetch(CDX_TARGET, CDX_DT) assert len(responses.calls) == 3 + @responses.activate def test_cdx_lookup_best(cdx_client): responses.add(responses.GET, - 'http://dummy-cdx/cdx', - status=200, - body=json.dumps(CDX_MULTI_HIT)) + 'http://dummy-cdx/cdx', + status=200, + body=json.dumps(CDX_MULTI_HIT)) resp = cdx_client.lookup_best(CDX_TARGET, best_mimetype="application/pdf") @@ -95,6 +136,7 @@ def test_cdx_lookup_best(cdx_client): assert resp.sha1b32 == CDX_BEST_SHA1B32 assert resp.warc_path == CDX_SINGLE_HIT[1][-1] + WARC_TARGET = "http://fatcat.wiki/" WARC_BODY = b""" <html> @@ -108,6 +150,7 @@ WARC_BODY = b""" </html> """ + @pytest.fixture def wayback_client(cdx_client, mocker): client = WaybackClient( @@ -127,6 +170,7 @@ def wayback_client(cdx_client, mocker): return client + @pytest.fixture def wayback_client_pdf(cdx_client, mocker): @@ -150,6 +194,7 @@ def wayback_client_pdf(cdx_client, mocker): return client + @responses.activate def test_wayback_fetch(wayback_client): resp = wayback_client.fetch_petabox(123, 456789, "here/there.warc.gz") @@ -159,13 +204,14 @@ def test_wayback_fetch(wayback_client): resp = wayback_client.fetch_petabox_body(123, 456789, "here/there.warc.gz") assert resp == WARC_BODY + @responses.activate def test_lookup_resource_success(wayback_client): responses.add(responses.GET, - 'http://dummy-cdx/cdx', - status=200, - body=json.dumps(CDX_MULTI_HIT)) + 'http://dummy-cdx/cdx', + status=200, + body=json.dumps(CDX_MULTI_HIT)) resp = wayback_client.lookup_resource(CDX_TARGET) diff --git a/python/tests/test_xml.py b/python/tests/test_xml.py index a996c56..1742f3a 100644 --- a/python/tests/test_xml.py +++ b/python/tests/test_xml.py @@ -1,11 +1,10 @@ - import pytest from sandcrawler.xml import xml_reserialize def test_xml_reserialize() -> None: - + with open('tests/files/scielo_article.jats.xml', 'rb') as f: raw_xml = f.read() |