aboutsummaryrefslogtreecommitdiffstats
path: root/python
diff options
context:
space:
mode:
Diffstat (limited to 'python')
-rwxr-xr-xpython/grobid2json.py12
-rwxr-xr-xpython/grobid_tool.py64
-rwxr-xr-xpython/ia_pdf_match.py12
-rwxr-xr-xpython/ingest_tool.py71
-rwxr-xr-xpython/pdfextract_tool.py74
-rwxr-xr-xpython/pdftrio_tool.py82
-rwxr-xr-xpython/persist_tool.py125
-rw-r--r--python/sandcrawler/__init__.py18
-rw-r--r--python/sandcrawler/db.py150
-rw-r--r--python/sandcrawler/fileset_platforms.py169
-rw-r--r--python/sandcrawler/fileset_strategies.py68
-rw-r--r--python/sandcrawler/fileset_types.py6
-rw-r--r--python/sandcrawler/grobid.py30
-rw-r--r--python/sandcrawler/html.py49
-rw-r--r--python/sandcrawler/html_metadata.py37
-rw-r--r--python/sandcrawler/ia.py192
-rw-r--r--python/sandcrawler/ingest_file.py145
-rw-r--r--python/sandcrawler/ingest_fileset.py101
-rw-r--r--python/sandcrawler/ingest_html.py120
-rw-r--r--python/sandcrawler/minio.py2
-rw-r--r--python/sandcrawler/misc.py64
-rw-r--r--python/sandcrawler/pdfextract.py29
-rw-r--r--python/sandcrawler/pdftrio.py10
-rw-r--r--python/sandcrawler/persist.py61
-rw-r--r--python/sandcrawler/workers.py60
-rw-r--r--python/sandcrawler/xml.py1
-rwxr-xr-xpython/sandcrawler_worker.py151
-rwxr-xr-xpython/scripts/arabesque2ingestrequest.py33
-rwxr-xr-xpython/scripts/archiveorg_fileset.py39
-rwxr-xr-xpython/scripts/cdx_collection.py24
-rwxr-xr-xpython/scripts/covid2ingestrequest.py12
-rwxr-xr-xpython/scripts/deliver_dumpgrobid_to_s3.py11
-rwxr-xr-xpython/scripts/deliver_gwb_to_disk.py71
-rwxr-xr-xpython/scripts/deliver_gwb_to_s3.py66
-rwxr-xr-xpython/scripts/doaj2ingestrequest.py15
-rwxr-xr-xpython/scripts/enrich_scored_matches.py14
-rwxr-xr-xpython/scripts/filter_grobid_metadata.py18
-rwxr-xr-xpython/scripts/filter_groupworks.py8
-rwxr-xr-xpython/scripts/filter_scored_matches.py7
-rwxr-xr-xpython/scripts/grobid_affiliations.py6
-rwxr-xr-xpython/scripts/import_grobid_metadata.py31
-rwxr-xr-xpython/scripts/ingestrequest_row2json.py11
-rwxr-xr-xpython/scripts/manifest_converter.py5
-rwxr-xr-xpython/scripts/oai2ingestrequest.py19
-rwxr-xr-xpython/scripts/pdf_thumbnail.py7
-rwxr-xr-xpython/scripts/unpaywall2ingestrequest.py19
-rw-r--r--python/tests/test_grobid.py27
-rw-r--r--python/tests/test_grobid2json.py6
-rw-r--r--python/tests/test_html.py10
-rw-r--r--python/tests/test_html_ingest.py1
-rw-r--r--python/tests/test_html_metadata.py32
-rw-r--r--python/tests/test_ingest.py119
-rw-r--r--python/tests/test_live_wayback.py23
-rw-r--r--python/tests/test_misc.py22
-rw-r--r--python/tests/test_pdfextract.py12
-rw-r--r--python/tests/test_pushers.py7
-rw-r--r--python/tests/test_savepagenow.py126
-rw-r--r--python/tests/test_wayback.py100
-rw-r--r--python/tests/test_xml.py3
59 files changed, 1582 insertions, 1225 deletions
diff --git a/python/grobid2json.py b/python/grobid2json.py
index b4bfe2b..0d47f36 100755
--- a/python/grobid2json.py
+++ b/python/grobid2json.py
@@ -1,5 +1,4 @@
#!/usr/bin/env python3
-
"""
NB: adapted to work as a library for PDF extraction. Will probably be
re-written eventually to be correct, complete, and robust; this is just a
@@ -76,9 +75,7 @@ def all_authors(elem: Optional[ET.Element]) -> List[Dict[str, Any]]:
def journal_info(elem: ET.Element) -> Dict[str, Any]:
journal = dict()
journal["name"] = elem.findtext(".//{%s}monogr/{%s}title" % (ns, ns))
- journal["publisher"] = elem.findtext(
- ".//{%s}publicationStmt/{%s}publisher" % (ns, ns)
- )
+ journal["publisher"] = elem.findtext(".//{%s}publicationStmt/{%s}publisher" % (ns, ns))
if journal["publisher"] == "":
journal["publisher"] = None
journal["issn"] = elem.findtext('.//{%s}idno[@type="ISSN"]' % ns)
@@ -145,9 +142,7 @@ def teixml2json(content: AnyStr, encumbered: bool = True) -> Dict[str, Any]:
info["grobid_version"] = application_tag.attrib["version"].strip()
info["grobid_timestamp"] = application_tag.attrib["when"].strip()
info["title"] = header.findtext(".//{%s}analytic/{%s}title" % (ns, ns))
- info["authors"] = all_authors(
- header.find(".//{%s}sourceDesc/{%s}biblStruct" % (ns, ns))
- )
+ info["authors"] = all_authors(header.find(".//{%s}sourceDesc/{%s}biblStruct" % (ns, ns)))
info["journal"] = journal_info(header)
date = header.find('.//{%s}date[@type="published"]' % ns)
info["date"] = (date is not None) and date.attrib.get("when")
@@ -207,8 +202,7 @@ def main() -> None: # pragma no cover
json.dumps(
teixml2json(content, encumbered=(not args.no_encumbered)),
sort_keys=True,
- )
- )
+ ))
if __name__ == "__main__": # pragma no cover
diff --git a/python/grobid_tool.py b/python/grobid_tool.py
index 0084330..4ba9540 100755
--- a/python/grobid_tool.py
+++ b/python/grobid_tool.py
@@ -1,5 +1,4 @@
#!/usr/bin/env python3
-
"""
These are generally for running one-off tasks from the command line. Output
might go to stdout, or might go to Kafka topic.
@@ -30,6 +29,7 @@ def run_extract_json(args):
pusher = JsonLinePusher(worker, args.json_file)
pusher.run()
+
def run_extract_cdx(args):
grobid_client = GrobidClient(host_url=args.grobid_host)
wayback_client = WaybackClient()
@@ -53,6 +53,7 @@ def run_extract_cdx(args):
)
pusher.run()
+
def run_extract_zipfile(args):
grobid_client = GrobidClient(host_url=args.grobid_host)
if args.jobs > 1:
@@ -65,6 +66,7 @@ def run_extract_zipfile(args):
pusher = ZipfilePusher(worker, args.zip_file)
pusher.run()
+
def run_transform(args):
grobid_client = GrobidClient()
for line in args.json_file:
@@ -82,52 +84,54 @@ def run_transform(args):
def main():
- parser = argparse.ArgumentParser(
- formatter_class=argparse.ArgumentDefaultsHelpFormatter)
+ parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter)
parser.add_argument('--kafka-mode',
- action='store_true',
- help="send output to Kafka (not stdout)")
+ action='store_true',
+ help="send output to Kafka (not stdout)")
parser.add_argument('--kafka-hosts',
- default="localhost:9092",
- help="list of Kafka brokers (host/port) to use")
+ default="localhost:9092",
+ help="list of Kafka brokers (host/port) to use")
parser.add_argument('--kafka-env',
- default="dev",
- help="Kafka topic namespace to use (eg, prod, qa, dev)")
- parser.add_argument('-j', '--jobs',
- default=8, type=int,
- help="parallelism for batch CPU jobs")
+ default="dev",
+ help="Kafka topic namespace to use (eg, prod, qa, dev)")
+ parser.add_argument('-j',
+ '--jobs',
+ default=8,
+ type=int,
+ help="parallelism for batch CPU jobs")
parser.add_argument('--grobid-host',
- default="http://grobid.qa.fatcat.wiki",
- help="GROBID API host/port")
+ default="http://grobid.qa.fatcat.wiki",
+ help="GROBID API host/port")
subparsers = parser.add_subparsers()
- sub_extract_json = subparsers.add_parser('extract-json',
+ sub_extract_json = subparsers.add_parser(
+ 'extract-json',
help="for each JSON line with CDX info, fetches PDF and does GROBID extraction")
sub_extract_json.set_defaults(func=run_extract_json)
sub_extract_json.add_argument('json_file',
- help="JSON file to import from (or '-' for stdin)",
- type=argparse.FileType('r'))
+ help="JSON file to import from (or '-' for stdin)",
+ type=argparse.FileType('r'))
- sub_extract_cdx = subparsers.add_parser('extract-cdx',
- help="for each CDX line, fetches PDF and does GROBID extraction")
+ sub_extract_cdx = subparsers.add_parser(
+ 'extract-cdx', help="for each CDX line, fetches PDF and does GROBID extraction")
sub_extract_cdx.set_defaults(func=run_extract_cdx)
sub_extract_cdx.add_argument('cdx_file',
- help="CDX file to import from (or '-' for stdin)",
- type=argparse.FileType('r'))
+ help="CDX file to import from (or '-' for stdin)",
+ type=argparse.FileType('r'))
- sub_extract_zipfile = subparsers.add_parser('extract-zipfile',
+ sub_extract_zipfile = subparsers.add_parser(
+ 'extract-zipfile',
help="opens zipfile, iterates over PDF files inside and does GROBID extract for each")
sub_extract_zipfile.set_defaults(func=run_extract_zipfile)
- sub_extract_zipfile.add_argument('zip_file',
- help="zipfile with PDFs to extract",
- type=str)
+ sub_extract_zipfile.add_argument('zip_file', help="zipfile with PDFs to extract", type=str)
sub_transform = subparsers.add_parser('transform')
sub_transform.set_defaults(func=run_transform)
sub_transform.add_argument('--metadata-only',
- action='store_true',
- help="Only pass through bibliographic metadata, not fulltext")
- sub_transform.add_argument('json_file',
+ action='store_true',
+ help="Only pass through bibliographic metadata, not fulltext")
+ sub_transform.add_argument(
+ 'json_file',
help="convert TEI-XML to JSON. Input is JSON lines with tei_xml field",
type=argparse.FileType('r'))
@@ -140,10 +144,10 @@ def main():
if args.kafka_mode:
produce_topic = "sandcrawler-{}.grobid-output-pg".format(args.kafka_env)
print("Running in kafka output mode, publishing to {}\n".format(produce_topic))
- args.sink = KafkaCompressSink(kafka_hosts=args.kafka_hosts,
- produce_topic=produce_topic)
+ args.sink = KafkaCompressSink(kafka_hosts=args.kafka_hosts, produce_topic=produce_topic)
args.func(args)
+
if __name__ == '__main__':
main()
diff --git a/python/ia_pdf_match.py b/python/ia_pdf_match.py
index 137110c..c3d9c16 100755
--- a/python/ia_pdf_match.py
+++ b/python/ia_pdf_match.py
@@ -1,5 +1,4 @@
#!/usr/bin/env python3
-
"""
Input is IA item metadata JSON.
Ouput is insertable fatcat "match" JSON
@@ -81,21 +80,23 @@ def parse(obj):
'size': int(pdf_file['size']),
'mimetype': 'application/pdf',
'urls': [
- "https://archive.org/download/{}/{}".format(
- obj['metadata']['identifier'],
- pdf_file['name']),
+ "https://archive.org/download/{}/{}".format(obj['metadata']['identifier'],
+ pdf_file['name']),
],
'cdx': [],
'dois': [],
}
if extid_type == 'doi':
- match['dois'] = [extid,]
+ match['dois'] = [
+ extid,
+ ]
else:
match[extid_type] = extid
return match
+
def run():
for line in sys.stdin:
if not line:
@@ -105,5 +106,6 @@ def run():
if match:
print(json.dumps(match, sort_keys=True))
+
if __name__ == '__main__':
run()
diff --git a/python/ingest_tool.py b/python/ingest_tool.py
index c0ef5aa..305c3a8 100755
--- a/python/ingest_tool.py
+++ b/python/ingest_tool.py
@@ -18,7 +18,9 @@ def run_single_ingest(args):
)
if args.force_recrawl:
request['force_recrawl'] = True
- if request['ingest_type'] in ['dataset',]:
+ if request['ingest_type'] in [
+ 'dataset',
+ ]:
ingester = IngestFilesetWorker(
try_spn2=not args.no_spn2,
ingest_file_result_stdout=True,
@@ -32,75 +34,71 @@ def run_single_ingest(args):
print(json.dumps(result, sort_keys=True))
return result
+
def run_requests(args):
# TODO: switch to using JsonLinePusher
file_worker = IngestFileWorker(
try_spn2=not args.no_spn2,
html_quick_mode=args.html_quick_mode,
)
- fileset_worker = IngestFilesetWorker(
- try_spn2=not args.no_spn2,
- )
+ fileset_worker = IngestFilesetWorker(try_spn2=not args.no_spn2, )
for l in args.json_file:
request = json.loads(l.strip())
- if request['ingest_type'] in ['dataset',]:
+ if request['ingest_type'] in [
+ 'dataset',
+ ]:
result = fileset_worker.process(request)
else:
result = file_worker.process(request)
print(json.dumps(result, sort_keys=True))
+
def run_api(args):
port = 8083
print("Listening on localhost:{}".format(port))
server = HTTPServer(('', port), IngestFileRequestHandler)
server.serve_forever()
+
def main():
- parser = argparse.ArgumentParser(
- formatter_class=argparse.ArgumentDefaultsHelpFormatter)
+ parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter)
subparsers = parser.add_subparsers()
- sub_single= subparsers.add_parser('single',
- help="ingests a single base URL")
+ sub_single = subparsers.add_parser('single', help="ingests a single base URL")
sub_single.set_defaults(func=run_single_ingest)
sub_single.add_argument('ingest_type',
- default="pdf",
- help="type of ingest (pdf, html, etc)")
+ default="pdf",
+ help="type of ingest (pdf, html, etc)")
sub_single.add_argument('--release-id',
- help="(optional) existing release ident to match to")
- sub_single.add_argument('--doi',
- help="(optional) existing release DOI to match to")
+ help="(optional) existing release ident to match to")
+ sub_single.add_argument('--doi', help="(optional) existing release DOI to match to")
sub_single.add_argument('--force-recrawl',
- action='store_true',
- help="ignore GWB history and use SPNv2 to re-crawl")
- sub_single.add_argument('--no-spn2',
- action='store_true',
- help="don't use live web (SPNv2)")
+ action='store_true',
+ help="ignore GWB history and use SPNv2 to re-crawl")
+ sub_single.add_argument('--no-spn2', action='store_true', help="don't use live web (SPNv2)")
sub_single.add_argument('--html-quick-mode',
- action='store_true',
- help="don't fetch individual sub-resources, just use CDX")
- sub_single.add_argument('url',
- help="URL of paper to fetch")
+ action='store_true',
+ help="don't fetch individual sub-resources, just use CDX")
+ sub_single.add_argument('url', help="URL of paper to fetch")
- sub_requests = subparsers.add_parser('requests',
- help="takes a series of ingest requests (JSON, per line) and runs each")
+ sub_requests = subparsers.add_parser(
+ 'requests', help="takes a series of ingest requests (JSON, per line) and runs each")
sub_requests.add_argument('--no-spn2',
- action='store_true',
- help="don't use live web (SPNv2)")
+ action='store_true',
+ help="don't use live web (SPNv2)")
sub_requests.add_argument('--html-quick-mode',
- action='store_true',
- help="don't fetch individual sub-resources, just use CDX")
+ action='store_true',
+ help="don't fetch individual sub-resources, just use CDX")
sub_requests.set_defaults(func=run_requests)
sub_requests.add_argument('json_file',
- help="JSON file (request per line) to import from (or stdin)",
- default=sys.stdin, type=argparse.FileType('r'))
+ help="JSON file (request per line) to import from (or stdin)",
+ default=sys.stdin,
+ type=argparse.FileType('r'))
- sub_api = subparsers.add_parser('api',
- help="starts a simple HTTP server that processes ingest requests")
+ sub_api = subparsers.add_parser(
+ 'api', help="starts a simple HTTP server that processes ingest requests")
sub_api.set_defaults(func=run_api)
- sub_api.add_argument('--port',
- help="HTTP port to listen on",
- default=8033, type=int)
+ sub_api.add_argument('--port', help="HTTP port to listen on", default=8033, type=int)
args = parser.parse_args()
if not args.__dict__.get("func"):
@@ -109,5 +107,6 @@ def main():
args.func(args)
+
if __name__ == '__main__':
main()
diff --git a/python/pdfextract_tool.py b/python/pdfextract_tool.py
index 89ecf1c..717b743 100755
--- a/python/pdfextract_tool.py
+++ b/python/pdfextract_tool.py
@@ -1,5 +1,4 @@
#!/usr/bin/env python3
-
"""
KNOWN ISSUE: thumbnails are not published to kafka in multi-processing mode
"""
@@ -20,10 +19,13 @@ def run_extract_json(args):
multi_worker = MultiprocessWrapper(worker, args.sink)
pusher = JsonLinePusher(multi_worker, args.json_file, batch_size=args.jobs)
else:
- worker = PdfExtractWorker(wayback_client, sink=args.sink, thumbnail_sink=args.thumbnail_sink)
+ worker = PdfExtractWorker(wayback_client,
+ sink=args.sink,
+ thumbnail_sink=args.thumbnail_sink)
pusher = JsonLinePusher(worker, args.json_file)
pusher.run()
+
def run_extract_cdx(args):
wayback_client = WaybackClient()
if args.jobs > 1:
@@ -37,7 +39,9 @@ def run_extract_cdx(args):
batch_size=args.jobs,
)
else:
- worker = PdfExtractWorker(wayback_client, sink=args.sink, thumbnail_sink=args.thumbnail_sink)
+ worker = PdfExtractWorker(wayback_client,
+ sink=args.sink,
+ thumbnail_sink=args.thumbnail_sink)
pusher = CdxLinePusher(
worker,
args.cdx_file,
@@ -46,6 +50,7 @@ def run_extract_cdx(args):
)
pusher.run()
+
def run_extract_zipfile(args):
if args.jobs > 1:
print("multi-processing: {}".format(args.jobs), file=sys.stderr)
@@ -57,6 +62,7 @@ def run_extract_zipfile(args):
pusher = ZipfilePusher(worker, args.zip_file)
pusher.run()
+
def run_single(args):
worker = PdfExtractBlobWorker(sink=args.sink, thumbnail_sink=args.thumbnail_sink)
with open(args.pdf_file, 'rb') as pdf_file:
@@ -67,51 +73,48 @@ def run_single(args):
args.thumbnail_sink.finish()
-
def main():
- parser = argparse.ArgumentParser(
- formatter_class=argparse.ArgumentDefaultsHelpFormatter)
+ parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter)
parser.add_argument('--kafka-mode',
- action='store_true',
- help="send output to Kafka (not stdout)")
+ action='store_true',
+ help="send output to Kafka (not stdout)")
parser.add_argument('--kafka-hosts',
- default="localhost:9092",
- help="list of Kafka brokers (host/port) to use")
+ default="localhost:9092",
+ help="list of Kafka brokers (host/port) to use")
parser.add_argument('--kafka-env',
- default="dev",
- help="Kafka topic namespace to use (eg, prod, qa, dev)")
- parser.add_argument('-j', '--jobs',
- default=8, type=int,
- help="parallelism for batch CPU jobs")
+ default="dev",
+ help="Kafka topic namespace to use (eg, prod, qa, dev)")
+ parser.add_argument('-j',
+ '--jobs',
+ default=8,
+ type=int,
+ help="parallelism for batch CPU jobs")
subparsers = parser.add_subparsers()
- sub_extract_json = subparsers.add_parser('extract-json',
+ sub_extract_json = subparsers.add_parser(
+ 'extract-json',
help="for each JSON line with CDX info, fetches PDF and does PDF extraction")
sub_extract_json.set_defaults(func=run_extract_json)
sub_extract_json.add_argument('json_file',
- help="JSON file to import from (or '-' for stdin)",
- type=argparse.FileType('r'))
+ help="JSON file to import from (or '-' for stdin)",
+ type=argparse.FileType('r'))
- sub_extract_cdx = subparsers.add_parser('extract-cdx',
- help="for each CDX line, fetches PDF and does PDF extraction")
+ sub_extract_cdx = subparsers.add_parser(
+ 'extract-cdx', help="for each CDX line, fetches PDF and does PDF extraction")
sub_extract_cdx.set_defaults(func=run_extract_cdx)
sub_extract_cdx.add_argument('cdx_file',
- help="CDX file to import from (or '-' for stdin)",
- type=argparse.FileType('r'))
+ help="CDX file to import from (or '-' for stdin)",
+ type=argparse.FileType('r'))
- sub_extract_zipfile = subparsers.add_parser('extract-zipfile',
+ sub_extract_zipfile = subparsers.add_parser(
+ 'extract-zipfile',
help="opens zipfile, iterates over PDF files inside and does PDF extract for each")
sub_extract_zipfile.set_defaults(func=run_extract_zipfile)
- sub_extract_zipfile.add_argument('zip_file',
- help="zipfile with PDFs to extract",
- type=str)
+ sub_extract_zipfile.add_argument('zip_file', help="zipfile with PDFs to extract", type=str)
- sub_single = subparsers.add_parser('single',
- help="opens single PDF and extracts it")
+ sub_single = subparsers.add_parser('single', help="opens single PDF and extracts it")
sub_single.set_defaults(func=run_single)
- sub_single.add_argument('pdf_file',
- help="single PDF to extract",
- type=str)
+ sub_single.add_argument('pdf_file', help="single PDF to extract", type=str)
args = parser.parse_args()
if not args.__dict__.get("func"):
@@ -123,17 +126,18 @@ def main():
if args.kafka_mode:
text_topic = "sandcrawler-{}.pdf-text".format(args.kafka_env)
thumbnail_topic = "sandcrawler-{}.pdf-thumbnail-180px-jpg".format(args.kafka_env)
- args.sink = KafkaCompressSink(kafka_hosts=args.kafka_hosts,
- produce_topic=text_topic)
+ args.sink = KafkaCompressSink(kafka_hosts=args.kafka_hosts, produce_topic=text_topic)
args.thumbnail_sink = KafkaSink(kafka_hosts=args.kafka_hosts,
- produce_topic=thumbnail_topic)
+ produce_topic=thumbnail_topic)
print("Running in kafka output mode, publishing to {} and {}\n".format(
- text_topic, thumbnail_topic), file=sys.stderr)
+ text_topic, thumbnail_topic),
+ file=sys.stderr)
else:
args.sink = None
args.thumbnail_sink = None
args.func(args)
+
if __name__ == '__main__':
main()
diff --git a/python/pdftrio_tool.py b/python/pdftrio_tool.py
index e195bc7..9316313 100755
--- a/python/pdftrio_tool.py
+++ b/python/pdftrio_tool.py
@@ -1,5 +1,4 @@
#!/usr/bin/env python3
-
"""
Basically just a copy of grobid_tool.py, but for PDF classification instead of
text extraction.
@@ -21,19 +20,29 @@ def run_classify_pdf_json(args):
pdftrio_client = PdfTrioClient(host_url=args.pdftrio_host)
wayback_client = WaybackClient()
if args.jobs > 1:
- worker = PdfTrioWorker(pdftrio_client, wayback_client, sink=None, mode=args.pdftrio_mode)
+ worker = PdfTrioWorker(pdftrio_client,
+ wayback_client,
+ sink=None,
+ mode=args.pdftrio_mode)
multi_worker = MultiprocessWrapper(worker, args.sink)
pusher = JsonLinePusher(multi_worker, args.json_file, batch_size=args.jobs)
else:
- worker = PdfTrioWorker(pdftrio_client, wayback_client, sink=args.sink, mode=args.pdftrio_mode)
+ worker = PdfTrioWorker(pdftrio_client,
+ wayback_client,
+ sink=args.sink,
+ mode=args.pdftrio_mode)
pusher = JsonLinePusher(worker, args.json_file)
pusher.run()
+
def run_classify_pdf_cdx(args):
pdftrio_client = PdfTrioClient(host_url=args.pdftrio_host)
wayback_client = WaybackClient()
if args.jobs > 1:
- worker = PdfTrioWorker(pdftrio_client, wayback_client, sink=None, mode=args.pdftrio_mode)
+ worker = PdfTrioWorker(pdftrio_client,
+ wayback_client,
+ sink=None,
+ mode=args.pdftrio_mode)
multi_worker = MultiprocessWrapper(worker, args.sink)
pusher = CdxLinePusher(
multi_worker,
@@ -43,7 +52,10 @@ def run_classify_pdf_cdx(args):
batch_size=args.jobs,
)
else:
- worker = PdfTrioWorker(pdftrio_client, wayback_client, sink=args.sink, mode=args.pdftrio_mode)
+ worker = PdfTrioWorker(pdftrio_client,
+ wayback_client,
+ sink=args.sink,
+ mode=args.pdftrio_mode)
pusher = CdxLinePusher(
worker,
args.cdx_file,
@@ -52,6 +64,7 @@ def run_classify_pdf_cdx(args):
)
pusher.run()
+
def run_classify_pdf_zipfile(args):
pdftrio_client = PdfTrioClient(host_url=args.pdftrio_host)
worker = PdfTrioBlobWorker(pdftrio_client, sink=args.sink, mode=args.pdftrio_mode)
@@ -60,48 +73,53 @@ def run_classify_pdf_zipfile(args):
def main():
- parser = argparse.ArgumentParser(
- formatter_class=argparse.ArgumentDefaultsHelpFormatter)
+ parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter)
parser.add_argument('--kafka-mode',
- action='store_true',
- help="send output to Kafka (not stdout)")
+ action='store_true',
+ help="send output to Kafka (not stdout)")
parser.add_argument('--kafka-hosts',
- default="localhost:9092",
- help="list of Kafka brokers (host/port) to use")
+ default="localhost:9092",
+ help="list of Kafka brokers (host/port) to use")
parser.add_argument('--kafka-env',
- default="dev",
- help="Kafka topic namespace to use (eg, prod, qa, dev)")
- parser.add_argument('-j', '--jobs',
- default=8, type=int,
- help="parallelism for batch CPU jobs")
+ default="dev",
+ help="Kafka topic namespace to use (eg, prod, qa, dev)")
+ parser.add_argument('-j',
+ '--jobs',
+ default=8,
+ type=int,
+ help="parallelism for batch CPU jobs")
parser.add_argument('--pdftrio-host',
- default="http://pdftrio.qa.fatcat.wiki",
- help="pdftrio API host/port")
+ default="http://pdftrio.qa.fatcat.wiki",
+ help="pdftrio API host/port")
parser.add_argument('--pdftrio-mode',
- default="auto",
- help="which classification mode to use")
+ default="auto",
+ help="which classification mode to use")
subparsers = parser.add_subparsers()
- sub_classify_pdf_json = subparsers.add_parser('classify-pdf-json',
+ sub_classify_pdf_json = subparsers.add_parser(
+ 'classify-pdf-json',
help="for each JSON line with CDX info, fetches PDF and does pdftrio classify_pdfion")
sub_classify_pdf_json.set_defaults(func=run_classify_pdf_json)
sub_classify_pdf_json.add_argument('json_file',
- help="JSON file to import from (or '-' for stdin)",
- type=argparse.FileType('r'))
+ help="JSON file to import from (or '-' for stdin)",
+ type=argparse.FileType('r'))
- sub_classify_pdf_cdx = subparsers.add_parser('classify-pdf-cdx',
+ sub_classify_pdf_cdx = subparsers.add_parser(
+ 'classify-pdf-cdx',
help="for each CDX line, fetches PDF and does pdftrio classify_pdfion")
sub_classify_pdf_cdx.set_defaults(func=run_classify_pdf_cdx)
sub_classify_pdf_cdx.add_argument('cdx_file',
- help="CDX file to import from (or '-' for stdin)",
- type=argparse.FileType('r'))
+ help="CDX file to import from (or '-' for stdin)",
+ type=argparse.FileType('r'))
- sub_classify_pdf_zipfile = subparsers.add_parser('classify-pdf-zipfile',
- help="opens zipfile, iterates over PDF files inside and does pdftrio classify_pdf for each")
+ sub_classify_pdf_zipfile = subparsers.add_parser(
+ 'classify-pdf-zipfile',
+ help=
+ "opens zipfile, iterates over PDF files inside and does pdftrio classify_pdf for each")
sub_classify_pdf_zipfile.set_defaults(func=run_classify_pdf_zipfile)
sub_classify_pdf_zipfile.add_argument('zip_file',
- help="zipfile with PDFs to classify",
- type=str)
+ help="zipfile with PDFs to classify",
+ type=str)
args = parser.parse_args()
if not args.__dict__.get("func"):
@@ -112,10 +130,10 @@ def main():
if args.kafka_mode:
produce_topic = "sandcrawler-{}.pdftrio-output".format(args.kafka_env)
print("Running in kafka output mode, publishing to {}\n".format(produce_topic))
- args.sink = KafkaSink(kafka_hosts=args.kafka_hosts,
- produce_topic=produce_topic)
+ args.sink = KafkaSink(kafka_hosts=args.kafka_hosts, produce_topic=produce_topic)
args.func(args)
+
if __name__ == '__main__':
main()
diff --git a/python/persist_tool.py b/python/persist_tool.py
index d52f7c1..9160db6 100755
--- a/python/persist_tool.py
+++ b/python/persist_tool.py
@@ -1,5 +1,4 @@
#!/usr/bin/env python3
-
"""
Commands for backfilling content from bulk files into postgresql and s3 (seaweedfs).
@@ -16,9 +15,7 @@ from sandcrawler.persist import *
def run_cdx(args):
- worker = PersistCdxWorker(
- db_url=args.db_url,
- )
+ worker = PersistCdxWorker(db_url=args.db_url, )
filter_mimetypes = ['application/pdf']
if args.no_mimetype_filter:
filter_mimetypes = None
@@ -32,6 +29,7 @@ def run_cdx(args):
)
pusher.run()
+
def run_grobid(args):
worker = PersistGrobidWorker(
db_url=args.db_url,
@@ -49,24 +47,22 @@ def run_grobid(args):
)
pusher.run()
+
def run_grobid_disk(args):
"""
Writes XML to individual files on disk, and also prints non-XML metadata to
stdout as JSON, which can be redirected to a separate file.
"""
- worker = PersistGrobidDiskWorker(
- output_dir=args.output_dir,
- )
+ worker = PersistGrobidDiskWorker(output_dir=args.output_dir, )
pusher = JsonLinePusher(
worker,
args.json_file,
)
pusher.run()
+
def run_pdftrio(args):
- worker = PersistPdfTrioWorker(
- db_url=args.db_url,
- )
+ worker = PersistPdfTrioWorker(db_url=args.db_url, )
pusher = JsonLinePusher(
worker,
args.json_file,
@@ -74,6 +70,7 @@ def run_pdftrio(args):
)
pusher.run()
+
def run_pdftext(args):
worker = PersistPdfTextWorker(
db_url=args.db_url,
@@ -91,10 +88,9 @@ def run_pdftext(args):
)
pusher.run()
+
def run_ingest_file_result(args):
- worker = PersistIngestFileResultWorker(
- db_url=args.db_url,
- )
+ worker = PersistIngestFileResultWorker(db_url=args.db_url, )
pusher = JsonLinePusher(
worker,
args.json_file,
@@ -102,10 +98,9 @@ def run_ingest_file_result(args):
)
pusher.run()
+
def run_ingest_request(args):
- worker = PersistIngestRequestWorker(
- db_url=args.db_url,
- )
+ worker = PersistIngestRequestWorker(db_url=args.db_url, )
pusher = JsonLinePusher(
worker,
args.json_file,
@@ -113,92 +108,95 @@ def run_ingest_request(args):
)
pusher.run()
+
def main():
- parser = argparse.ArgumentParser(
- formatter_class=argparse.ArgumentDefaultsHelpFormatter)
+ parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter)
parser.add_argument('--db-url',
- help="postgresql database connection string",
- default="postgres:///sandcrawler")
- parser.add_argument('--s3-url',
- help="S3 (seaweedfs) backend URL",
- default="localhost:9000")
+ help="postgresql database connection string",
+ default="postgres:///sandcrawler")
+ parser.add_argument('--s3-url', help="S3 (seaweedfs) backend URL", default="localhost:9000")
parser.add_argument('--s3-access-key',
- help="S3 (seaweedfs) credential",
- default=os.environ.get('SANDCRAWLER_BLOB_ACCESS_KEY') or os.environ.get('MINIO_ACCESS_KEY'))
+ help="S3 (seaweedfs) credential",
+ default=os.environ.get('SANDCRAWLER_BLOB_ACCESS_KEY')
+ or os.environ.get('MINIO_ACCESS_KEY'))
parser.add_argument('--s3-secret-key',
- help="S3 (seaweedfs) credential",
- default=os.environ.get('SANDCRAWLER_BLOB_ACCESS_KEY') or os.environ.get('MINIO_SECRET_KEY'))
+ help="S3 (seaweedfs) credential",
+ default=os.environ.get('SANDCRAWLER_BLOB_ACCESS_KEY')
+ or os.environ.get('MINIO_SECRET_KEY'))
parser.add_argument('--s3-bucket',
- help="S3 (seaweedfs) bucket to persist into",
- default="sandcrawler-dev")
+ help="S3 (seaweedfs) bucket to persist into",
+ default="sandcrawler-dev")
subparsers = parser.add_subparsers()
- sub_cdx = subparsers.add_parser('cdx',
- help="backfill a CDX file into postgresql cdx table")
+ sub_cdx = subparsers.add_parser('cdx', help="backfill a CDX file into postgresql cdx table")
sub_cdx.set_defaults(func=run_cdx)
sub_cdx.add_argument('cdx_file',
- help="CDX file to import from (or '-' for stdin)",
- type=argparse.FileType('r'))
- sub_cdx.add_argument('--no-mimetype-filter',
+ help="CDX file to import from (or '-' for stdin)",
+ type=argparse.FileType('r'))
+ sub_cdx.add_argument(
+ '--no-mimetype-filter',
action='store_true',
help="ignore mimetype filtering; insert all content types (eg, assuming pre-filtered)")
- sub_grobid = subparsers.add_parser('grobid',
- help="backfill a grobid JSON ('pg') dump into postgresql and s3 (seaweedfs)")
+ sub_grobid = subparsers.add_parser(
+ 'grobid', help="backfill a grobid JSON ('pg') dump into postgresql and s3 (seaweedfs)")
sub_grobid.set_defaults(func=run_grobid)
sub_grobid.add_argument('json_file',
- help="grobid file to import from (or '-' for stdin)",
- type=argparse.FileType('r'))
+ help="grobid file to import from (or '-' for stdin)",
+ type=argparse.FileType('r'))
sub_grobid.add_argument('--s3-only',
- action='store_true',
- help="only upload TEI-XML to S3 (don't write to database)")
- sub_grobid.add_argument('--db-only',
+ action='store_true',
+ help="only upload TEI-XML to S3 (don't write to database)")
+ sub_grobid.add_argument(
+ '--db-only',
action='store_true',
help="only write status to sandcrawler-db (don't save TEI-XML to S3)")
- sub_pdftext = subparsers.add_parser('pdftext',
+ sub_pdftext = subparsers.add_parser(
+ 'pdftext',
help="backfill a pdftext JSON ('pg') dump into postgresql and s3 (seaweedfs)")
sub_pdftext.set_defaults(func=run_pdftext)
sub_pdftext.add_argument('json_file',
- help="pdftext file to import from (or '-' for stdin)",
- type=argparse.FileType('r'))
+ help="pdftext file to import from (or '-' for stdin)",
+ type=argparse.FileType('r'))
sub_pdftext.add_argument('--s3-only',
- action='store_true',
- help="only upload TEI-XML to S3 (don't write to database)")
- sub_pdftext.add_argument('--db-only',
+ action='store_true',
+ help="only upload TEI-XML to S3 (don't write to database)")
+ sub_pdftext.add_argument(
+ '--db-only',
action='store_true',
help="only write status to sandcrawler-db (don't save TEI-XML to S3)")
sub_grobid_disk = subparsers.add_parser('grobid-disk',
- help="dump GRBOID output to (local) files on disk")
+ help="dump GRBOID output to (local) files on disk")
sub_grobid_disk.set_defaults(func=run_grobid_disk)
sub_grobid_disk.add_argument('json_file',
- help="grobid file to import from (or '-' for stdin)",
- type=argparse.FileType('r'))
- sub_grobid_disk.add_argument('output_dir',
- help="base directory to output into",
- type=str)
+ help="grobid file to import from (or '-' for stdin)",
+ type=argparse.FileType('r'))
+ sub_grobid_disk.add_argument('output_dir', help="base directory to output into", type=str)
- sub_pdftrio = subparsers.add_parser('pdftrio',
+ sub_pdftrio = subparsers.add_parser(
+ 'pdftrio',
help="backfill a pdftrio JSON ('pg') dump into postgresql and s3 (seaweedfs)")
sub_pdftrio.set_defaults(func=run_pdftrio)
sub_pdftrio.add_argument('json_file',
- help="pdftrio file to import from (or '-' for stdin)",
- type=argparse.FileType('r'))
+ help="pdftrio file to import from (or '-' for stdin)",
+ type=argparse.FileType('r'))
- sub_ingest_file_result = subparsers.add_parser('ingest-file-result',
- help="backfill a ingest_file_result JSON dump into postgresql")
+ sub_ingest_file_result = subparsers.add_parser(
+ 'ingest-file-result', help="backfill a ingest_file_result JSON dump into postgresql")
sub_ingest_file_result.set_defaults(func=run_ingest_file_result)
- sub_ingest_file_result.add_argument('json_file',
+ sub_ingest_file_result.add_argument(
+ 'json_file',
help="ingest_file_result file to import from (or '-' for stdin)",
type=argparse.FileType('r'))
- sub_ingest_request = subparsers.add_parser('ingest-request',
- help="backfill a ingest_request JSON dump into postgresql")
+ sub_ingest_request = subparsers.add_parser(
+ 'ingest-request', help="backfill a ingest_request JSON dump into postgresql")
sub_ingest_request.set_defaults(func=run_ingest_request)
sub_ingest_request.add_argument('json_file',
- help="ingest_request to import from (or '-' for stdin)",
- type=argparse.FileType('r'))
+ help="ingest_request to import from (or '-' for stdin)",
+ type=argparse.FileType('r'))
args = parser.parse_args()
if not args.__dict__.get("func"):
@@ -207,5 +205,6 @@ def main():
args.func(args)
+
if __name__ == '__main__':
main()
diff --git a/python/sandcrawler/__init__.py b/python/sandcrawler/__init__.py
index bf2d92d..46735eb 100644
--- a/python/sandcrawler/__init__.py
+++ b/python/sandcrawler/__init__.py
@@ -1,14 +1,16 @@
-
from .db import SandcrawlerPostgresClient, SandcrawlerPostgrestClient
from .grobid import GrobidBlobWorker, GrobidClient, GrobidWorker
-from .ia import (CdxApiClient, CdxApiError, CdxPartial, CdxRow, PetaboxError, ResourceResult, SavePageNowClient,
- SavePageNowError, WarcResource, WaybackClient, WaybackContentError, WaybackError)
+from .ia import (CdxApiClient, CdxApiError, CdxPartial, CdxRow, PetaboxError, ResourceResult,
+ SavePageNowClient, SavePageNowError, WarcResource, WaybackClient,
+ WaybackContentError, WaybackError)
from .ingest_file import IngestFileWorker
from .ingest_fileset import IngestFilesetWorker
-from .misc import b32_hex, clean_url, gen_file_metadata, gen_file_metadata_path, parse_cdx_datetime, parse_cdx_line
+from .misc import (b32_hex, clean_url, gen_file_metadata, gen_file_metadata_path,
+ parse_cdx_datetime, parse_cdx_line)
from .pdfextract import PdfExtractBlobWorker, PdfExtractWorker
from .pdftrio import PdfTrioBlobWorker, PdfTrioClient, PdfTrioWorker
-from .persist import (PersistCdxWorker, PersistGrobidDiskWorker, PersistGrobidWorker, PersistIngestFileResultWorker,
- PersistIngestRequestWorker, PersistPdfTextWorker, PersistPdfTrioWorker, PersistThumbnailWorker)
-from .workers import (BlackholeSink, CdxLinePusher, JsonLinePusher, KafkaCompressSink, KafkaJsonPusher, KafkaSink,
- MultiprocessWrapper, ZipfilePusher)
+from .persist import (PersistCdxWorker, PersistGrobidDiskWorker, PersistGrobidWorker,
+ PersistIngestFileResultWorker, PersistIngestRequestWorker,
+ PersistPdfTextWorker, PersistPdfTrioWorker, PersistThumbnailWorker)
+from .workers import (BlackholeSink, CdxLinePusher, JsonLinePusher, KafkaCompressSink,
+ KafkaJsonPusher, KafkaSink, MultiprocessWrapper, ZipfilePusher)
diff --git a/python/sandcrawler/db.py b/python/sandcrawler/db.py
index 4dcdb0e..360add9 100644
--- a/python/sandcrawler/db.py
+++ b/python/sandcrawler/db.py
@@ -1,4 +1,3 @@
-
import datetime
import json
from typing import Optional
@@ -9,17 +8,16 @@ import requests
class SandcrawlerPostgrestClient:
-
def __init__(self, api_url="http://wbgrp-svc506.us.archive.org:3030", **kwargs):
self.api_url = api_url
def get_cdx(self, url):
- resp = requests.get(self.api_url + "/cdx", params=dict(url='eq.'+url))
+ resp = requests.get(self.api_url + "/cdx", params=dict(url='eq.' + url))
resp.raise_for_status()
return resp.json() or None
def get_grobid(self, sha1):
- resp = requests.get(self.api_url + "/grobid", params=dict(sha1hex='eq.'+sha1))
+ resp = requests.get(self.api_url + "/grobid", params=dict(sha1hex='eq.' + sha1))
resp.raise_for_status()
resp = resp.json()
if resp:
@@ -28,7 +26,7 @@ class SandcrawlerPostgrestClient:
return None
def get_pdftrio(self, sha1):
- resp = requests.get(self.api_url + "/pdftrio", params=dict(sha1hex='eq.'+sha1))
+ resp = requests.get(self.api_url + "/pdftrio", params=dict(sha1hex='eq.' + sha1))
resp.raise_for_status()
resp = resp.json()
if resp:
@@ -37,7 +35,7 @@ class SandcrawlerPostgrestClient:
return None
def get_pdf_meta(self, sha1):
- resp = requests.get(self.api_url + "/pdf_meta", params=dict(sha1hex='eq.'+sha1))
+ resp = requests.get(self.api_url + "/pdf_meta", params=dict(sha1hex='eq.' + sha1))
resp.raise_for_status()
resp = resp.json()
if resp:
@@ -58,7 +56,7 @@ class SandcrawlerPostgrestClient:
return None
def get_file_meta(self, sha1):
- resp = requests.get(self.api_url + "/file_meta", params=dict(sha1hex='eq.'+sha1))
+ resp = requests.get(self.api_url + "/file_meta", params=dict(sha1hex='eq.' + sha1))
resp.raise_for_status()
resp = resp.json()
if resp:
@@ -91,7 +89,7 @@ class SandcrawlerPostgrestClient:
return None
def get_crossref(self, doi):
- resp = requests.get(self.api_url + "/crossref", params=dict(doi='eq.'+doi))
+ resp = requests.get(self.api_url + "/crossref", params=dict(doi='eq.' + doi))
resp.raise_for_status()
resp = resp.json()
if resp:
@@ -99,8 +97,8 @@ class SandcrawlerPostgrestClient:
else:
return None
-class SandcrawlerPostgresClient:
+class SandcrawlerPostgresClient:
def __init__(self, db_url, **kwargs):
self.conn = psycopg2.connect(db_url)
@@ -135,14 +133,8 @@ class SandcrawlerPostgresClient:
batch = [d for d in batch if d.get('warc_path')]
if not batch:
return (0, 0)
- batch = [(d['url'],
- d['datetime'],
- d['sha1hex'],
- d['mimetype'],
- d['warc_path'],
- int(d['warc_csize']),
- int(d['warc_offset']))
- for d in batch]
+ batch = [(d['url'], d['datetime'], d['sha1hex'], d['mimetype'], d['warc_path'],
+ int(d['warc_csize']), int(d['warc_offset'])) for d in batch]
# filter out duplicate rows by key (url, datetime)
batch_dict = dict()
for b in batch:
@@ -170,12 +162,8 @@ class SandcrawlerPostgresClient:
else:
raise NotImplementedError("on_conflict: {}".format(on_conflict))
sql += " RETURNING xmax;"
- batch = [(d['sha1hex'],
- d['sha256hex'],
- d['md5hex'],
- int(d['size_bytes']),
- d['mimetype'])
- for d in batch]
+ batch = [(d['sha1hex'], d['sha256hex'], d['md5hex'], int(d['size_bytes']),
+ d['mimetype']) for d in batch]
# filter out duplicate rows by key (sha1hex)
batch_dict = dict()
for b in batch:
@@ -215,15 +203,15 @@ class SandcrawlerPostgresClient:
r[k] = r['metadata'].get(k)
r['metadata'].pop(k, None)
r['metadata'] = json.dumps(r['metadata'], sort_keys=True)
- batch = [(d['key'],
- d.get('grobid_version') or None,
- d['status_code'],
- d['status'],
- d.get('fatcat_release') or None,
- d.get('updated') or datetime.datetime.now(),
- d.get('metadata') or None ,
- )
- for d in batch]
+ batch = [(
+ d['key'],
+ d.get('grobid_version') or None,
+ d['status_code'],
+ d['status'],
+ d.get('fatcat_release') or None,
+ d.get('updated') or datetime.datetime.now(),
+ d.get('metadata') or None,
+ ) for d in batch]
# filter out duplicate rows by key (sha1hex)
batch_dict = dict()
for b in batch:
@@ -331,20 +319,18 @@ class SandcrawlerPostgresClient:
else:
raise NotImplementedError("on_conflict: {}".format(on_conflict))
sql += " RETURNING xmax;"
- batch = [
- (
- d['key'],
- d.get('updated') or datetime.datetime.now(),
- d['status_code'],
- d['status'],
- d.get('versions', {}).get('pdftrio_version') or None,
- d.get('versions', {}).get('models_date') or None,
- d.get('ensemble_score'),
- d.get('bert_score'),
- d.get('linear_score'),
- d.get('image_score'),
- )
- for d in batch]
+ batch = [(
+ d['key'],
+ d.get('updated') or datetime.datetime.now(),
+ d['status_code'],
+ d['status'],
+ d.get('versions', {}).get('pdftrio_version') or None,
+ d.get('versions', {}).get('models_date') or None,
+ d.get('ensemble_score'),
+ d.get('bert_score'),
+ d.get('linear_score'),
+ d.get('image_score'),
+ ) for d in batch]
# filter out duplicate rows by key (sha1hex)
batch_dict = dict()
for b in batch:
@@ -373,15 +359,15 @@ class SandcrawlerPostgresClient:
extra[k] = r[k]
if extra:
r['extra'] = json.dumps(extra, sort_keys=True)
- batch = [(d['link_source'],
- d['link_source_id'],
- d['ingest_type'],
- d['base_url'],
- d.get('ingest_request_source'),
- d.get('release_stage') or None,
- d.get('extra') or None,
- )
- for d in batch]
+ batch = [(
+ d['link_source'],
+ d['link_source_id'],
+ d['ingest_type'],
+ d['base_url'],
+ d.get('ingest_request_source'),
+ d.get('release_stage') or None,
+ d.get('extra') or None,
+ ) for d in batch]
# filter out duplicate rows by key (link_source, link_source_id, ingest_type, base_url)
batch_dict = dict()
for b in batch:
@@ -412,16 +398,16 @@ class SandcrawlerPostgresClient:
else:
raise NotImplementedError("on_conflict: {}".format(on_conflict))
sql += " RETURNING xmax;"
- batch = [(d['ingest_type'],
- d['base_url'],
- bool(d['hit']),
- d['status'],
- d.get('terminal_url'),
- d.get('terminal_dt'),
- d.get('terminal_status_code'),
- d.get('terminal_sha1hex'),
- )
- for d in batch]
+ batch = [(
+ d['ingest_type'],
+ d['base_url'],
+ bool(d['hit']),
+ d['status'],
+ d.get('terminal_url'),
+ d.get('terminal_dt'),
+ d.get('terminal_status_code'),
+ d.get('terminal_sha1hex'),
+ ) for d in batch]
# filter out duplicate rows by key (ingest_type, base_url)
batch_dict = dict()
for b in batch:
@@ -459,23 +445,23 @@ class SandcrawlerPostgresClient:
else:
raise NotImplementedError("on_conflict: {}".format(on_conflict))
sql += " RETURNING xmax;"
- batch = [(d['ingest_type'],
- d['base_url'],
- bool(d['hit']),
- d['status'],
- d.get('platform_name'),
- d.get('platform_domain'),
- d.get('platform_id'),
- d.get('ingest_strategy'),
- d.get('total_size'),
- d.get('file_count'),
- d.get('archiveorg_item_name'),
- d.get('archiveorg_item_bundle_path'),
- d.get('web_bundle_url'),
- d.get('web_bundle_dt'),
- d.get('manifest'),
- )
- for d in batch]
+ batch = [(
+ d['ingest_type'],
+ d['base_url'],
+ bool(d['hit']),
+ d['status'],
+ d.get('platform_name'),
+ d.get('platform_domain'),
+ d.get('platform_id'),
+ d.get('ingest_strategy'),
+ d.get('total_size'),
+ d.get('file_count'),
+ d.get('archiveorg_item_name'),
+ d.get('archiveorg_item_bundle_path'),
+ d.get('web_bundle_url'),
+ d.get('web_bundle_dt'),
+ d.get('manifest'),
+ ) for d in batch]
# filter out duplicate rows by key (ingest_type, base_url)
batch_dict = dict()
for b in batch:
diff --git a/python/sandcrawler/fileset_platforms.py b/python/sandcrawler/fileset_platforms.py
index 92fed37..f3441c9 100644
--- a/python/sandcrawler/fileset_platforms.py
+++ b/python/sandcrawler/fileset_platforms.py
@@ -1,4 +1,3 @@
-
import gzip
import json
import sys
@@ -16,17 +15,18 @@ from sandcrawler.ia import ResourceResult
class FilesetPlatformHelper():
-
def __init__(self):
self.platform_name = 'unknown'
- def match_request(self, request: dict , resource: Optional[ResourceResult], html_biblio: Optional[BiblioMetadata]) -> bool:
+ def match_request(self, request: dict, resource: Optional[ResourceResult],
+ html_biblio: Optional[BiblioMetadata]) -> bool:
"""
Does this request look like it matches this platform?
"""
raise NotImplementedError()
- def process_request(self, request: dict, resource: Optional[ResourceResult], html_biblio: Optional[BiblioMetadata]) -> FilesetPlatformItem:
+ def process_request(self, request: dict, resource: Optional[ResourceResult],
+ html_biblio: Optional[BiblioMetadata]) -> FilesetPlatformItem:
"""
Fetch platform-specific metadata for this request (eg, via API calls)
"""
@@ -40,19 +40,18 @@ class FilesetPlatformHelper():
# XXX: while developing ArchiveorgFileset path
#return IngestStrategy.ArchiveorgFileset
if len(item.manifest) == 1:
- if total_size < 64*1024*1024:
+ if total_size < 64 * 1024 * 1024:
return IngestStrategy.WebFile
else:
return IngestStrategy.ArchiveorgFile
else:
- if largest_size < 64*1024*1024 and total_size < 128*1024*1024*1024:
+ if largest_size < 64 * 1024 * 1024 and total_size < 128 * 1024 * 1024 * 1024:
return IngestStrategy.WebFileset
else:
return IngestStrategy.ArchiveorgFileset
class DataverseHelper(FilesetPlatformHelper):
-
def __init__(self):
super().__init__()
self.platform_name = 'dataverse'
@@ -122,8 +121,8 @@ class DataverseHelper(FilesetPlatformHelper):
"file_id": file_id,
}
-
- def match_request(self, request: dict , resource: Optional[ResourceResult], html_biblio: Optional[BiblioMetadata]) -> bool:
+ def match_request(self, request: dict, resource: Optional[ResourceResult],
+ html_biblio: Optional[BiblioMetadata]) -> bool:
if resource and resource.terminal_url:
url = resource.terminal_url
else:
@@ -146,7 +145,8 @@ class DataverseHelper(FilesetPlatformHelper):
return True
- def process_request(self, request: dict, resource: Optional[ResourceResult], html_biblio: Optional[BiblioMetadata]) -> FilesetPlatformItem:
+ def process_request(self, request: dict, resource: Optional[ResourceResult],
+ html_biblio: Optional[BiblioMetadata]) -> FilesetPlatformItem:
"""
Fetch platform-specific metadata for this request (eg, via API calls)
@@ -179,22 +179,27 @@ class DataverseHelper(FilesetPlatformHelper):
if parsed_id['file_id']:
# XXX: maybe we could support this?
- raise PlatformScopeError(f"only entire dataverse datasets can be archived with this tool")
+ raise PlatformScopeError(
+ f"only entire dataverse datasets can be archived with this tool")
# 1b. if we didn't get a version number from URL, fetch it from API
if not dataset_version:
- resp = self.session.get(f"https://{platform_domain}/api/datasets/:persistentId/?persistentId={platform_id}")
+ resp = self.session.get(
+ f"https://{platform_domain}/api/datasets/:persistentId/?persistentId={platform_id}"
+ )
resp.raise_for_status()
obj = resp.json()
obj_latest = obj['data']['latestVersion']
dataset_version = f"{obj_latest['versionNumber']}.{obj_latest['versionMinorNumber']}"
# 2. API fetch
- resp = self.session.get(f"https://{platform_domain}/api/datasets/:persistentId/?persistentId={platform_id}&version={dataset_version}")
+ resp = self.session.get(
+ f"https://{platform_domain}/api/datasets/:persistentId/?persistentId={platform_id}&version={dataset_version}"
+ )
resp.raise_for_status()
obj = resp.json()
- obj_latest= obj['data']['latestVersion']
+ obj_latest = obj['data']['latestVersion']
assert dataset_version == f"{obj_latest['versionNumber']}.{obj_latest['versionMinorNumber']}"
assert platform_id == obj_latest['datasetPersistentId']
@@ -212,15 +217,16 @@ class DataverseHelper(FilesetPlatformHelper):
extra['version'] = row['version']
if 'description' in df:
extra['description'] = df['description']
- manifest.append(FilesetManifestFile(
- path=df.get('originalFileName') or df['filename'],
- size=df.get('originalFileSize') or df['filesize'],
- md5=df['md5'],
- # NOTE: don't get: sha1, sha256
- mimetype=df['contentType'],
- platform_url=platform_url,
- extra=extra or None,
- ))
+ manifest.append(
+ FilesetManifestFile(
+ path=df.get('originalFileName') or df['filename'],
+ size=df.get('originalFileSize') or df['filesize'],
+ md5=df['md5'],
+ # NOTE: don't get: sha1, sha256
+ mimetype=df['contentType'],
+ platform_url=platform_url,
+ extra=extra or None,
+ ))
platform_sub_id = platform_id.split('/')[-1]
archiveorg_item_name = f"{platform_domain}-{platform_sub_id}-v{dataset_version}"
@@ -228,7 +234,8 @@ class DataverseHelper(FilesetPlatformHelper):
# XXX: collection=platform_domain,
collection="datasets",
date=obj_latest['releaseTime'].split('T')[0],
- source=f"https://{platform_domain}/dataset.xhtml?persistentId={platform_id}&version={dataset_version}",
+ source=
+ f"https://{platform_domain}/dataset.xhtml?persistentId={platform_id}&version={dataset_version}",
)
if platform_id.startswith('doi:10.'):
archiveorg_item_meta['doi'] = platform_id.replace('doi:', '')
@@ -238,7 +245,8 @@ class DataverseHelper(FilesetPlatformHelper):
elif block['typeName'] == 'depositor':
archiveorg_item_meta['creator'] = block['value']
elif block['typeName'] == 'dsDescription':
- archiveorg_item_meta['description'] = block['value'][0]['dsDescriptionValue']['value']
+ archiveorg_item_meta['description'] = block['value'][0]['dsDescriptionValue'][
+ 'value']
archiveorg_item_meta['description'] = archiveorg_item_meta.get('description', '')
if obj_latest.get('termsOfUse'):
@@ -252,11 +260,13 @@ class DataverseHelper(FilesetPlatformHelper):
platform_id=platform_id,
archiveorg_item_name=archiveorg_item_name,
archiveorg_item_meta=archiveorg_item_meta,
- web_bundle_url=f"https://{platform_domain}/api/access/dataset/:persistentId/?persistentId={platform_id}&format=original",
+ web_bundle_url=
+ f"https://{platform_domain}/api/access/dataset/:persistentId/?persistentId={platform_id}&format=original",
# TODO: web_base_url= (for GWB downloading, in lieu of platform_url on individual files)
extra=dict(version=dataset_version),
)
+
def test_parse_dataverse_persistentid():
valid = {
@@ -322,8 +332,8 @@ def test_parse_dataverse_persistentid():
except ValueError:
pass
-class FigshareHelper(FilesetPlatformHelper):
+class FigshareHelper(FilesetPlatformHelper):
def __init__(self):
super().__init__()
self.platform_name = 'figshare'
@@ -346,7 +356,9 @@ class FigshareHelper(FilesetPlatformHelper):
raise ValueError(f"not a figshare URL: {path}")
comp = comp[2:]
- if comp[0] in ['dataset',]:
+ if comp[0] in [
+ 'dataset',
+ ]:
comp = comp[1:]
if len(comp) == 3 and comp[1].isdigit() and comp[2].isdigit():
@@ -356,7 +368,8 @@ class FigshareHelper(FilesetPlatformHelper):
else:
raise ValueError(f"couldn't find figshare identiier: {path}")
- def match_request(self, request: dict , resource: Optional[ResourceResult], html_biblio: Optional[BiblioMetadata]) -> bool:
+ def match_request(self, request: dict, resource: Optional[ResourceResult],
+ html_biblio: Optional[BiblioMetadata]) -> bool:
if resource and resource.terminal_url:
url = resource.terminal_url
@@ -381,7 +394,8 @@ class FigshareHelper(FilesetPlatformHelper):
return False
- def process_request(self, request: dict, resource: Optional[ResourceResult], html_biblio: Optional[BiblioMetadata]) -> FilesetPlatformItem:
+ def process_request(self, request: dict, resource: Optional[ResourceResult],
+ html_biblio: Optional[BiblioMetadata]) -> FilesetPlatformItem:
"""
Fetch platform-specific metadata for this request (eg, via API calls)
"""
@@ -397,13 +411,15 @@ class FigshareHelper(FilesetPlatformHelper):
(platform_id, dataset_version) = self.parse_figshare_url_path(components.path)
assert platform_id.isdigit(), f"expected numeric: {platform_id}"
- assert dataset_version and dataset_version.isdigit(), f"expected numeric: {dataset_version}"
+ assert dataset_version and dataset_version.isdigit(
+ ), f"expected numeric: {dataset_version}"
# 1b. if we didn't get a version number from URL, fetch it from API
# TODO: implement this code path
# 2. API fetch
- resp = self.session.get(f"https://api.figshare.com/v2/articles/{platform_id}/versions/{dataset_version}")
+ resp = self.session.get(
+ f"https://api.figshare.com/v2/articles/{platform_id}/versions/{dataset_version}")
resp.raise_for_status()
obj = resp.json()
@@ -412,18 +428,21 @@ class FigshareHelper(FilesetPlatformHelper):
if not obj['is_public']:
raise PlatformRestrictedError(f'record not public: {platform_id} {dataset_version}')
if obj['is_embargoed']:
- raise PlatformRestrictedError(f'record is embargoed: {obj.get("embargo_title")} ({platform_id} {dataset_version})')
+ raise PlatformRestrictedError(
+ f'record is embargoed: {obj.get("embargo_title")} ({platform_id} {dataset_version})'
+ )
manifest = []
for row in obj['files']:
- manifest.append(FilesetManifestFile(
- path=row['name'],
- size=row['size'],
- md5=row['computed_md5'],
- # NOTE: don't get: sha1, sha256, mimetype
- platform_url=row['download_url'],
- #extra=dict(),
- ))
+ manifest.append(
+ FilesetManifestFile(
+ path=row['name'],
+ size=row['size'],
+ md5=row['computed_md5'],
+ # NOTE: don't get: sha1, sha256, mimetype
+ platform_url=row['download_url'],
+ #extra=dict(),
+ ))
assert not row.get('is_link_only')
authors = []
@@ -451,18 +470,23 @@ class FigshareHelper(FilesetPlatformHelper):
platform_id=platform_id,
archiveorg_item_name=archiveorg_item_name,
archiveorg_item_meta=archiveorg_item_meta,
- web_bundle_url=f"https://ndownloader.figshare.com/articles/{platform_id}/versions/{dataset_version}",
+ web_bundle_url=
+ f"https://ndownloader.figshare.com/articles/{platform_id}/versions/{dataset_version}",
# TODO: web_base_url= (for GWB downloading, in lieu of platform_url on individual files)
extra=dict(version=dataset_version),
)
+
def test_parse_figshare_url_path():
valid = {
- "/articles/Optimized_protocol_to_isolate_high_quality_genomic_DNA_from_different_tissues_of_a_palm_species/8987858/1": ("8987858", "1"),
- "/articles/Optimized_protocol_to_isolate_high_quality_genomic_DNA_from_different_tissues_of_a_palm_species/8987858": ("8987858", None),
+ "/articles/Optimized_protocol_to_isolate_high_quality_genomic_DNA_from_different_tissues_of_a_palm_species/8987858/1":
+ ("8987858", "1"),
+ "/articles/Optimized_protocol_to_isolate_high_quality_genomic_DNA_from_different_tissues_of_a_palm_species/8987858":
+ ("8987858", None),
"/articles/CIBERSORT_p-value_0_05/8217188/1": ("8217188", "1"),
- "/articles/dataset/STable_1_U-Pb_geochronologic_analyses_on_samples_xls/12127176/4": ("12127176", "4"),
+ "/articles/dataset/STable_1_U-Pb_geochronologic_analyses_on_samples_xls/12127176/4":
+ ("12127176", "4"),
}
invalid = [
@@ -479,14 +503,15 @@ def test_parse_figshare_url_path():
except ValueError:
pass
-class ZenodoHelper(FilesetPlatformHelper):
+class ZenodoHelper(FilesetPlatformHelper):
def __init__(self):
super().__init__()
self.platform_name = 'zenodo'
self.session = requests.Session()
- def match_request(self, request: dict , resource: Optional[ResourceResult], html_biblio: Optional[BiblioMetadata]) -> bool:
+ def match_request(self, request: dict, resource: Optional[ResourceResult],
+ html_biblio: Optional[BiblioMetadata]) -> bool:
if resource and resource.terminal_url:
url = resource.terminal_url
@@ -499,7 +524,8 @@ class ZenodoHelper(FilesetPlatformHelper):
return True
return False
- def process_request(self, request: dict, resource: Optional[ResourceResult], html_biblio: Optional[BiblioMetadata]) -> FilesetPlatformItem:
+ def process_request(self, request: dict, resource: Optional[ResourceResult],
+ html_biblio: Optional[BiblioMetadata]) -> FilesetPlatformItem:
"""
Fetch platform-specific metadata for this request (eg, via API calls)
"""
@@ -535,12 +561,15 @@ class ZenodoHelper(FilesetPlatformHelper):
assert obj['id'] == int(platform_id)
work_id = obj['conceptrecid']
if work_id == obj['id']:
- raise PlatformScopeError("got a work-level zenodo record, not a versioned record: {work_id}")
+ raise PlatformScopeError(
+ "got a work-level zenodo record, not a versioned record: {work_id}")
zenodo_type = obj['metadata']['resource_type']['type']
if obj['metadata']['access_right'] != 'open':
- raise PlatformRestrictedError("not publicly available ({obj['metadata']['access_right']}): {platform_domain} {platform_id}")
+ raise PlatformRestrictedError(
+ "not publicly available ({obj['metadata']['access_right']}): {platform_domain} {platform_id}"
+ )
manifest = []
for row in obj['files']:
@@ -600,11 +629,9 @@ class ArchiveOrgHelper(FilesetPlatformHelper):
'RAR': 'application/vnd.rar',
'TAR': 'application/x-tar',
'7z': 'application/x-7z-compressed',
-
'HTML': 'text/html',
'Text': 'text/plain',
'PDF': 'application/pdf',
-
'CSV': 'text/csv',
'XML': 'application/xml',
'JSON': 'application/json',
@@ -613,17 +640,13 @@ class ArchiveOrgHelper(FilesetPlatformHelper):
#'application/vnd.openxmlformats-officedocument.wordprocessingml.document', # .docx
#'application/vnd.ms-excel', # .xls
#'application/vnd.openxmlformats-officedocument.spreadsheetml.sheet', # .xlsx
-
- 'MP3': 'audio/mpeg', # .mp3
-
- 'MP4': 'video/mp4', # .mp4
- 'MPEG': 'video/mpeg', # .mpeg
-
+ 'MP3': 'audio/mpeg', # .mp3
+ 'MP4': 'video/mp4', # .mp4
+ 'MPEG': 'video/mpeg', # .mpeg
'JPEG': 'image/jpeg',
'GIF': 'image/gif',
'PNG': 'image/png',
'TIFF': 'image/tiff',
-
'Unknown': None,
}
@@ -640,24 +663,27 @@ class ArchiveOrgHelper(FilesetPlatformHelper):
if f.source != 'original':
return False
for suffix in [
- '_meta.sqlite',
- '_archive.torrent',
- '_itemimage.jpg',
- '_meta.xml',
- '_thumb.png',
- '_files.xml',
+ '_meta.sqlite',
+ '_archive.torrent',
+ '_itemimage.jpg',
+ '_meta.xml',
+ '_thumb.png',
+ '_files.xml',
]:
if f.name == item_name + suffix or f.name == item_name.lower() + suffix:
return False
if f.name.startswith('_'):
return False
if item_name.startswith('academictorrents_'):
- for suffix in ['_academictorrents.torrent', '_academictorrents_torrent.txt', '.bib']:
+ for suffix in [
+ '_academictorrents.torrent', '_academictorrents_torrent.txt', '.bib'
+ ]:
if f.name == item_name + suffix:
return False
return True
- def match_request(self, request: dict , resource: Optional[ResourceResult], html_biblio: Optional[BiblioMetadata]) -> bool:
+ def match_request(self, request: dict, resource: Optional[ResourceResult],
+ html_biblio: Optional[BiblioMetadata]) -> bool:
if resource and resource.terminal_url:
url = resource.terminal_url
@@ -672,20 +698,23 @@ class ArchiveOrgHelper(FilesetPlatformHelper):
return True
return False
- def process_request(self, request: dict, resource: Optional[ResourceResult], html_biblio: Optional[BiblioMetadata]) -> FilesetPlatformItem:
+ def process_request(self, request: dict, resource: Optional[ResourceResult],
+ html_biblio: Optional[BiblioMetadata]) -> FilesetPlatformItem:
"""
Fetch platform-specific metadata for this request (eg, via API calls)
"""
base_url_split = request['base_url'].split('/')
#print(base_url_split, file=sys.stderr)
- assert len(base_url_split) in [5,6]
+ assert len(base_url_split) in [5, 6]
assert base_url_split[0] in ['http:', 'https:']
assert base_url_split[2] == 'archive.org'
assert base_url_split[3] in ['details', 'download']
item_name = base_url_split[4]
if len(base_url_split) == 6 and base_url_split[5]:
- raise PlatformScopeError("got an archive.org file path, not download/details page; individual files not handled yet")
+ raise PlatformScopeError(
+ "got an archive.org file path, not download/details page; individual files not handled yet"
+ )
#print(f" archiveorg processing item={item_name}", file=sys.stderr)
item = self.session.get_item(item_name)
diff --git a/python/sandcrawler/fileset_strategies.py b/python/sandcrawler/fileset_strategies.py
index c9f182c..6c25276 100644
--- a/python/sandcrawler/fileset_strategies.py
+++ b/python/sandcrawler/fileset_strategies.py
@@ -1,4 +1,3 @@
-
import gzip
import json
import os
@@ -10,15 +9,15 @@ from typing import Any, Dict, List, Optional, Tuple
import internetarchive
-from sandcrawler.fileset_types import (ArchiveStrategyResult, FilesetManifestFile, FilesetPlatformItem, IngestStrategy,
- PlatformScopeError)
+from sandcrawler.fileset_types import (ArchiveStrategyResult, FilesetManifestFile,
+ FilesetPlatformItem, IngestStrategy, PlatformScopeError)
from sandcrawler.html_metadata import BiblioMetadata
-from sandcrawler.ia import ResourceResult, SavePageNowClient, WaybackClient, fix_transfer_encoding
+from sandcrawler.ia import (ResourceResult, SavePageNowClient, WaybackClient,
+ fix_transfer_encoding)
from sandcrawler.misc import gen_file_metadata, gen_file_metadata_path, sanitize_fs_path
class FilesetIngestStrategy():
-
def __init__(self):
#self.ingest_strategy = 'unknown'
self.success_status = "success"
@@ -31,7 +30,6 @@ class FilesetIngestStrategy():
class ArchiveorgFilesetStrategy(FilesetIngestStrategy):
-
def __init__(self, **kwargs):
super().__init__()
self.ingest_strategy = IngestStrategy.ArchiveorgFileset
@@ -61,7 +59,9 @@ class ArchiveorgFilesetStrategy(FilesetIngestStrategy):
found = False
for existing in item_files:
if existing.name == wanted.path:
- if ((existing.sha1 and existing.sha1 == wanted.sha1) or (existing.md5 and existing.md5 == wanted.md5)) and existing.name == wanted.path and existing.size == wanted.size:
+ if ((existing.sha1 and existing.sha1 == wanted.sha1) or
+ (existing.md5 and existing.md5 == wanted.md5)
+ ) and existing.name == wanted.path and existing.size == wanted.size:
found = True
wanted.status = 'exists'
break
@@ -69,7 +69,9 @@ class ArchiveorgFilesetStrategy(FilesetIngestStrategy):
wanted.status = 'mismatch-existing'
break
if not found:
- print(f" item exists ({item.archiveorg_item_name}) but didn't find at least one file: {wanted.path}", file=sys.stderr)
+ print(
+ f" item exists ({item.archiveorg_item_name}) but didn't find at least one file: {wanted.path}",
+ file=sys.stderr)
return None
return ArchiveStrategyResult(
ingest_strategy=self.ingest_strategy,
@@ -108,10 +110,11 @@ class ArchiveorgFilesetStrategy(FilesetIngestStrategy):
if not os.path.exists(local_path):
print(f" downloading {m.path}", file=sys.stderr)
- with self.ia_session.get(m.platform_url, stream=True, allow_redirects=True) as r:
+ with self.ia_session.get(m.platform_url, stream=True,
+ allow_redirects=True) as r:
r.raise_for_status()
with open(local_path + '.partial', 'wb') as f:
- for chunk in r.iter_content(chunk_size=256*1024):
+ for chunk in r.iter_content(chunk_size=256 * 1024):
f.write(chunk)
os.rename(local_path + '.partial', local_path)
m.status = 'downloaded-local'
@@ -120,7 +123,8 @@ class ArchiveorgFilesetStrategy(FilesetIngestStrategy):
print(f" verifying {m.path}", file=sys.stderr)
file_meta = gen_file_metadata_path(local_path, allow_empty=True)
- assert file_meta['size_bytes'] == m.size, f"expected: {m.size} found: {file_meta['size_bytes']}"
+ assert file_meta[
+ 'size_bytes'] == m.size, f"expected: {m.size} found: {file_meta['size_bytes']}"
if m.sha1:
assert file_meta['sha1hex'] == m.sha1
@@ -142,7 +146,9 @@ class ArchiveorgFilesetStrategy(FilesetIngestStrategy):
if file_meta['mimetype'] != m.mimetype and file_meta['mimetype'] != 'text/plain':
# these 'tab-separated-values' from dataverse are just noise, don't log them
if m.mimetype != 'text/tab-separated-values':
- print(f" WARN: mimetype mismatch: expected {m.mimetype}, found {file_meta['mimetype']}", file=sys.stderr)
+ print(
+ f" WARN: mimetype mismatch: expected {m.mimetype}, found {file_meta['mimetype']}",
+ file=sys.stderr)
m.mimetype = file_meta['mimetype']
else:
m.mimetype = file_meta['mimetype']
@@ -158,7 +164,9 @@ class ArchiveorgFilesetStrategy(FilesetIngestStrategy):
'remote_name': m.path,
})
- print(f" uploading all files to {item.archiveorg_item_name} under {item.archiveorg_item_meta.get('collection')}...", file=sys.stderr)
+ print(
+ f" uploading all files to {item.archiveorg_item_name} under {item.archiveorg_item_meta.get('collection')}...",
+ file=sys.stderr)
internetarchive.upload(
item.archiveorg_item_name,
files=item_files,
@@ -183,25 +191,26 @@ class ArchiveorgFilesetStrategy(FilesetIngestStrategy):
return result
+
class ArchiveorgFileStrategy(ArchiveorgFilesetStrategy):
"""
ArchiveorgFilesetStrategy currently works fine with individual files. Just
need to over-ride the ingest_strategy name.
"""
-
def __init__(self):
super().__init__()
self.ingest_strategy = IngestStrategy.ArchiveorgFileset
self.success_status = "success-file"
-class WebFilesetStrategy(FilesetIngestStrategy):
+class WebFilesetStrategy(FilesetIngestStrategy):
def __init__(self, **kwargs):
super().__init__()
self.ingest_strategy = IngestStrategy.WebFileset
self.wayback_client = WaybackClient()
self.try_spn2 = True
- self.spn_client = SavePageNowClient(spn_cdx_retry_sec=kwargs.get('spn_cdx_retry_sec', 9.0))
+ self.spn_client = SavePageNowClient(
+ spn_cdx_retry_sec=kwargs.get('spn_cdx_retry_sec', 9.0))
self.max_spn_manifest = 20
def process(self, item: FilesetPlatformItem) -> ArchiveStrategyResult:
@@ -218,23 +227,26 @@ class WebFilesetStrategy(FilesetIngestStrategy):
for m in item.manifest:
fetch_url = m.platform_url
if not fetch_url:
- raise NotImplementedError("require 'platform_url' for each file when doing Web fetching")
+ raise NotImplementedError(
+ "require 'platform_url' for each file when doing Web fetching")
via = "wayback"
resource = self.wayback_client.lookup_resource(fetch_url, m.mimetype)
- if self.try_spn2 and (resource == None or (resource and resource.status == 'no-capture')):
+ if self.try_spn2 and (resource == None or
+ (resource and resource.status == 'no-capture')):
if len(item.manifest) > self.max_spn_manifest:
m.status = 'too-much-spn'
continue
via = "spn2"
- resource = self.spn_client.crawl_resource(fetch_url, self.wayback_client, force_simple_get=True)
+ resource = self.spn_client.crawl_resource(fetch_url,
+ self.wayback_client,
+ force_simple_get=True)
- print("[FETCH {:>6}] {} {}".format(
- via,
- (resource and resource.status),
- (resource and resource.terminal_url) or fetch_url),
- file=sys.stderr)
+ print("[FETCH {:>6}] {} {}".format(via, (resource and resource.status),
+ (resource and resource.terminal_url)
+ or fetch_url),
+ file=sys.stderr)
m.terminal_url = resource.terminal_url
m.terminal_dt = resource.terminal_dt
@@ -251,9 +263,11 @@ class WebFilesetStrategy(FilesetIngestStrategy):
file_meta, html_resource = fix_transfer_encoding(file_meta, resource)
if self.ingest_strategy == "web-file":
- file_file_meta = file_meta
+ file_file_meta = file_meta
- if file_meta['size_bytes'] != m.size or (m.md5 and m.md5 != file_meta['md5hex']) or (m.sha1 and m.sha1 != file_meta['sha1hex']):
+ if file_meta['size_bytes'] != m.size or (m.md5 and m.md5 != file_meta['md5hex']
+ ) or (m.sha1
+ and m.sha1 != file_meta['sha1hex']):
m.status = 'mismatch'
continue
@@ -280,8 +294,8 @@ class WebFilesetStrategy(FilesetIngestStrategy):
result.file_resource = file_resource
return result
-class WebFileStrategy(WebFilesetStrategy):
+class WebFileStrategy(WebFilesetStrategy):
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.ingest_strategy = IngestStrategy.WebFile
diff --git a/python/sandcrawler/fileset_types.py b/python/sandcrawler/fileset_types.py
index 8ea136e..606af07 100644
--- a/python/sandcrawler/fileset_types.py
+++ b/python/sandcrawler/fileset_types.py
@@ -1,4 +1,3 @@
-
from enum import Enum
from typing import Any, Dict, List, Optional, Tuple
@@ -13,6 +12,7 @@ class IngestStrategy(str, Enum):
ArchiveorgFileset = "archiveorg-fileset"
ArchiveorgFilesetBundled = "archiveorg-fileset-bundled"
+
class FilesetManifestFile(BaseModel):
path: str
size: Optional[int]
@@ -27,6 +27,7 @@ class FilesetManifestFile(BaseModel):
terminal_url: Optional[str]
terminal_dt: Optional[str]
+
class FilesetPlatformItem(BaseModel):
platform_name: str
platform_status: str
@@ -39,6 +40,7 @@ class FilesetPlatformItem(BaseModel):
web_base_url: Optional[str]
web_bundle_url: Optional[str]
+
class ArchiveStrategyResult(BaseModel):
ingest_strategy: str
status: str
@@ -49,6 +51,7 @@ class ArchiveStrategyResult(BaseModel):
bundle_resource: Optional[Any]
bundle_archiveorg_path: Optional[str]
+
class PlatformScopeError(Exception):
"""
For incidents where platform helper discovers that the fileset/dataset is
@@ -61,6 +64,7 @@ class PlatformScopeError(Exception):
"""
pass
+
class PlatformRestrictedError(Exception):
"""
When datasets are not publicly available on a platform (yet)
diff --git a/python/sandcrawler/grobid.py b/python/sandcrawler/grobid.py
index 5242b3a..16bbb01 100644
--- a/python/sandcrawler/grobid.py
+++ b/python/sandcrawler/grobid.py
@@ -1,4 +1,3 @@
-
import requests
from grobid2json import teixml2json
@@ -8,7 +7,6 @@ from .workers import SandcrawlerFetchWorker, SandcrawlerWorker
class GrobidClient(object):
-
def __init__(self, host_url="http://grobid.qa.fatcat.wiki", **kwargs):
self.host_url = host_url
self.consolidate_mode = int(kwargs.get('consolidate_mode', 0))
@@ -34,7 +32,7 @@ class GrobidClient(object):
files={
'input': blob,
'consolidateHeader': self.consolidate_mode,
- 'consolidateCitations': 0, # too expensive for now
+ 'consolidateCitations': 0, # too expensive for now
'includeRawCitations': 1,
},
timeout=180.0,
@@ -46,9 +44,7 @@ class GrobidClient(object):
'error_msg': 'GROBID request (HTTP POST) timeout',
}
- info = dict(
- status_code=grobid_response.status_code,
- )
+ info = dict(status_code=grobid_response.status_code, )
if grobid_response.status_code == 200:
info['status'] = 'success'
info['tei_xml'] = grobid_response.text
@@ -56,7 +52,8 @@ class GrobidClient(object):
# XML is larger than Kafka message size, and much larger than
# an article in general; bail out
info['status'] = 'error'
- info['error_msg'] = "response XML too large: {} bytes".format(len(info['tei_xml']))
+ info['error_msg'] = "response XML too large: {} bytes".format(
+ len(info['tei_xml']))
info.pop('tei_xml')
else:
# response.text is .content decoded as utf-8
@@ -70,7 +67,13 @@ class GrobidClient(object):
tei_json = teixml2json(result['tei_xml'], encumbered=False)
meta = dict()
biblio = dict()
- for k in ('title', 'authors', 'journal', 'date', 'doi', ):
+ for k in (
+ 'title',
+ 'authors',
+ 'journal',
+ 'date',
+ 'doi',
+ ):
if tei_json.get(k):
biblio[k] = tei_json[k]
meta['biblio'] = biblio
@@ -79,8 +82,8 @@ class GrobidClient(object):
meta[k] = tei_json[k]
return meta
-class GrobidWorker(SandcrawlerFetchWorker):
+class GrobidWorker(SandcrawlerFetchWorker):
def __init__(self, grobid_client, wayback_client=None, sink=None, **kwargs):
super().__init__(wayback_client=wayback_client)
self.grobid_client = grobid_client
@@ -104,18 +107,19 @@ class GrobidWorker(SandcrawlerFetchWorker):
return fetch_result
blob = fetch_result['blob']
- result = self.grobid_client.process_fulltext(blob, consolidate_mode=self.consolidate_mode)
+ result = self.grobid_client.process_fulltext(blob,
+ consolidate_mode=self.consolidate_mode)
result['file_meta'] = gen_file_metadata(blob)
result['source'] = record
result['key'] = result['file_meta']['sha1hex']
return result
+
class GrobidBlobWorker(SandcrawlerWorker):
"""
This is sort of like GrobidWorker, except it receives blobs directly,
instead of fetching blobs from some remote store.
"""
-
def __init__(self, grobid_client, sink=None, **kwargs):
super().__init__()
self.grobid_client = grobid_client
@@ -125,8 +129,8 @@ class GrobidBlobWorker(SandcrawlerWorker):
def process(self, blob, key=None):
if not blob:
return None
- result = self.grobid_client.process_fulltext(blob, consolidate_mode=self.consolidate_mode)
+ result = self.grobid_client.process_fulltext(blob,
+ consolidate_mode=self.consolidate_mode)
result['file_meta'] = gen_file_metadata(blob)
result['key'] = result['file_meta']['sha1hex']
return result
-
diff --git a/python/sandcrawler/html.py b/python/sandcrawler/html.py
index 6bdebdd..a44fc67 100644
--- a/python/sandcrawler/html.py
+++ b/python/sandcrawler/html.py
@@ -1,4 +1,3 @@
-
import json
import re
import sys
@@ -6,7 +5,8 @@ import urllib.parse
from bs4 import BeautifulSoup
-RESEARCHSQUARE_REGEX = re.compile(r'"url":"(https://assets.researchsquare.com/files/.{1,50}/v\d+/Manuscript.pdf)"')
+RESEARCHSQUARE_REGEX = re.compile(
+ r'"url":"(https://assets.researchsquare.com/files/.{1,50}/v\d+/Manuscript.pdf)"')
IEEEXPLORE_REGEX = re.compile(r'"pdfPath":"(/.*?\.pdf)"')
OVID_JOURNAL_URL_REGEX = re.compile(r'journalURL = "(http.*)";')
SCIENCEDIRECT_BOUNCE_URL_REGEX = re.compile(r"window.location = '(http.*)';")
@@ -33,16 +33,16 @@ def extract_fulltext_url(html_url, html_body):
### General Tricks ###
# highwire-style meta tag
- meta = soup.find('meta', attrs={"name":"citation_pdf_url"})
+ meta = soup.find('meta', attrs={"name": "citation_pdf_url"})
if not meta:
- meta = soup.find('meta', attrs={"name":"bepress_citation_pdf_url"})
+ meta = soup.find('meta', attrs={"name": "bepress_citation_pdf_url"})
if not meta:
- meta = soup.find('meta', attrs={"name":"wkhealth_pdf_url"})
+ meta = soup.find('meta', attrs={"name": "wkhealth_pdf_url"})
if not meta:
# researchgate does this; maybe others also?
- meta = soup.find('meta', attrs={"property":"citation_pdf_url"})
+ meta = soup.find('meta', attrs={"property": "citation_pdf_url"})
if not meta:
- meta = soup.find('meta', attrs={"name":"eprints.document_url"})
+ meta = soup.find('meta', attrs={"name": "eprints.document_url"})
# if tag is only partially populated
if meta and not meta.get('content'):
meta = None
@@ -52,10 +52,10 @@ def extract_fulltext_url(html_url, html_body):
if '://doi.org/' in url:
print(f"\tdoi.org in citation_pdf_url (loop?): {url}", file=sys.stderr)
elif url.startswith('/'):
- if host_prefix+url == html_url:
+ if host_prefix + url == html_url:
print(f"\tavoiding citation_pdf_url link-loop", file=sys.stderr)
else:
- return dict(pdf_url=host_prefix+url, technique='citation_pdf_url')
+ return dict(pdf_url=host_prefix + url, technique='citation_pdf_url')
elif url.startswith('http'):
if url == html_url:
print(f"\tavoiding citation_pdf_url link-loop", file=sys.stderr)
@@ -64,7 +64,7 @@ def extract_fulltext_url(html_url, html_body):
else:
print("\tmalformed citation_pdf_url? {}".format(url), file=sys.stderr)
- meta = soup.find('meta', attrs={"name":"generator"})
+ meta = soup.find('meta', attrs={"name": "generator"})
meta_generator = None
if meta and meta.get('content'):
meta_generator = meta['content'].strip()
@@ -105,7 +105,8 @@ def extract_fulltext_url(html_url, html_body):
json_meta = json.loads(json_text)
pdf_meta = json_meta['article']['pdfDownload']['urlMetadata']
# https://www.sciencedirect.com/science/article/pii/S0169204621000670/pdfft?md5=c4a83d06b334b627ded74cf9423bfa56&pid=1-s2.0-S0169204621000670-main.pdf
- url = html_url + pdf_meta['pdfExtension'] + "?md5=" + pdf_meta['queryParams']['md5'] + "&pid=" + pdf_meta['queryParams']['pid']
+ url = html_url + pdf_meta['pdfExtension'] + "?md5=" + pdf_meta['queryParams'][
+ 'md5'] + "&pid=" + pdf_meta['queryParams']['pid']
except (KeyError, TypeError, json.JSONDecodeError):
pass
if url:
@@ -130,7 +131,9 @@ def extract_fulltext_url(html_url, html_body):
if m:
url = m.group(1)
assert len(url) < 4096
- return dict(release_stage="published", pdf_url=host_prefix+url, technique="ieeexplore")
+ return dict(release_stage="published",
+ pdf_url=host_prefix + url,
+ technique="ieeexplore")
# https://ieeexplore.ieee.org/stamp/stamp.jsp?arnumber=8730313
if '://ieeexplore.ieee.org/stamp/stamp.jsp?arnumber' in html_url:
# HTML iframe like:
@@ -172,11 +175,12 @@ def extract_fulltext_url(html_url, html_body):
'://thesiscommons.org/',
]
for domain in OSF_DOMAINS:
- if domain in html_url and (len(html_url.split('/')) in [4,5] or '/preprints/' in html_url) and '/download' not in html_url:
+ if domain in html_url and (len(html_url.split('/')) in [4, 5] or '/preprints/'
+ in html_url) and '/download' not in html_url:
if not html_url.endswith("/"):
- next_url = html_url+"/download"
+ next_url = html_url + "/download"
else:
- next_url = html_url+"download"
+ next_url = html_url + "download"
return dict(next_url=next_url, technique='osf-by-url')
# wiley
@@ -199,14 +203,14 @@ def extract_fulltext_url(html_url, html_body):
url = html_url.replace("/doi/10.", "/doi/pdf/10.")
return dict(pdf_url=url, technique='archivist-url')
# <a href="/doi/pdf/10.17723/aarc.62.2.j475270470145630" target="_blank">
- hrefs = soup.find_all('a', attrs={"target":"_blank"})
+ hrefs = soup.find_all('a', attrs={"target": "_blank"})
for href in hrefs:
url = href['href'].strip()
if "/doi/pdf/" in url:
if url.startswith('http'):
return dict(pdf_url=url, technique='publisher-href')
elif url.startswith('/'):
- return dict(pdf_url=host_prefix+url, technique='publisher-href')
+ return dict(pdf_url=host_prefix + url, technique='publisher-href')
# protocols.io
# https://www.protocols.io/view/flow-cytometry-protocol-mgdc3s6
@@ -248,7 +252,8 @@ def extract_fulltext_url(html_url, html_body):
if "://ehp.niehs.nih.gov/doi/" in html_url:
# <a href="/doi/pdf/10.1289/EHP4709" target="_blank">
if b'/doi/pdf/10.' in html_body:
- url = html_url.replace('/doi/full/10.', '/doi/pdf/10.').replace('/doi/10.', '/doi/pdf/10.')
+ url = html_url.replace('/doi/full/10.',
+ '/doi/pdf/10.').replace('/doi/10.', '/doi/pdf/10.')
return dict(pdf_url=url, technique='ehp.niehs.nigh.gov-url')
# cogentoa.com
@@ -275,7 +280,7 @@ def extract_fulltext_url(html_url, html_body):
# http://en.gzbd.cnki.net/gzbt/detail/detail.aspx?FileName=HBGF202002003&DbName=GZBJ7920&DbCode=GZBJ
if '://en.gzbd.cnki.net/KCMS/detail/detail.aspx' in html_url:
# <a onclick="WriteKrsDownLog()" target="_blank" id="pdfDown" name="pdfDown" href="/gzbt/download.aspx?filename=4Q1ZYpFdKFUZ6FDR1QkRrolayRXV2ZzattyQ3QFa2JXTyZXUSV3QRFkbndzaGV2KyJXWZVEbFdVYnZndD9EOxg1Tj5Eeys2SMFzLZ5kcuFkM3dEbsR2ZjxEaShVdJhFdp90KhlVVzcjVVlXUVNHWBtWS5Rlb5cnc&amp;tablename=GZBJLAST2020&amp;dflag=pdfdown&#xA; "><i></i>PDF Download</a>
- href = soup.find('a', attrs={"id":"pdfDown"})
+ href = soup.find('a', attrs={"id": "pdfDown"})
if href:
url = href['href'].strip().replace('&#xA;', '')
if not url.startswith('http'):
@@ -300,7 +305,7 @@ def extract_fulltext_url(html_url, html_body):
# OJS 3 (some)
if meta_generator and meta_generator.startswith("Open Journal Systems"):
- href = soup.find('a', attrs={"class":"obj_galley_link file"})
+ href = soup.find('a', attrs={"class": "obj_galley_link file"})
if href and href.text and "pdf" in href.text.lower():
url = href['href'].strip()
if url.startswith('/'):
@@ -329,13 +334,15 @@ def extract_fulltext_url(html_url, html_body):
return dict()
+
def test_regex():
lines = """
blah
var journalURL = "https://journals.lww.com/co-urology/fulltext/10.1097/MOU.0000000000000689";
asdf"""
m = OVID_JOURNAL_URL_REGEX.search(lines)
- assert m.group(1) == "https://journals.lww.com/co-urology/fulltext/10.1097/MOU.0000000000000689"
+ assert m.group(
+ 1) == "https://journals.lww.com/co-urology/fulltext/10.1097/MOU.0000000000000689"
lines = """
window.onload = function () {
diff --git a/python/sandcrawler/html_metadata.py b/python/sandcrawler/html_metadata.py
index c6725dc..6d27a3a 100644
--- a/python/sandcrawler/html_metadata.py
+++ b/python/sandcrawler/html_metadata.py
@@ -1,4 +1,3 @@
-
import datetime
import sys
import urllib.parse
@@ -31,9 +30,7 @@ HEAD_META_PATTERNS: Any = {
"meta[name='dcterms.title']",
"meta[name='dc.title']",
],
- "subtitle": [
- "meta[name='prism.subtitle']",
- ],
+ "subtitle": ["meta[name='prism.subtitle']", ],
"doi": [
"meta[name='citation_doi']",
"meta[name='DOI']",
@@ -43,9 +40,7 @@ HEAD_META_PATTERNS: Any = {
"meta[name='dc.identifier.doi']",
"meta[name='dc.identifier'][scheme='doi']",
],
- "pmid": [
- "meta[name='citation_pmid']",
- ],
+ "pmid": ["meta[name='citation_pmid']", ],
"abstract": [
"meta[name='citation_abstract']",
"meta[name='bepress_citation_abstract']",
@@ -66,9 +61,7 @@ HEAD_META_PATTERNS: Any = {
"meta[name='dc.source']",
"meta[property='og:site_name']",
],
- "container_abbrev": [
- "meta[name='citation_journal_abbrev']",
- ],
+ "container_abbrev": ["meta[name='citation_journal_abbrev']", ],
"raw_date": [
"meta[name='citation_publication_date']",
"meta[name='bepress_citation_publication_date']",
@@ -169,9 +162,7 @@ HEAD_META_LIST_PATTERNS: Any = {
"meta[name='dc.contributor']",
],
# TODO: citation_author_institution
- "raw_references": [
- "meta[name='citation_reference']",
- ],
+ "raw_references": ["meta[name='citation_reference']", ],
"raw_identifiers": [
"meta[name='eprints.id_number']",
"meta[name='dcterms.identifier']",
@@ -260,7 +251,7 @@ HTML_FULLTEXT_PATTERNS: List[dict] = [
COMPONENT_FULLTEXT_PATTERNS: List[dict] = [
{
- "in_doc_url": "pensoft.net/article/", # also /element/
+ "in_doc_url": "pensoft.net/article/", # also /element/
"in_fulltext_url": "/download/fig/",
"selector": ".Main-Content .figure a.P-Article-Preview-Picture-Download-Small",
"attr": "href",
@@ -652,12 +643,11 @@ class BiblioMetadata(pydantic.BaseModel):
component_url: Optional[str]
class Config:
- json_encoders = {
- datetime.date: lambda dt: dt.isoformat()
- }
+ json_encoders = {datetime.date: lambda dt: dt.isoformat()}
-def html_extract_fulltext_url(doc_url: str, doc: HTMLParser, patterns: List[dict]) -> Optional[Tuple[str, str]]:
+def html_extract_fulltext_url(doc_url: str, doc: HTMLParser,
+ patterns: List[dict]) -> Optional[Tuple[str, str]]:
"""
Tries to quickly extract fulltext URLs using a set of patterns. This
function is intendend to be generic across various extraction techniques.
@@ -701,6 +691,7 @@ def html_extract_fulltext_url(doc_url: str, doc: HTMLParser, patterns: List[dict
return self_doc_url
return None
+
def html_extract_biblio(doc_url: str, doc: HTMLParser) -> Optional[BiblioMetadata]:
meta: Any = dict()
@@ -772,6 +763,7 @@ def html_extract_biblio(doc_url: str, doc: HTMLParser) -> Optional[BiblioMetadat
return BiblioMetadata(**meta)
+
def load_adblock_rules() -> braveblock.Adblocker:
"""
TODO: consider blocking very generic assets:
@@ -838,7 +830,8 @@ def _extract_generic(doc: HTMLParser, selector: str, attrs: List[str], type_name
return resources
-def html_extract_resources(doc_url: str, doc: HTMLParser, adblock: braveblock.Adblocker) -> list:
+def html_extract_resources(doc_url: str, doc: HTMLParser,
+ adblock: braveblock.Adblocker) -> list:
"""
This function tries to find all the important resources in a page. The
presumption is that the HTML document is article fulltext, and we want the
@@ -869,10 +862,12 @@ def html_extract_resources(doc_url: str, doc: HTMLParser, adblock: braveblock.Ad
r['url'] = urllib.parse.urljoin(doc_url, r['url'])
# filter using adblocker
- resources = [r for r in resources if adblock.check_network_urls(r['url'], source_url=doc_url, request_type=r['type']) == False]
+ resources = [
+ r for r in resources if adblock.check_network_urls(
+ r['url'], source_url=doc_url, request_type=r['type']) == False
+ ]
# remove duplicates
resources = [dict(t) for t in {tuple(d.items()) for d in resources}]
return resources
-
diff --git a/python/sandcrawler/ia.py b/python/sandcrawler/ia.py
index ca1182f..a8ce193 100644
--- a/python/sandcrawler/ia.py
+++ b/python/sandcrawler/ia.py
@@ -1,4 +1,3 @@
-
# XXX: some broken MRO thing going on in here due to python3 object wrangling
# in `wayback` library. Means we can't run pylint.
# pylint: skip-file
@@ -38,6 +37,7 @@ class SandcrawlerBackoffError(Exception):
"""
pass
+
ResourceResult = namedtuple("ResourceResult", [
"start_url",
"hit",
@@ -80,6 +80,7 @@ CdxPartial = namedtuple('CdxPartial', [
'sha1hex',
])
+
def cdx_partial_from_row(full):
return CdxPartial(
surt=full.surt,
@@ -91,6 +92,7 @@ def cdx_partial_from_row(full):
sha1hex=full.sha1hex,
)
+
def cdx_to_dict(cdx):
d = {
"surt": cdx.surt,
@@ -107,6 +109,7 @@ def cdx_to_dict(cdx):
d['warc_path'] = cdx.warc_path
return d
+
def fuzzy_match_url(left, right):
"""
Matches URLs agnostic of http/https (and maybe other normalizations in the
@@ -123,6 +126,7 @@ def fuzzy_match_url(left, right):
return True
return False
+
def test_fuzzy_match_url():
assert fuzzy_match_url("http://thing.com", "http://thing.com") == True
assert fuzzy_match_url("http://thing.com", "https://thing.com") == True
@@ -137,18 +141,19 @@ def test_fuzzy_match_url():
assert fuzzy_match_url("http://www.thing.com", "http://www2.thing.com") == False
assert fuzzy_match_url("http://www.thing.com", "https://www2.thing.com") == False
+
class CdxApiError(Exception):
pass
-class CdxApiClient:
+class CdxApiClient:
def __init__(self, host_url="https://web.archive.org/cdx/search/cdx", **kwargs):
self.host_url = host_url
self.http_session = requests_retry_session(retries=3, backoff_factor=3)
- cdx_auth_token = kwargs.get('cdx_auth_token',
- os.environ.get('CDX_AUTH_TOKEN'))
+ cdx_auth_token = kwargs.get('cdx_auth_token', os.environ.get('CDX_AUTH_TOKEN'))
if not cdx_auth_token:
- raise Exception("CDX auth token required (as parameter or environment variable CDX_AUTH_TOKEN)")
+ raise Exception(
+ "CDX auth token required (as parameter or environment variable CDX_AUTH_TOKEN)")
self.http_session.headers.update({
'User-Agent': 'Mozilla/5.0 sandcrawler.CdxApiClient',
'Cookie': 'cdx_auth_token={}'.format(cdx_auth_token),
@@ -208,7 +213,8 @@ class CdxApiClient:
found, because we expect to be looking up a specific full record.
"""
if len(datetime) != 14:
- raise ValueError("CDX fetch requires full 14 digit timestamp. Got: {}".format(datetime))
+ raise ValueError(
+ "CDX fetch requires full 14 digit timestamp. Got: {}".format(datetime))
params = {
'url': url,
'from': datetime,
@@ -226,18 +232,28 @@ class CdxApiClient:
if retry_sleep > 3:
next_sleep = retry_sleep - 3
retry_sleep = 3
- print(" CDX fetch failed; will sleep {}sec and try again".format(retry_sleep), file=sys.stderr)
+ print(" CDX fetch failed; will sleep {}sec and try again".format(retry_sleep),
+ file=sys.stderr)
time.sleep(retry_sleep)
- return self.fetch(url, datetime, filter_status_code=filter_status_code, retry_sleep=next_sleep)
+ return self.fetch(url,
+ datetime,
+ filter_status_code=filter_status_code,
+ retry_sleep=next_sleep)
raise KeyError("CDX url/datetime not found: {} {}".format(url, datetime))
row = resp[0]
# allow fuzzy http/https match
if not (fuzzy_match_url(row.url, url) and row.datetime == datetime):
if retry_sleep and retry_sleep > 0:
- print(" CDX fetch failed; will sleep {}sec and try again".format(retry_sleep), file=sys.stderr)
+ print(" CDX fetch failed; will sleep {}sec and try again".format(retry_sleep),
+ file=sys.stderr)
time.sleep(retry_sleep)
- return self.fetch(url, datetime, filter_status_code=filter_status_code, retry_sleep=None)
- raise KeyError("Didn't get exact CDX url/datetime match. url:{} dt:{} got:{}".format(url, datetime, row))
+ return self.fetch(url,
+ datetime,
+ filter_status_code=filter_status_code,
+ retry_sleep=None)
+ raise KeyError(
+ "Didn't get exact CDX url/datetime match. url:{} dt:{} got:{}".format(
+ url, datetime, row))
if filter_status_code:
assert row.status_code == filter_status_code
return row
@@ -311,17 +327,20 @@ class CdxApiClient:
class WaybackError(Exception):
pass
+
class WaybackContentError(Exception):
pass
+
class PetaboxError(Exception):
pass
+
class NoCaptureError(Exception):
pass
-class WaybackClient:
+class WaybackClient:
def __init__(self, cdx_client=None, **kwargs):
if cdx_client:
self.cdx_client = cdx_client
@@ -367,32 +386,42 @@ class WaybackClient:
if not self.petabox_webdata_secret:
raise Exception("WaybackClient needs petabox secret to do direct WARC fetches")
if not "/" in warc_path:
- raise ValueError("what looks like a liveweb/SPN temporary warc path: {}".format(warc_path))
+ raise ValueError(
+ "what looks like a liveweb/SPN temporary warc path: {}".format(warc_path))
warc_uri = self.warc_uri_prefix + warc_path
if not self.rstore:
- self.rstore = ResourceStore(loaderfactory=CDXLoaderFactory3(
- webdata_secret=self.petabox_webdata_secret,
- ))
+ self.rstore = ResourceStore(
+ loaderfactory=CDXLoaderFactory3(webdata_secret=self.petabox_webdata_secret, ))
try:
#print("offset: {} csize: {} uri: {}".format(offset, csize, warc_uri), file=sys.stderr)
gwb_record = self.rstore.load_resource(warc_uri, offset, csize)
except wayback.exception.ResourceUnavailable:
print(" Failed to fetch from warc_path:{}".format(warc_path), file=sys.stderr)
- raise PetaboxError("failed to load file contents from wayback/petabox (ResourceUnavailable)")
+ raise PetaboxError(
+ "failed to load file contents from wayback/petabox (ResourceUnavailable)")
except wayback.exception.InvalidResource:
print(" Failed to fetch from warc_path:{}".format(warc_path), file=sys.stderr)
- raise WaybackContentError("failed to load file contents from wayback/petabox (InvalidResource)")
+ raise WaybackContentError(
+ "failed to load file contents from wayback/petabox (InvalidResource)")
except urllib3.exceptions.ReadTimeoutError as rte:
- raise PetaboxError("failed to load file contents from wayback/petabox (ReadTimeoutError: {})".format(rte))
+ raise PetaboxError(
+ "failed to load file contents from wayback/petabox (ReadTimeoutError: {})".
+ format(rte))
except ValueError as ve:
- raise PetaboxError("failed to load file contents from wayback/petabox (ValueError: {})".format(ve))
+ raise PetaboxError(
+ "failed to load file contents from wayback/petabox (ValueError: {})".format(ve))
except EOFError as eofe:
- raise PetaboxError("failed to load file contents from wayback/petabox (EOFError: {})".format(eofe))
+ raise PetaboxError(
+ "failed to load file contents from wayback/petabox (EOFError: {})".format(eofe))
except TypeError as te:
- raise PetaboxError("failed to load file contents from wayback/petabox (TypeError: {}; likely a bug in wayback python code)".format(te))
+ raise PetaboxError(
+ "failed to load file contents from wayback/petabox (TypeError: {}; likely a bug in wayback python code)"
+ .format(te))
except Exception as e:
if "while decompressing data: invalid block type" in str(e):
- raise PetaboxError("decompression error fetching WARC record; usually due to bad alexa ARC files")
+ raise PetaboxError(
+ "decompression error fetching WARC record; usually due to bad alexa ARC files"
+ )
else:
raise e
# Note: could consider a generic "except Exception" here, as we get so
@@ -405,7 +434,8 @@ class WaybackClient:
raise WaybackContentError("too many HTTP headers (in wayback fetch)")
location = gwb_record.get_location() or None
- if status_code is None and gwb_record.target_uri.startswith(b"ftp://") and not gwb_record.is_revisit():
+ if status_code is None and gwb_record.target_uri.startswith(
+ b"ftp://") and not gwb_record.is_revisit():
# TODO: some additional verification here?
status_code = 226
@@ -416,8 +446,9 @@ class WaybackClient:
raise WaybackContentError("found revisit record, but won't resolve (loop?)")
revisit_uri, revisit_dt = gwb_record.refers_to
if not (revisit_uri and revisit_dt):
- raise WaybackContentError("revisit record missing URI and/or DT: warc:{} offset:{}".format(
- warc_path, offset))
+ raise WaybackContentError(
+ "revisit record missing URI and/or DT: warc:{} offset:{}".format(
+ warc_path, offset))
# convert revisit_dt
# len("2018-07-24T11:56:49"), or with "Z"
assert len(revisit_dt) in (19, 20)
@@ -425,7 +456,9 @@ class WaybackClient:
revisit_uri = revisit_uri.decode('utf-8')
if type(revisit_dt) is bytes:
revisit_dt = revisit_dt.decode('utf-8')
- revisit_dt = revisit_dt.replace('-', '').replace(':', '').replace('T', '').replace('Z', '')
+ revisit_dt = revisit_dt.replace('-', '').replace(':',
+ '').replace('T',
+ '').replace('Z', '')
assert len(revisit_dt) == 14
try:
revisit_cdx = self.cdx_client.fetch(revisit_uri, revisit_dt)
@@ -443,10 +476,10 @@ class WaybackClient:
body = gwb_record.open_raw_content().read()
except IncompleteRead as ire:
raise WaybackError(
- "failed to read actual file contents from wayback/petabox (IncompleteRead: {})".format(ire))
+ "failed to read actual file contents from wayback/petabox (IncompleteRead: {})"
+ .format(ire))
elif status_code is None:
- raise WaybackContentError(
- "got a None status_code in (W)ARC record")
+ raise WaybackContentError("got a None status_code in (W)ARC record")
return WarcResource(
status_code=status_code,
location=location,
@@ -454,7 +487,12 @@ class WaybackClient:
revisit_cdx=revisit_cdx,
)
- def fetch_petabox_body(self, csize, offset, warc_path, resolve_revisit=True, expected_status_code=None):
+ def fetch_petabox_body(self,
+ csize,
+ offset,
+ warc_path,
+ resolve_revisit=True,
+ expected_status_code=None):
"""
Fetches HTTP 200 WARC resource directly from petabox using WARC path/offset/csize.
@@ -474,12 +512,10 @@ class WaybackClient:
raise KeyError("archived HTTP response (WARC) was not {}: {}".format(
expected_status_code,
resource.status_code,
- )
- )
+ ))
elif resource.status_code not in (200, 226):
raise KeyError("archived HTTP response (WARC) was not 200: {}".format(
- resource.status_code)
- )
+ resource.status_code))
return resource.body
@@ -514,7 +550,9 @@ class WaybackClient:
except requests.exceptions.ChunkedEncodingError:
raise WaybackError("ChunkedEncodingError (wayback replay fetch)")
except UnicodeDecodeError:
- raise WaybackContentError("UnicodeDecodeError in replay request (can mean nasty redirect URL): {}".format(url))
+ raise WaybackContentError(
+ "UnicodeDecodeError in replay request (can mean nasty redirect URL): {}".format(
+ url))
try:
resp.raise_for_status()
@@ -526,21 +564,20 @@ class WaybackClient:
if not "X-Archive-Src" in resp.headers:
raise WaybackError("replay fetch didn't return X-Archive-Src in headers")
if not datetime in resp.url:
- raise WaybackError("didn't get exact reply (redirect?) datetime:{} got:{}".format(datetime, resp.url))
+ raise WaybackError("didn't get exact reply (redirect?) datetime:{} got:{}".format(
+ datetime, resp.url))
if cdx_sha1hex:
# verify that body matches CDX hash
# TODO: don't need *all* these hashes, just sha1
file_meta = gen_file_metadata(resp.content)
if cdx_sha1hex != file_meta['sha1hex']:
- print(" REPLAY MISMATCH: cdx:{} replay:{}".format(
- cdx_sha1hex,
- file_meta['sha1hex']),
- file=sys.stderr)
- raise WaybackContentError("replay fetch body didn't match CDX hash cdx:{} body:{}".format(
- cdx_sha1hex,
- file_meta['sha1hex']),
- )
+ print(" REPLAY MISMATCH: cdx:{} replay:{}".format(cdx_sha1hex,
+ file_meta['sha1hex']),
+ file=sys.stderr)
+ raise WaybackContentError(
+ "replay fetch body didn't match CDX hash cdx:{} body:{}".format(
+ cdx_sha1hex, file_meta['sha1hex']), )
return resp.content
def fetch_replay_redirect(self, url, datetime):
@@ -568,7 +605,9 @@ class WaybackClient:
except requests.exceptions.TooManyRedirects:
raise WaybackContentError("redirect loop (wayback replay fetch)")
except UnicodeDecodeError:
- raise WaybackContentError("UnicodeDecodeError in replay request (can mean nasty redirect URL): {}".format(url))
+ raise WaybackContentError(
+ "UnicodeDecodeError in replay request (can mean nasty redirect URL): {}".format(
+ url))
try:
resp.raise_for_status()
except Exception as e:
@@ -580,7 +619,8 @@ class WaybackClient:
if not "X-Archive-Src" in resp.headers:
raise WaybackError("redirect replay fetch didn't return X-Archive-Src in headers")
if not datetime in resp.url:
- raise WaybackError("didn't get exact reply (redirect?) datetime:{} got:{}".format(datetime, resp.url))
+ raise WaybackError("didn't get exact reply (redirect?) datetime:{} got:{}".format(
+ datetime, resp.url))
redirect_url = resp.headers.get("Location")
# eg, https://web.archive.org/web/20200111003923id_/https://dx.doi.org/10.17504/protocols.io.y2gfybw
@@ -622,7 +662,9 @@ class WaybackClient:
urls_seen = [start_url]
for i in range(self.max_redirects):
print(" URL: {}".format(next_url), file=sys.stderr)
- cdx_row = self.cdx_client.lookup_best(next_url, best_mimetype=best_mimetype, closest=closest)
+ cdx_row = self.cdx_client.lookup_best(next_url,
+ best_mimetype=best_mimetype,
+ closest=closest)
#print(cdx_row, file=sys.stderr)
if not cdx_row:
return ResourceResult(
@@ -776,9 +818,11 @@ class WaybackClient:
class SavePageNowError(Exception):
pass
+
class SavePageNowBackoffError(SandcrawlerBackoffError):
pass
+
SavePageNowResult = namedtuple('SavePageNowResult', [
'success',
'status',
@@ -789,13 +833,11 @@ SavePageNowResult = namedtuple('SavePageNowResult', [
'resources',
])
-class SavePageNowClient:
+class SavePageNowClient:
def __init__(self, v2endpoint="https://web.archive.org/save", **kwargs):
- self.ia_access_key = kwargs.get('ia_access_key',
- os.environ.get('IA_ACCESS_KEY'))
- self.ia_secret_key = kwargs.get('ia_secret_key',
- os.environ.get('IA_SECRET_KEY'))
+ self.ia_access_key = kwargs.get('ia_access_key', os.environ.get('IA_ACCESS_KEY'))
+ self.ia_secret_key = kwargs.get('ia_secret_key', os.environ.get('IA_SECRET_KEY'))
self.v2endpoint = v2endpoint
self.v2_session = requests_retry_session(retries=5, backoff_factor=3)
self.v2_session.headers.update({
@@ -886,12 +928,15 @@ class SavePageNowClient:
},
)
if resp.status_code == 429:
- raise SavePageNowBackoffError("status_code: {}, url: {}".format(resp.status_code, request_url))
+ raise SavePageNowBackoffError("status_code: {}, url: {}".format(
+ resp.status_code, request_url))
elif resp.status_code != 200:
- raise SavePageNowError("SPN2 status_code: {}, url: {}".format(resp.status_code, request_url))
+ raise SavePageNowError("SPN2 status_code: {}, url: {}".format(
+ resp.status_code, request_url))
resp_json = resp.json()
- if resp_json and 'message' in resp_json and 'You have already reached the limit of active sessions' in resp_json['message']:
+ if resp_json and 'message' in resp_json and 'You have already reached the limit of active sessions' in resp_json[
+ 'message']:
raise SavePageNowBackoffError(resp_json['message'])
elif not resp_json or 'job_id' not in resp_json or not resp_json['job_id']:
raise SavePageNowError(
@@ -915,7 +960,8 @@ class SavePageNowClient:
final_json = resp.json()
break
else:
- raise SavePageNowError("Unknown SPN2 status:{} url:{}".format(status, request_url))
+ raise SavePageNowError("Unknown SPN2 status:{} url:{}".format(
+ status, request_url))
if not final_json:
raise SavePageNowError("SPN2 timed out (polling count exceeded)")
@@ -923,8 +969,10 @@ class SavePageNowClient:
# if there was a recent crawl of same URL, fetch the status of that
# crawl to get correct datetime
if final_json.get('original_job_id'):
- print(f" SPN recent capture: {job_id} -> {final_json['original_job_id']}", file=sys.stderr)
- resp = self.v2_session.get("{}/status/{}".format(self.v2endpoint, final_json['original_job_id']))
+ print(f" SPN recent capture: {job_id} -> {final_json['original_job_id']}",
+ file=sys.stderr)
+ resp = self.v2_session.get("{}/status/{}".format(self.v2endpoint,
+ final_json['original_job_id']))
try:
resp.raise_for_status()
except:
@@ -935,7 +983,8 @@ class SavePageNowClient:
if final_json['status'] == "success":
if final_json.get('original_url').startswith('/'):
- print(f" truncateded URL in JSON: {request_url} {json.dumps(final_json)}", file=sys.stderr)
+ print(f" truncateded URL in JSON: {request_url} {json.dumps(final_json)}",
+ file=sys.stderr)
return SavePageNowResult(
True,
"success",
@@ -969,15 +1018,17 @@ class SavePageNowClient:
# HACK: capture CNKI domains with outlinks (for COVID-19 crawling)
if 'gzbd.cnki.net/' in start_url:
- spn_result = self.save_url_now_v2(start_url, force_simple_get=force_simple_get, capture_outlinks=1)
+ spn_result = self.save_url_now_v2(start_url,
+ force_simple_get=force_simple_get,
+ capture_outlinks=1)
else:
spn_result = self.save_url_now_v2(start_url, force_simple_get=force_simple_get)
if not spn_result.success:
status = spn_result.status
if status in ("error:invalid-url", "error:not-found",
- "error:invalid-host-resolution", "error:gateway-timeout",
- "error:too-many-redirects", "error:read-timeout"):
+ "error:invalid-host-resolution", "error:gateway-timeout",
+ "error:too-many-redirects", "error:read-timeout"):
status = status.replace("error:", "")
elif status in ("error:no-access", "error:forbidden"):
status = "forbidden"
@@ -988,7 +1039,8 @@ class SavePageNowClient:
elif status.startswith("error:"):
status = "spn2-" + status
# despite other errors, call these a failure (so we don't retry)
- if spn_result.terminal_url and (spn_result.terminal_url.endswith('/cookieAbsent') or spn_result.terminal_url.endswith("cookieSet=1")):
+ if spn_result.terminal_url and (spn_result.terminal_url.endswith('/cookieAbsent')
+ or spn_result.terminal_url.endswith("cookieSet=1")):
status = "blocked-cookie"
return ResourceResult(
start_url=start_url,
@@ -1018,7 +1070,8 @@ class SavePageNowClient:
)
# don't try to CDX fetch for this common cookie block terminal
- if spn_result.terminal_url.endswith('/cookieAbsent') or spn_result.terminal_url.endswith("cookieSet=1"):
+ if spn_result.terminal_url.endswith(
+ '/cookieAbsent') or spn_result.terminal_url.endswith("cookieSet=1"):
return ResourceResult(
start_url=start_url,
hit=False,
@@ -1143,9 +1196,12 @@ class SavePageNowClient:
)
-def fix_transfer_encoding(file_meta: dict, resource: ResourceResult) -> Tuple[dict, ResourceResult]:
- if resource.body and file_meta['mimetype'] == 'application/gzip' and resource.cdx and resource.cdx.mimetype != 'application/gzip':
- print(" transfer encoding not stripped: {}".format(resource.cdx.mimetype), file=sys.stderr)
+def fix_transfer_encoding(file_meta: dict,
+ resource: ResourceResult) -> Tuple[dict, ResourceResult]:
+ if resource.body and file_meta[
+ 'mimetype'] == 'application/gzip' and resource.cdx and resource.cdx.mimetype != 'application/gzip':
+ print(" transfer encoding not stripped: {}".format(resource.cdx.mimetype),
+ file=sys.stderr)
inner_body = gzip.decompress(resource.body)
if not inner_body:
raise Exception("null body inside transfer encoding")
diff --git a/python/sandcrawler/ingest_file.py b/python/sandcrawler/ingest_file.py
index 137a793..b480cc2 100644
--- a/python/sandcrawler/ingest_file.py
+++ b/python/sandcrawler/ingest_file.py
@@ -1,4 +1,3 @@
-
import base64
import gzip
import json
@@ -15,18 +14,22 @@ from selectolax.parser import HTMLParser
from sandcrawler.db import SandcrawlerPostgrestClient
from sandcrawler.grobid import GrobidClient
from sandcrawler.html import extract_fulltext_url
-from sandcrawler.html_metadata import BiblioMetadata, html_extract_biblio, html_extract_resources, load_adblock_rules
-from sandcrawler.ia import (CdxApiClient, CdxApiError, NoCaptureError, PetaboxError, ResourceResult, SavePageNowClient,
- SavePageNowError, WaybackClient, WaybackContentError, WaybackError, cdx_to_dict,
+from sandcrawler.html_metadata import (BiblioMetadata, html_extract_biblio,
+ html_extract_resources, load_adblock_rules)
+from sandcrawler.ia import (CdxApiClient, CdxApiError, NoCaptureError, PetaboxError,
+ ResourceResult, SavePageNowClient, SavePageNowError, WaybackClient,
+ WaybackContentError, WaybackError, cdx_to_dict,
fix_transfer_encoding)
-from sandcrawler.ingest_html import (WebResource, fetch_html_resources, html_extract_body_teixml, html_guess_platform,
+from sandcrawler.ingest_html import (WebResource, fetch_html_resources,
+ html_extract_body_teixml, html_guess_platform,
html_guess_scope, quick_fetch_html_resources)
from sandcrawler.misc import clean_url, gen_file_metadata, parse_cdx_datetime
from sandcrawler.pdfextract import PdfExtractResult, process_pdf
from sandcrawler.workers import SandcrawlerWorker
from sandcrawler.xml import xml_reserialize
-MAX_BODY_SIZE_BYTES = 128*1024*1024
+MAX_BODY_SIZE_BYTES = 128 * 1024 * 1024
+
class IngestFileWorker(SandcrawlerWorker):
"""
@@ -54,7 +57,6 @@ class IngestFileWorker(SandcrawlerWorker):
process_file_hit(ResourceResult) -> response
process_grobid(ResourceResult)
"""
-
def __init__(self, sink=None, **kwargs):
super().__init__()
@@ -64,7 +66,8 @@ class IngestFileWorker(SandcrawlerWorker):
self.wayback_client = WaybackClient()
self.spn_client = kwargs.get('spn_client')
if not self.spn_client:
- self.spn_client = SavePageNowClient(spn_cdx_retry_sec=kwargs.get('spn_cdx_retry_sec', 9.0))
+ self.spn_client = SavePageNowClient(
+ spn_cdx_retry_sec=kwargs.get('spn_cdx_retry_sec', 9.0))
self.grobid_client = kwargs.get('grobid_client')
if not self.grobid_client:
self.grobid_client = GrobidClient()
@@ -123,13 +126,13 @@ class IngestFileWorker(SandcrawlerWorker):
"fao.org/glis/",
# Historical non-paper content:
- "dhz.uni-passau.de/", # newspapers
- "digital.ucd.ie/", # ireland national historical
+ "dhz.uni-passau.de/", # newspapers
+ "digital.ucd.ie/", # ireland national historical
# DOI prefixes
- "doi.org/10.2307/", # JSTOR; slow and many redirects
- "doi.org/10.18730/", # fao.org: database entry
- "doi.org/10.15468/", # gbif.org: database entry
+ "doi.org/10.2307/", # JSTOR; slow and many redirects
+ "doi.org/10.18730/", # fao.org: database entry
+ "doi.org/10.15468/", # gbif.org: database entry
# deprecated domain (doesn't redirect correctly)
"://edoc.mpg.de/",
@@ -173,10 +176,10 @@ class IngestFileWorker(SandcrawlerWorker):
"video/mpeg",
"text/plain",
"text/csv",
- "text/x-r-source", # dataverse
- "text/tab-separated-values", # dataverse
- "text/x-rst", # dataverse
- "application/x-rlang-transport", # dataverse
+ "text/x-r-source", # dataverse
+ "text/tab-separated-values", # dataverse
+ "text/x-rst", # dataverse
+ "application/x-rlang-transport", # dataverse
"application/json",
"application/xml",
"application/pdf",
@@ -194,7 +197,6 @@ class IngestFileWorker(SandcrawlerWorker):
"application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",
]
-
def check_existing_ingest(self, ingest_type: str, base_url: str) -> Optional[dict]:
"""
Check in sandcrawler-db (postgres) to see if we have already ingested
@@ -214,7 +216,10 @@ class IngestFileWorker(SandcrawlerWorker):
else:
return None
- def find_resource(self, url, best_mimetype=None, force_recrawl=False) -> Optional[ResourceResult]:
+ def find_resource(self,
+ url,
+ best_mimetype=None,
+ force_recrawl=False) -> Optional[ResourceResult]:
"""
Looks in wayback for a resource starting at the URL, following any
redirects. If a hit isn't found, try crawling with SPN.
@@ -222,7 +227,8 @@ class IngestFileWorker(SandcrawlerWorker):
via = "none"
resource = None
- if url.startswith("http://web.archive.org/web/") or url.startswith("https://web.archive.org/web/"):
+ if url.startswith("http://web.archive.org/web/") or url.startswith(
+ "https://web.archive.org/web/"):
raise NotImplementedError("handling direct wayback links not supported yet")
if url.startswith("http://archive.org/") or url.startswith("https://archive.org/"):
@@ -243,14 +249,13 @@ class IngestFileWorker(SandcrawlerWorker):
if resource and not resource.hit and resource.terminal_dt and resource.terminal_dt < '20190000000000':
old_failure = True
- if self.try_spn2 and (resource == None or (resource and resource.status == 'no-capture') or soft404 or old_failure):
+ if self.try_spn2 and (resource == None or (resource and resource.status == 'no-capture')
+ or soft404 or old_failure):
via = "spn2"
resource = self.spn_client.crawl_resource(url, self.wayback_client)
- print("[FETCH {:>6}] {} {}".format(
- via,
- (resource and resource.status),
- (resource and resource.terminal_url) or url),
- file=sys.stderr)
+ print("[FETCH {:>6}] {} {}".format(via, (resource and resource.status),
+ (resource and resource.terminal_url) or url),
+ file=sys.stderr)
return resource
def process_existing(self, request: dict, result_row: dict) -> dict:
@@ -262,7 +267,8 @@ class IngestFileWorker(SandcrawlerWorker):
assert result_row['hit']
existing_file_meta = self.pgrest_client.get_file_meta(result_row['terminal_sha1hex'])
existing_grobid = self.pgrest_client.get_grobid(result_row['terminal_sha1hex'])
- existing_cdx = self.pgrest_client.get_cdx(result_row['terminal_url'], result_row['terminal_dt'])
+ existing_cdx = self.pgrest_client.get_cdx(result_row['terminal_url'],
+ result_row['terminal_dt'])
if not (existing_file_meta and existing_grobid and existing_cdx):
raise NotImplementedError("partially-exsiting records not implemented yet")
result = {
@@ -281,11 +287,13 @@ class IngestFileWorker(SandcrawlerWorker):
}
return result
- def process_file_hit(self, ingest_type: str, resource: ResourceResult, file_meta: dict) -> dict:
+ def process_file_hit(self, ingest_type: str, resource: ResourceResult,
+ file_meta: dict) -> dict:
"""
Run all the necessary processing for a new/fresh ingest hit.
"""
- if ingest_type in ["dataset-file", "component"] and file_meta['mimetype'] == "application/pdf":
+ if ingest_type in ["dataset-file", "component"
+ ] and file_meta['mimetype'] == "application/pdf":
ingest_type = "pdf"
if ingest_type == "pdf":
return {
@@ -396,24 +404,26 @@ class IngestFileWorker(SandcrawlerWorker):
try:
html_doc = HTMLParser(resource.body)
except ValueError as ve:
- return dict(
- status="html-selectolax-error",
- )
+ return dict(status="html-selectolax-error", )
html_biblio = html_extract_biblio(resource.terminal_url, html_doc)
assert html_biblio
html_body = html_extract_body_teixml(resource.body)
html_platform = html_guess_platform(resource.terminal_url, html_doc, html_biblio)
- html_scope = html_guess_scope(resource.terminal_url, html_doc, html_biblio, html_body.get('word_count'))
+ html_scope = html_guess_scope(resource.terminal_url, html_doc, html_biblio,
+ html_body.get('word_count'))
html_biblio_dict = json.loads(html_biblio.json(exclude_none=True))
- if html_scope in ('blocked-captcha','blocked-cookie','blocked-forbidden'):
+ if html_scope in ('blocked-captcha', 'blocked-cookie', 'blocked-forbidden'):
return dict(
status=html_scope,
html_biblio=html_biblio_dict,
scope=html_scope,
platform=html_platform,
)
- elif html_scope not in ('article-fulltext','unknown',):
+ elif html_scope not in (
+ 'article-fulltext',
+ 'unknown',
+ ):
html_body.pop("tei_xml", None)
return dict(
status="wrong-scope",
@@ -423,7 +433,8 @@ class IngestFileWorker(SandcrawlerWorker):
html_body=html_body,
)
- raw_resources = html_extract_resources(resource.terminal_url, html_doc, self.adblock_rules)
+ raw_resources = html_extract_resources(resource.terminal_url, html_doc,
+ self.adblock_rules)
if len(raw_resources) > self.max_html_resources:
html_body.pop("tei_xml", None)
return dict(
@@ -452,7 +463,9 @@ class IngestFileWorker(SandcrawlerWorker):
try:
if self.html_quick_mode:
print(" WARN: running quick CDX-only fetches", file=sys.stderr)
- full_resources = quick_fetch_html_resources(raw_resources, self.wayback_client.cdx_client, when)
+ full_resources = quick_fetch_html_resources(raw_resources,
+ self.wayback_client.cdx_client,
+ when)
else:
full_resources = fetch_html_resources(raw_resources, self.wayback_client, when)
except PetaboxError as e:
@@ -572,7 +585,9 @@ class IngestFileWorker(SandcrawlerWorker):
return result
try:
- resource = self.find_resource(next_url, best_mimetype, force_recrawl=force_recrawl)
+ resource = self.find_resource(next_url,
+ best_mimetype,
+ force_recrawl=force_recrawl)
except SavePageNowError as e:
result['status'] = 'spn2-error'
result['error_message'] = str(e)[:1600]
@@ -650,10 +665,9 @@ class IngestFileWorker(SandcrawlerWorker):
# here we split based on ingest type to try and extract a next hop
html_ish_resource = bool(
"html" in file_meta['mimetype']
- or "xhtml" in file_meta['mimetype'] # matches "application/xhtml+xml"
+ or "xhtml" in file_meta['mimetype'] # matches "application/xhtml+xml"
or "application/xml" in file_meta['mimetype']
- or "text/xml" in file_meta['mimetype']
- )
+ or "text/xml" in file_meta['mimetype'])
html_biblio = None
html_doc = None
if html_ish_resource and resource.body:
@@ -662,7 +676,8 @@ class IngestFileWorker(SandcrawlerWorker):
html_biblio = html_extract_biblio(resource.terminal_url, html_doc)
if html_biblio:
if not 'html_biblio' in result or html_biblio.title:
- result['html_biblio'] = json.loads(html_biblio.json(exclude_none=True))
+ result['html_biblio'] = json.loads(
+ html_biblio.json(exclude_none=True))
#print(f" setting html_biblio: {result['html_biblio']}", file=sys.stderr)
except ValueError:
pass
@@ -686,18 +701,19 @@ class IngestFileWorker(SandcrawlerWorker):
assert next_url
next_url = clean_url(next_url)
print("[PARSE {:>6}] {} {}".format(
- ingest_type,
- fulltext_url.get('technique'),
- next_url,
- ),
- file=sys.stderr)
+ ingest_type,
+ fulltext_url.get('technique'),
+ next_url,
+ ),
+ file=sys.stderr)
if next_url in hops:
result['status'] = 'link-loop'
result['error_message'] = "repeated: {}".format(next_url)
return result
hops.append(next_url)
continue
- elif ingest_type in ("xml", "html", "component") and html_ish_resource and html_biblio:
+ elif ingest_type in ("xml", "html",
+ "component") and html_ish_resource and html_biblio:
# NOTE: src_fulltext_url is not a thing
next_url_found = None
if ingest_type == "xml" and html_biblio.xml_fulltext_url:
@@ -711,11 +727,11 @@ class IngestFileWorker(SandcrawlerWorker):
next_url = next_url_found
technique = "html_biblio"
print("[PARSE {:>6}] {} {}".format(
- ingest_type,
- technique,
- next_url,
- ),
- file=sys.stderr)
+ ingest_type,
+ technique,
+ next_url,
+ ),
+ file=sys.stderr)
if next_url in hops:
if ingest_type == "html":
# for HTML ingest, we don't count this as a link-loop
@@ -756,7 +772,8 @@ class IngestFileWorker(SandcrawlerWorker):
result['status'] = "wrong-mimetype" # formerly: "other-mimetype"
return result
elif ingest_type == "xml":
- if file_meta['mimetype'] not in ("application/xml", "text/xml", "application/jats+xml"):
+ if file_meta['mimetype'] not in ("application/xml", "text/xml",
+ "application/jats+xml"):
result['status'] = "wrong-mimetype"
return result
elif ingest_type == "html":
@@ -786,18 +803,18 @@ class IngestFileWorker(SandcrawlerWorker):
result['hit'] = True
if ingest_type == "pdf":
print("[SUCCESS {:>5}] sha1:{} grobid:{} pdfextract:{}".format(
- ingest_type,
- result.get('file_meta', {}).get('sha1hex'),
- result.get('grobid', {}).get('status_code'),
- result.get('pdf_meta', {}).get('status'),
- ),
- file=sys.stderr)
+ ingest_type,
+ result.get('file_meta', {}).get('sha1hex'),
+ result.get('grobid', {}).get('status_code'),
+ result.get('pdf_meta', {}).get('status'),
+ ),
+ file=sys.stderr)
else:
print("[SUCCESS {:>5}] sha1:{}".format(
- ingest_type,
- result.get('file_meta', {}).get('sha1hex'),
- ),
- file=sys.stderr)
+ ingest_type,
+ result.get('file_meta', {}).get('sha1hex'),
+ ),
+ file=sys.stderr)
return result
diff --git a/python/sandcrawler/ingest_fileset.py b/python/sandcrawler/ingest_fileset.py
index 11386df..5cbb908 100644
--- a/python/sandcrawler/ingest_fileset.py
+++ b/python/sandcrawler/ingest_fileset.py
@@ -1,4 +1,3 @@
-
import gzip
import json
import sys
@@ -14,17 +13,21 @@ from sandcrawler.fileset_platforms import DATASET_PLATFORM_HELPER_TABLE, Fileset
from sandcrawler.fileset_strategies import FILESET_STRATEGY_HELPER_TABLE, FilesetIngestStrategy
from sandcrawler.fileset_types import PlatformRestrictedError, PlatformScopeError
from sandcrawler.html import extract_fulltext_url
-from sandcrawler.html_metadata import BiblioMetadata, html_extract_biblio, html_extract_resources, load_adblock_rules
-from sandcrawler.ia import (CdxApiClient, CdxApiError, NoCaptureError, PetaboxError, ResourceResult, SavePageNowClient,
- SavePageNowError, WaybackClient, WaybackContentError, WaybackError, cdx_to_dict,
+from sandcrawler.html_metadata import (BiblioMetadata, html_extract_biblio,
+ html_extract_resources, load_adblock_rules)
+from sandcrawler.ia import (CdxApiClient, CdxApiError, NoCaptureError, PetaboxError,
+ ResourceResult, SavePageNowClient, SavePageNowError, WaybackClient,
+ WaybackContentError, WaybackError, cdx_to_dict,
fix_transfer_encoding)
from sandcrawler.ingest_file import IngestFileWorker
-from sandcrawler.ingest_html import (WebResource, fetch_html_resources, html_extract_body_teixml, html_guess_platform,
+from sandcrawler.ingest_html import (WebResource, fetch_html_resources,
+ html_extract_body_teixml, html_guess_platform,
html_guess_scope, quick_fetch_html_resources)
from sandcrawler.misc import clean_url, gen_file_metadata, parse_cdx_datetime
from sandcrawler.workers import SandcrawlerWorker
-MAX_BODY_SIZE_BYTES = 128*1024*1024
+MAX_BODY_SIZE_BYTES = 128 * 1024 * 1024
+
class IngestFilesetWorker(IngestFileWorker):
"""
@@ -39,14 +42,13 @@ class IngestFilesetWorker(IngestFileWorker):
checking to see if content has been archived already)
4. summarize status
"""
-
def __init__(self, sink=None, **kwargs):
super().__init__(sink=None, **kwargs)
self.sink = sink
self.dataset_platform_helpers = DATASET_PLATFORM_HELPER_TABLE
self.dataset_strategy_archivers = FILESET_STRATEGY_HELPER_TABLE
- self.max_total_size = kwargs.get('max_total_size', 64*1024*1024*1024)
+ self.max_total_size = kwargs.get('max_total_size', 64 * 1024 * 1024 * 1024)
self.max_file_count = kwargs.get('max_file_count', 200)
self.ingest_file_result_sink = kwargs.get('ingest_file_result_sink')
self.ingest_file_result_stdout = kwargs.get('ingest_file_result_stdout', False)
@@ -72,11 +74,12 @@ class IngestFilesetWorker(IngestFileWorker):
raise NotImplementedError("process_existing() not tested or safe yet")
def want(self, request: dict) -> bool:
- if not request.get('ingest_type') in ('dataset',):
+ if not request.get('ingest_type') in ('dataset', ):
return False
return True
- def fetch_resource_iteratively(self, ingest_type: str, base_url: str, force_recrawl: bool) -> dict:
+ def fetch_resource_iteratively(self, ingest_type: str, base_url: str,
+ force_recrawl: bool) -> dict:
"""
This is copypasta from process_file(), should probably refactor.
"""
@@ -174,10 +177,9 @@ class IngestFilesetWorker(IngestFileWorker):
# here we split based on ingest type to try and extract a next hop
html_ish_resource = bool(
"html" in file_meta['mimetype']
- or "xhtml" in file_meta['mimetype'] # matches "application/xhtml+xml"
+ or "xhtml" in file_meta['mimetype'] # matches "application/xhtml+xml"
or "application/xml" in file_meta['mimetype']
- or "text/xml" in file_meta['mimetype']
- )
+ or "text/xml" in file_meta['mimetype'])
html_biblio = None
html_doc = None
if html_ish_resource and resource.body:
@@ -186,7 +188,8 @@ class IngestFilesetWorker(IngestFileWorker):
html_biblio = html_extract_biblio(resource.terminal_url, html_doc)
if html_biblio:
if not 'html_biblio' in result or html_biblio.title:
- result['html_biblio'] = json.loads(html_biblio.json(exclude_none=True))
+ result['html_biblio'] = json.loads(
+ html_biblio.json(exclude_none=True))
#print(f" setting html_biblio: {result['html_biblio']}", file=sys.stderr)
except ValueError:
pass
@@ -214,7 +217,8 @@ class IngestFilesetWorker(IngestFileWorker):
result['status'] = "wrong-mimetype" # formerly: "other-mimetype"
return result
elif ingest_type == "xml":
- if file_meta['mimetype'] not in ("application/xml", "text/xml", "application/jats+xml"):
+ if file_meta['mimetype'] not in ("application/xml", "text/xml",
+ "application/jats+xml"):
result['status'] = "wrong-mimetype"
return result
elif ingest_type == "html":
@@ -229,11 +233,10 @@ class IngestFilesetWorker(IngestFileWorker):
result['_resource'] = resource
return result
-
def process(self, request: dict, key: Any = None) -> dict:
ingest_type = request.get('ingest_type')
- if ingest_type not in ("dataset",):
+ if ingest_type not in ("dataset", ):
raise NotImplementedError(f"can't handle ingest_type={ingest_type}")
# parse/clean URL
@@ -250,7 +253,9 @@ class IngestFilesetWorker(IngestFileWorker):
#if existing:
# return self.process_existing(request, existing)
- result = self.fetch_resource_iteratively(ingest_type, base_url, force_recrawl=force_recrawl)
+ result = self.fetch_resource_iteratively(ingest_type,
+ base_url,
+ force_recrawl=force_recrawl)
result['request'] = request
if result.get('status') != None:
result['request'] = request
@@ -323,14 +328,16 @@ class IngestFilesetWorker(IngestFileWorker):
return result
if result['file_count'] > self.max_file_count:
# hard max, to prevent downstream breakage
- if result['file_count'] > 10*1000:
+ if result['file_count'] > 10 * 1000:
result['manifest'] = result['manifest'][:self.max_file_count]
result['status'] = 'too-many-files'
return result
ingest_strategy = platform_helper.chose_strategy(dataset_meta)
result['ingest_strategy'] = ingest_strategy
- print(f"[PLATFORM {platform}] id={dataset_meta.platform_id} file_count={result['file_count']} total_size={result['total_size']} strategy={ingest_strategy}", file=sys.stderr)
+ print(
+ f"[PLATFORM {platform}] id={dataset_meta.platform_id} file_count={result['file_count']} total_size={result['total_size']} strategy={ingest_strategy}",
+ file=sys.stderr)
strategy_helper = self.dataset_strategy_archivers.get(ingest_strategy)
if not strategy_helper:
@@ -349,7 +356,8 @@ class IngestFilesetWorker(IngestFileWorker):
if archive_result.bundle_file_meta:
result['fileset_bundle']['file_meta'] = archive_result.bundle_file_meta
if archive_result.archiveorg_bundle_path:
- result['fileset_bundle']['archiveorg_bundle_path'] = archive_result.archiveorg_bundle_path
+ result['fileset_bundle'][
+ 'archiveorg_bundle_path'] = archive_result.archiveorg_bundle_path
if archive_result.bundle_resource:
result['fileset_bundle']['terminal'] = dict(
terminal_url=archive_result.bundle_resource.terminal_url,
@@ -357,14 +365,16 @@ class IngestFilesetWorker(IngestFileWorker):
terminal_status_code=archive_result.bundle_resource.terminal_status_code,
)
if archive_result.bundle_resource.cdx:
- result['fileset_bundle']['cdx'] = cdx_to_dict(archive_result.bundle_resource.cdx)
+ result['fileset_bundle']['cdx'] = cdx_to_dict(
+ archive_result.bundle_resource.cdx)
if archive_result.bundle_resource.revisit_cdx:
- result['fileset_bundle']['revisit_cdx'] = cdx_to_dict(archive_result.bundle_resource.revisit_cdx)
+ result['fileset_bundle']['revisit_cdx'] = cdx_to_dict(
+ archive_result.bundle_resource.revisit_cdx)
if ingest_strategy.endswith('-file'):
result['fileset_file'] = dict()
if archive_result.file_file_meta:
- result['fileset_file']['file_meta'] = file_meta=archive_result.file_file_meta,
+ result['fileset_file']['file_meta'] = file_meta = archive_result.file_file_meta,
if archive_result.file_resource:
result['fileset_file']['terminal'] = dict(
terminal_url=archive_result.file_resource.terminal_url,
@@ -372,16 +382,20 @@ class IngestFilesetWorker(IngestFileWorker):
terminal_status_code=archive_result.file_resource.terminal_status_code,
)
if archive_result.file_resource.cdx:
- result['fileset_file']['cdx'] = cdx_to_dict(archive_result.file_resource.cdx)
+ result['fileset_file']['cdx'] = cdx_to_dict(
+ archive_result.file_resource.cdx)
if archive_result.file_resource.revisit_cdx:
- result['fileset_file']['revisit_cdx'] = cdx_to_dict(archive_result.file_resource.revisit_cdx)
+ result['fileset_file']['revisit_cdx'] = cdx_to_dict(
+ archive_result.file_resource.revisit_cdx)
if result['status'].startswith('success'):
# check that these are still valid
assert result['file_count'] == len(archive_result.manifest)
- assert result['total_size'] == sum([m.size for m in archive_result.manifest if m.size])
+ assert result['total_size'] == sum(
+ [m.size for m in archive_result.manifest if m.size])
- if result['status'] == 'success-file' and archive_result.file_resource and archive_result.file_file_meta:
+ if result[
+ 'status'] == 'success-file' and archive_result.file_resource and archive_result.file_file_meta:
file_result = dict(
hit=True,
status='success',
@@ -397,10 +411,13 @@ class IngestFilesetWorker(IngestFileWorker):
if archive_result.file_resource.cdx:
file_result['cdx'] = cdx_to_dict(archive_result.file_resource.cdx)
if archive_result.file_resource.revisit_cdx:
- file_result['revisit_cdx'] = cdx_to_dict(archive_result.file_resource.revisit_cdx)
+ file_result['revisit_cdx'] = cdx_to_dict(
+ archive_result.file_resource.revisit_cdx)
file_result['request']['ingest_type'] = request['ingest_type'] + "-file"
# call the super() (ingest_file) version of process_hit()
- info = self.process_file_hit(file_result['request']['ingest_type'], archive_result.file_resource, archive_result.file_file_meta)
+ info = self.process_file_hit(file_result['request']['ingest_type'],
+ archive_result.file_resource,
+ archive_result.file_file_meta)
file_result.update(info)
if self.ingest_file_result_sink:
self.ingest_file_result_sink.push_record(result.copy())
@@ -410,17 +427,19 @@ class IngestFilesetWorker(IngestFileWorker):
if result['status'].startswith('success'):
result['hit'] = True
print("[SUCCESS {:>5}] file_count={} total_size={} strategy={}".format(
- ingest_type,
- result['file_count'],
- result['total_size'],
- ingest_strategy,
- ), file=sys.stderr)
+ ingest_type,
+ result['file_count'],
+ result['total_size'],
+ ingest_strategy,
+ ),
+ file=sys.stderr)
else:
print("[FAIL {:>5}] status={} file_count={} total_size={} strategy={}".format(
- ingest_type,
- result['status'],
- result['file_count'],
- result['total_size'],
- ingest_strategy,
- ), file=sys.stderr)
+ ingest_type,
+ result['status'],
+ result['file_count'],
+ result['total_size'],
+ ingest_strategy,
+ ),
+ file=sys.stderr)
return result
diff --git a/python/sandcrawler/ingest_html.py b/python/sandcrawler/ingest_html.py
index 9c72dd5..bf25d5d 100644
--- a/python/sandcrawler/ingest_html.py
+++ b/python/sandcrawler/ingest_html.py
@@ -1,4 +1,3 @@
-
import argparse
import datetime
import io
@@ -11,16 +10,20 @@ import pydantic
import trafilatura
from selectolax.parser import HTMLParser
-from sandcrawler.html_metadata import BiblioMetadata, html_extract_biblio, html_extract_resources, load_adblock_rules
-from sandcrawler.ia import (CdxApiClient, NoCaptureError, ResourceResult, WaybackClient, WaybackContentError,
- cdx_to_dict, fix_transfer_encoding)
-from sandcrawler.misc import clean_url, datetime_to_cdx, gen_file_metadata, parse_cdx_datetime, url_fuzzy_equal
+from sandcrawler.html_metadata import (BiblioMetadata, html_extract_biblio,
+ html_extract_resources, load_adblock_rules)
+from sandcrawler.ia import (CdxApiClient, NoCaptureError, ResourceResult, WaybackClient,
+ WaybackContentError, cdx_to_dict, fix_transfer_encoding)
+from sandcrawler.misc import (clean_url, datetime_to_cdx, gen_file_metadata, parse_cdx_datetime,
+ url_fuzzy_equal)
TRAFILATURA_AGENT = f"trafilatura/{trafilatura.__version__}"
+
def html_extract_body_teixml(doc: bytes) -> dict:
try:
- tei_xml = trafilatura.extract(doc,
+ tei_xml = trafilatura.extract(
+ doc,
output_format='xmltei',
include_comments=False,
include_formatting=True,
@@ -33,13 +36,19 @@ def html_extract_body_teixml(doc: bytes) -> dict:
if tei_xml:
body_txt = teixml_body_text(tei_xml)
word_count = len(body_txt.split())
- return dict(status="success", agent=TRAFILATURA_AGENT, tei_xml=tei_xml, word_count=word_count)
- elif doc.startswith(b'<!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" 2012"http://www.w3.org/TR/html4/loose.dtd">'):
+ return dict(status="success",
+ agent=TRAFILATURA_AGENT,
+ tei_xml=tei_xml,
+ word_count=word_count)
+ elif doc.startswith(
+ b'<!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" 2012"http://www.w3.org/TR/html4/loose.dtd">'
+ ):
# hack for firstmonday.org
return html_extract_body_teixml(doc[106:])
else:
return dict(status="empty-xml", agent=TRAFILATURA_AGENT)
+
def teixml_body_text(doc_xml: str) -> str:
ns = {"tei": "http://www.tei-c.org/ns/1.0"}
tree = ET.fromstring(doc_xml)
@@ -49,6 +58,7 @@ def teixml_body_text(doc_xml: str) -> str:
else:
return ""
+
class WebResource(pydantic.BaseModel):
surt: str
timestamp: datetime.datetime
@@ -61,16 +71,15 @@ class WebResource(pydantic.BaseModel):
resource_type: Optional[str]
class Config:
- json_encoders = {
- datetime.datetime: lambda dt: dt.isoformat()
- }
+ json_encoders = {datetime.datetime: lambda dt: dt.isoformat()}
+
class IngestWebResult(pydantic.BaseModel):
status: str
hit: bool
error_message: Optional[str]
cdx: Optional[dict]
- terminal: Optional[Any] # TODO
+ terminal: Optional[Any] # TODO
request: Optional[Any] # TODO
file_meta: Optional[dict]
html_biblio: Optional[BiblioMetadata]
@@ -84,6 +93,7 @@ class IngestWebResult(pydantic.BaseModel):
datetime.datetime: lambda dt: dt.isoformat(),
}
+
class HtmlMetaRow(pydantic.BaseModel):
sha1hex: str
status: str
@@ -106,7 +116,7 @@ class HtmlMetaRow(pydantic.BaseModel):
"""
return (
self.sha1hex,
- datetime.datetime.now(), # updated
+ datetime.datetime.now(), # updated
self.status,
self.scope,
self.has_teixml,
@@ -117,7 +127,8 @@ class HtmlMetaRow(pydantic.BaseModel):
)
-def quick_fetch_html_resources(resources: List[dict], cdx_client: CdxApiClient, when: Optional[datetime.datetime]) -> List[WebResource]:
+def quick_fetch_html_resources(resources: List[dict], cdx_client: CdxApiClient,
+ when: Optional[datetime.datetime]) -> List[WebResource]:
"""
This is the lazy version that just does a CDX lookup for each resource.
@@ -132,27 +143,30 @@ def quick_fetch_html_resources(resources: List[dict], cdx_client: CdxApiClient,
if not cdx_row:
raise NoCaptureError(f"HTML sub-resource not found: {resource['url']}")
if cdx_row.url != resource['url'] and not url_fuzzy_equal(cdx_row.url, resource['url']):
- print(f" WARN: CDX fuzzy match: {cdx_row.url} != {resource['url']}", file=sys.stderr)
+ print(f" WARN: CDX fuzzy match: {cdx_row.url} != {resource['url']}",
+ file=sys.stderr)
if not cdx_row.status_code:
# TODO: fall back to a full fetch?
print(f" WARN: skipping revisit record", file=sys.stderr)
continue
- full.append(WebResource(
- surt=cdx_row.surt,
- timestamp=cdx_row.datetime,
- url=cdx_row.url,
- sha1hex=cdx_row.sha1hex,
- mimetype=cdx_row.mimetype,
- status_code=cdx_row.status_code,
- size=None,
- sha256hex=None,
- resource_type=resource['type'],
- ))
+ full.append(
+ WebResource(
+ surt=cdx_row.surt,
+ timestamp=cdx_row.datetime,
+ url=cdx_row.url,
+ sha1hex=cdx_row.sha1hex,
+ mimetype=cdx_row.mimetype,
+ status_code=cdx_row.status_code,
+ size=None,
+ sha256hex=None,
+ resource_type=resource['type'],
+ ))
return full
-def fetch_html_resources(resources: List[dict], wayback_client: WaybackClient, when: Optional[datetime.datetime]) -> List[WebResource]:
+def fetch_html_resources(resources: List[dict], wayback_client: WaybackClient,
+ when: Optional[datetime.datetime]) -> List[WebResource]:
"""
This is the full version which fetches each resource from wayback/petabox
and calculates additional hashes.
@@ -168,23 +182,28 @@ def fetch_html_resources(resources: List[dict], wayback_client: WaybackClient, w
raise NoCaptureError(f"HTML sub-resource not found: {resource['url']}")
file_meta = gen_file_metadata(wayback_resp.body, allow_empty=True)
if file_meta['sha1hex'] != wayback_resp.cdx.sha1hex:
- raise WaybackContentError(f"wayback payload sha1hex mismatch: {wayback_resp.cdx.datetime} {wayback_resp.cdx.url}")
- full.append(WebResource(
- surt=wayback_resp.cdx.surt,
- timestamp=parse_cdx_datetime(wayback_resp.cdx.datetime),
- url=wayback_resp.cdx.url,
- sha1hex=file_meta['sha1hex'],
- mimetype=file_meta['mimetype'],
- status_code=wayback_resp.cdx.status_code or wayback_resp.revisit_cdx.status_code,
- size=file_meta['size_bytes'],
- sha256hex=file_meta['sha256hex'],
- resource_type=resource['type'],
- ))
+ raise WaybackContentError(
+ f"wayback payload sha1hex mismatch: {wayback_resp.cdx.datetime} {wayback_resp.cdx.url}"
+ )
+ full.append(
+ WebResource(
+ surt=wayback_resp.cdx.surt,
+ timestamp=parse_cdx_datetime(wayback_resp.cdx.datetime),
+ url=wayback_resp.cdx.url,
+ sha1hex=file_meta['sha1hex'],
+ mimetype=file_meta['mimetype'],
+ status_code=wayback_resp.cdx.status_code
+ or wayback_resp.revisit_cdx.status_code,
+ size=file_meta['size_bytes'],
+ sha256hex=file_meta['sha256hex'],
+ resource_type=resource['type'],
+ ))
return full
-def html_guess_platform(url: str, doc: HTMLParser, biblio: Optional[BiblioMetadata]) -> Optional[str]:
+def html_guess_platform(url: str, doc: HTMLParser,
+ biblio: Optional[BiblioMetadata]) -> Optional[str]:
generator: Optional[str] = None
generator_elem = doc.css_first("meta[name='generator']")
@@ -229,7 +248,9 @@ def html_guess_platform(url: str, doc: HTMLParser, biblio: Optional[BiblioMetada
return None
-def html_guess_scope(url: str, doc: HTMLParser, biblio: Optional[BiblioMetadata], word_count: Optional[int]) -> str:
+
+def html_guess_scope(url: str, doc: HTMLParser, biblio: Optional[BiblioMetadata],
+ word_count: Optional[int]) -> str:
"""
This function tries to guess if an HTML document represents one of:
@@ -328,7 +349,9 @@ def html_guess_scope(url: str, doc: HTMLParser, biblio: Optional[BiblioMetadata]
return "unknown"
-def run_single(url: str, timestamp: Optional[str] = None, quick_mode: bool = False) -> IngestWebResult:
+def run_single(url: str,
+ timestamp: Optional[str] = None,
+ quick_mode: bool = False) -> IngestWebResult:
adblock = load_adblock_rules()
wayback_client = WaybackClient()
@@ -375,7 +398,8 @@ def run_single(url: str, timestamp: Optional[str] = None, quick_mode: bool = Fal
full_resources: List[WebResource] = []
if quick_mode:
- full_resources = quick_fetch_html_resources(raw_resources, wayback_client.cdx_client, when)
+ full_resources = quick_fetch_html_resources(raw_resources, wayback_client.cdx_client,
+ when)
else:
full_resources = fetch_html_resources(raw_resources, wayback_client, when)
@@ -399,14 +423,11 @@ def main() -> None:
python -m sandcrawler.ingest_html
"""
- parser = argparse.ArgumentParser(
- formatter_class=argparse.ArgumentDefaultsHelpFormatter
- )
+ parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter)
subparsers = parser.add_subparsers()
- sub = subparsers.add_parser(
- "single", help="tries to ingest a single URL, dumps result to stdout"
- )
+ sub = subparsers.add_parser("single",
+ help="tries to ingest a single URL, dumps result to stdout")
sub.set_defaults(func="run_single")
sub.add_argument(
"url",
@@ -437,5 +458,6 @@ def main() -> None:
#func()
raise NotImplementedError()
+
if __name__ == "__main__":
main()
diff --git a/python/sandcrawler/minio.py b/python/sandcrawler/minio.py
index b617178..188621f 100644
--- a/python/sandcrawler/minio.py
+++ b/python/sandcrawler/minio.py
@@ -1,4 +1,3 @@
-
import hashlib
import io
import os
@@ -7,7 +6,6 @@ import minio
class SandcrawlerMinioClient(object):
-
def __init__(self, host_url, access_key, secret_key, default_bucket=None):
"""
host is minio connection string (host:port)
diff --git a/python/sandcrawler/misc.py b/python/sandcrawler/misc.py
index cf8c4bd..ddbd95a 100644
--- a/python/sandcrawler/misc.py
+++ b/python/sandcrawler/misc.py
@@ -1,4 +1,3 @@
-
import base64
import datetime
import hashlib
@@ -19,22 +18,28 @@ def clean_url(s: str) -> str:
parsed.colon_before_port = b''
return str(urlcanon.whatwg(parsed))
+
def url_fuzzy_equal(left: str, right: str) -> bool:
"""
TODO: use proper surt library and canonicalization for this check
"""
- fuzzy_left = '://'.join(clean_url(left).replace('www.', '').replace(':80/', '/').split('://')[1:])
- fuzzy_right = '://'.join(clean_url(right).replace('www.', '').replace(':80/', '/').split('://')[1:])
+ fuzzy_left = '://'.join(
+ clean_url(left).replace('www.', '').replace(':80/', '/').split('://')[1:])
+ fuzzy_right = '://'.join(
+ clean_url(right).replace('www.', '').replace(':80/', '/').split('://')[1:])
if fuzzy_left == fuzzy_right:
return True
elif fuzzy_left == fuzzy_right + "/" or fuzzy_right == fuzzy_left + "/":
return True
return False
+
def test_url_fuzzy_equal() -> None:
assert True == url_fuzzy_equal(
"http://www.annalsofian.org/article.asp?issn=0972-2327;year=2014;volume=17;issue=4;spage=463;epage=465;aulast=Nithyashree",
- "http://annalsofian.org/article.asp?issn=0972-2327;year=2014;volume=17;issue=4;spage=463;epage=465;aulast=Nithyashree")
+ "http://annalsofian.org/article.asp?issn=0972-2327;year=2014;volume=17;issue=4;spage=463;epage=465;aulast=Nithyashree"
+ )
+
def gen_file_metadata(blob: bytes, allow_empty: bool = False) -> dict:
"""
@@ -45,10 +50,10 @@ def gen_file_metadata(blob: bytes, allow_empty: bool = False) -> dict:
assert blob is not None
if not allow_empty:
assert blob
- if len(blob) < 1024*1024:
+ if len(blob) < 1024 * 1024:
mimetype = magic.Magic(mime=True).from_buffer(blob)
else:
- mimetype = magic.Magic(mime=True).from_buffer(blob[:(1024*1024)])
+ mimetype = magic.Magic(mime=True).from_buffer(blob[:(1024 * 1024)])
if mimetype in ("application/xml", "text/xml"):
# crude checks for XHTML or JATS XML, using only first 1 kB of file
if b"<htm" in blob[:1024] and b'xmlns="http://www.w3.org/1999/xhtml"' in blob[:1024]:
@@ -70,6 +75,7 @@ def gen_file_metadata(blob: bytes, allow_empty: bool = False) -> dict:
mimetype=mimetype,
)
+
def gen_file_metadata_path(path: str, allow_empty: bool = False) -> dict:
"""
Variant of gen_file_metadata() which works with files on local disk
@@ -92,7 +98,7 @@ def gen_file_metadata_path(path: str, allow_empty: bool = False) -> dict:
size_bytes = 0
with open(path, 'rb') as f:
while True:
- chunk = f.read(1024*1024)
+ chunk = f.read(1024 * 1024)
if not chunk:
break
size_bytes += len(chunk)
@@ -108,6 +114,7 @@ def gen_file_metadata_path(path: str, allow_empty: bool = False) -> dict:
mimetype=mimetype,
)
+
def b32_hex(s: str) -> str:
"""
Converts a base32-encoded SHA-1 checksum into hex-encoded
@@ -123,6 +130,7 @@ def b32_hex(s: str) -> str:
raise ValueError("not a base-32 encoded SHA-1 hash: {}".format(s))
return base64.b16encode(base64.b32decode(s.upper())).lower().decode('utf-8')
+
NORMAL_MIME = (
'application/pdf',
'application/postscript',
@@ -131,6 +139,7 @@ NORMAL_MIME = (
'application/octet-stream',
)
+
def normalize_mime(raw: str) -> Optional[str]:
raw = raw.lower().strip()
for norm in NORMAL_MIME:
@@ -142,9 +151,7 @@ def normalize_mime(raw: str) -> Optional[str]:
return 'text/xml'
if raw.startswith('application/x-pdf'):
return 'application/pdf'
- if raw in (
- '.pdf',
- ):
+ if raw in ('.pdf', ):
return 'application/pdf'
if raw in (
'application/download',
@@ -154,7 +161,7 @@ def normalize_mime(raw: str) -> Optional[str]:
'application/octetstream',
'application/force-download',
'application/unknown',
- ):
+ ):
return 'application/octet-stream'
return None
@@ -193,8 +200,8 @@ def parse_cdx_line(raw_cdx: str, normalize=True) -> Optional[dict]:
offset = cdx[9]
warc = cdx[10]
- if not (sha1b32.isalnum() and c_size.isdigit() and offset.isdigit()
- and len(sha1b32) == 32 and dt.isdigit()):
+ if not (sha1b32.isalnum() and c_size.isdigit() and offset.isdigit() and len(sha1b32) == 32
+ and dt.isdigit()):
return None
if '-' in (surt, dt, url, http_status, sha1b32, c_size, offset, warc):
@@ -221,6 +228,7 @@ def parse_cdx_line(raw_cdx: str, normalize=True) -> Optional[dict]:
warc_path=warc,
)
+
def parse_cdx_datetime(dt_str: str) -> Optional[datetime.datetime]:
if not dt_str:
return None
@@ -229,23 +237,39 @@ def parse_cdx_datetime(dt_str: str) -> Optional[datetime.datetime]:
except Exception:
return None
+
def test_parse_cdx_datetime() -> None:
assert parse_cdx_datetime("") == None
assert parse_cdx_datetime("asdf") == None
assert parse_cdx_datetime("19930203123045") != None
- assert parse_cdx_datetime("20201028235103") == datetime.datetime(year=2020, month=10, day=28, hour=23, minute=51, second=3)
+ assert parse_cdx_datetime("20201028235103") == datetime.datetime(year=2020,
+ month=10,
+ day=28,
+ hour=23,
+ minute=51,
+ second=3)
+
def datetime_to_cdx(dt: datetime.datetime) -> str:
return '%04d%02d%02d%02d%02d%02d' % (
- dt.year, dt.month, dt.day,
- dt.hour, dt.minute, dt.second,
+ dt.year,
+ dt.month,
+ dt.day,
+ dt.hour,
+ dt.minute,
+ dt.second,
)
+
def test_datetime_to_cdx() -> None:
- assert "20201028235103" == datetime_to_cdx(datetime.datetime(year=2020, month=10, day=28, hour=23, minute=51, second=3))
+ assert "20201028235103" == datetime_to_cdx(
+ datetime.datetime(year=2020, month=10, day=28, hour=23, minute=51, second=3))
+
-def requests_retry_session(retries=10, backoff_factor=3,
- status_forcelist=(500, 502, 504), session=None) -> requests.Session:
+def requests_retry_session(retries=10,
+ backoff_factor=3,
+ status_forcelist=(500, 502, 504),
+ session=None) -> requests.Session:
"""
From: https://www.peterbe.com/plog/best-practice-with-retries-with-requests
"""
@@ -262,6 +286,7 @@ def requests_retry_session(retries=10, backoff_factor=3,
session.mount('https://', adapter)
return session
+
def sanitize_fs_path(path: str) -> str:
"""
From: https://stackoverflow.com/questions/13939120/sanitizing-a-file-path-in-python/66950540#66950540
@@ -271,6 +296,7 @@ def sanitize_fs_path(path: str) -> str:
# - making the path relative
return os.path.relpath(os.path.normpath(os.path.join("/", path)), "/")
+
def test_sanitize_fs_path() -> None:
assert sanitize_fs_path("/thing.png") == "thing.png"
assert sanitize_fs_path("../../thing.png") == "thing.png"
diff --git a/python/sandcrawler/pdfextract.py b/python/sandcrawler/pdfextract.py
index 2fb34b8..190672d 100644
--- a/python/sandcrawler/pdfextract.py
+++ b/python/sandcrawler/pdfextract.py
@@ -1,4 +1,3 @@
-
import datetime
import json
import sys
@@ -153,19 +152,20 @@ BAD_PDF_SHA1HEX = [
"fd9bd560662e070b222d63052830837829c490f0",
]
+
@dataclass
class PdfExtractResult:
sha1hex: str
status: str
error_msg: Optional[str] = None
- file_meta: Optional[Dict[str,Any]] = None
+ file_meta: Optional[Dict[str, Any]] = None
text: Optional[str] = None
page0_thumbnail: Optional[bytes] = None
has_page0_thumbnail: bool = False
meta_xml: Optional[str] = None
- pdf_info: Optional[Dict[str,Any]] = None
- pdf_extra: Optional[Dict[str,Any]] = None
- source: Optional[Dict[str,Any]] = None
+ pdf_info: Optional[Dict[str, Any]] = None
+ pdf_extra: Optional[Dict[str, Any]] = None
+ source: Optional[Dict[str, Any]] = None
def to_pdftext_dict(self) -> dict:
"""
@@ -221,7 +221,8 @@ class PdfExtractResult:
)
else:
pdf_extra = dict()
- for k in ('page_count', 'page0_height', 'page0_width', 'permanent_id', 'pdf_version'):
+ for k in ('page_count', 'page0_height', 'page0_width', 'permanent_id',
+ 'pdf_version'):
if record.get(k):
pdf_extra[k] = record[k]
return PdfExtractResult(
@@ -255,7 +256,7 @@ class PdfExtractResult:
metadata_json = json.dumps(metadata, sort_keys=True)
return (
self.sha1hex,
- datetime.datetime.now(), # updated
+ datetime.datetime.now(), # updated
self.status,
self.has_page0_thumbnail,
pdf_extra.get('page_count'),
@@ -269,7 +270,7 @@ class PdfExtractResult:
)
-def process_pdf(blob: bytes, thumb_size=(180,300), thumb_type="JPEG") -> PdfExtractResult:
+def process_pdf(blob: bytes, thumb_size=(180, 300), thumb_type="JPEG") -> PdfExtractResult:
"""
A known issue is that output text is in "physical layout" mode, which means
columns will be side-by-side. We would prefer a single stream of tokens!
@@ -330,7 +331,8 @@ def process_pdf(blob: bytes, thumb_size=(180,300), thumb_type="JPEG") -> PdfExtr
renderer = poppler.PageRenderer()
try:
full_img = renderer.render_page(page0)
- img = Image.frombuffer("RGBA", (full_img.width, full_img.height), full_img.data, 'raw', "BGRA", 0, 1)
+ img = Image.frombuffer("RGBA", (full_img.width, full_img.height), full_img.data, 'raw',
+ "BGRA", 0, 1)
img.thumbnail(thumb_size, Image.BICUBIC)
buf = BytesIO()
img.save(buf, thumb_type)
@@ -356,14 +358,14 @@ def process_pdf(blob: bytes, thumb_size=(180,300), thumb_type="JPEG") -> PdfExtr
)
# Kafka message size limit; cap at about 1 MByte
- if len(full_text)> 1000000:
+ if len(full_text) > 1000000:
return PdfExtractResult(
sha1hex=sha1hex,
status='text-too-large',
error_msg="full_text chars: {}".format(len(full_text)),
file_meta=file_meta,
)
- if len(pdf.metadata)> 1000000:
+ if len(pdf.metadata) > 1000000:
return PdfExtractResult(
sha1hex=sha1hex,
status='text-too-large',
@@ -414,8 +416,8 @@ def process_pdf(blob: bytes, thumb_size=(180,300), thumb_type="JPEG") -> PdfExtr
),
)
-class PdfExtractWorker(SandcrawlerFetchWorker):
+class PdfExtractWorker(SandcrawlerFetchWorker):
def __init__(self, wayback_client=None, sink=None, **kwargs):
super().__init__(wayback_client=wayback_client)
self.wayback_client = wayback_client
@@ -445,12 +447,12 @@ class PdfExtractWorker(SandcrawlerFetchWorker):
self.thumbnail_sink.push_record(result.page0_thumbnail, key=result.sha1hex)
return result.to_pdftext_dict()
+
class PdfExtractBlobWorker(SandcrawlerWorker):
"""
This is sort of like PdfExtractWorker, except it receives blobs directly,
instead of fetching blobs from some remote store.
"""
-
def __init__(self, sink=None, **kwargs):
super().__init__()
self.sink = sink
@@ -466,4 +468,3 @@ class PdfExtractBlobWorker(SandcrawlerWorker):
self.thumbnail_sink.push_record(result.page0_thumbnail, key=result.sha1hex)
return result.to_pdftext_dict()
-
diff --git a/python/sandcrawler/pdftrio.py b/python/sandcrawler/pdftrio.py
index 7d03357..e3d4a54 100644
--- a/python/sandcrawler/pdftrio.py
+++ b/python/sandcrawler/pdftrio.py
@@ -1,4 +1,3 @@
-
import time
import requests
@@ -8,7 +7,6 @@ from .workers import SandcrawlerFetchWorker, SandcrawlerWorker
class PdfTrioClient(object):
-
def __init__(self, host_url="http://pdftrio.qa.fatcat.wiki", **kwargs):
self.host_url = host_url
self.http_session = requests_retry_session(retries=3, backoff_factor=3)
@@ -51,9 +49,7 @@ class PdfTrioClient(object):
'error_msg': 'pdftrio request connection timout',
}
- info = dict(
- status_code=pdftrio_response.status_code,
- )
+ info = dict(status_code=pdftrio_response.status_code, )
if pdftrio_response.status_code == 200:
resp_json = pdftrio_response.json()
assert 'ensemble_score' in resp_json
@@ -72,7 +68,6 @@ class PdfTrioWorker(SandcrawlerFetchWorker):
"""
This class is basically copied directly from GrobidWorker
"""
-
def __init__(self, pdftrio_client, wayback_client=None, sink=None, **kwargs):
super().__init__(wayback_client=wayback_client)
self.pdftrio_client = pdftrio_client
@@ -103,12 +98,12 @@ class PdfTrioWorker(SandcrawlerFetchWorker):
result['timing']['fetch_sec'] = fetch_sec
return result
+
class PdfTrioBlobWorker(SandcrawlerWorker):
"""
This is sort of like PdfTrioWorker, except it receives blobs directly,
instead of fetching blobs from some remote store.
"""
-
def __init__(self, pdftrio_client, sink=None, mode="auto", **kwargs):
super().__init__()
self.pdftrio_client = pdftrio_client
@@ -128,4 +123,3 @@ class PdfTrioBlobWorker(SandcrawlerWorker):
total_sec=time.time() - start_process,
)
return result
-
diff --git a/python/sandcrawler/persist.py b/python/sandcrawler/persist.py
index 66a36bc..44c03f2 100644
--- a/python/sandcrawler/persist.py
+++ b/python/sandcrawler/persist.py
@@ -1,4 +1,3 @@
-
"""
cdx
- read raw CDX, filter
@@ -32,7 +31,6 @@ from sandcrawler.workers import SandcrawlerWorker
class PersistCdxWorker(SandcrawlerWorker):
-
def __init__(self, db_url, **kwargs):
super().__init__()
self.db = SandcrawlerPostgresClient(db_url)
@@ -56,8 +54,8 @@ class PersistCdxWorker(SandcrawlerWorker):
self.db.commit()
return []
-class PersistIngestFileResultWorker(SandcrawlerWorker):
+class PersistIngestFileResultWorker(SandcrawlerWorker):
def __init__(self, db_url, **kwargs):
super().__init__()
self.db = SandcrawlerPostgresClient(db_url)
@@ -78,8 +76,7 @@ class PersistIngestFileResultWorker(SandcrawlerWorker):
# backwards compat hacks; transform request to look like current schema
if raw.get('ingest_type') == 'file':
raw['ingest_type'] = 'pdf'
- if (not raw.get('link_source')
- and raw.get('base_url')
+ if (not raw.get('link_source') and raw.get('base_url')
and raw.get('ext_ids', {}).get('doi')
and raw['base_url'] == "https://doi.org/{}".format(raw['ext_ids']['doi'])):
# set link_source(_id) for old ingest requests
@@ -119,7 +116,6 @@ class PersistIngestFileResultWorker(SandcrawlerWorker):
if not request['request']:
request['request'] = None
return request
-
def file_result_to_row(self, raw: dict) -> Optional[dict]:
"""
@@ -137,7 +133,8 @@ class PersistIngestFileResultWorker(SandcrawlerWorker):
ingest_type = raw['request'].get('ingest_type')
if ingest_type == 'file':
ingest_type = 'pdf'
- if ingest_type not in ('pdf', 'xml', 'html', 'component', 'src', 'dataset', 'dataset-file'):
+ if ingest_type not in ('pdf', 'xml', 'html', 'component', 'src', 'dataset',
+ 'dataset-file'):
self.counts['skip-ingest-type'] += 1
return None
if raw['status'] in ("existing", ):
@@ -153,7 +150,9 @@ class PersistIngestFileResultWorker(SandcrawlerWorker):
if terminal:
result['terminal_url'] = terminal.get('terminal_url') or terminal.get('url')
result['terminal_dt'] = terminal.get('terminal_dt')
- result['terminal_status_code'] = terminal.get('terminal_status_code') or terminal.get('status_code') or terminal.get('http_code')
+ result['terminal_status_code'] = terminal.get(
+ 'terminal_status_code') or terminal.get('status_code') or terminal.get(
+ 'http_code')
if result['terminal_status_code']:
result['terminal_status_code'] = int(result['terminal_status_code'])
result['terminal_sha1hex'] = terminal.get('terminal_sha1hex')
@@ -215,9 +214,12 @@ class PersistIngestFileResultWorker(SandcrawlerWorker):
'manifest': raw.get('manifest'),
}
if result.get('fileset_bundle'):
- result['archiveorg_item_bundle_path'] = result['fileset_bundle'].get('archiveorg_item_bundle_path')
- result['web_bundle_url'] = result['fileset_bundle'].get('terminal', {}).get('terminal_url')
- result['web_bundle_dt'] = result['fileset_bundle'].get('terminal', {}).get('terminal_dt')
+ result['archiveorg_item_bundle_path'] = result['fileset_bundle'].get(
+ 'archiveorg_item_bundle_path')
+ result['web_bundle_url'] = result['fileset_bundle'].get('terminal',
+ {}).get('terminal_url')
+ result['web_bundle_dt'] = result['fileset_bundle'].get('terminal',
+ {}).get('terminal_dt')
return result
def push_batch(self, batch):
@@ -243,7 +245,9 @@ class PersistIngestFileResultWorker(SandcrawlerWorker):
# these schemas match, so can just pass through
cdx_batch = [r['cdx'] for r in batch if r.get('hit') and r.get('cdx')]
- revisit_cdx_batch = [r['revisit_cdx'] for r in batch if r.get('hit') and r.get('revisit_cdx')]
+ revisit_cdx_batch = [
+ r['revisit_cdx'] for r in batch if r.get('hit') and r.get('revisit_cdx')
+ ]
cdx_batch.extend(revisit_cdx_batch)
# filter to full CDX lines, with full warc_paths (not liveweb)
cdx_batch = [r for r in cdx_batch if r.get('warc_path') and ("/" in r['warc_path'])]
@@ -258,24 +262,31 @@ class PersistIngestFileResultWorker(SandcrawlerWorker):
self.counts['insert-file_meta'] += resp[0]
self.counts['update-file_meta'] += resp[1]
- html_meta_batch = [self.result_to_html_meta(r) for r in batch if r.get('hit') and r.get('html_body')]
+ html_meta_batch = [
+ self.result_to_html_meta(r) for r in batch if r.get('hit') and r.get('html_body')
+ ]
if html_meta_batch:
resp = self.db.insert_html_meta(self.cur, html_meta_batch, on_conflict="update")
self.counts['insert-html_meta'] += resp[0]
self.counts['update-html_meta'] += resp[1]
- fileset_platform_batch = [self.result_to_platform_row(raw) for raw in batch if raw.get('request', {}).get('ingest_type') == 'dataset' and raw.get('platform_name')]
+ fileset_platform_batch = [
+ self.result_to_platform_row(raw) for raw in batch if
+ raw.get('request', {}).get('ingest_type') == 'dataset' and raw.get('platform_name')
+ ]
fileset_platform_batch = [p for p in fileset_platform_batch if p]
if fileset_platform_batch:
- resp = self.db.insert_ingest_fileset_platform(self.cur, fileset_platform_batch, on_conflict="update")
+ resp = self.db.insert_ingest_fileset_platform(self.cur,
+ fileset_platform_batch,
+ on_conflict="update")
self.counts['insert-fileset_platform'] += resp[0]
self.counts['update-fileset_platform'] += resp[1]
self.db.commit()
return []
-class PersistIngestFilesetWorker(SandcrawlerWorker):
+class PersistIngestFilesetWorker(SandcrawlerWorker):
def __init__(self, db_url, **kwargs):
super().__init__()
self.db = SandcrawlerPostgresClient(db_url)
@@ -287,8 +298,8 @@ class PersistIngestFilesetWorker(SandcrawlerWorker):
"""
raise NotImplementedError
-class PersistIngestRequestWorker(PersistIngestFileResultWorker):
+class PersistIngestRequestWorker(PersistIngestFileResultWorker):
def __init__(self, db_url, **kwargs):
super().__init__(db_url=db_url)
@@ -315,8 +326,8 @@ class PersistIngestRequestWorker(PersistIngestFileResultWorker):
self.db.commit()
return []
-class PersistGrobidWorker(SandcrawlerWorker):
+class PersistGrobidWorker(SandcrawlerWorker):
def __init__(self, db_url, **kwargs):
super().__init__()
self.grobid = GrobidClient()
@@ -406,7 +417,6 @@ class PersistGrobidDiskWorker(SandcrawlerWorker):
This could be refactored into a "Sink" type with an even thinner wrapper.
"""
-
def __init__(self, output_dir):
super().__init__()
self.output_dir = output_dir
@@ -424,7 +434,7 @@ class PersistGrobidDiskWorker(SandcrawlerWorker):
if record.get('status_code') != 200 or not record.get('tei_xml'):
return False
- assert(len(record['key'])) == 40
+ assert (len(record['key'])) == 40
p = "{}/{}".format(self.output_dir, self._blob_path(record['key']))
os.makedirs(os.path.dirname(p), exist_ok=True)
with open(p, 'w') as f:
@@ -434,7 +444,6 @@ class PersistGrobidDiskWorker(SandcrawlerWorker):
class PersistPdfTrioWorker(SandcrawlerWorker):
-
def __init__(self, db_url, **kwargs):
super().__init__()
self.db = SandcrawlerPostgresClient(db_url)
@@ -458,7 +467,10 @@ class PersistPdfTrioWorker(SandcrawlerWorker):
self.counts['insert-pdftrio'] += resp[0]
self.counts['update-pdftrio'] += resp[1]
- file_meta_batch = [r['file_meta'] for r in batch if r['pdf_trio']['status'] == "success" and r.get('file_meta')]
+ file_meta_batch = [
+ r['file_meta'] for r in batch
+ if r['pdf_trio']['status'] == "success" and r.get('file_meta')
+ ]
resp = self.db.insert_file_meta(self.cur, file_meta_batch)
self.counts['insert-file-meta'] += resp[0]
self.counts['update-file-meta'] += resp[1]
@@ -473,7 +485,6 @@ class PersistPdfTextWorker(SandcrawlerWorker):
Should keep batch sizes small.
"""
-
def __init__(self, db_url, **kwargs):
super().__init__()
self.s3 = SandcrawlerMinioClient(
@@ -545,7 +556,6 @@ class PersistThumbnailWorker(SandcrawlerWorker):
This worker *must* be used with raw kakfa mode; thumbnails are *not*
wrapped in JSON like most sandcrawler kafka messages.
"""
-
def __init__(self, **kwargs):
super().__init__()
self.s3 = SandcrawlerMinioClient(
@@ -583,7 +593,6 @@ class GenericPersistDocWorker(SandcrawlerWorker):
Objects are assumed to be JSON-wrapped strings.
"""
-
def __init__(self, **kwargs):
super().__init__()
self.s3 = SandcrawlerMinioClient(
@@ -624,7 +633,6 @@ class PersistXmlDocWorker(GenericPersistDocWorker):
Pushes TEI-XML file to blob store (S3/seaweed/minio). Does not talk to
sandcrawler database (SQL).
"""
-
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.s3_extension = kwargs.get('s3_extension', ".jats.xml")
@@ -637,7 +645,6 @@ class PersistHtmlTeiXmlWorker(GenericPersistDocWorker):
Pushes TEI-XML file to blob store (S3/seaweed/minio). Does not talk to
sandcrawler database (SQL).
"""
-
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.s3_extension = kwargs.get('s3_extension', ".tei.xml")
diff --git a/python/sandcrawler/workers.py b/python/sandcrawler/workers.py
index d8a4016..7135f4c 100644
--- a/python/sandcrawler/workers.py
+++ b/python/sandcrawler/workers.py
@@ -1,4 +1,3 @@
-
import json
import multiprocessing.pool
import signal
@@ -21,7 +20,6 @@ class SandcrawlerWorker(object):
Usually these get "pushed" into by a RecordPusher. Output goes to another
worker (pipeline-style), or defaults to stdout.
"""
-
def __init__(self):
self.counts = Counter()
self.sink = None
@@ -62,9 +60,9 @@ class SandcrawlerWorker(object):
multithreading or if signal-based timeouts are used elsewhere in the
same process.
"""
-
def timeout_handler(signum, frame):
raise TimeoutError("timeout processing record")
+
signal.signal(signal.SIGALRM, timeout_handler)
resp = None
signal.alarm(int(timeout))
@@ -72,7 +70,7 @@ class SandcrawlerWorker(object):
resp = self.push_record(task, key=key)
except TimeoutError:
self.counts['timeout'] += 1
- resp = self.timeout_response(task) # pylint: disable=assignment-from-none
+ resp = self.timeout_response(task) # pylint: disable=assignment-from-none
# TODO: what if it is this push_record() itself that is timing out?
if resp and self.sink:
self.sink.push_record(resp)
@@ -113,7 +111,6 @@ class SandcrawlerFetchWorker(SandcrawlerWorker):
Wrapper of SandcrawlerWorker that adds a helper method to fetch blobs (eg,
PDFs) from wayback, archive.org, or other sources.
"""
-
def __init__(self, wayback_client, **kwargs):
super().__init__(**kwargs)
self.wayback_client = wayback_client
@@ -178,7 +175,8 @@ class SandcrawlerFetchWorker(SandcrawlerWorker):
)
blob = resp.content
else:
- raise ValueError("not a CDX (wayback) or petabox (archive.org) dict; not sure how to proceed")
+ raise ValueError(
+ "not a CDX (wayback) or petabox (archive.org) dict; not sure how to proceed")
if not blob:
return dict(
key=default_key,
@@ -192,8 +190,8 @@ class SandcrawlerFetchWorker(SandcrawlerWorker):
blob=blob,
)
-class MultiprocessWrapper(SandcrawlerWorker):
+class MultiprocessWrapper(SandcrawlerWorker):
def __init__(self, worker, sink, jobs=None):
self.counts = Counter()
self.worker = worker
@@ -226,21 +224,21 @@ class MultiprocessWrapper(SandcrawlerWorker):
print("Multiprocessing: {}".format(self.counts), file=sys.stderr)
return worker_counts
+
class BlackholeSink(SandcrawlerWorker):
"""
Dummy SandcrawlerWorker. That doesn't do or process anything.
Useful for tests.
"""
-
def push_record(self, task, key=None):
return
def push_batch(self, tasks):
return
-class KafkaSink(SandcrawlerWorker):
+class KafkaSink(SandcrawlerWorker):
def __init__(self, kafka_hosts, produce_topic, **kwargs):
self.sink = None
self.counts = Counter()
@@ -249,13 +247,12 @@ class KafkaSink(SandcrawlerWorker):
config = self.producer_config({
'bootstrap.servers': kafka_hosts,
- 'message.max.bytes': 30000000, # ~30 MBytes; broker is ~50 MBytes
+ 'message.max.bytes': 30000000, # ~30 MBytes; broker is ~50 MBytes
'api.version.request': True,
'api.version.fallback.ms': 0,
})
self.producer = Producer(config)
-
@staticmethod
def _fail_fast(err, msg):
if err is not None:
@@ -270,7 +267,7 @@ class KafkaSink(SandcrawlerWorker):
'delivery.report.only.error': True,
'default.topic.config': {
'message.timeout.ms': 30000,
- 'request.required.acks': -1, # all brokers must confirm
+ 'request.required.acks': -1, # all brokers must confirm
}
})
return config
@@ -285,11 +282,7 @@ class KafkaSink(SandcrawlerWorker):
msg = msg.encode('utf-8')
assert type(msg) == bytes
- self.producer.produce(
- self.produce_topic,
- msg,
- key=key,
- on_delivery=self._fail_fast)
+ self.producer.produce(self.produce_topic, msg, key=key, on_delivery=self._fail_fast)
self.counts['produced'] += 1
# check for errors etc
@@ -308,7 +301,6 @@ class KafkaCompressSink(KafkaSink):
"""
Variant of KafkaSink for large documents. Used for, eg, GROBID output.
"""
-
def producer_config(self, kafka_config):
config = kafka_config.copy()
config.update({
@@ -319,7 +311,7 @@ class KafkaCompressSink(KafkaSink):
'delivery.report.only.error': True,
'default.topic.config': {
'message.timeout.ms': 30000,
- 'request.required.acks': -1, # all brokers must confirm
+ 'request.required.acks': -1, # all brokers must confirm
}
})
return config
@@ -330,7 +322,6 @@ class RecordPusher:
Base class for different record sources to be pushed into workers. Pretty
trivial interface, just wraps an importer and pushes records in to it.
"""
-
def __init__(self, worker, **kwargs):
self.counts = Counter()
self.worker = worker
@@ -348,7 +339,6 @@ class RecordPusher:
class JsonLinePusher(RecordPusher):
-
def __init__(self, worker, json_file, **kwargs):
self.counts = Counter()
self.worker = worker
@@ -387,7 +377,6 @@ class JsonLinePusher(RecordPusher):
class CdxLinePusher(RecordPusher):
-
def __init__(self, worker, cdx_file, **kwargs):
self.counts = Counter()
self.worker = worker
@@ -409,7 +398,8 @@ class CdxLinePusher(RecordPusher):
if not record:
self.counts['skip-parse'] += 1
continue
- if self.filter_http_statuses and record['http_status'] not in self.filter_http_statuses:
+ if self.filter_http_statuses and record[
+ 'http_status'] not in self.filter_http_statuses:
self.counts['skip-http_status'] += 1
continue
if self.filter_mimetypes and record['mimetype'] not in self.filter_mimetypes:
@@ -434,7 +424,6 @@ class CdxLinePusher(RecordPusher):
class ZipfilePusher(RecordPusher):
-
def __init__(self, worker, zipfile_path, **kwargs):
self.counts = Counter()
self.worker = worker
@@ -472,8 +461,8 @@ class ZipfilePusher(RecordPusher):
print("ZIP PDFs pushed: {}".format(self.counts), file=sys.stderr)
return self.counts
-class KafkaJsonPusher(RecordPusher):
+class KafkaJsonPusher(RecordPusher):
def __init__(self, worker, kafka_hosts, consume_topic, group, **kwargs):
self.counts = Counter()
self.worker = worker
@@ -499,12 +488,11 @@ class KafkaJsonPusher(RecordPusher):
# case where there there is one update and thousands of creates;
# update would be lingering in worker, and if worker crashed
# never created. Not great.
- batch = self.consumer.consume(
- num_messages=self.batch_size,
- timeout=self.poll_interval)
+ batch = self.consumer.consume(num_messages=self.batch_size,
+ timeout=self.poll_interval)
print("... got {} kafka messages ({}sec poll interval)".format(
- len(batch), self.poll_interval),
- file=sys.stderr)
+ len(batch), self.poll_interval),
+ file=sys.stderr)
if not batch:
# TODO: could have some larger timeout here and
# self.worker.finish() if it's been more than, eg, a couple
@@ -541,7 +529,9 @@ class KafkaJsonPusher(RecordPusher):
while not done:
try:
# use timeouts; don't want kafka itself to timeout
- self.worker.push_record_timeout(record, key=msg.key(), timeout=self.process_timeout_sec)
+ self.worker.push_record_timeout(record,
+ key=msg.key(),
+ timeout=self.process_timeout_sec)
break
except SandcrawlerBackoffError as be:
print("Backing off for 200 seconds: {}".format(be))
@@ -611,14 +601,14 @@ def make_kafka_consumer(hosts, consume_topic, group):
for p in partitions:
if p.error:
raise KafkaException(p.error)
- print("Kafka partitions rebalanced: {} / {}".format(
- consumer, partitions),
- file=sys.stderr)
+ print("Kafka partitions rebalanced: {} / {}".format(consumer, partitions),
+ file=sys.stderr)
consumer = Consumer(conf)
# NOTE: it's actually important that topic_name *not* be bytes (UTF-8
# encoded)
- consumer.subscribe([topic_name],
+ consumer.subscribe(
+ [topic_name],
on_assign=on_rebalance,
on_revoke=on_rebalance,
)
diff --git a/python/sandcrawler/xml.py b/python/sandcrawler/xml.py
index 7a0086d..83d53d4 100644
--- a/python/sandcrawler/xml.py
+++ b/python/sandcrawler/xml.py
@@ -1,4 +1,3 @@
-
import xml.etree.ElementTree as ET
diff --git a/python/sandcrawler_worker.py b/python/sandcrawler_worker.py
index e185fad..3c76c17 100755
--- a/python/sandcrawler_worker.py
+++ b/python/sandcrawler_worker.py
@@ -1,5 +1,4 @@
#!/usr/bin/env python3
-
"""
These are generally for continuously running workers that consume from Kafka.
Outputs might either be pushed back into Kafka, or directly into sandcrawler-db
@@ -31,12 +30,8 @@ def run_grobid_extract(args):
kafka_hosts=args.kafka_hosts,
produce_topic=produce_topic,
)
- grobid_client = GrobidClient(
- host_url=args.grobid_host,
- )
- wayback_client = WaybackClient(
- host_url=args.grobid_host,
- )
+ grobid_client = GrobidClient(host_url=args.grobid_host, )
+ wayback_client = WaybackClient(host_url=args.grobid_host, )
worker = GrobidWorker(
grobid_client=grobid_client,
wayback_client=wayback_client,
@@ -51,6 +46,7 @@ def run_grobid_extract(args):
)
pusher.run()
+
def run_pdf_extract(args):
consume_topic = "sandcrawler-{}.unextracted".format(args.env)
pdftext_topic = "sandcrawler-{}.pdf-text".format(args.env)
@@ -63,9 +59,7 @@ def run_pdf_extract(args):
kafka_hosts=args.kafka_hosts,
produce_topic=thumbnail_topic,
)
- wayback_client = WaybackClient(
- host_url=args.grobid_host,
- )
+ wayback_client = WaybackClient(host_url=args.grobid_host, )
worker = PdfExtractWorker(
wayback_client=wayback_client,
sink=pdftext_sink,
@@ -81,6 +75,7 @@ def run_pdf_extract(args):
)
pusher.run()
+
def run_persist_grobid(args):
consume_topic = "sandcrawler-{}.grobid-output-pg".format(args.env)
worker = PersistGrobidWorker(
@@ -105,6 +100,7 @@ def run_persist_grobid(args):
)
pusher.run()
+
def run_persist_pdftext(args):
consume_topic = "sandcrawler-{}.pdf-text".format(args.env)
worker = PersistPdfTextWorker(
@@ -129,6 +125,7 @@ def run_persist_pdftext(args):
)
pusher.run()
+
def run_persist_thumbnail(args):
consume_topic = "sandcrawler-{}.pdf-thumbnail-180px-jpg".format(args.env)
worker = PersistThumbnailWorker(
@@ -150,6 +147,7 @@ def run_persist_thumbnail(args):
)
pusher.run()
+
def run_persist_xml_doc(args: argparse.Namespace) -> None:
consume_topic = f"sandcrawler-{args.env}.xml-doc"
worker = PersistXmlDocWorker(
@@ -168,6 +166,7 @@ def run_persist_xml_doc(args: argparse.Namespace) -> None:
)
pusher.run()
+
def run_persist_html_teixml(args: argparse.Namespace) -> None:
consume_topic = f"sandcrawler-{args.env}.html-teixml"
worker = PersistHtmlTeiXmlWorker(
@@ -186,11 +185,10 @@ def run_persist_html_teixml(args: argparse.Namespace) -> None:
)
pusher.run()
+
def run_persist_pdftrio(args):
consume_topic = "sandcrawler-{}.pdftrio-output".format(args.env)
- worker = PersistPdfTrioWorker(
- db_url=args.db_url,
- )
+ worker = PersistPdfTrioWorker(db_url=args.db_url, )
pusher = KafkaJsonPusher(
worker=worker,
kafka_hosts=args.kafka_hosts,
@@ -201,6 +199,7 @@ def run_persist_pdftrio(args):
)
pusher.run()
+
def run_ingest_file(args):
spn_cdx_retry_sec = 9.0
if args.bulk:
@@ -228,9 +227,7 @@ def run_ingest_file(args):
kafka_hosts=args.kafka_hosts,
produce_topic=grobid_topic,
)
- grobid_client = GrobidClient(
- host_url=args.grobid_host,
- )
+ grobid_client = GrobidClient(host_url=args.grobid_host, )
pdftext_sink = KafkaCompressSink(
kafka_hosts=args.kafka_hosts,
produce_topic=pdftext_topic,
@@ -268,11 +265,10 @@ def run_ingest_file(args):
)
pusher.run()
+
def run_persist_ingest_file(args):
consume_topic = "sandcrawler-{}.ingest-file-results".format(args.env)
- worker = PersistIngestFileResultWorker(
- db_url=args.db_url,
- )
+ worker = PersistIngestFileResultWorker(db_url=args.db_url, )
pusher = KafkaJsonPusher(
worker=worker,
kafka_hosts=args.kafka_hosts,
@@ -283,90 +279,116 @@ def run_persist_ingest_file(args):
)
pusher.run()
+
def main():
- parser = argparse.ArgumentParser(
- formatter_class=argparse.ArgumentDefaultsHelpFormatter)
+ parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter)
parser.add_argument('--kafka-hosts',
- default="localhost:9092",
- help="list of Kafka brokers (host/port) to use")
+ default="localhost:9092",
+ help="list of Kafka brokers (host/port) to use")
parser.add_argument('--env',
- default="dev",
- help="Kafka topic namespace to use (eg, prod, qa, dev)")
+ default="dev",
+ help="Kafka topic namespace to use (eg, prod, qa, dev)")
parser.add_argument('--grobid-host',
- default="http://grobid.qa.fatcat.wiki",
- help="GROBID API host/port")
+ default="http://grobid.qa.fatcat.wiki",
+ help="GROBID API host/port")
parser.add_argument('--db-url',
- help="postgresql database connection string",
- default="postgres:///sandcrawler")
- parser.add_argument('--s3-url',
- help="S3 (seaweedfs) backend URL",
- default="localhost:9000")
+ help="postgresql database connection string",
+ default="postgres:///sandcrawler")
+ parser.add_argument('--s3-url', help="S3 (seaweedfs) backend URL", default="localhost:9000")
parser.add_argument('--s3-access-key',
- help="S3 (seaweedfs) credential",
- default=os.environ.get('SANDCRAWLER_BLOB_ACCESS_KEY') or os.environ.get('MINIO_ACCESS_KEY'))
+ help="S3 (seaweedfs) credential",
+ default=os.environ.get('SANDCRAWLER_BLOB_ACCESS_KEY')
+ or os.environ.get('MINIO_ACCESS_KEY'))
parser.add_argument('--s3-secret-key',
- help="S3 (seaweedfs) credential",
- default=os.environ.get('SANDCRAWLER_BLOB_SECRET_KEY') or os.environ.get('MINIO_SECRET_KEY'))
+ help="S3 (seaweedfs) credential",
+ default=os.environ.get('SANDCRAWLER_BLOB_SECRET_KEY')
+ or os.environ.get('MINIO_SECRET_KEY'))
parser.add_argument('--s3-bucket',
- help="S3 (seaweedfs) bucket to persist into",
- default="sandcrawler-dev")
+ help="S3 (seaweedfs) bucket to persist into",
+ default="sandcrawler-dev")
subparsers = parser.add_subparsers()
- sub_grobid_extract = subparsers.add_parser('grobid-extract',
- help="daemon that consumes CDX JSON objects from Kafka, uses GROBID to extract XML, pushes to Kafka")
+ sub_grobid_extract = subparsers.add_parser(
+ 'grobid-extract',
+ help=
+ "daemon that consumes CDX JSON objects from Kafka, uses GROBID to extract XML, pushes to Kafka"
+ )
sub_grobid_extract.set_defaults(func=run_grobid_extract)
- sub_pdf_extract = subparsers.add_parser('pdf-extract',
- help="daemon that consumes CDX JSON objects from Kafka, extracts text and thumbnail, pushes to Kafka")
+ sub_pdf_extract = subparsers.add_parser(
+ 'pdf-extract',
+ help=
+ "daemon that consumes CDX JSON objects from Kafka, extracts text and thumbnail, pushes to Kafka"
+ )
sub_pdf_extract.set_defaults(func=run_pdf_extract)
- sub_persist_grobid = subparsers.add_parser('persist-grobid',
- help="daemon that consumes GROBID output from Kafka and pushes to S3 (seaweedfs) and postgres")
+ sub_persist_grobid = subparsers.add_parser(
+ 'persist-grobid',
+ help=
+ "daemon that consumes GROBID output from Kafka and pushes to S3 (seaweedfs) and postgres"
+ )
sub_persist_grobid.add_argument('--s3-only',
- action='store_true',
- help="only upload TEI-XML to S3 (don't write to database)")
- sub_persist_grobid.add_argument('--db-only',
+ action='store_true',
+ help="only upload TEI-XML to S3 (don't write to database)")
+ sub_persist_grobid.add_argument(
+ '--db-only',
action='store_true',
help="only write status to database (don't upload TEI-XML to S3)")
sub_persist_grobid.set_defaults(func=run_persist_grobid)
- sub_persist_pdftext = subparsers.add_parser('persist-pdftext',
- help="daemon that consumes pdftext output from Kafka and pushes to S3 (seaweedfs) and postgres")
+ sub_persist_pdftext = subparsers.add_parser(
+ 'persist-pdftext',
+ help=
+ "daemon that consumes pdftext output from Kafka and pushes to S3 (seaweedfs) and postgres"
+ )
sub_persist_pdftext.add_argument('--s3-only',
- action='store_true',
- help="only upload TEI-XML to S3 (don't write to database)")
- sub_persist_pdftext.add_argument('--db-only',
+ action='store_true',
+ help="only upload TEI-XML to S3 (don't write to database)")
+ sub_persist_pdftext.add_argument(
+ '--db-only',
action='store_true',
help="only write status to database (don't upload TEI-XML to S3)")
sub_persist_pdftext.set_defaults(func=run_persist_pdftext)
- sub_persist_thumbnail = subparsers.add_parser('persist-thumbnail',
- help="daemon that consumes thumbnail output from Kafka and pushes to S3 (seaweedfs) and postgres")
+ sub_persist_thumbnail = subparsers.add_parser(
+ 'persist-thumbnail',
+ help=
+ "daemon that consumes thumbnail output from Kafka and pushes to S3 (seaweedfs) and postgres"
+ )
sub_persist_thumbnail.set_defaults(func=run_persist_thumbnail)
- sub_persist_xml_doc = subparsers.add_parser('persist-xml-doc',
- help="daemon that consumes xml-doc output from Kafka and pushes to S3 (seaweedfs) bucket")
+ sub_persist_xml_doc = subparsers.add_parser(
+ 'persist-xml-doc',
+ help="daemon that consumes xml-doc output from Kafka and pushes to S3 (seaweedfs) bucket"
+ )
sub_persist_xml_doc.set_defaults(func=run_persist_xml_doc)
- sub_persist_html_teixml = subparsers.add_parser('persist-html-teixml',
- help="daemon that consumes html-teixml output from Kafka and pushes to S3 (seaweedfs) bucket")
+ sub_persist_html_teixml = subparsers.add_parser(
+ 'persist-html-teixml',
+ help=
+ "daemon that consumes html-teixml output from Kafka and pushes to S3 (seaweedfs) bucket"
+ )
sub_persist_html_teixml.set_defaults(func=run_persist_html_teixml)
- sub_persist_pdftrio = subparsers.add_parser('persist-pdftrio',
+ sub_persist_pdftrio = subparsers.add_parser(
+ 'persist-pdftrio',
help="daemon that consumes pdftrio output from Kafka and pushes to postgres")
sub_persist_pdftrio.set_defaults(func=run_persist_pdftrio)
- sub_ingest_file = subparsers.add_parser('ingest-file',
+ sub_ingest_file = subparsers.add_parser(
+ 'ingest-file',
help="daemon that consumes requests from Kafka, ingests, pushes results to Kafka")
sub_ingest_file.add_argument('--bulk',
- action='store_true',
- help="consume from bulk kafka topic (eg, for ingest backfill)")
- sub_ingest_file.add_argument('--priority',
+ action='store_true',
+ help="consume from bulk kafka topic (eg, for ingest backfill)")
+ sub_ingest_file.add_argument(
+ '--priority',
action='store_true',
help="consume from priority kafka topic (eg, for SPN requests)")
sub_ingest_file.set_defaults(func=run_ingest_file)
- sub_persist_ingest_file = subparsers.add_parser('persist-ingest-file',
+ sub_persist_ingest_file = subparsers.add_parser(
+ 'persist-ingest-file',
help="daemon that consumes ingest-file output from Kafka and pushes to postgres")
sub_persist_ingest_file.set_defaults(func=run_persist_ingest_file)
@@ -377,5 +399,6 @@ def main():
args.func(args)
+
if __name__ == '__main__':
main()
diff --git a/python/scripts/arabesque2ingestrequest.py b/python/scripts/arabesque2ingestrequest.py
index 69fe320..9cc9055 100755
--- a/python/scripts/arabesque2ingestrequest.py
+++ b/python/scripts/arabesque2ingestrequest.py
@@ -1,5 +1,4 @@
#!/usr/bin/env python3
-
"""
This script is intended to be used for backfill ingest of old crawls. It can
also be used as a fast path for getting freshly crawled content into fatcat if
@@ -36,37 +35,35 @@ def run(args):
},
}
if args.release_stage:
- assert args.release_stage in ('published', 'submitted', 'accepted', 'draft', 'update')
+ assert args.release_stage in ('published', 'submitted', 'accepted', 'draft',
+ 'update')
request['release_stage'] = args.release_stage
print("{}".format(json.dumps(request, sort_keys=True)))
+
def main():
- parser = argparse.ArgumentParser(
- formatter_class=argparse.ArgumentDefaultsHelpFormatter)
+ parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter)
parser.add_argument('--link-source',
- required=True,
- help="link_source to include in request")
- parser.add_argument('--extid-type',
- required=True,
- help="extid to encode identifier as")
+ required=True,
+ help="link_source to include in request")
+ parser.add_argument('--extid-type', required=True, help="extid to encode identifier as")
parser.add_argument('--ingest-type',
- default="pdf",
- help="ingest type (pdf, html, xml, etc)")
+ default="pdf",
+ help="ingest type (pdf, html, xml, etc)")
parser.add_argument('--ingest-request-source',
- default="arabesque",
- help="to include in request")
- parser.add_argument('--release-stage',
- default=None,
- help="to include in request")
+ default="arabesque",
+ help="to include in request")
+ parser.add_argument('--release-stage', default=None, help="to include in request")
parser.add_argument('json_file',
- help="arabesque output file to use",
- type=argparse.FileType('r'))
+ help="arabesque output file to use",
+ type=argparse.FileType('r'))
subparsers = parser.add_subparsers()
args = parser.parse_args()
run(args)
+
if __name__ == '__main__':
main()
diff --git a/python/scripts/archiveorg_fileset.py b/python/scripts/archiveorg_fileset.py
index 86ca062..83c04e3 100755
--- a/python/scripts/archiveorg_fileset.py
+++ b/python/scripts/archiveorg_fileset.py
@@ -23,11 +23,9 @@ FORMAT_TO_MIMETYPE = {
'RAR': 'application/vnd.rar',
'TAR': 'application/x-tar',
'7z': 'application/x-7z-compressed',
-
'HTML': 'text/html',
'Text': 'text/plain',
'PDF': 'application/pdf',
-
'CSV': 'text/csv',
'XML': 'application/xml',
'JSON': 'application/json',
@@ -36,20 +34,17 @@ FORMAT_TO_MIMETYPE = {
#'application/vnd.openxmlformats-officedocument.wordprocessingml.document', # .docx
#'application/vnd.ms-excel', # .xls
#'application/vnd.openxmlformats-officedocument.spreadsheetml.sheet', # .xlsx
-
- 'MP3': 'audio/mpeg', # .mp3
-
- 'MP4': 'video/mp4', # .mp4
- 'MPEG': 'video/mpeg', # .mpeg
-
+ 'MP3': 'audio/mpeg', # .mp3
+ 'MP4': 'video/mp4', # .mp4
+ 'MPEG': 'video/mpeg', # .mpeg
'JPEG': 'image/jpeg',
'GIF': 'image/gif',
'PNG': 'image/png',
'TIFF': 'image/tiff',
-
'Unknown': None,
}
+
def want_file(f: dict, item_name: str) -> bool:
"""
Filters IA API files
@@ -57,12 +52,12 @@ def want_file(f: dict, item_name: str) -> bool:
if f.source != 'original':
return False
for suffix in [
- '_meta.sqlite',
- '_archive.torrent',
- '_itemimage.jpg',
- '_meta.xml',
- '_thumb.png',
- '_files.xml',
+ '_meta.sqlite',
+ '_archive.torrent',
+ '_itemimage.jpg',
+ '_meta.xml',
+ '_thumb.png',
+ '_files.xml',
]:
if f.name == item_name + suffix or f.name == item_name.lower() + suffix:
return False
@@ -74,6 +69,7 @@ def want_file(f: dict, item_name: str) -> bool:
return False
return True
+
def parse_file(f: dict) -> dict:
"""
Takes an IA API file and turns it in to a fatcat fileset manifest file
@@ -93,6 +89,7 @@ def parse_file(f: dict) -> dict:
mf['extra'] = dict(mimetype=mimetype)
return mf
+
def item_to_fileset(item_name: str, release_id: str, session: internetarchive.ArchiveSession):
print(f"processing item={item_name} release_id={release_id}", file=sys.stderr)
if release_id.startswith('release_'):
@@ -104,18 +101,17 @@ def item_to_fileset(item_name: str, release_id: str, session: internetarchive.Ar
manifest = [parse_file(f) for f in item_files if want_file(f, item_name)]
fileset = {
'manifest': manifest,
- 'urls': [
- {
- 'rel': 'archive',
- 'url': f'https://archive.org/download/{item_name}/',
- },
- ],
+ 'urls': [{
+ 'rel': 'archive',
+ 'url': f'https://archive.org/download/{item_name}/',
+ }, ],
'release_ids': [release_id],
#extra={},
}
print(json.dumps(fileset))
return fileset
+
def main():
session = internetarchive.get_session()
if len(sys.argv) == 3:
@@ -133,5 +129,6 @@ def main():
release_id = fields[1]
item_to_fileset(item_name, release_id=release_id, session=session)
+
if __name__ == '__main__':
main()
diff --git a/python/scripts/cdx_collection.py b/python/scripts/cdx_collection.py
index 5e33def..aa78aec 100755
--- a/python/scripts/cdx_collection.py
+++ b/python/scripts/cdx_collection.py
@@ -35,9 +35,7 @@ def run():
print("Looking up collection: {}".format(collection))
# First fetch list
- item_list = list(
- ia.search_items(
- query="collection:{} mediatype:web".format(collection)))
+ item_list = list(ia.search_items(query="collection:{} mediatype:web".format(collection)))
if len(item_list) == 0:
print("No items found, bailing")
@@ -50,11 +48,12 @@ def run():
item = item['identifier']
# TODO: error handling
try:
- ret = ia.download(item, files=[item + '.cdx.gz'],
- verbose=True,
- destdir=tempdir,
- no_directory=True,
- retries=1000)
+ ret = ia.download(item,
+ files=[item + '.cdx.gz'],
+ verbose=True,
+ destdir=tempdir,
+ no_directory=True,
+ retries=1000)
status = ret and status
except requests.exceptions.ReadTimeout as rt:
print(str(rt), file=sys.stderr)
@@ -69,14 +68,13 @@ def run():
# Combine files
print("Merging and re-compressing all CDX files...")
#subprocess.run('zcat {0}/*.cdx.gz | pigz > {0}/combined.gz'.format(tempdir),
- subprocess.run('zcat {0}/*.cdx.gz | gzip > {0}/combined.gz'.format(tempdir),
- shell=True)
+ subprocess.run('zcat {0}/*.cdx.gz | gzip > {0}/combined.gz'.format(tempdir), shell=True)
# Move and cleanup
- shutil.move('{}/combined.gz'.format(tempdir),
- '{}.cdx.gz'.format(collection))
+ shutil.move('{}/combined.gz'.format(tempdir), '{}.cdx.gz'.format(collection))
print("Done!")
-if __name__=='__main__':
+
+if __name__ == '__main__':
run()
diff --git a/python/scripts/covid2ingestrequest.py b/python/scripts/covid2ingestrequest.py
index 1b7c85c..4714b60 100755
--- a/python/scripts/covid2ingestrequest.py
+++ b/python/scripts/covid2ingestrequest.py
@@ -1,5 +1,4 @@
#!/usr/bin/env python3
-
"""
Transform an unpaywall dump (JSON) into ingest requests.
"""
@@ -21,7 +20,6 @@ def transform_cnki(obj):
requests = []
assert obj['cnki_id']
-
requests = []
requests.append({
'base_url': canon(obj['info_url']),
@@ -41,6 +39,7 @@ def transform_cnki(obj):
return requests
+
def transform_wanfang(obj):
assert obj['wanfang_id']
@@ -68,17 +67,18 @@ def run(args):
for r in requests:
print("{}".format(json.dumps(r, sort_keys=True)))
+
def main():
- parser = argparse.ArgumentParser(
- formatter_class=argparse.ArgumentDefaultsHelpFormatter)
+ parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter)
parser.add_argument('json_file',
- help="COVID-19 metadata file to use",
- type=argparse.FileType('r'))
+ help="COVID-19 metadata file to use",
+ type=argparse.FileType('r'))
subparsers = parser.add_subparsers()
args = parser.parse_args()
run(args)
+
if __name__ == '__main__':
main()
diff --git a/python/scripts/deliver_dumpgrobid_to_s3.py b/python/scripts/deliver_dumpgrobid_to_s3.py
index 62a85e6..3b53235 100755
--- a/python/scripts/deliver_dumpgrobid_to_s3.py
+++ b/python/scripts/deliver_dumpgrobid_to_s3.py
@@ -49,7 +49,6 @@ def b32_hex(s):
class DeliverDumpGrobidS3():
-
def __init__(self, s3_bucket, **kwargs):
self.rstore = None
self.count = Counter()
@@ -80,11 +79,7 @@ class DeliverDumpGrobidS3():
tei_xml = tei_xml.encode('utf-8')
# upload to AWS S3
obj = self.bucket.put_object(
- Key="{}{}/{}{}".format(
- self.s3_prefix,
- sha1_hex[0:4],
- sha1_hex,
- self.s3_suffix),
+ Key="{}{}/{}{}".format(self.s3_prefix, sha1_hex[0:4], sha1_hex, self.s3_suffix),
Body=tei_xml,
StorageClass=self.s3_storage_class,
)
@@ -92,6 +87,7 @@ class DeliverDumpGrobidS3():
self.count['success-s3'] += 1
sys.stderr.write("{}\n".format(self.count))
+
@sentry_client.capture_exceptions
def main():
@@ -121,5 +117,6 @@ def main():
worker = DeliverDumpGrobidS3(**args.__dict__)
worker.run(args.dump_file)
-if __name__ == '__main__': # pragma: no cover
+
+if __name__ == '__main__': # pragma: no cover
main()
diff --git a/python/scripts/deliver_gwb_to_disk.py b/python/scripts/deliver_gwb_to_disk.py
index ab1906a..ca19b97 100755
--- a/python/scripts/deliver_gwb_to_disk.py
+++ b/python/scripts/deliver_gwb_to_disk.py
@@ -26,7 +26,6 @@ sentry_client = raven.Client()
class DeliverGwbDisk:
-
def __init__(self, disk_dir, **kwargs):
self.warc_uri_prefix = kwargs.get('warc_uri_prefix')
self.rstore = None
@@ -34,7 +33,8 @@ class DeliverGwbDisk:
# /serve/ instead of /download/ doesn't record view count
self.petabox_base_url = kwargs.get('petabox_base_url', 'http://archive.org/serve/')
# gwb library will fall back to reading from /opt/.petabox/webdata.secret
- self.petabox_webdata_secret = kwargs.get('petabox_webdata_secret', os.environ.get('PETABOX_WEBDATA_SECRET'))
+ self.petabox_webdata_secret = kwargs.get('petabox_webdata_secret',
+ os.environ.get('PETABOX_WEBDATA_SECRET'))
self.disk_dir = disk_dir
self.disk_prefix = kwargs.get('disk_prefix', 'pdf/')
self.disk_suffix = kwargs.get('disk_suffix', '.pdf')
@@ -42,48 +42,56 @@ class DeliverGwbDisk:
def fetch_warc_content(self, warc_path, offset, c_size):
warc_uri = self.warc_uri_prefix + warc_path
if not self.rstore:
- self.rstore = ResourceStore(loaderfactory=CDXLoaderFactory(
- webdata_secret=self.petabox_webdata_secret,
- download_base_url=self.petabox_base_url))
+ self.rstore = ResourceStore(
+ loaderfactory=CDXLoaderFactory(webdata_secret=self.petabox_webdata_secret,
+ download_base_url=self.petabox_base_url))
try:
gwb_record = self.rstore.load_resource(warc_uri, offset, c_size)
except wayback.exception.ResourceUnavailable:
- return None, dict(status="error",
- reason="failed to load file contents from wayback/petabox (ResourceUnavailable)")
+ return None, dict(
+ status="error",
+ reason="failed to load file contents from wayback/petabox (ResourceUnavailable)"
+ )
except ValueError as ve:
- return None, dict(status="error",
- reason="failed to load file contents from wayback/petabox (ValueError: {})".format(ve))
+ return None, dict(
+ status="error",
+ reason="failed to load file contents from wayback/petabox (ValueError: {})".
+ format(ve))
except EOFError as eofe:
- return None, dict(status="error",
- reason="failed to load file contents from wayback/petabox (EOFError: {})".format(eofe))
+ return None, dict(
+ status="error",
+ reason="failed to load file contents from wayback/petabox (EOFError: {})".
+ format(eofe))
except TypeError as te:
- return None, dict(status="error",
- reason="failed to load file contents from wayback/petabox (TypeError: {}; likely a bug in wayback python code)".format(te))
+ return None, dict(
+ status="error",
+ reason=
+ "failed to load file contents from wayback/petabox (TypeError: {}; likely a bug in wayback python code)"
+ .format(te))
# Note: could consider a generic "except Exception" here, as we get so
# many petabox errors. Do want jobs to fail loud and clear when the
# whole cluster is down though.
if gwb_record.get_status()[0] != 200:
return None, dict(status="error",
- reason="archived HTTP response (WARC) was not 200",
- warc_status=gwb_record.get_status()[0])
+ reason="archived HTTP response (WARC) was not 200",
+ warc_status=gwb_record.get_status()[0])
try:
raw_content = gwb_record.open_raw_content().read()
except IncompleteRead as ire:
- return None, dict(status="error",
- reason="failed to read actual file contents from wayback/petabox (IncompleteRead: {})".format(ire))
+ return None, dict(
+ status="error",
+ reason=
+ "failed to read actual file contents from wayback/petabox (IncompleteRead: {})".
+ format(ire))
return raw_content, None
def run(self, manifest_file):
sys.stderr.write("Ensuring all 65536 base directories exist...\n")
for i in range(256):
for j in range(256):
- fpath = "{}/{}{:02x}/{:02x}".format(
- self.disk_dir,
- self.disk_prefix,
- i,
- j)
+ fpath = "{}/{}{:02x}/{:02x}".format(self.disk_dir, self.disk_prefix, i, j)
os.makedirs(fpath, exist_ok=True)
sys.stderr.write("Starting...\n")
for line in manifest_file:
@@ -102,9 +110,11 @@ class DeliverGwbDisk:
self.count['skip-warc'] += 1
continue
# fetch from GWB/petabox via HTTP range-request
- blob, status = self.fetch_warc_content(file_cdx['warc'], file_cdx['offset'], file_cdx['c_size'])
+ blob, status = self.fetch_warc_content(file_cdx['warc'], file_cdx['offset'],
+ file_cdx['c_size'])
if blob is None and status:
- print("{}\terror petabox\t{}\t{}".format(sha1_hex, file_cdx['warc'], status['reason']))
+ print("{}\terror petabox\t{}\t{}".format(sha1_hex, file_cdx['warc'],
+ status['reason']))
self.count['err-petabox-fetch'] += 1
continue
elif not blob:
@@ -120,19 +130,15 @@ class DeliverGwbDisk:
self.count['petabox-ok'] += 1
# save to disk
- fpath = "{}/{}{}/{}/{}{}".format(
- self.disk_dir,
- self.disk_prefix,
- sha1_hex[0:2],
- sha1_hex[2:4],
- sha1_hex,
- self.disk_suffix)
+ fpath = "{}/{}{}/{}/{}{}".format(self.disk_dir, self.disk_prefix, sha1_hex[0:2],
+ sha1_hex[2:4], sha1_hex, self.disk_suffix)
with open(fpath, 'wb') as f:
f.write(blob)
print("{}\tsuccess\t{}\t{}".format(sha1_hex, fpath, len(blob)))
self.count['success-disk'] += 1
sys.stderr.write("{}\n".format(self.count))
+
@sentry_client.capture_exceptions
def main():
@@ -162,5 +168,6 @@ def main():
worker = DeliverGwbDisk(**args.__dict__)
worker.run(args.manifest_file)
-if __name__ == '__main__': # pragma: no cover
+
+if __name__ == '__main__': # pragma: no cover
main()
diff --git a/python/scripts/deliver_gwb_to_s3.py b/python/scripts/deliver_gwb_to_s3.py
index f103205..f9b3b19 100755
--- a/python/scripts/deliver_gwb_to_s3.py
+++ b/python/scripts/deliver_gwb_to_s3.py
@@ -53,7 +53,6 @@ sentry_client = raven.Client()
class DeliverGwbS3:
-
def __init__(self, s3_bucket, **kwargs):
self.warc_uri_prefix = kwargs.get('warc_uri_prefix')
self.rstore = None
@@ -61,7 +60,8 @@ class DeliverGwbS3:
# /serve/ instead of /download/ doesn't record view count
self.petabox_base_url = kwargs.get('petabox_base_url', 'http://archive.org/serve/')
# gwb library will fall back to reading from /opt/.petabox/webdata.secret
- self.petabox_webdata_secret = kwargs.get('petabox_webdata_secret', os.environ.get('PETABOX_WEBDATA_SECRET'))
+ self.petabox_webdata_secret = kwargs.get('petabox_webdata_secret',
+ os.environ.get('PETABOX_WEBDATA_SECRET'))
self.s3_bucket = s3_bucket
self.s3_prefix = kwargs.get('s3_prefix', 'pdf/')
self.s3_suffix = kwargs.get('s3_suffix', '.pdf')
@@ -71,37 +71,49 @@ class DeliverGwbS3:
def fetch_warc_content(self, warc_path, offset, c_size):
warc_uri = self.warc_uri_prefix + warc_path
if not self.rstore:
- self.rstore = ResourceStore(loaderfactory=CDXLoaderFactory(
- webdata_secret=self.petabox_webdata_secret,
- download_base_url=self.petabox_base_url))
+ self.rstore = ResourceStore(
+ loaderfactory=CDXLoaderFactory(webdata_secret=self.petabox_webdata_secret,
+ download_base_url=self.petabox_base_url))
try:
gwb_record = self.rstore.load_resource(warc_uri, offset, c_size)
except wayback.exception.ResourceUnavailable:
- return None, dict(status="error",
- reason="failed to load file contents from wayback/petabox (ResourceUnavailable)")
+ return None, dict(
+ status="error",
+ reason="failed to load file contents from wayback/petabox (ResourceUnavailable)"
+ )
except ValueError as ve:
- return None, dict(status="error",
- reason="failed to load file contents from wayback/petabox (ValueError: {})".format(ve))
+ return None, dict(
+ status="error",
+ reason="failed to load file contents from wayback/petabox (ValueError: {})".
+ format(ve))
except EOFError as eofe:
- return None, dict(status="error",
- reason="failed to load file contents from wayback/petabox (EOFError: {})".format(eofe))
+ return None, dict(
+ status="error",
+ reason="failed to load file contents from wayback/petabox (EOFError: {})".
+ format(eofe))
except TypeError as te:
- return None, dict(status="error",
- reason="failed to load file contents from wayback/petabox (TypeError: {}; likely a bug in wayback python code)".format(te))
+ return None, dict(
+ status="error",
+ reason=
+ "failed to load file contents from wayback/petabox (TypeError: {}; likely a bug in wayback python code)"
+ .format(te))
# Note: could consider a generic "except Exception" here, as we get so
# many petabox errors. Do want jobs to fail loud and clear when the
# whole cluster is down though.
if gwb_record.get_status()[0] != 200:
return None, dict(status="error",
- reason="archived HTTP response (WARC) was not 200",
- warc_status=gwb_record.get_status()[0])
+ reason="archived HTTP response (WARC) was not 200",
+ warc_status=gwb_record.get_status()[0])
try:
raw_content = gwb_record.open_raw_content().read()
except IncompleteRead as ire:
- return None, dict(status="error",
- reason="failed to read actual file contents from wayback/petabox (IncompleteRead: {})".format(ire))
+ return None, dict(
+ status="error",
+ reason=
+ "failed to read actual file contents from wayback/petabox (IncompleteRead: {})".
+ format(ire))
return raw_content, None
def run(self, manifest_file):
@@ -122,9 +134,11 @@ class DeliverGwbS3:
self.count['skip-warc'] += 1
continue
# fetch from GWB/petabox via HTTP range-request
- blob, status = self.fetch_warc_content(file_cdx['warc'], file_cdx['offset'], file_cdx['c_size'])
+ blob, status = self.fetch_warc_content(file_cdx['warc'], file_cdx['offset'],
+ file_cdx['c_size'])
if blob is None and status:
- print("{}\terror petabox\t{}\t{}".format(sha1_hex, file_cdx['warc'], status['reason']))
+ print("{}\terror petabox\t{}\t{}".format(sha1_hex, file_cdx['warc'],
+ status['reason']))
self.count['err-petabox-fetch'] += 1
continue
elif not blob:
@@ -140,17 +154,14 @@ class DeliverGwbS3:
self.count['petabox-ok'] += 1
# upload to AWS S3
- obj = self.bucket.put_object(
- Key="{}{}/{}{}".format(
- self.s3_prefix,
- sha1_hex[0:4],
- sha1_hex,
- self.s3_suffix),
- Body=blob)
+ obj = self.bucket.put_object(Key="{}{}/{}{}".format(self.s3_prefix, sha1_hex[0:4],
+ sha1_hex, self.s3_suffix),
+ Body=blob)
print("{}\tsuccess\t{}\t{}".format(sha1_hex, obj.key, len(blob)))
self.count['success-s3'] += 1
sys.stderr.write("{}\n".format(self.count))
+
@sentry_client.capture_exceptions
def main():
@@ -180,5 +191,6 @@ def main():
worker = DeliverGwbS3(**args.__dict__)
worker.run(args.manifest_file)
-if __name__ == '__main__': # pragma: no cover
+
+if __name__ == '__main__': # pragma: no cover
main()
diff --git a/python/scripts/doaj2ingestrequest.py b/python/scripts/doaj2ingestrequest.py
index 15b30a0..84a2c2c 100755
--- a/python/scripts/doaj2ingestrequest.py
+++ b/python/scripts/doaj2ingestrequest.py
@@ -1,5 +1,4 @@
#!/usr/bin/env python3
-
"""
Transform an DOAJ article dump (JSON) into ingest requests.
@@ -42,22 +41,22 @@ CONTENT_TYPE_MAP = {
"abstract": [],
"doc": [],
"": ["pdf"],
-
"doi": ["pdf"],
"url": ["pdf"],
"fulltext": ["pdf"],
"anySimpleType": ["pdf"],
-
"application/pdf": ["pdf"],
"html": ["html", "pdf"],
"text/html": ["html", "pdf"],
"xml": ["xml"],
}
+
def canon(s: str) -> str:
parsed = urlcanon.parse_url(s)
return str(urlcanon.whatwg(parsed))
+
def transform(obj: dict) -> List[dict]:
"""
Transforms from a single DOAJ object to zero or more ingest requests.
@@ -118,6 +117,7 @@ def transform(obj: dict) -> List[dict]:
return requests
+
def run(args) -> None:
for l in args.json_file:
if not l.strip():
@@ -128,17 +128,18 @@ def run(args) -> None:
for r in requests:
print("{}".format(json.dumps(r, sort_keys=True)))
+
def main() -> None:
- parser = argparse.ArgumentParser(
- formatter_class=argparse.ArgumentDefaultsHelpFormatter)
+ parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter)
parser.add_argument('json_file',
- help="DOAJ article dump file to use",
- type=argparse.FileType('r'))
+ help="DOAJ article dump file to use",
+ type=argparse.FileType('r'))
subparsers = parser.add_subparsers()
args = parser.parse_args()
run(args)
+
if __name__ == '__main__':
main()
diff --git a/python/scripts/enrich_scored_matches.py b/python/scripts/enrich_scored_matches.py
index 3085346..54c3d5f 100755
--- a/python/scripts/enrich_scored_matches.py
+++ b/python/scripts/enrich_scored_matches.py
@@ -34,13 +34,13 @@ def run():
sha1 = base64.b16encode(base64.b32decode(raw_sha1)).decode('ascii').lower()
- obj = dict(
- sha1=sha1,
- dois=dois,
- cdx=[dict(url=cdx['url'], dt=cdx['dt'])],
- size=size,
- mimetype=mimetype)
+ obj = dict(sha1=sha1,
+ dois=dois,
+ cdx=[dict(url=cdx['url'], dt=cdx['dt'])],
+ size=size,
+ mimetype=mimetype)
print(json.dumps(obj))
-if __name__=='__main__':
+
+if __name__ == '__main__':
run()
diff --git a/python/scripts/filter_grobid_metadata.py b/python/scripts/filter_grobid_metadata.py
index d0666ce..a474393 100755
--- a/python/scripts/filter_grobid_metadata.py
+++ b/python/scripts/filter_grobid_metadata.py
@@ -24,6 +24,7 @@ NAME_DENYLIST = (
'phdstudent',
)
+
def tokenize(s, remove_whitespace=True):
s.replace('&apos;', "'")
@@ -36,9 +37,11 @@ def tokenize(s, remove_whitespace=True):
# Encode as dumb ASCII (TODO: this is horrible)
return s.encode('ascii', 'replace').decode('utf8').replace('?', '')
+
assert tokenize("Impact Factor: 2.114") == "impactfactor"
assert tokenize("Impact Factor: 2.114") in TITLE_DENYLIST
+
def filter_title(title):
title = title.strip()
@@ -83,19 +86,23 @@ def filter_title(title):
return title
+
def filter_author_name(name):
name = name['name']
if name.strip().lower().replace(' ', '') in NAME_DENYLIST:
return None
return ' '.join([t for t in name.split() if tokenize(t)])
+
def filter_authors(l):
return [dict(name=n) for n in map(filter_author_name, l) if n and len(n) > 1]
+
def filter_refs(l):
# TODO:
return l
+
def filter_journal_name(name):
# same denylist, for now
if not name:
@@ -104,10 +111,12 @@ def filter_journal_name(name):
slug_name = tokenize(name)
if slug_name in TITLE_DENYLIST or len(slug_name) < 4 or name == "N.º":
return None
- for prefix in ("/ ", "~ ", "& ", "© ", "Original Research Article ", "Original Article ", "Research Article ", "Available online www.jocpr.com "):
+ for prefix in ("/ ", "~ ", "& ", "© ", "Original Research Article ", "Original Article ",
+ "Research Article ", "Available online www.jocpr.com "):
if name.startswith(prefix):
name = name.replace(prefix, '')
- for suffix in (" Available online at www.sciarena.com", " Original Article", " Available online at", " ISSN", " ISSUE"):
+ for suffix in (" Available online at www.sciarena.com", " Original Article",
+ " Available online at", " ISSN", " ISSUE"):
if name.endswith(suffix):
name = name.replace(suffix, '')
if "====================" in name:
@@ -116,6 +125,7 @@ def filter_journal_name(name):
return None
return ' '.join(name.split())
+
def filter_metadata(obj):
if not (obj.get('title') and obj.get('authors')):
return None
@@ -132,6 +142,7 @@ def filter_metadata(obj):
return obj
+
def run(invert=False):
for line in sys.stdin:
fields = line.split('\t')
@@ -155,5 +166,6 @@ def run(invert=False):
elif invert:
print(raw.strip())
-if __name__=="__main__":
+
+if __name__ == "__main__":
run(invert="--invert" in sys.argv)
diff --git a/python/scripts/filter_groupworks.py b/python/scripts/filter_groupworks.py
index 494da71..fda9098 100755
--- a/python/scripts/filter_groupworks.py
+++ b/python/scripts/filter_groupworks.py
@@ -28,6 +28,7 @@ MAX_SLUG_LINES = 50
REQUIRE_AUTHORS = False
+
def tokenize(s, remove_whitespace=False):
s.replace('&apos;', "'")
@@ -40,6 +41,7 @@ def tokenize(s, remove_whitespace=False):
# Encode as dumb ASCII (TODO: this is horrible)
return s.encode('ascii', 'replace').replace(b'?', b'')
+
def check_authors(left, right):
"""
Intended to check GROBID extracted authors (right) against "known good"
@@ -63,6 +65,7 @@ def check_authors(left, right):
return False
return True
+
def test_check_authors():
assert check_authors([], []) == bool(not REQUIRE_AUTHORS)
assert not check_authors([], ['one'])
@@ -74,6 +77,7 @@ def test_check_authors():
assert check_authors(['Mr. Magoo'], ['Mr Magoo'])
assert check_authors(['one', 'tw', 'thr'], ['one', 'two', 'three'])
+
# Rows are (score, left, right)
def process_group(rows):
@@ -119,6 +123,7 @@ def process_group(rows):
print(json.dumps([releases[ident] for ident in group_ids]))
+
def run():
last_slug = None
@@ -140,5 +145,6 @@ def run():
if lines:
process_group(lines)
-if __name__=='__main__':
+
+if __name__ == '__main__':
run()
diff --git a/python/scripts/filter_scored_matches.py b/python/scripts/filter_scored_matches.py
index abf81bd..3251852 100755
--- a/python/scripts/filter_scored_matches.py
+++ b/python/scripts/filter_scored_matches.py
@@ -33,6 +33,7 @@ def tokenize(s, remove_whitespace=False):
# Encode as dumb ASCII (TODO: this is horrible)
return s.encode('ascii', 'replace').replace(b'?', b'')
+
def check_authors(left, right):
"""
Intended to check GROBID extracted authors (right) against "known good"
@@ -56,6 +57,7 @@ def check_authors(left, right):
return False
return True
+
def test_check_authors():
assert not check_authors([], [])
assert not check_authors([], ['one'])
@@ -67,6 +69,7 @@ def test_check_authors():
assert check_authors(['Mr. Magoo'], ['Mr Magoo'])
assert check_authors(['one', 'tw', 'thr'], ['one', 'two', 'three'])
+
# Rows are (score, grobid, crossref)
def process_group(rows):
if len(rows) > max_slug_lines:
@@ -92,6 +95,7 @@ def process_group(rows):
for sha1, doi_list in keepers.items():
print("{}\t{}".format(sha1, json.dumps(doi_list)))
+
def run():
last_slug = None
@@ -112,5 +116,6 @@ def run():
if lines:
process_group(lines)
-if __name__=='__main__':
+
+if __name__ == '__main__':
run()
diff --git a/python/scripts/grobid_affiliations.py b/python/scripts/grobid_affiliations.py
index d391f60..b42153c 100755
--- a/python/scripts/grobid_affiliations.py
+++ b/python/scripts/grobid_affiliations.py
@@ -1,5 +1,4 @@
#!/usr/bin/env python3
-
"""
Takes old (HBase) or new (pg) style JSON wrappers of GROBID XML extraction
output, converts the XML to JSON, filters out raw affiliation strings, and
@@ -24,10 +23,12 @@ def parse_hbase(line):
tei_xml = obj['tei_xml']
return sha1hex, tei_xml
+
def parse_pg(line):
obj = json.loads(line)
return obj['sha1hex'], obj['tei_xml']
+
def run(mode='hbase'):
for line in sys.stdin:
if mode == 'hbase':
@@ -49,5 +50,6 @@ def run(mode='hbase'):
affiliations = [json.loads(a) for a in affiliations]
print('\t'.join([sha1hex, json.dumps(affiliations)]))
-if __name__=='__main__':
+
+if __name__ == '__main__':
run()
diff --git a/python/scripts/import_grobid_metadata.py b/python/scripts/import_grobid_metadata.py
index 8aee0be..c9bc134 100755
--- a/python/scripts/import_grobid_metadata.py
+++ b/python/scripts/import_grobid_metadata.py
@@ -4,7 +4,8 @@ import datetime
import json
import sys
-MAX_ABSTRACT_BYTES=4096
+MAX_ABSTRACT_BYTES = 4096
+
def parse_grobid_json(obj):
@@ -14,10 +15,7 @@ def parse_grobid_json(obj):
extra = dict()
if obj.get('abstract') and len(obj.get('abstract')) < MAX_ABSTRACT_BYTES:
- abobj = dict(
- mimetype="text/plain",
- language=None,
- content=obj.get('abstract').strip())
+ abobj = dict(mimetype="text/plain", language=None, content=obj.get('abstract').strip())
abstracts = [abobj]
else:
abstracts = None
@@ -72,16 +70,16 @@ def parse_grobid_json(obj):
else:
extra = None
- return dict(
- title=obj['title'].strip(),
- contribs=contribs,
- publisher=obj['journal'].get('publisher'),
- volume=obj['journal'].get('volume'),
- issue=obj['journal'].get('issue'),
- abstracts=abstracts,
- release_type=release_type,
- release_date=release_date,
- extra=extra)
+ return dict(title=obj['title'].strip(),
+ contribs=contribs,
+ publisher=obj['journal'].get('publisher'),
+ volume=obj['journal'].get('volume'),
+ issue=obj['journal'].get('issue'),
+ abstracts=abstracts,
+ release_type=release_type,
+ release_date=release_date,
+ extra=extra)
+
def run():
for line in sys.stdin:
@@ -90,5 +88,6 @@ def run():
if out:
print(out)
-if __name__=="__main__":
+
+if __name__ == "__main__":
run()
diff --git a/python/scripts/ingestrequest_row2json.py b/python/scripts/ingestrequest_row2json.py
index acba2a8..70731d5 100755
--- a/python/scripts/ingestrequest_row2json.py
+++ b/python/scripts/ingestrequest_row2json.py
@@ -1,5 +1,4 @@
#!/usr/bin/env python3
-
"""
This script is used to turn ingest request postgres rows (in JSON export
format) back in to regular ingest request JSON.
@@ -25,6 +24,7 @@ def transform(row):
row['fatcat'] = dict(release_ident=extra['release_ident'])
return row
+
def run(args):
for l in args.json_file:
if not l.strip():
@@ -35,17 +35,18 @@ def run(args):
print(l, file=sys.stderr)
print(json.dumps(req, sort_keys=True))
+
def main():
- parser = argparse.ArgumentParser(
- formatter_class=argparse.ArgumentDefaultsHelpFormatter)
+ parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter)
parser.add_argument('json_file',
- help="arabesque output file to use",
- type=argparse.FileType('r'))
+ help="arabesque output file to use",
+ type=argparse.FileType('r'))
subparsers = parser.add_subparsers()
args = parser.parse_args()
run(args)
+
if __name__ == '__main__':
main()
diff --git a/python/scripts/manifest_converter.py b/python/scripts/manifest_converter.py
index 8267003..24e22fd 100755
--- a/python/scripts/manifest_converter.py
+++ b/python/scripts/manifest_converter.py
@@ -20,6 +20,7 @@ import sys
# 2. select all file metadata
# 3. output object
+
def or_none(s):
if s is None:
return None
@@ -27,6 +28,7 @@ def or_none(s):
return None
return s
+
def process_db(db_path):
db = sqlite3.connect(db_path)
@@ -52,5 +54,6 @@ def process_db(db_path):
dois = db.execute("SELECT doi FROM files_id_doi WHERE sha1=?", [sha1])
print(json.dumps(obj))
-if __name__=="__main__":
+
+if __name__ == "__main__":
process_db(sys.argv[1])
diff --git a/python/scripts/oai2ingestrequest.py b/python/scripts/oai2ingestrequest.py
index 315b8d2..1f4a19f 100755
--- a/python/scripts/oai2ingestrequest.py
+++ b/python/scripts/oai2ingestrequest.py
@@ -1,5 +1,4 @@
#!/usr/bin/env python3
-
"""
Transform an OAI-PMH bulk dump (JSON) into ingest requests.
@@ -33,17 +32,19 @@ DOMAIN_BLOCKLIST = [
]
RELEASE_STAGE_MAP = {
- 'info:eu-repo/semantics/draftVersion': 'draft',
+ 'info:eu-repo/semantics/draftVersion': 'draft',
'info:eu-repo/semantics/submittedVersion': 'submitted',
- 'info:eu-repo/semantics/acceptedVersion': 'accepted',
+ 'info:eu-repo/semantics/acceptedVersion': 'accepted',
'info:eu-repo/semantics/publishedVersion': 'published',
- 'info:eu-repo/semantics/updatedVersion': 'updated',
+ 'info:eu-repo/semantics/updatedVersion': 'updated',
}
+
def canon(s):
parsed = urlcanon.parse_url(s)
return str(urlcanon.whatwg(parsed))
+
def transform(obj):
"""
Transforms from a single OAI-PMH object to zero or more ingest requests.
@@ -112,6 +113,7 @@ def transform(obj):
return requests
+
def run(args):
for l in args.json_file:
if not l.strip():
@@ -122,17 +124,18 @@ def run(args):
for r in requests:
print("{}".format(json.dumps(r, sort_keys=True)))
+
def main():
- parser = argparse.ArgumentParser(
- formatter_class=argparse.ArgumentDefaultsHelpFormatter)
+ parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter)
parser.add_argument('json_file',
- help="OAI-PMH dump file to use (usually stdin)",
- type=argparse.FileType('r'))
+ help="OAI-PMH dump file to use (usually stdin)",
+ type=argparse.FileType('r'))
subparsers = parser.add_subparsers()
args = parser.parse_args()
run(args)
+
if __name__ == '__main__':
main()
diff --git a/python/scripts/pdf_thumbnail.py b/python/scripts/pdf_thumbnail.py
index 71fbe54..3f81b3b 100755
--- a/python/scripts/pdf_thumbnail.py
+++ b/python/scripts/pdf_thumbnail.py
@@ -1,5 +1,4 @@
#!/usr/bin/env python3
-
"""
Quick CLI script to convert a PDF to thumbnail (.png, jpeg, etc).
@@ -23,12 +22,14 @@ def run(inpath, outpath):
renderer = poppler.PageRenderer()
full_page = renderer.render_page(page)
- img = Image.frombuffer("RGBA", (full_page.width, full_page.height), full_page.data, 'raw', "BGRA", 0, 1)
- img.thumbnail((180,300), Image.BICUBIC)
+ img = Image.frombuffer("RGBA", (full_page.width, full_page.height), full_page.data, 'raw',
+ "BGRA", 0, 1)
+ img.thumbnail((180, 300), Image.BICUBIC)
#img.thumbnail((360,600), Image.BICUBIC)
img.save(outpath)
#img.save(outpath, quality=95)
+
if __name__ == '__main__':
if len(sys.argv) != 3:
print("expect two parameters: INPUT.png OUTPUT.png", file=sys.stderr)
diff --git a/python/scripts/unpaywall2ingestrequest.py b/python/scripts/unpaywall2ingestrequest.py
index 590b429..b79f316 100755
--- a/python/scripts/unpaywall2ingestrequest.py
+++ b/python/scripts/unpaywall2ingestrequest.py
@@ -1,5 +1,4 @@
#!/usr/bin/env python3
-
"""
Transform an unpaywall dump (JSON) into ingest requests.
"""
@@ -26,17 +25,19 @@ DOMAIN_BLOCKLIST = [
]
RELEASE_STAGE_MAP = {
- 'draftVersion': 'draft',
+ 'draftVersion': 'draft',
'submittedVersion': 'submitted',
- 'acceptedVersion': 'accepted',
+ 'acceptedVersion': 'accepted',
'publishedVersion': 'published',
- 'updatedVersion': 'updated',
+ 'updatedVersion': 'updated',
}
+
def canon(s):
parsed = urlcanon.parse_url(s)
return str(urlcanon.whatwg(parsed))
+
def transform(obj):
"""
Transforms from a single unpaywall object to zero or more ingest requests.
@@ -86,6 +87,7 @@ def transform(obj):
return requests
+
def run(args):
for l in args.json_file:
if not l.strip():
@@ -96,17 +98,18 @@ def run(args):
for r in requests:
print("{}".format(json.dumps(r, sort_keys=True)))
+
def main():
- parser = argparse.ArgumentParser(
- formatter_class=argparse.ArgumentDefaultsHelpFormatter)
+ parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter)
parser.add_argument('json_file',
- help="unpaywall dump file to use",
- type=argparse.FileType('r'))
+ help="unpaywall dump file to use",
+ type=argparse.FileType('r'))
subparsers = parser.add_subparsers()
args = parser.parse_args()
run(args)
+
if __name__ == '__main__':
main()
diff --git a/python/tests/test_grobid.py b/python/tests/test_grobid.py
index 7d950df..55636dc 100644
--- a/python/tests/test_grobid.py
+++ b/python/tests/test_grobid.py
@@ -1,4 +1,3 @@
-
import struct
import pytest
@@ -12,20 +11,21 @@ FAKE_PDF_BYTES = b"%PDF SOME JUNK" + struct.pack("!q", 112853843)
with open('tests/files/23b29ea36382680716be08fc71aa81bd226e8a85.xml', 'rb') as f:
REAL_TEI_XML = f.read()
+
@pytest.fixture
def grobid_client():
- client = GrobidClient(
- host_url="http://dummy-grobid",
- )
+ client = GrobidClient(host_url="http://dummy-grobid", )
return client
+
@responses.activate
def test_grobid_503(grobid_client):
status = b'{"status": "done broke due to 503"}'
responses.add(responses.POST,
- 'http://dummy-grobid/api/processFulltextDocument', status=503,
- body=status)
+ 'http://dummy-grobid/api/processFulltextDocument',
+ status=503,
+ body=status)
resp = grobid_client.process_fulltext(FAKE_PDF_BYTES)
@@ -35,12 +35,15 @@ def test_grobid_503(grobid_client):
assert resp['status_code'] == 503
assert resp['status'] == "error"
+
@responses.activate
def test_grobid_success(grobid_client):
responses.add(responses.POST,
- 'http://dummy-grobid/api/processFulltextDocument', status=200,
- body=REAL_TEI_XML, content_type='text/xml')
+ 'http://dummy-grobid/api/processFulltextDocument',
+ status=200,
+ body=REAL_TEI_XML,
+ content_type='text/xml')
resp = grobid_client.process_fulltext(FAKE_PDF_BYTES)
@@ -53,6 +56,7 @@ def test_grobid_success(grobid_client):
#print(type(REAL_TEI_XML))
assert resp['tei_xml'] == REAL_TEI_XML.decode('ISO-8859-1')
+
@responses.activate
def test_grobid_worker_cdx(grobid_client, wayback_client):
@@ -60,8 +64,10 @@ def test_grobid_worker_cdx(grobid_client, wayback_client):
worker = GrobidWorker(grobid_client, wayback_client, sink=sink)
responses.add(responses.POST,
- 'http://dummy-grobid/api/processFulltextDocument', status=200,
- body=REAL_TEI_XML, content_type='text/xml')
+ 'http://dummy-grobid/api/processFulltextDocument',
+ status=200,
+ body=REAL_TEI_XML,
+ content_type='text/xml')
with open('tests/files/example.cdx', 'r') as cdx_file:
pusher = CdxLinePusher(
@@ -76,4 +82,3 @@ def test_grobid_worker_cdx(grobid_client, wayback_client):
assert pusher_counts['pushed'] == worker.counts['total']
assert len(responses.calls) == worker.counts['total']
-
diff --git a/python/tests/test_grobid2json.py b/python/tests/test_grobid2json.py
index b8999b1..7637871 100644
--- a/python/tests/test_grobid2json.py
+++ b/python/tests/test_grobid2json.py
@@ -1,4 +1,3 @@
-
import json
import xml
@@ -8,14 +7,15 @@ from grobid2json import *
def test_small_xml():
-
+
with open('tests/files/small.xml', 'r') as f:
tei_xml = f.read()
with open('tests/files/small.json', 'r') as f:
- json_form = json.loads(f.read())
+ json_form = json.loads(f.read())
assert teixml2json(tei_xml) == json_form
+
def test_invalid_xml():
with pytest.raises(xml.etree.ElementTree.ParseError):
diff --git a/python/tests/test_html.py b/python/tests/test_html.py
index d4bffc1..c5f422e 100644
--- a/python/tests/test_html.py
+++ b/python/tests/test_html.py
@@ -1,4 +1,3 @@
-
import json
import pytest
@@ -13,8 +12,7 @@ def test_extract_fulltext_url():
assert resp == {}
resp = extract_fulltext_url(
- "http://dummy-site/",
- b"""<html>
+ "http://dummy-site/", b"""<html>
<head>
<meta name="citation_pdf_url" content="http://www.example.com/content/271/20/11761.full.pdf">
</head>
@@ -22,8 +20,7 @@ def test_extract_fulltext_url():
<h1>my big article here</h1>
blah
</body>
- </html>"""
- )
+ </html>""")
assert resp['pdf_url'] == "http://www.example.com/content/271/20/11761.full.pdf"
assert resp['technique'] == "citation_pdf_url"
@@ -32,4 +29,5 @@ def test_extract_fulltext_url():
"https://journals.plos.org/plosone/article?id=10.1371/journal.pone.0213978",
f.read(),
)
- assert resp['pdf_url'] == "https://journals.plos.org/plosone/article/file?id=10.1371/journal.pone.0213978&type=printable"
+ assert resp[
+ 'pdf_url'] == "https://journals.plos.org/plosone/article/file?id=10.1371/journal.pone.0213978&type=printable"
diff --git a/python/tests/test_html_ingest.py b/python/tests/test_html_ingest.py
index 943e5da..3bf94e2 100644
--- a/python/tests/test_html_ingest.py
+++ b/python/tests/test_html_ingest.py
@@ -1,4 +1,3 @@
-
import datetime
import pytest
diff --git a/python/tests/test_html_metadata.py b/python/tests/test_html_metadata.py
index 7f35d55..a4c1e41 100644
--- a/python/tests/test_html_metadata.py
+++ b/python/tests/test_html_metadata.py
@@ -1,4 +1,3 @@
-
import datetime
import pytest
@@ -44,11 +43,12 @@ def test_html_metadata_plos() -> None:
def test_html_metadata_elife() -> None:
-
+
with open('tests/files/elife_article.html', 'r') as f:
elife_html = f.read()
- meta = html_extract_biblio("https://elifesciences.org/articles/44753", HTMLParser(elife_html))
+ meta = html_extract_biblio("https://elifesciences.org/articles/44753",
+ HTMLParser(elife_html))
assert meta is not None
assert meta.title == "Parallel visual circuitry in a basal chordate"
assert meta.doi == "10.7554/eLife.44753"
@@ -69,7 +69,7 @@ def test_html_metadata_elife() -> None:
def test_html_metadata_peerj() -> None:
-
+
with open('tests/files/peerj_oa_article.html', 'r') as f:
peerj_html = f.read()
@@ -78,15 +78,15 @@ def test_html_metadata_peerj() -> None:
assert meta.title == "The state of OA: a large-scale analysis of the prevalence and impact of Open Access articles"
assert meta.doi == "10.7717/peerj.4375"
assert meta.contrib_names == [
- "Heather Piwowar",
- "Jason Priem",
- "Vincent Larivière",
- "Juan Pablo Alperin",
- "Lisa Matthias",
- "Bree Norlander",
- "Ashley Farley",
- "Jevin West",
- "Stefanie Haustein",
+ "Heather Piwowar",
+ "Jason Priem",
+ "Vincent Larivière",
+ "Juan Pablo Alperin",
+ "Lisa Matthias",
+ "Bree Norlander",
+ "Ashley Farley",
+ "Jevin West",
+ "Stefanie Haustein",
]
assert meta.container_name == "PeerJ"
# "2018-02-13"
@@ -129,7 +129,7 @@ def test_html_metadata_ojs3() -> None:
"Os Keyes",
]
assert meta.container_name == "First Monday"
- assert meta.container_abbrev == "1" # NOTE: bad source metadata
+ assert meta.container_abbrev == "1" # NOTE: bad source metadata
assert meta.container_issn == "1396-0466"
# "2020/09/10"
assert meta.release_date == datetime.date(year=2020, month=9, day=10)
@@ -150,6 +150,7 @@ def test_html_metadata_dlib() -> None:
# "2017-05-15"
assert meta.release_date == datetime.date(year=2017, month=5, day=15)
+
def test_html_metadata_dc_case() -> None:
"""
This tests that CSS selector <meta name=""> attribute lookups are not case-sensitive.
@@ -167,10 +168,12 @@ def test_html_metadata_dc_case() -> None:
assert meta is not None
assert meta.issue == "123"
+
@pytest.fixture
def adblock() -> Any:
return load_adblock_rules()
+
def test_html_resources(adblock) -> None:
with open('tests/files/dlib_05vanhyning.html', 'r') as f:
@@ -227,4 +230,3 @@ def test_html_resources(adblock) -> None:
HTMLParser(nature_html),
adblock,
)
-
diff --git a/python/tests/test_ingest.py b/python/tests/test_ingest.py
index 0965fcb..79f50f4 100644
--- a/python/tests/test_ingest.py
+++ b/python/tests/test_ingest.py
@@ -1,4 +1,3 @@
-
import json
import pytest
@@ -12,9 +11,7 @@ from sandcrawler import *
@pytest.fixture
def ingest_worker(wayback_client, spn_client):
- grobid_client = GrobidClient(
- host_url="http://dummy-grobid",
- )
+ grobid_client = GrobidClient(host_url="http://dummy-grobid", )
worker = IngestFileWorker(
wayback_client=wayback_client,
spn_client=spn_client,
@@ -22,14 +19,11 @@ def ingest_worker(wayback_client, spn_client):
)
return worker
+
@pytest.fixture
def ingest_worker_pdf(wayback_client_pdf, spn_client):
- grobid_client = GrobidClient(
- host_url="http://dummy-grobid",
- )
- pgrest_client = SandcrawlerPostgrestClient(
- api_url="http://dummy-postgrest",
- )
+ grobid_client = GrobidClient(host_url="http://dummy-grobid", )
+ pgrest_client = SandcrawlerPostgrestClient(api_url="http://dummy-postgrest", )
worker = IngestFileWorker(
wayback_client=wayback_client_pdf,
spn_client=spn_client,
@@ -50,37 +44,45 @@ def test_ingest_success(ingest_worker_pdf):
'base_url': "http://dummy-host/",
}
responses.add(responses.POST,
- 'http://dummy-spnv2/save',
- status=200,
- body=json.dumps({"url": TARGET, "job_id": JOB_ID}))
+ 'http://dummy-spnv2/save',
+ status=200,
+ body=json.dumps({
+ "url": TARGET,
+ "job_id": JOB_ID
+ }))
responses.add(responses.GET,
- 'http://dummy-spnv2/save/status/' + JOB_ID,
- status=200,
- body=json.dumps(PENDING_BODY))
+ 'http://dummy-spnv2/save/status/' + JOB_ID,
+ status=200,
+ body=json.dumps(PENDING_BODY))
responses.add(responses.GET,
- 'http://dummy-spnv2/save/status/' + JOB_ID,
- status=200,
- body=json.dumps(SUCCESS_BODY))
+ 'http://dummy-spnv2/save/status/' + JOB_ID,
+ status=200,
+ body=json.dumps(SUCCESS_BODY))
responses.add(responses.GET,
- 'http://dummy-cdx/cdx',
- status=200,
- body=json.dumps(CDX_SPN_HIT))
+ 'http://dummy-cdx/cdx',
+ status=200,
+ body=json.dumps(CDX_SPN_HIT))
responses.add(responses.GET,
- 'https://web.archive.org/web/{}id_/{}'.format("20180326070330", TARGET + "/redirect"),
- status=200,
- headers={"X-Archive-Src": "liveweb-whatever.warc.gz"},
- body=pdf_bytes)
+ 'https://web.archive.org/web/{}id_/{}'.format("20180326070330",
+ TARGET + "/redirect"),
+ status=200,
+ headers={"X-Archive-Src": "liveweb-whatever.warc.gz"},
+ body=pdf_bytes)
responses.add(responses.GET,
- 'http://dummy-postgrest/grobid?sha1hex=eq.{}'.format("90ffd2359008d82298821d16b21778c5c39aec36"),
- status=200,
- body=json.dumps([]))
+ 'http://dummy-postgrest/grobid?sha1hex=eq.{}'.format(
+ "90ffd2359008d82298821d16b21778c5c39aec36"),
+ status=200,
+ body=json.dumps([]))
responses.add(responses.GET,
- 'http://dummy-postgrest/pdf_meta?sha1hex=eq.{}'.format("90ffd2359008d82298821d16b21778c5c39aec36"),
- status=200,
- body=json.dumps([]))
+ 'http://dummy-postgrest/pdf_meta?sha1hex=eq.{}'.format(
+ "90ffd2359008d82298821d16b21778c5c39aec36"),
+ status=200,
+ body=json.dumps([]))
responses.add(responses.POST,
- 'http://dummy-grobid/api/processFulltextDocument', status=200,
- body=REAL_TEI_XML, content_type='text/xml')
+ 'http://dummy-grobid/api/processFulltextDocument',
+ status=200,
+ body=REAL_TEI_XML,
+ content_type='text/xml')
resp = ingest_worker_pdf.process(request)
@@ -108,6 +110,7 @@ def test_ingest_success(ingest_worker_pdf):
assert resp['pdf_meta']['pdf_extra']['page_count'] == 1
assert resp['pdf_meta'].get('text') is None
+
@responses.activate
def test_ingest_landing(ingest_worker):
@@ -116,34 +119,39 @@ def test_ingest_landing(ingest_worker):
'base_url': "http://dummy-host/",
}
responses.add(responses.POST,
- 'http://dummy-spnv2/save',
- status=200,
- body=json.dumps({"url": TARGET, "job_id": JOB_ID}))
+ 'http://dummy-spnv2/save',
+ status=200,
+ body=json.dumps({
+ "url": TARGET,
+ "job_id": JOB_ID
+ }))
responses.add(responses.GET,
- 'http://dummy-spnv2/save/status/' + JOB_ID,
- status=200,
- body=json.dumps(PENDING_BODY))
+ 'http://dummy-spnv2/save/status/' + JOB_ID,
+ status=200,
+ body=json.dumps(PENDING_BODY))
responses.add(responses.GET,
- 'http://dummy-spnv2/save/status/' + JOB_ID,
- status=200,
- body=json.dumps(SUCCESS_BODY))
+ 'http://dummy-spnv2/save/status/' + JOB_ID,
+ status=200,
+ body=json.dumps(SUCCESS_BODY))
responses.add(responses.GET,
- 'http://dummy-cdx/cdx',
- status=200,
- body=json.dumps(CDX_SPN_HIT))
+ 'http://dummy-cdx/cdx',
+ status=200,
+ body=json.dumps(CDX_SPN_HIT))
responses.add(responses.GET,
- 'https://web.archive.org/web/{}id_/{}'.format("20180326070330", TARGET + "/redirect"),
- status=200,
- headers={"X-Archive-Src": "liveweb-whatever.warc.gz"},
- body=WARC_BODY)
+ 'https://web.archive.org/web/{}id_/{}'.format("20180326070330",
+ TARGET + "/redirect"),
+ status=200,
+ headers={"X-Archive-Src": "liveweb-whatever.warc.gz"},
+ body=WARC_BODY)
# this is for second time around; don't want to fetch same landing page
# HTML again and result in a loop
responses.add(responses.GET,
- 'https://web.archive.org/web/{}id_/{}'.format("20180326070330", TARGET + "/redirect"),
- status=200,
- headers={"X-Archive-Src": "liveweb-whatever.warc.gz"},
- body="<html></html>")
+ 'https://web.archive.org/web/{}id_/{}'.format("20180326070330",
+ TARGET + "/redirect"),
+ status=200,
+ headers={"X-Archive-Src": "liveweb-whatever.warc.gz"},
+ body="<html></html>")
resp = ingest_worker.process(request)
@@ -157,6 +165,7 @@ def test_ingest_landing(ingest_worker):
assert 'revisit_cdx' not in resp
assert 'grobid' not in resp
+
@responses.activate
def test_ingest_blocklist(ingest_worker):
@@ -192,6 +201,7 @@ def test_ingest_wall_blocklist(ingest_worker):
assert resp['status'] == "skip-wall"
assert resp['request'] == request
+
@responses.activate
def test_ingest_cookie_blocklist(ingest_worker):
@@ -205,4 +215,3 @@ def test_ingest_cookie_blocklist(ingest_worker):
assert resp['hit'] == False
assert resp['status'] == "blocked-cookie"
assert resp['request'] == request
-
diff --git a/python/tests/test_live_wayback.py b/python/tests/test_live_wayback.py
index b501dc3..0ff4902 100644
--- a/python/tests/test_live_wayback.py
+++ b/python/tests/test_live_wayback.py
@@ -1,4 +1,3 @@
-
"""
This file contains tests to run against "live" wayback services. They default
to "skip" because you need authentication, and we shouldn't hit these services
@@ -11,8 +10,8 @@ import json
import pytest
-from sandcrawler import (CdxApiClient, CdxApiError, CdxPartial, PetaboxError, SavePageNowClient, SavePageNowError,
- WaybackClient, WaybackError, gen_file_metadata)
+from sandcrawler import (CdxApiClient, CdxApiError, CdxPartial, PetaboxError, SavePageNowClient,
+ SavePageNowError, WaybackClient, WaybackError, gen_file_metadata)
@pytest.fixture
@@ -20,16 +19,19 @@ def cdx_client():
client = CdxApiClient()
return client
+
@pytest.fixture
def wayback_client():
client = WaybackClient()
return client
+
@pytest.fixture
def spn_client():
client = SavePageNowClient()
return client
+
@pytest.mark.skip(reason="hits prod services, requires auth")
def test_cdx_fetch(cdx_client):
@@ -50,6 +52,7 @@ def test_cdx_fetch(cdx_client):
with pytest.raises(KeyError):
resp = cdx_client.fetch(url, "12345678123456")
+
@pytest.mark.skip(reason="hits prod services, requires auth")
def test_cdx_lookup_best(cdx_client):
@@ -68,13 +71,18 @@ def test_cdx_lookup_best(cdx_client):
assert resp.mimetype == "text/html"
assert resp.status_code == 200
+
@pytest.mark.skip(reason="hits prod services, requires auth")
def test_wayback_fetch(wayback_client):
- resp = wayback_client.fetch_petabox(25683, 2676464871, "archiveteam_archivebot_go_20171205210002/arstechnica.co.uk-inf-20171201-061309-bb65j-00021.warc.gz")
+ resp = wayback_client.fetch_petabox(
+ 25683, 2676464871,
+ "archiveteam_archivebot_go_20171205210002/arstechnica.co.uk-inf-20171201-061309-bb65j-00021.warc.gz"
+ )
assert resp.body
+
@pytest.mark.skip(reason="hits prod services, requires auth")
def test_lookup_resource_success(wayback_client):
@@ -86,6 +94,7 @@ def test_lookup_resource_success(wayback_client):
assert resp.terminal_url in (url, url.replace("https://", "http://"))
assert resp.cdx.url in (url, url.replace("https://", "http://"))
+
@pytest.mark.skip(reason="hits prod services, requires auth")
def test_cdx_fetch_spn2(cdx_client):
@@ -107,8 +116,8 @@ def test_cdx_fetch_spn2(cdx_client):
# https://onlinelibrary.wiley.com/doi/pdf/10.1002/lrh2.10209 20200110222410
#com,wiley,onlinelibrary)/doi/pdf/10.1002/lrh2.10209 20200110222410 https://onlinelibrary.wiley.com/doi/pdf/10.1002/lrh2.10209 text/html 200 VYW7JXFK6EC2KC537N5B7PHYZC4B6MZL - - 9006 815069841 liveweb-20200110214015-wwwb-spn18.us.archive.org-8002.warc.gz
-#com,wiley,onlinelibrary)/doi/pdf/10.1002/lrh2.10209 20200110222410 https://onlinelibrary.wiley.com/doi/pdf/10.1002/lrh2.10209 text/html 302 AFI55BZE23HDTTEERUFKRP6WQVO3LOLS - - 1096 815066572 liveweb-20200110214015-wwwb-spn18.us.archive.org-8002.warc.gz
-#com,wiley,onlinelibrary)/doi/pdf/10.1002/lrh2.10209 20200110222422 https://onlinelibrary.wiley.com/doi/pdf/10.1002/lrh2.10209 text/html 302 AFI55BZE23HDTTEERUFKRP6WQVO3LOLS - - 1094 307563475 liveweb-20200110214449-wwwb-spn18.us.archive.org-8003.warc.gz
+ #com,wiley,onlinelibrary)/doi/pdf/10.1002/lrh2.10209 20200110222410 https://onlinelibrary.wiley.com/doi/pdf/10.1002/lrh2.10209 text/html 302 AFI55BZE23HDTTEERUFKRP6WQVO3LOLS - - 1096 815066572 liveweb-20200110214015-wwwb-spn18.us.archive.org-8002.warc.gz
+ #com,wiley,onlinelibrary)/doi/pdf/10.1002/lrh2.10209 20200110222422 https://onlinelibrary.wiley.com/doi/pdf/10.1002/lrh2.10209 text/html 302 AFI55BZE23HDTTEERUFKRP6WQVO3LOLS - - 1094 307563475 liveweb-20200110214449-wwwb-spn18.us.archive.org-8003.warc.gz
url = "https://onlinelibrary.wiley.com/doi/pdf/10.1002/lrh2.10209"
datetime = "20200110222410"
@@ -119,6 +128,7 @@ def test_cdx_fetch_spn2(cdx_client):
assert resp.sha1b32 == "VYW7JXFK6EC2KC537N5B7PHYZC4B6MZL"
assert resp.status_code == 200
+
@pytest.mark.skip(reason="hits prod services, requires auth")
def test_lookup_ftp(wayback_client):
# ftp://ftp.ncbi.nlm.nih.gov/pub/pmc/oa_pdf/80/23/10.1177_1559827617708562.PMC6236633.pdf
@@ -153,6 +163,7 @@ def test_lookup_ftp(wayback_client):
file_meta = gen_file_metadata(resp.body)
assert file_meta['sha1hex'] == resp.cdx.sha1hex
+
@pytest.mark.skip(reason="hits prod services, requires auth")
def test_crawl_ftp(spn_client, wayback_client):
diff --git a/python/tests/test_misc.py b/python/tests/test_misc.py
index 0788c38..dcc1202 100644
--- a/python/tests/test_misc.py
+++ b/python/tests/test_misc.py
@@ -1,11 +1,11 @@
-
import pytest
-from sandcrawler import b32_hex, clean_url, gen_file_metadata, gen_file_metadata_path, parse_cdx_line
+from sandcrawler import (b32_hex, clean_url, gen_file_metadata, gen_file_metadata_path,
+ parse_cdx_line)
def test_gen_file_metadata():
-
+
# valid (but very small) PDF file
with open('tests/files/dummy.pdf', 'rb') as f:
file_meta = gen_file_metadata(f.read())
@@ -27,8 +27,9 @@ def test_gen_file_metadata():
assert fm['mimetype'] == 'text/plain'
assert fm['size_bytes'] == 8
+
def test_gen_file_metadata_path():
-
+
# valid (but very small) PDF file
file_meta = gen_file_metadata_path('tests/files/dummy.pdf')
assert file_meta == {
@@ -39,11 +40,14 @@ def test_gen_file_metadata_path():
'size_bytes': 13264,
}
+
def test_b32_hex():
# valid b32
- assert b32_hex('sha1:TZCYZ2ULEHYGESS4L3RNH75I23KKFSMC') == '9e458cea8b21f0624a5c5ee2d3ffa8d6d4a2c982'
- assert b32_hex('TZCYZ2ULEHYGESS4L3RNH75I23KKFSMC') == '9e458cea8b21f0624a5c5ee2d3ffa8d6d4a2c982'
+ assert b32_hex(
+ 'sha1:TZCYZ2ULEHYGESS4L3RNH75I23KKFSMC') == '9e458cea8b21f0624a5c5ee2d3ffa8d6d4a2c982'
+ assert b32_hex(
+ 'TZCYZ2ULEHYGESS4L3RNH75I23KKFSMC') == '9e458cea8b21f0624a5c5ee2d3ffa8d6d4a2c982'
# sha1hex pass-through
s = 'bda3c1017d52e826bbd1da51efad877272d300f9'
@@ -53,6 +57,7 @@ def test_b32_hex():
with pytest.raises(ValueError):
assert b32_hex('blah') == 'blah'
+
def test_parse_cdx_line():
raw = "edu,upenn,ldc)/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf 20170828233154 https://www.ldc.upenn.edu/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf application/pdf 200 WL3FEA62TEU4F52Y5DOVQ62VET4QJW7G - - 210251 931661233 SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828231135742-00000-00009-wbgrp-svc284/SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828232253025-00005-3480~wbgrp-svc284.us.archive.org~8443.warc.gz"
@@ -73,6 +78,7 @@ def test_parse_cdx_line():
assert parse_cdx_line(raw + "\n") == correct
assert parse_cdx_line(raw + " extra_field") == correct
+
def test_invalid_cdx():
print("missing warc")
@@ -80,11 +86,11 @@ def test_invalid_cdx():
assert parse_cdx_line(raw) == None
print("bad datetime")
- raw = "edu,upenn,ldc)/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf 2070828233154 https://www.ldc.upenn.edu/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf application/pdf 200 WL3FEA62TEU4F52Y5DOVQ62VET4QJW7G - - 210251 931661233i SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828231135742-00000-00009-wbgrp-svc284/SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828232253025-00005-3480~wbgrp-svc284.us.archive.org~8443.warc.gz"
+ raw = "edu,upenn,ldc)/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf 2070828233154 https://www.ldc.upenn.edu/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf application/pdf 200 WL3FEA62TEU4F52Y5DOVQ62VET4QJW7G - - 210251 931661233i SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828231135742-00000-00009-wbgrp-svc284/SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828232253025-00005-3480~wbgrp-svc284.us.archive.org~8443.warc.gz"
assert parse_cdx_line(raw) == None
+
def test_clean_url():
assert clean_url("http://BLAH.COM/file.pdf") == "http://blah.com/file.pdf"
assert clean_url("https://opensky.ucar.edu:/islandora/object/articles%3A10809/datastream/PDF/view") == \
"https://opensky.ucar.edu/islandora/object/articles%3A10809/datastream/PDF/view"
-
diff --git a/python/tests/test_pdfextract.py b/python/tests/test_pdfextract.py
index 1d334d6..146b138 100644
--- a/python/tests/test_pdfextract.py
+++ b/python/tests/test_pdfextract.py
@@ -1,4 +1,3 @@
-
import struct
import poppler
@@ -6,11 +5,13 @@ import pytest
import responses
from test_wayback import cdx_client, wayback_client
-from sandcrawler import BlackholeSink, CdxLinePusher, PdfExtractBlobWorker, PdfExtractWorker, WaybackClient
+from sandcrawler import (BlackholeSink, CdxLinePusher, PdfExtractBlobWorker, PdfExtractWorker,
+ WaybackClient)
from sandcrawler.pdfextract import process_pdf
FAKE_PDF_BYTES = b"%PDF SOME JUNK" + struct.pack("!q", 112853843)
+
def test_process_fake_pdf():
resp = process_pdf(FAKE_PDF_BYTES)
print(resp)
@@ -21,7 +22,9 @@ def test_process_fake_pdf():
resp = process_pdf(pdf_bytes)
assert resp.status == 'not-pdf'
-@pytest.mark.skipif(poppler.version_string() == '0.71.0', reason="unsupported version of poppler")
+
+@pytest.mark.skipif(poppler.version_string() == '0.71.0',
+ reason="unsupported version of poppler")
def test_process_dummy_pdf():
with open('tests/files/dummy.pdf', 'rb') as f:
pdf_bytes = f.read()
@@ -39,6 +42,7 @@ def test_process_dummy_pdf():
assert resp.pdf_extra['page0_width'] == 595
assert resp.pdf_extra['page_count'] == 1
+
def test_pdfextract_worker_cdx(wayback_client):
sink = BlackholeSink()
@@ -56,6 +60,7 @@ def test_pdfextract_worker_cdx(wayback_client):
assert pusher_counts['pushed'] == 7
assert pusher_counts['pushed'] == worker.counts['total']
+
def test_pdfextract_blob_worker():
sink = BlackholeSink()
@@ -65,4 +70,3 @@ def test_pdfextract_blob_worker():
pdf_bytes = f.read()
worker.process(pdf_bytes)
-
diff --git a/python/tests/test_pushers.py b/python/tests/test_pushers.py
index 62fa515..63f90d3 100644
--- a/python/tests/test_pushers.py
+++ b/python/tests/test_pushers.py
@@ -1,4 +1,3 @@
-
import pytest
from sandcrawler.workers import BlackholeSink, CdxLinePusher
@@ -18,8 +17,10 @@ def test_cdx_line_pusher():
# HTTP 200 and application/pdf
with open('tests/files/example.cdx', 'r') as cdx_file:
- pusher = CdxLinePusher(sink, cdx_file,
- filter_mimetypes=['application/pdf'], filter_http_statuses=[200, 226])
+ pusher = CdxLinePusher(sink,
+ cdx_file,
+ filter_mimetypes=['application/pdf'],
+ filter_http_statuses=[200, 226])
counts = pusher.run()
assert counts['total'] == 20
assert counts['skip-parse'] == 1
diff --git a/python/tests/test_savepagenow.py b/python/tests/test_savepagenow.py
index f3fbfda..80334d9 100644
--- a/python/tests/test_savepagenow.py
+++ b/python/tests/test_savepagenow.py
@@ -1,4 +1,3 @@
-
import json
import pytest
@@ -26,9 +25,7 @@ SUCCESS_BODY = {
"timestamp": "20180326070330",
"duration_sec": 6.203,
"resources": [
- TARGET,
- TARGET + "/redirect",
- "http://brewster.kahle.org/",
+ TARGET, TARGET + "/redirect", "http://brewster.kahle.org/",
"http://brewster.kahle.org/favicon.ico",
"http://brewster.kahle.org/files/2011/07/bkheader-follow.jpg",
"http://brewster.kahle.org/files/2016/12/amazon-unhappy.jpg",
@@ -43,8 +40,7 @@ SUCCESS_BODY = {
"http://brewster.kahle.org/wp-content/themes/twentyten/style.css",
"http://brewster.kahle.org/wp-includes/js/wp-embed.min.js?ver=4.9.4",
"http://brewster.kahle.org/wp-includes/js/wp-emoji-release.min.js?ver=4.9.4",
- "http://platform.twitter.com/widgets.js",
- "https://archive-it.org/piwik.js",
+ "http://platform.twitter.com/widgets.js", "https://archive-it.org/piwik.js",
"https://platform.twitter.com/jot.html",
"https://platform.twitter.com/js/button.556f0ea0e4da4e66cfdc182016dbd6db.js",
"https://platform.twitter.com/widgets/follow_button.f47a2e0b4471326b6fa0f163bda46011.en.html",
@@ -60,7 +56,7 @@ SUCCESS_BODY = {
"https://www.syndikat.org/wp-includes/js/jquery/jquery.js?ver=1.12.4",
"https://www.syndikat.org/wp-includes/js/wp-emoji-release.min.js?ver=4.9.4"
],
- "outlinks":{
+ "outlinks": {
"https://archive.org/": "xxxxxx89b-f3ca-48d0-9ea6-1d1225e98695",
"https://other.com": "yyyy89b-f3ca-48d0-9ea6-1d1225e98695"
}
@@ -74,10 +70,18 @@ ERROR_BODY = {
"resources": []
}
CDX_SPN_HIT = [
- ["urlkey","timestamp","original","mimetype","statuscode","digest","redirect","robotflags","length","offset","filename"],
- ["wiki,fatcat)/", "20180326070330", TARGET + "/redirect", "application/pdf", "200", CDX_BEST_SHA1B32, "-", "-", "8445", "108062304", "liveweb-20200108215212-wwwb-spn04.us.archive.org-kols1pud.warc.gz"],
+ [
+ "urlkey", "timestamp", "original", "mimetype", "statuscode", "digest", "redirect",
+ "robotflags", "length", "offset", "filename"
+ ],
+ [
+ "wiki,fatcat)/", "20180326070330", TARGET + "/redirect", "application/pdf", "200",
+ CDX_BEST_SHA1B32, "-", "-", "8445", "108062304",
+ "liveweb-20200108215212-wwwb-spn04.us.archive.org-kols1pud.warc.gz"
+ ],
]
+
@pytest.fixture
def spn_client():
client = SavePageNowClient(
@@ -88,25 +92,29 @@ def spn_client():
client.poll_seconds = 0.0
return client
+
@responses.activate
def test_savepagenow_success(spn_client):
responses.add(responses.POST,
- 'http://dummy-spnv2/save',
- status=200,
- body=json.dumps({"url": TARGET, "job_id": JOB_ID}))
+ 'http://dummy-spnv2/save',
+ status=200,
+ body=json.dumps({
+ "url": TARGET,
+ "job_id": JOB_ID
+ }))
responses.add(responses.GET,
- 'http://dummy-spnv2/save/status/' + JOB_ID,
- status=200,
- body=json.dumps(PENDING_BODY))
+ 'http://dummy-spnv2/save/status/' + JOB_ID,
+ status=200,
+ body=json.dumps(PENDING_BODY))
responses.add(responses.GET,
- 'http://dummy-spnv2/save/status/' + JOB_ID,
- status=200,
- body=json.dumps(PENDING_BODY))
+ 'http://dummy-spnv2/save/status/' + JOB_ID,
+ status=200,
+ body=json.dumps(PENDING_BODY))
responses.add(responses.GET,
- 'http://dummy-spnv2/save/status/' + JOB_ID,
- status=200,
- body=json.dumps(SUCCESS_BODY))
+ 'http://dummy-spnv2/save/status/' + JOB_ID,
+ status=200,
+ body=json.dumps(SUCCESS_BODY))
resp = spn_client.save_url_now_v2(TARGET)
@@ -119,21 +127,25 @@ def test_savepagenow_success(spn_client):
assert resp.terminal_dt == SUCCESS_BODY['timestamp']
assert resp.resources == SUCCESS_BODY['resources']
+
@responses.activate
def test_savepagenow_remote_error(spn_client):
responses.add(responses.POST,
- 'http://dummy-spnv2/save',
- status=200,
- body=json.dumps({"url": TARGET, "job_id": JOB_ID}))
+ 'http://dummy-spnv2/save',
+ status=200,
+ body=json.dumps({
+ "url": TARGET,
+ "job_id": JOB_ID
+ }))
responses.add(responses.GET,
- 'http://dummy-spnv2/save/status/' + JOB_ID,
- status=200,
- body=json.dumps(PENDING_BODY))
+ 'http://dummy-spnv2/save/status/' + JOB_ID,
+ status=200,
+ body=json.dumps(PENDING_BODY))
responses.add(responses.GET,
- 'http://dummy-spnv2/save/status/' + JOB_ID,
- status=200,
- body=json.dumps(ERROR_BODY))
+ 'http://dummy-spnv2/save/status/' + JOB_ID,
+ status=200,
+ body=json.dumps(ERROR_BODY))
resp = spn_client.save_url_now_v2(TARGET)
@@ -146,47 +158,56 @@ def test_savepagenow_remote_error(spn_client):
assert resp.terminal_dt == None
assert resp.resources == None
+
@responses.activate
def test_savepagenow_500(spn_client):
responses.add(responses.POST,
- 'http://dummy-spnv2/save',
- status=200,
- body=json.dumps({"url": TARGET, "job_id": JOB_ID}))
+ 'http://dummy-spnv2/save',
+ status=200,
+ body=json.dumps({
+ "url": TARGET,
+ "job_id": JOB_ID
+ }))
responses.add(responses.GET,
- 'http://dummy-spnv2/save/status/' + JOB_ID,
- status=500,
- body=json.dumps(ERROR_BODY))
+ 'http://dummy-spnv2/save/status/' + JOB_ID,
+ status=500,
+ body=json.dumps(ERROR_BODY))
with pytest.raises(SavePageNowError):
resp = spn_client.save_url_now_v2(TARGET)
assert len(responses.calls) == 2
+
@responses.activate
def test_crawl_resource(spn_client, wayback_client):
responses.add(responses.POST,
- 'http://dummy-spnv2/save',
- status=200,
- body=json.dumps({"url": TARGET, "job_id": JOB_ID}))
+ 'http://dummy-spnv2/save',
+ status=200,
+ body=json.dumps({
+ "url": TARGET,
+ "job_id": JOB_ID
+ }))
responses.add(responses.GET,
- 'http://dummy-spnv2/save/status/' + JOB_ID,
- status=200,
- body=json.dumps(PENDING_BODY))
+ 'http://dummy-spnv2/save/status/' + JOB_ID,
+ status=200,
+ body=json.dumps(PENDING_BODY))
responses.add(responses.GET,
- 'http://dummy-spnv2/save/status/' + JOB_ID,
- status=200,
- body=json.dumps(SUCCESS_BODY))
+ 'http://dummy-spnv2/save/status/' + JOB_ID,
+ status=200,
+ body=json.dumps(SUCCESS_BODY))
responses.add(responses.GET,
- 'http://dummy-cdx/cdx',
- status=200,
- body=json.dumps(CDX_SPN_HIT))
+ 'http://dummy-cdx/cdx',
+ status=200,
+ body=json.dumps(CDX_SPN_HIT))
responses.add(responses.GET,
- 'https://web.archive.org/web/{}id_/{}'.format("20180326070330", TARGET + "/redirect"),
- status=200,
- headers={"X-Archive-Src": "liveweb-whatever.warc.gz"},
- body=WARC_BODY)
+ 'https://web.archive.org/web/{}id_/{}'.format("20180326070330",
+ TARGET + "/redirect"),
+ status=200,
+ headers={"X-Archive-Src": "liveweb-whatever.warc.gz"},
+ body=WARC_BODY)
print('https://web.archive.org/web/{}id_/{}'.format("20180326070330", TARGET + "/redirect"))
resp = spn_client.crawl_resource(TARGET, wayback_client)
@@ -201,4 +222,3 @@ def test_crawl_resource(spn_client, wayback_client):
assert type(resp.cdx) == CdxPartial
with pytest.raises(AttributeError):
print(resp.cdx.warc_path)
-
diff --git a/python/tests/test_wayback.py b/python/tests/test_wayback.py
index 83311b9..6ccf775 100644
--- a/python/tests/test_wayback.py
+++ b/python/tests/test_wayback.py
@@ -1,4 +1,3 @@
-
import json
import pytest
@@ -10,27 +9,66 @@ CDX_TARGET = "http://fatcat.wiki/"
CDX_DT = "20180812220054"
# cdx -m exact -p output=json -p from=20180812220054 -p to=20180812220054 http://fatcat.wiki/
CDX_SINGLE_HIT = [
- ["urlkey","timestamp","original","mimetype","statuscode","digest","redirect","robotflags","length","offset","filename"],
- ["wiki,fatcat)/", CDX_DT, CDX_TARGET, "text/html", "200", "O5RHV6OQ7SIHDJIEP7ZW53DLRX5NFIJR", "-", "-", "8445", "108062304", "WIDE-20180810142205-crawl802/WIDE-20180812131623-00059.warc.gz"],
+ [
+ "urlkey", "timestamp", "original", "mimetype", "statuscode", "digest", "redirect",
+ "robotflags", "length", "offset", "filename"
+ ],
+ [
+ "wiki,fatcat)/", CDX_DT, CDX_TARGET, "text/html", "200",
+ "O5RHV6OQ7SIHDJIEP7ZW53DLRX5NFIJR", "-", "-", "8445", "108062304",
+ "WIDE-20180810142205-crawl802/WIDE-20180812131623-00059.warc.gz"
+ ],
]
CDX_BEST_SHA1B32 = "AAAAAAAAASIHDJIEP7ZW53DLRX5NFIJR"
# cdx -m exact -p output=json -p from=20180812220054 -p to=20180812220054 http://fatcat.wiki/
CDX_MULTI_HIT = [
- ["urlkey","timestamp","original","mimetype","statuscode","digest","redirect","robotflags","length","offset","filename"],
- ["wiki,fatcat)/", CDX_DT, CDX_TARGET, "text/html", "200", "O5RHV6OQ7SIHDJIEP7ZW53DLRX5NFIJR", "-", "-", "8445", "108062304", "WIDE-20180810142205-crawl802/WIDE-20180812131623-00059.warc.gz"],
- # sooner, but not right mimetype
- ["wiki,fatcat)/", "20180912220054", CDX_TARGET, "text/html", "200", "O5RHV6OQ7SIHDJIEP7ZW53DLRX5NFIJR", "-", "-", "8445", "108062304", "WIDE-20180810142205-crawl802/WIDE-20180812131623-00059.warc.gz"],
- # sooner and mimetype, but wrong status code
- ["wiki,fatcat)/", "20180912220054", CDX_TARGET, "application/pdf", "400", "O5RHV6OQ7SIHDJIEP7ZW53DLRX5NFIJR", "-", "-", "8445", "108062304", "WIDE-20180810142205-crawl802/WIDE-20180812131623-00059.warc.gz"],
- ["wiki,fatcat)/", "20180912220054", CDX_TARGET, "application/pdf", "500", "O5RHV6OQ7SIHDJIEP7ZW53DLRX5NFIJR", "-", "-", "8445", "108062304", "WIDE-20180810142205-crawl802/WIDE-20180812131623-00059.warc.gz"],
- ["wiki,fatcat)/", "20180912220054", CDX_TARGET, "application/pdf", "150", "O5RHV6OQ7SIHDJIEP7ZW53DLRX5NFIJR", "-", "-", "8445", "108062304", "WIDE-20180810142205-crawl802/WIDE-20180812131623-00059.warc.gz"],
- # "best"
- ["wiki,fatcat)/", CDX_DT, CDX_TARGET, "application/pdf", "200", CDX_BEST_SHA1B32, "-", "-", "8445", "108062304", "WIDE-20180810142205-crawl802/WIDE-20180812131623-00059.warc.gz"],
- # older
- ["wiki,fatcat)/", "20180712220054", CDX_TARGET, "application/pdf", "200", "O5RHV6OQ7SIHDJIEP7ZW53DLRX5NFIJR", "-", "-", "8445", "108062304", "WIDE-20180810142205-crawl802/WIDE-20180812131623-00059.warc.gz"],
+ [
+ "urlkey", "timestamp", "original", "mimetype", "statuscode", "digest", "redirect",
+ "robotflags", "length", "offset", "filename"
+ ],
+ [
+ "wiki,fatcat)/", CDX_DT, CDX_TARGET, "text/html", "200",
+ "O5RHV6OQ7SIHDJIEP7ZW53DLRX5NFIJR", "-", "-", "8445", "108062304",
+ "WIDE-20180810142205-crawl802/WIDE-20180812131623-00059.warc.gz"
+ ],
+ # sooner, but not right mimetype
+ [
+ "wiki,fatcat)/", "20180912220054", CDX_TARGET, "text/html", "200",
+ "O5RHV6OQ7SIHDJIEP7ZW53DLRX5NFIJR", "-", "-", "8445", "108062304",
+ "WIDE-20180810142205-crawl802/WIDE-20180812131623-00059.warc.gz"
+ ],
+ # sooner and mimetype, but wrong status code
+ [
+ "wiki,fatcat)/", "20180912220054", CDX_TARGET, "application/pdf", "400",
+ "O5RHV6OQ7SIHDJIEP7ZW53DLRX5NFIJR", "-", "-", "8445", "108062304",
+ "WIDE-20180810142205-crawl802/WIDE-20180812131623-00059.warc.gz"
+ ],
+ [
+ "wiki,fatcat)/", "20180912220054", CDX_TARGET, "application/pdf", "500",
+ "O5RHV6OQ7SIHDJIEP7ZW53DLRX5NFIJR", "-", "-", "8445", "108062304",
+ "WIDE-20180810142205-crawl802/WIDE-20180812131623-00059.warc.gz"
+ ],
+ [
+ "wiki,fatcat)/", "20180912220054", CDX_TARGET, "application/pdf", "150",
+ "O5RHV6OQ7SIHDJIEP7ZW53DLRX5NFIJR", "-", "-", "8445", "108062304",
+ "WIDE-20180810142205-crawl802/WIDE-20180812131623-00059.warc.gz"
+ ],
+ # "best"
+ [
+ "wiki,fatcat)/", CDX_DT, CDX_TARGET, "application/pdf", "200", CDX_BEST_SHA1B32, "-",
+ "-", "8445", "108062304",
+ "WIDE-20180810142205-crawl802/WIDE-20180812131623-00059.warc.gz"
+ ],
+ # older
+ [
+ "wiki,fatcat)/", "20180712220054", CDX_TARGET, "application/pdf", "200",
+ "O5RHV6OQ7SIHDJIEP7ZW53DLRX5NFIJR", "-", "-", "8445", "108062304",
+ "WIDE-20180810142205-crawl802/WIDE-20180812131623-00059.warc.gz"
+ ],
]
+
@pytest.fixture
def cdx_client():
client = CdxApiClient(
@@ -39,13 +77,14 @@ def cdx_client():
)
return client
+
@responses.activate
def test_cdx_fetch(cdx_client):
responses.add(responses.GET,
- 'http://dummy-cdx/cdx',
- status=200,
- body=json.dumps(CDX_SINGLE_HIT))
+ 'http://dummy-cdx/cdx',
+ status=200,
+ body=json.dumps(CDX_SINGLE_HIT))
resp = cdx_client.fetch(CDX_TARGET, CDX_DT)
@@ -58,6 +97,7 @@ def test_cdx_fetch(cdx_client):
assert resp.warc_offset == 108062304
assert resp.warc_path == "WIDE-20180810142205-crawl802/WIDE-20180812131623-00059.warc.gz"
+
@responses.activate
def test_cdx_fetch_errors(cdx_client):
@@ -65,9 +105,9 @@ def test_cdx_fetch_errors(cdx_client):
resp = cdx_client.fetch(CDX_TARGET, "2019")
responses.add(responses.GET,
- 'http://dummy-cdx/cdx',
- status=200,
- body=json.dumps(CDX_SINGLE_HIT))
+ 'http://dummy-cdx/cdx',
+ status=200,
+ body=json.dumps(CDX_SINGLE_HIT))
with pytest.raises(KeyError):
resp = cdx_client.fetch(CDX_TARGET, "20180812220055")
@@ -78,13 +118,14 @@ def test_cdx_fetch_errors(cdx_client):
resp = cdx_client.fetch(CDX_TARGET, CDX_DT)
assert len(responses.calls) == 3
+
@responses.activate
def test_cdx_lookup_best(cdx_client):
responses.add(responses.GET,
- 'http://dummy-cdx/cdx',
- status=200,
- body=json.dumps(CDX_MULTI_HIT))
+ 'http://dummy-cdx/cdx',
+ status=200,
+ body=json.dumps(CDX_MULTI_HIT))
resp = cdx_client.lookup_best(CDX_TARGET, best_mimetype="application/pdf")
@@ -95,6 +136,7 @@ def test_cdx_lookup_best(cdx_client):
assert resp.sha1b32 == CDX_BEST_SHA1B32
assert resp.warc_path == CDX_SINGLE_HIT[1][-1]
+
WARC_TARGET = "http://fatcat.wiki/"
WARC_BODY = b"""
<html>
@@ -108,6 +150,7 @@ WARC_BODY = b"""
</html>
"""
+
@pytest.fixture
def wayback_client(cdx_client, mocker):
client = WaybackClient(
@@ -127,6 +170,7 @@ def wayback_client(cdx_client, mocker):
return client
+
@pytest.fixture
def wayback_client_pdf(cdx_client, mocker):
@@ -150,6 +194,7 @@ def wayback_client_pdf(cdx_client, mocker):
return client
+
@responses.activate
def test_wayback_fetch(wayback_client):
resp = wayback_client.fetch_petabox(123, 456789, "here/there.warc.gz")
@@ -159,13 +204,14 @@ def test_wayback_fetch(wayback_client):
resp = wayback_client.fetch_petabox_body(123, 456789, "here/there.warc.gz")
assert resp == WARC_BODY
+
@responses.activate
def test_lookup_resource_success(wayback_client):
responses.add(responses.GET,
- 'http://dummy-cdx/cdx',
- status=200,
- body=json.dumps(CDX_MULTI_HIT))
+ 'http://dummy-cdx/cdx',
+ status=200,
+ body=json.dumps(CDX_MULTI_HIT))
resp = wayback_client.lookup_resource(CDX_TARGET)
diff --git a/python/tests/test_xml.py b/python/tests/test_xml.py
index a996c56..1742f3a 100644
--- a/python/tests/test_xml.py
+++ b/python/tests/test_xml.py
@@ -1,11 +1,10 @@
-
import pytest
from sandcrawler.xml import xml_reserialize
def test_xml_reserialize() -> None:
-
+
with open('tests/files/scielo_article.jats.xml', 'rb') as f:
raw_xml = f.read()