aboutsummaryrefslogtreecommitdiffstats
path: root/python_client/fatcat_client/api_client.py
diff options
context:
space:
mode:
Diffstat (limited to 'python_client/fatcat_client/api_client.py')
-rw-r--r--python_client/fatcat_client/api_client.py624
1 files changed, 0 insertions, 624 deletions
diff --git a/python_client/fatcat_client/api_client.py b/python_client/fatcat_client/api_client.py
deleted file mode 100644
index 98256e65..00000000
--- a/python_client/fatcat_client/api_client.py
+++ /dev/null
@@ -1,624 +0,0 @@
-# coding: utf-8
-"""
- fatcat
-
- A scalable, versioned, API-oriented catalog of bibliographic entities and file metadata # noqa: E501
-
- OpenAPI spec version: 0.3.0
-
- Generated by: https://github.com/swagger-api/swagger-codegen.git
-"""
-
-from __future__ import absolute_import
-
-import datetime
-import json
-import mimetypes
-from multiprocessing.pool import ThreadPool
-import os
-import re
-import tempfile
-
-# python 2 and python 3 compatibility library
-import six
-from six.moves.urllib.parse import quote
-
-from fatcat_client.configuration import Configuration
-import fatcat_client.models
-from fatcat_client import rest
-
-
-class ApiClient(object):
- """Generic API client for Swagger client library builds.
-
- Swagger generic API client. This client handles the client-
- server communication, and is invariant across implementations. Specifics of
- the methods and models for each application are generated from the Swagger
- templates.
-
- NOTE: This class is auto generated by the swagger code generator program.
- Ref: https://github.com/swagger-api/swagger-codegen
- Do not edit the class manually.
-
- :param configuration: .Configuration object for this client
- :param header_name: a header to pass when making calls to the API.
- :param header_value: a header value to pass when making calls to
- the API.
- :param cookie: a cookie to include in the header when making calls
- to the API
- """
-
- PRIMITIVE_TYPES = (float, bool, bytes, six.text_type) + six.integer_types
- NATIVE_TYPES_MAPPING = {
- 'int': int,
- 'long': int if six.PY3 else long, # noqa: F821
- 'float': float,
- 'str': str,
- 'bool': bool,
- 'date': datetime.date,
- 'datetime': datetime.datetime,
- 'object': object,
- }
-
- def __init__(self, configuration=None, header_name=None, header_value=None,
- cookie=None):
- if configuration is None:
- configuration = Configuration()
- self.configuration = configuration
-
- self.pool = ThreadPool()
- self.rest_client = rest.RESTClientObject(configuration)
- self.default_headers = {}
- if header_name is not None:
- self.default_headers[header_name] = header_value
- self.cookie = cookie
- # Set default User-Agent.
- self.user_agent = 'Swagger-Codegen/1.0.0/python'
-
- def __del__(self):
- try:
- self.pool.close()
- self.pool.join()
- except:
- pass
-
- @property
- def user_agent(self):
- """User agent for this API client"""
- return self.default_headers['User-Agent']
-
- @user_agent.setter
- def user_agent(self, value):
- self.default_headers['User-Agent'] = value
-
- def set_default_header(self, header_name, header_value):
- self.default_headers[header_name] = header_value
-
- def __call_api(
- self, resource_path, method, path_params=None,
- query_params=None, header_params=None, body=None, post_params=None,
- files=None, response_type=None, auth_settings=None,
- _return_http_data_only=None, collection_formats=None,
- _preload_content=True, _request_timeout=None):
-
- config = self.configuration
-
- # header parameters
- header_params = header_params or {}
- header_params.update(self.default_headers)
- if self.cookie:
- header_params['Cookie'] = self.cookie
- if header_params:
- header_params = self.sanitize_for_serialization(header_params)
- header_params = dict(self.parameters_to_tuples(header_params,
- collection_formats))
-
- # path parameters
- if path_params:
- path_params = self.sanitize_for_serialization(path_params)
- path_params = self.parameters_to_tuples(path_params,
- collection_formats)
- for k, v in path_params:
- # specified safe chars, encode everything
- resource_path = resource_path.replace(
- '{%s}' % k,
- quote(str(v), safe=config.safe_chars_for_path_param)
- )
-
- # query parameters
- if query_params:
- query_params = self.sanitize_for_serialization(query_params)
- query_params = self.parameters_to_tuples(query_params,
- collection_formats)
-
- # post parameters
- if post_params or files:
- post_params = self.prepare_post_parameters(post_params, files)
- post_params = self.sanitize_for_serialization(post_params)
- post_params = self.parameters_to_tuples(post_params,
- collection_formats)
-
- # auth setting
- self.update_params_for_auth(header_params, query_params, auth_settings)
-
- # body
- if body:
- body = self.sanitize_for_serialization(body)
-
- # request url
- url = self.configuration.host + resource_path
-
- # perform request and return response
- response_data = self.request(
- method, url, query_params=query_params, headers=header_params,
- post_params=post_params, body=body,
- _preload_content=_preload_content,
- _request_timeout=_request_timeout)
-
- self.last_response = response_data
-
- return_data = response_data
- if _preload_content:
- # deserialize response data
- if response_type:
- return_data = self.deserialize(response_data, response_type)
- else:
- return_data = None
-
- if _return_http_data_only:
- return (return_data)
- else:
- return (return_data, response_data.status,
- response_data.getheaders())
-
- def sanitize_for_serialization(self, obj):
- """Builds a JSON POST object.
-
- If obj is None, return None.
- If obj is str, int, long, float, bool, return directly.
- If obj is datetime.datetime, datetime.date
- convert to string in iso8601 format.
- If obj is list, sanitize each element in the list.
- If obj is dict, return the dict.
- If obj is swagger model, return the properties dict.
-
- :param obj: The data to serialize.
- :return: The serialized form of data.
- """
- if obj is None:
- return None
- elif isinstance(obj, self.PRIMITIVE_TYPES):
- return obj
- elif isinstance(obj, list):
- return [self.sanitize_for_serialization(sub_obj)
- for sub_obj in obj]
- elif isinstance(obj, tuple):
- return tuple(self.sanitize_for_serialization(sub_obj)
- for sub_obj in obj)
- elif isinstance(obj, (datetime.datetime, datetime.date)):
- return obj.isoformat()
-
- if isinstance(obj, dict):
- obj_dict = obj
- else:
- # Convert model obj to dict except
- # attributes `swagger_types`, `attribute_map`
- # and attributes which value is not None.
- # Convert attribute name to json key in
- # model definition for request.
- obj_dict = {obj.attribute_map[attr]: getattr(obj, attr)
- for attr, _ in six.iteritems(obj.swagger_types)
- if getattr(obj, attr) is not None}
-
- return {key: self.sanitize_for_serialization(val)
- for key, val in six.iteritems(obj_dict)}
-
- def deserialize(self, response, response_type):
- """Deserializes response into an object.
-
- :param response: RESTResponse object to be deserialized.
- :param response_type: class literal for
- deserialized object, or string of class name.
-
- :return: deserialized object.
- """
- # handle file downloading
- # save response body into a tmp file and return the instance
- if response_type == "file":
- return self.__deserialize_file(response)
-
- # fetch data from response object
- try:
- data = json.loads(response.data)
- except ValueError:
- data = response.data
-
- return self.__deserialize(data, response_type)
-
- def __deserialize(self, data, klass):
- """Deserializes dict, list, str into an object.
-
- :param data: dict, list or str.
- :param klass: class literal, or string of class name.
-
- :return: object.
- """
- if data is None:
- return None
-
- if type(klass) == str:
- if klass.startswith('list['):
- sub_kls = re.match('list\[(.*)\]', klass).group(1)
- return [self.__deserialize(sub_data, sub_kls)
- for sub_data in data]
-
- if klass.startswith('dict('):
- sub_kls = re.match('dict\(([^,]*), (.*)\)', klass).group(2)
- return {k: self.__deserialize(v, sub_kls)
- for k, v in six.iteritems(data)}
-
- # convert str to class
- if klass in self.NATIVE_TYPES_MAPPING:
- klass = self.NATIVE_TYPES_MAPPING[klass]
- else:
- klass = getattr(fatcat_client.models, klass)
-
- if klass in self.PRIMITIVE_TYPES:
- return self.__deserialize_primitive(data, klass)
- elif klass == object:
- return self.__deserialize_object(data)
- elif klass == datetime.date:
- return self.__deserialize_date(data)
- elif klass == datetime.datetime:
- return self.__deserialize_datatime(data)
- else:
- return self.__deserialize_model(data, klass)
-
- def call_api(self, resource_path, method,
- path_params=None, query_params=None, header_params=None,
- body=None, post_params=None, files=None,
- response_type=None, auth_settings=None, async=None,
- _return_http_data_only=None, collection_formats=None,
- _preload_content=True, _request_timeout=None):
- """Makes the HTTP request (synchronous) and returns deserialized data.
-
- To make an async request, set the async parameter.
-
- :param resource_path: Path to method endpoint.
- :param method: Method to call.
- :param path_params: Path parameters in the url.
- :param query_params: Query parameters in the url.
- :param header_params: Header parameters to be
- placed in the request header.
- :param body: Request body.
- :param post_params dict: Request post form parameters,
- for `application/x-www-form-urlencoded`, `multipart/form-data`.
- :param auth_settings list: Auth Settings names for the request.
- :param response: Response data type.
- :param files dict: key -> filename, value -> filepath,
- for `multipart/form-data`.
- :param async bool: execute request asynchronously
- :param _return_http_data_only: response data without head status code
- and headers
- :param collection_formats: dict of collection formats for path, query,
- header, and post parameters.
- :param _preload_content: if False, the urllib3.HTTPResponse object will
- be returned without reading/decoding response
- data. Default is True.
- :param _request_timeout: timeout setting for this request. If one
- number provided, it will be total request
- timeout. It can also be a pair (tuple) of
- (connection, read) timeouts.
- :return:
- If async parameter is True,
- the request will be called asynchronously.
- The method will return the request thread.
- If parameter async is False or missing,
- then the method will return the response directly.
- """
- if not async:
- return self.__call_api(resource_path, method,
- path_params, query_params, header_params,
- body, post_params, files,
- response_type, auth_settings,
- _return_http_data_only, collection_formats,
- _preload_content, _request_timeout)
- else:
- thread = self.pool.apply_async(self.__call_api, (resource_path,
- method, path_params, query_params,
- header_params, body,
- post_params, files,
- response_type, auth_settings,
- _return_http_data_only,
- collection_formats,
- _preload_content, _request_timeout))
- return thread
-
- def request(self, method, url, query_params=None, headers=None,
- post_params=None, body=None, _preload_content=True,
- _request_timeout=None):
- """Makes the HTTP request using RESTClient."""
- if method == "GET":
- return self.rest_client.GET(url,
- query_params=query_params,
- _preload_content=_preload_content,
- _request_timeout=_request_timeout,
- headers=headers)
- elif method == "HEAD":
- return self.rest_client.HEAD(url,
- query_params=query_params,
- _preload_content=_preload_content,
- _request_timeout=_request_timeout,
- headers=headers)
- elif method == "OPTIONS":
- return self.rest_client.OPTIONS(url,
- query_params=query_params,
- headers=headers,
- post_params=post_params,
- _preload_content=_preload_content,
- _request_timeout=_request_timeout,
- body=body)
- elif method == "POST":
- return self.rest_client.POST(url,
- query_params=query_params,
- headers=headers,
- post_params=post_params,
- _preload_content=_preload_content,
- _request_timeout=_request_timeout,
- body=body)
- elif method == "PUT":
- return self.rest_client.PUT(url,
- query_params=query_params,
- headers=headers,
- post_params=post_params,
- _preload_content=_preload_content,
- _request_timeout=_request_timeout,
- body=body)
- elif method == "PATCH":
- return self.rest_client.PATCH(url,
- query_params=query_params,
- headers=headers,
- post_params=post_params,
- _preload_content=_preload_content,
- _request_timeout=_request_timeout,
- body=body)
- elif method == "DELETE":
- return self.rest_client.DELETE(url,
- query_params=query_params,
- headers=headers,
- _preload_content=_preload_content,
- _request_timeout=_request_timeout,
- body=body)
- else:
- raise ValueError(
- "http method must be `GET`, `HEAD`, `OPTIONS`,"
- " `POST`, `PATCH`, `PUT` or `DELETE`."
- )
-
- def parameters_to_tuples(self, params, collection_formats):
- """Get parameters as list of tuples, formatting collections.
-
- :param params: Parameters as dict or list of two-tuples
- :param dict collection_formats: Parameter collection formats
- :return: Parameters as list of tuples, collections formatted
- """
- new_params = []
- if collection_formats is None:
- collection_formats = {}
- for k, v in six.iteritems(params) if isinstance(params, dict) else params: # noqa: E501
- if k in collection_formats:
- collection_format = collection_formats[k]
- if collection_format == 'multi':
- new_params.extend((k, value) for value in v)
- else:
- if collection_format == 'ssv':
- delimiter = ' '
- elif collection_format == 'tsv':
- delimiter = '\t'
- elif collection_format == 'pipes':
- delimiter = '|'
- else: # csv is the default
- delimiter = ','
- new_params.append(
- (k, delimiter.join(str(value) for value in v)))
- else:
- new_params.append((k, v))
- return new_params
-
- def prepare_post_parameters(self, post_params=None, files=None):
- """Builds form parameters.
-
- :param post_params: Normal form parameters.
- :param files: File parameters.
- :return: Form parameters with files.
- """
- params = []
-
- if post_params:
- params = post_params
-
- if files:
- for k, v in six.iteritems(files):
- if not v:
- continue
- file_names = v if type(v) is list else [v]
- for n in file_names:
- with open(n, 'rb') as f:
- filename = os.path.basename(f.name)
- filedata = f.read()
- mimetype = (mimetypes.guess_type(filename)[0] or
- 'application/octet-stream')
- params.append(
- tuple([k, tuple([filename, filedata, mimetype])]))
-
- return params
-
- def select_header_accept(self, accepts):
- """Returns `Accept` based on an array of accepts provided.
-
- :param accepts: List of headers.
- :return: Accept (e.g. application/json).
- """
- if not accepts:
- return
-
- accepts = [x.lower() for x in accepts]
-
- if 'application/json' in accepts:
- return 'application/json'
- else:
- return ', '.join(accepts)
-
- def select_header_content_type(self, content_types):
- """Returns `Content-Type` based on an array of content_types provided.
-
- :param content_types: List of content-types.
- :return: Content-Type (e.g. application/json).
- """
- if not content_types:
- return 'application/json'
-
- content_types = [x.lower() for x in content_types]
-
- if 'application/json' in content_types or '*/*' in content_types:
- return 'application/json'
- else:
- return content_types[0]
-
- def update_params_for_auth(self, headers, querys, auth_settings):
- """Updates header and query params based on authentication setting.
-
- :param headers: Header parameters dict to be updated.
- :param querys: Query parameters tuple list to be updated.
- :param auth_settings: Authentication setting identifiers list.
- """
- if not auth_settings:
- return
-
- for auth in auth_settings:
- auth_setting = self.configuration.auth_settings().get(auth)
- if auth_setting:
- if not auth_setting['value']:
- continue
- elif auth_setting['in'] == 'header':
- headers[auth_setting['key']] = auth_setting['value']
- elif auth_setting['in'] == 'query':
- querys.append((auth_setting['key'], auth_setting['value']))
- else:
- raise ValueError(
- 'Authentication token must be in `query` or `header`'
- )
-
- def __deserialize_file(self, response):
- """Deserializes body to file
-
- Saves response body into a file in a temporary folder,
- using the filename from the `Content-Disposition` header if provided.
-
- :param response: RESTResponse.
- :return: file path.
- """
- fd, path = tempfile.mkstemp(dir=self.configuration.temp_folder_path)
- os.close(fd)
- os.remove(path)
-
- content_disposition = response.getheader("Content-Disposition")
- if content_disposition:
- filename = re.search(r'filename=[\'"]?([^\'"\s]+)[\'"]?',
- content_disposition).group(1)
- path = os.path.join(os.path.dirname(path), filename)
-
- with open(path, "wb") as f:
- f.write(response.data)
-
- return path
-
- def __deserialize_primitive(self, data, klass):
- """Deserializes string to primitive type.
-
- :param data: str.
- :param klass: class literal.
-
- :return: int, long, float, str, bool.
- """
- try:
- return klass(data)
- except UnicodeEncodeError:
- return six.u(data)
- except TypeError:
- return data
-
- def __deserialize_object(self, value):
- """Return a original value.
-
- :return: object.
- """
- return value
-
- def __deserialize_date(self, string):
- """Deserializes string to date.
-
- :param string: str.
- :return: date.
- """
- try:
- from dateutil.parser import parse
- return parse(string).date()
- except ImportError:
- return string
- except ValueError:
- raise rest.ApiException(
- status=0,
- reason="Failed to parse `{0}` as date object".format(string)
- )
-
- def __deserialize_datatime(self, string):
- """Deserializes string to datetime.
-
- The string should be in iso8601 datetime format.
-
- :param string: str.
- :return: datetime.
- """
- try:
- from dateutil.parser import parse
- return parse(string)
- except ImportError:
- return string
- except ValueError:
- raise rest.ApiException(
- status=0,
- reason=(
- "Failed to parse `{0}` as datetime object"
- .format(string)
- )
- )
-
- def __deserialize_model(self, data, klass):
- """Deserializes list or dict to model.
-
- :param data: dict, list.
- :param klass: class literal.
- :return: model object.
- """
-
- if not klass.swagger_types and not hasattr(klass,
- 'get_real_child_model'):
- return data
-
- kwargs = {}
- if klass.swagger_types is not None:
- for attr, attr_type in six.iteritems(klass.swagger_types):
- if (data is not None and
- klass.attribute_map[attr] in data and
- isinstance(data, (list, dict))):
- value = data[klass.attribute_map[attr]]
- kwargs[attr] = self.__deserialize(value, attr_type)
-
- instance = klass(**kwargs)
-
- if hasattr(instance, 'get_real_child_model'):
- klass_name = instance.get_real_child_model(data)
- if klass_name:
- instance = self.__deserialize(data, klass_name)
- return instance