aboutsummaryrefslogtreecommitdiffstats
path: root/python_openapi_client/fatcat_openapi_client/rest.py
diff options
context:
space:
mode:
Diffstat (limited to 'python_openapi_client/fatcat_openapi_client/rest.py')
-rw-r--r--python_openapi_client/fatcat_openapi_client/rest.py118
1 files changed, 87 insertions, 31 deletions
diff --git a/python_openapi_client/fatcat_openapi_client/rest.py b/python_openapi_client/fatcat_openapi_client/rest.py
index d049b537..5a599e36 100644
--- a/python_openapi_client/fatcat_openapi_client/rest.py
+++ b/python_openapi_client/fatcat_openapi_client/rest.py
@@ -1,31 +1,26 @@
-# coding: utf-8
-
"""
fatcat
Fatcat is a scalable, versioned, API-oriented catalog of bibliographic entities and file metadata. # noqa: E501
- The version of the OpenAPI document: 0.3.1
+ The version of the OpenAPI document: 0.5.0
Contact: webservices@archive.org
Generated by: https://openapi-generator.tech
"""
-from __future__ import absolute_import
-
import io
import json
import logging
import re
import ssl
-
-import certifi
-# python 2 and python 3 compatibility library
-import six
-from six.moves.urllib.parse import urlencode
+from urllib.parse import urlencode
+from urllib.parse import urlparse
+from urllib.request import proxy_bypass_environment
import urllib3
+import ipaddress
-from fatcat_openapi_client.exceptions import ApiException, ApiValueError
+from fatcat_openapi_client.exceptions import ApiException, UnauthorizedException, ForbiddenException, NotFoundException, ServiceException, ApiValueError
logger = logging.getLogger(__name__)
@@ -63,13 +58,6 @@ class RESTClientObject(object):
else:
cert_reqs = ssl.CERT_NONE
- # ca_certs
- if configuration.ssl_ca_cert:
- ca_certs = configuration.ssl_ca_cert
- else:
- # if not set certificate file, use Mozilla's root certificates.
- ca_certs = certifi.where()
-
addition_pool_args = {}
if configuration.assert_hostname is not None:
addition_pool_args['assert_hostname'] = configuration.assert_hostname # noqa: E501
@@ -77,6 +65,9 @@ class RESTClientObject(object):
if configuration.retries is not None:
addition_pool_args['retries'] = configuration.retries
+ if configuration.socket_options is not None:
+ addition_pool_args['socket_options'] = configuration.socket_options
+
if maxsize is None:
if configuration.connection_pool_maxsize is not None:
maxsize = configuration.connection_pool_maxsize
@@ -84,12 +75,13 @@ class RESTClientObject(object):
maxsize = 4
# https pool manager
- if configuration.proxy:
+ if configuration.proxy and not should_bypass_proxies(
+ configuration.host, no_proxy=configuration.no_proxy or ''):
self.pool_manager = urllib3.ProxyManager(
num_pools=pools_size,
maxsize=maxsize,
cert_reqs=cert_reqs,
- ca_certs=ca_certs,
+ ca_certs=configuration.ssl_ca_cert,
cert_file=configuration.cert_file,
key_file=configuration.key_file,
proxy_url=configuration.proxy,
@@ -101,7 +93,7 @@ class RESTClientObject(object):
num_pools=pools_size,
maxsize=maxsize,
cert_reqs=cert_reqs,
- ca_certs=ca_certs,
+ ca_certs=configuration.ssl_ca_cert,
cert_file=configuration.cert_file,
key_file=configuration.key_file,
**addition_pool_args
@@ -142,22 +134,23 @@ class RESTClientObject(object):
timeout = None
if _request_timeout:
- if isinstance(_request_timeout, (int, ) if six.PY3 else (int, long)): # noqa: E501,F821
+ if isinstance(_request_timeout, (int, float)): # noqa: E501,F821
timeout = urllib3.Timeout(total=_request_timeout)
elif (isinstance(_request_timeout, tuple) and
len(_request_timeout) == 2):
timeout = urllib3.Timeout(
connect=_request_timeout[0], read=_request_timeout[1])
- if 'Content-Type' not in headers:
- headers['Content-Type'] = 'application/json'
-
try:
# For `POST`, `PUT`, `PATCH`, `OPTIONS`, `DELETE`
if method in ['POST', 'PUT', 'PATCH', 'OPTIONS', 'DELETE']:
+ # Only set a default Content-Type for POST, PUT, PATCH and OPTIONS requests
+ if (method != 'DELETE') and ('Content-Type' not in headers):
+ headers['Content-Type'] = 'application/json'
if query_params:
url += '?' + urlencode(query_params)
- if re.search('json', headers['Content-Type'], re.IGNORECASE):
+ if ('Content-Type' not in headers) or (re.search('json',
+ headers['Content-Type'], re.IGNORECASE)):
request_body = None
if body is not None:
request_body = json.dumps(body)
@@ -218,15 +211,22 @@ class RESTClientObject(object):
if _preload_content:
r = RESTResponse(r)
- # In the python 3, the response.data is bytes.
- # we need to decode it to string.
- if six.PY3:
- r.data = r.data.decode('utf8')
-
# log response body
logger.debug("response body: %s", r.data)
if not 200 <= r.status <= 299:
+ if r.status == 401:
+ raise UnauthorizedException(http_resp=r)
+
+ if r.status == 403:
+ raise ForbiddenException(http_resp=r)
+
+ if r.status == 404:
+ raise NotFoundException(http_resp=r)
+
+ if 500 <= r.status <= 599:
+ raise ServiceException(http_resp=r)
+
raise ApiException(http_resp=r)
return r
@@ -295,3 +295,59 @@ class RESTClientObject(object):
_preload_content=_preload_content,
_request_timeout=_request_timeout,
body=body)
+
+# end of class RESTClientObject
+
+
+def is_ipv4(target):
+ """ Test if IPv4 address or not
+ """
+ try:
+ chk = ipaddress.IPv4Address(target)
+ return True
+ except ipaddress.AddressValueError:
+ return False
+
+
+def in_ipv4net(target, net):
+ """ Test if target belongs to given IPv4 network
+ """
+ try:
+ nw = ipaddress.IPv4Network(net)
+ ip = ipaddress.IPv4Address(target)
+ if ip in nw:
+ return True
+ return False
+ except ipaddress.AddressValueError:
+ return False
+ except ipaddress.NetmaskValueError:
+ return False
+
+
+def should_bypass_proxies(url, no_proxy=None):
+ """ Yet another requests.should_bypass_proxies
+ Test if proxies should not be used for a particular url.
+ """
+
+ parsed = urlparse(url)
+
+ # special cases
+ if parsed.hostname in [None, '']:
+ return True
+
+ # special cases
+ if no_proxy in [None, '']:
+ return False
+ if no_proxy == '*':
+ return True
+
+ no_proxy = no_proxy.lower().replace(' ', '');
+ entries = (
+ host for host in no_proxy.split(',') if host
+ )
+
+ if is_ipv4(parsed.hostname):
+ for item in entries:
+ if in_ipv4net(parsed.hostname, item):
+ return True
+ return proxy_bypass_environment(parsed.hostname, {'no': no_proxy})