aboutsummaryrefslogtreecommitdiffstats
path: root/python/sandcrawler/misc.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/sandcrawler/misc.py')
-rw-r--r--python/sandcrawler/misc.py64
1 files changed, 45 insertions, 19 deletions
diff --git a/python/sandcrawler/misc.py b/python/sandcrawler/misc.py
index cf8c4bd..ddbd95a 100644
--- a/python/sandcrawler/misc.py
+++ b/python/sandcrawler/misc.py
@@ -1,4 +1,3 @@
-
import base64
import datetime
import hashlib
@@ -19,22 +18,28 @@ def clean_url(s: str) -> str:
parsed.colon_before_port = b''
return str(urlcanon.whatwg(parsed))
+
def url_fuzzy_equal(left: str, right: str) -> bool:
"""
TODO: use proper surt library and canonicalization for this check
"""
- fuzzy_left = '://'.join(clean_url(left).replace('www.', '').replace(':80/', '/').split('://')[1:])
- fuzzy_right = '://'.join(clean_url(right).replace('www.', '').replace(':80/', '/').split('://')[1:])
+ fuzzy_left = '://'.join(
+ clean_url(left).replace('www.', '').replace(':80/', '/').split('://')[1:])
+ fuzzy_right = '://'.join(
+ clean_url(right).replace('www.', '').replace(':80/', '/').split('://')[1:])
if fuzzy_left == fuzzy_right:
return True
elif fuzzy_left == fuzzy_right + "/" or fuzzy_right == fuzzy_left + "/":
return True
return False
+
def test_url_fuzzy_equal() -> None:
assert True == url_fuzzy_equal(
"http://www.annalsofian.org/article.asp?issn=0972-2327;year=2014;volume=17;issue=4;spage=463;epage=465;aulast=Nithyashree",
- "http://annalsofian.org/article.asp?issn=0972-2327;year=2014;volume=17;issue=4;spage=463;epage=465;aulast=Nithyashree")
+ "http://annalsofian.org/article.asp?issn=0972-2327;year=2014;volume=17;issue=4;spage=463;epage=465;aulast=Nithyashree"
+ )
+
def gen_file_metadata(blob: bytes, allow_empty: bool = False) -> dict:
"""
@@ -45,10 +50,10 @@ def gen_file_metadata(blob: bytes, allow_empty: bool = False) -> dict:
assert blob is not None
if not allow_empty:
assert blob
- if len(blob) < 1024*1024:
+ if len(blob) < 1024 * 1024:
mimetype = magic.Magic(mime=True).from_buffer(blob)
else:
- mimetype = magic.Magic(mime=True).from_buffer(blob[:(1024*1024)])
+ mimetype = magic.Magic(mime=True).from_buffer(blob[:(1024 * 1024)])
if mimetype in ("application/xml", "text/xml"):
# crude checks for XHTML or JATS XML, using only first 1 kB of file
if b"<htm" in blob[:1024] and b'xmlns="http://www.w3.org/1999/xhtml"' in blob[:1024]:
@@ -70,6 +75,7 @@ def gen_file_metadata(blob: bytes, allow_empty: bool = False) -> dict:
mimetype=mimetype,
)
+
def gen_file_metadata_path(path: str, allow_empty: bool = False) -> dict:
"""
Variant of gen_file_metadata() which works with files on local disk
@@ -92,7 +98,7 @@ def gen_file_metadata_path(path: str, allow_empty: bool = False) -> dict:
size_bytes = 0
with open(path, 'rb') as f:
while True:
- chunk = f.read(1024*1024)
+ chunk = f.read(1024 * 1024)
if not chunk:
break
size_bytes += len(chunk)
@@ -108,6 +114,7 @@ def gen_file_metadata_path(path: str, allow_empty: bool = False) -> dict:
mimetype=mimetype,
)
+
def b32_hex(s: str) -> str:
"""
Converts a base32-encoded SHA-1 checksum into hex-encoded
@@ -123,6 +130,7 @@ def b32_hex(s: str) -> str:
raise ValueError("not a base-32 encoded SHA-1 hash: {}".format(s))
return base64.b16encode(base64.b32decode(s.upper())).lower().decode('utf-8')
+
NORMAL_MIME = (
'application/pdf',
'application/postscript',
@@ -131,6 +139,7 @@ NORMAL_MIME = (
'application/octet-stream',
)
+
def normalize_mime(raw: str) -> Optional[str]:
raw = raw.lower().strip()
for norm in NORMAL_MIME:
@@ -142,9 +151,7 @@ def normalize_mime(raw: str) -> Optional[str]:
return 'text/xml'
if raw.startswith('application/x-pdf'):
return 'application/pdf'
- if raw in (
- '.pdf',
- ):
+ if raw in ('.pdf', ):
return 'application/pdf'
if raw in (
'application/download',
@@ -154,7 +161,7 @@ def normalize_mime(raw: str) -> Optional[str]:
'application/octetstream',
'application/force-download',
'application/unknown',
- ):
+ ):
return 'application/octet-stream'
return None
@@ -193,8 +200,8 @@ def parse_cdx_line(raw_cdx: str, normalize=True) -> Optional[dict]:
offset = cdx[9]
warc = cdx[10]
- if not (sha1b32.isalnum() and c_size.isdigit() and offset.isdigit()
- and len(sha1b32) == 32 and dt.isdigit()):
+ if not (sha1b32.isalnum() and c_size.isdigit() and offset.isdigit() and len(sha1b32) == 32
+ and dt.isdigit()):
return None
if '-' in (surt, dt, url, http_status, sha1b32, c_size, offset, warc):
@@ -221,6 +228,7 @@ def parse_cdx_line(raw_cdx: str, normalize=True) -> Optional[dict]:
warc_path=warc,
)
+
def parse_cdx_datetime(dt_str: str) -> Optional[datetime.datetime]:
if not dt_str:
return None
@@ -229,23 +237,39 @@ def parse_cdx_datetime(dt_str: str) -> Optional[datetime.datetime]:
except Exception:
return None
+
def test_parse_cdx_datetime() -> None:
assert parse_cdx_datetime("") == None
assert parse_cdx_datetime("asdf") == None
assert parse_cdx_datetime("19930203123045") != None
- assert parse_cdx_datetime("20201028235103") == datetime.datetime(year=2020, month=10, day=28, hour=23, minute=51, second=3)
+ assert parse_cdx_datetime("20201028235103") == datetime.datetime(year=2020,
+ month=10,
+ day=28,
+ hour=23,
+ minute=51,
+ second=3)
+
def datetime_to_cdx(dt: datetime.datetime) -> str:
return '%04d%02d%02d%02d%02d%02d' % (
- dt.year, dt.month, dt.day,
- dt.hour, dt.minute, dt.second,
+ dt.year,
+ dt.month,
+ dt.day,
+ dt.hour,
+ dt.minute,
+ dt.second,
)
+
def test_datetime_to_cdx() -> None:
- assert "20201028235103" == datetime_to_cdx(datetime.datetime(year=2020, month=10, day=28, hour=23, minute=51, second=3))
+ assert "20201028235103" == datetime_to_cdx(
+ datetime.datetime(year=2020, month=10, day=28, hour=23, minute=51, second=3))
+
-def requests_retry_session(retries=10, backoff_factor=3,
- status_forcelist=(500, 502, 504), session=None) -> requests.Session:
+def requests_retry_session(retries=10,
+ backoff_factor=3,
+ status_forcelist=(500, 502, 504),
+ session=None) -> requests.Session:
"""
From: https://www.peterbe.com/plog/best-practice-with-retries-with-requests
"""
@@ -262,6 +286,7 @@ def requests_retry_session(retries=10, backoff_factor=3,
session.mount('https://', adapter)
return session
+
def sanitize_fs_path(path: str) -> str:
"""
From: https://stackoverflow.com/questions/13939120/sanitizing-a-file-path-in-python/66950540#66950540
@@ -271,6 +296,7 @@ def sanitize_fs_path(path: str) -> str:
# - making the path relative
return os.path.relpath(os.path.normpath(os.path.join("/", path)), "/")
+
def test_sanitize_fs_path() -> None:
assert sanitize_fs_path("/thing.png") == "thing.png"
assert sanitize_fs_path("../../thing.png") == "thing.png"