From 6571c1d98aa1e13f28e1d2557408a369d4e3220f Mon Sep 17 00:00:00 2001 From: Bryan Newbold Date: Wed, 9 Jan 2019 11:11:39 -0800 Subject: start refactor of moving fatcat_client into python_client --- python/fatcat_client/api_client.py | 621 ------------------------------------- 1 file changed, 621 deletions(-) delete mode 100644 python/fatcat_client/api_client.py (limited to 'python/fatcat_client/api_client.py') diff --git a/python/fatcat_client/api_client.py b/python/fatcat_client/api_client.py deleted file mode 100644 index 6e562014..00000000 --- a/python/fatcat_client/api_client.py +++ /dev/null @@ -1,621 +0,0 @@ -# coding: utf-8 -""" - fatcat - - A scalable, versioned, API-oriented catalog of bibliographic entities and file metadata # noqa: E501 - - OpenAPI spec version: 0.1.0 - - Generated by: https://github.com/swagger-api/swagger-codegen.git -""" - -from __future__ import absolute_import - -import datetime -import json -import mimetypes -from multiprocessing.pool import ThreadPool -import os -import re -import tempfile - -# python 2 and python 3 compatibility library -import six -from six.moves.urllib.parse import quote - -from fatcat_client.configuration import Configuration -import fatcat_client.models -from fatcat_client import rest - - -class ApiClient(object): - """Generic API client for Swagger client library builds. - - Swagger generic API client. This client handles the client- - server communication, and is invariant across implementations. Specifics of - the methods and models for each application are generated from the Swagger - templates. - - NOTE: This class is auto generated by the swagger code generator program. - Ref: https://github.com/swagger-api/swagger-codegen - Do not edit the class manually. - - :param configuration: .Configuration object for this client - :param header_name: a header to pass when making calls to the API. - :param header_value: a header value to pass when making calls to - the API. - :param cookie: a cookie to include in the header when making calls - to the API - """ - - PRIMITIVE_TYPES = (float, bool, bytes, six.text_type) + six.integer_types - NATIVE_TYPES_MAPPING = { - 'int': int, - 'long': int if six.PY3 else long, # noqa: F821 - 'float': float, - 'str': str, - 'bool': bool, - 'date': datetime.date, - 'datetime': datetime.datetime, - 'object': object, - } - - def __init__(self, configuration=None, header_name=None, header_value=None, - cookie=None): - if configuration is None: - configuration = Configuration() - self.configuration = configuration - - self.pool = ThreadPool() - self.rest_client = rest.RESTClientObject(configuration) - self.default_headers = {} - if header_name is not None: - self.default_headers[header_name] = header_value - self.cookie = cookie - # Set default User-Agent. - self.user_agent = 'Swagger-Codegen/1.0.0/python' - - def __del__(self): - self.pool.close() - self.pool.join() - - @property - def user_agent(self): - """User agent for this API client""" - return self.default_headers['User-Agent'] - - @user_agent.setter - def user_agent(self, value): - self.default_headers['User-Agent'] = value - - def set_default_header(self, header_name, header_value): - self.default_headers[header_name] = header_value - - def __call_api( - self, resource_path, method, path_params=None, - query_params=None, header_params=None, body=None, post_params=None, - files=None, response_type=None, auth_settings=None, - _return_http_data_only=None, collection_formats=None, - _preload_content=True, _request_timeout=None): - - config = self.configuration - - # header parameters - header_params = header_params or {} - header_params.update(self.default_headers) - if self.cookie: - header_params['Cookie'] = self.cookie - if header_params: - header_params = self.sanitize_for_serialization(header_params) - header_params = dict(self.parameters_to_tuples(header_params, - collection_formats)) - - # path parameters - if path_params: - path_params = self.sanitize_for_serialization(path_params) - path_params = self.parameters_to_tuples(path_params, - collection_formats) - for k, v in path_params: - # specified safe chars, encode everything - resource_path = resource_path.replace( - '{%s}' % k, - quote(str(v), safe=config.safe_chars_for_path_param) - ) - - # query parameters - if query_params: - query_params = self.sanitize_for_serialization(query_params) - query_params = self.parameters_to_tuples(query_params, - collection_formats) - - # post parameters - if post_params or files: - post_params = self.prepare_post_parameters(post_params, files) - post_params = self.sanitize_for_serialization(post_params) - post_params = self.parameters_to_tuples(post_params, - collection_formats) - - # auth setting - self.update_params_for_auth(header_params, query_params, auth_settings) - - # body - if body: - body = self.sanitize_for_serialization(body) - - # request url - url = self.configuration.host + resource_path - - # perform request and return response - response_data = self.request( - method, url, query_params=query_params, headers=header_params, - post_params=post_params, body=body, - _preload_content=_preload_content, - _request_timeout=_request_timeout) - - self.last_response = response_data - - return_data = response_data - if _preload_content: - # deserialize response data - if response_type: - return_data = self.deserialize(response_data, response_type) - else: - return_data = None - - if _return_http_data_only: - return (return_data) - else: - return (return_data, response_data.status, - response_data.getheaders()) - - def sanitize_for_serialization(self, obj): - """Builds a JSON POST object. - - If obj is None, return None. - If obj is str, int, long, float, bool, return directly. - If obj is datetime.datetime, datetime.date - convert to string in iso8601 format. - If obj is list, sanitize each element in the list. - If obj is dict, return the dict. - If obj is swagger model, return the properties dict. - - :param obj: The data to serialize. - :return: The serialized form of data. - """ - if obj is None: - return None - elif isinstance(obj, self.PRIMITIVE_TYPES): - return obj - elif isinstance(obj, list): - return [self.sanitize_for_serialization(sub_obj) - for sub_obj in obj] - elif isinstance(obj, tuple): - return tuple(self.sanitize_for_serialization(sub_obj) - for sub_obj in obj) - elif isinstance(obj, (datetime.datetime, datetime.date)): - return obj.isoformat() - - if isinstance(obj, dict): - obj_dict = obj - else: - # Convert model obj to dict except - # attributes `swagger_types`, `attribute_map` - # and attributes which value is not None. - # Convert attribute name to json key in - # model definition for request. - obj_dict = {obj.attribute_map[attr]: getattr(obj, attr) - for attr, _ in six.iteritems(obj.swagger_types) - if getattr(obj, attr) is not None} - - return {key: self.sanitize_for_serialization(val) - for key, val in six.iteritems(obj_dict)} - - def deserialize(self, response, response_type): - """Deserializes response into an object. - - :param response: RESTResponse object to be deserialized. - :param response_type: class literal for - deserialized object, or string of class name. - - :return: deserialized object. - """ - # handle file downloading - # save response body into a tmp file and return the instance - if response_type == "file": - return self.__deserialize_file(response) - - # fetch data from response object - try: - data = json.loads(response.data) - except ValueError: - data = response.data - - return self.__deserialize(data, response_type) - - def __deserialize(self, data, klass): - """Deserializes dict, list, str into an object. - - :param data: dict, list or str. - :param klass: class literal, or string of class name. - - :return: object. - """ - if data is None: - return None - - if type(klass) == str: - if klass.startswith('list['): - sub_kls = re.match('list\[(.*)\]', klass).group(1) - return [self.__deserialize(sub_data, sub_kls) - for sub_data in data] - - if klass.startswith('dict('): - sub_kls = re.match('dict\(([^,]*), (.*)\)', klass).group(2) - return {k: self.__deserialize(v, sub_kls) - for k, v in six.iteritems(data)} - - # convert str to class - if klass in self.NATIVE_TYPES_MAPPING: - klass = self.NATIVE_TYPES_MAPPING[klass] - else: - klass = getattr(fatcat_client.models, klass) - - if klass in self.PRIMITIVE_TYPES: - return self.__deserialize_primitive(data, klass) - elif klass == object: - return self.__deserialize_object(data) - elif klass == datetime.date: - return self.__deserialize_date(data) - elif klass == datetime.datetime: - return self.__deserialize_datatime(data) - else: - return self.__deserialize_model(data, klass) - - def call_api(self, resource_path, method, - path_params=None, query_params=None, header_params=None, - body=None, post_params=None, files=None, - response_type=None, auth_settings=None, async=None, - _return_http_data_only=None, collection_formats=None, - _preload_content=True, _request_timeout=None): - """Makes the HTTP request (synchronous) and returns deserialized data. - - To make an async request, set the async parameter. - - :param resource_path: Path to method endpoint. - :param method: Method to call. - :param path_params: Path parameters in the url. - :param query_params: Query parameters in the url. - :param header_params: Header parameters to be - placed in the request header. - :param body: Request body. - :param post_params dict: Request post form parameters, - for `application/x-www-form-urlencoded`, `multipart/form-data`. - :param auth_settings list: Auth Settings names for the request. - :param response: Response data type. - :param files dict: key -> filename, value -> filepath, - for `multipart/form-data`. - :param async bool: execute request asynchronously - :param _return_http_data_only: response data without head status code - and headers - :param collection_formats: dict of collection formats for path, query, - header, and post parameters. - :param _preload_content: if False, the urllib3.HTTPResponse object will - be returned without reading/decoding response - data. Default is True. - :param _request_timeout: timeout setting for this request. If one - number provided, it will be total request - timeout. It can also be a pair (tuple) of - (connection, read) timeouts. - :return: - If async parameter is True, - the request will be called asynchronously. - The method will return the request thread. - If parameter async is False or missing, - then the method will return the response directly. - """ - if not async: - return self.__call_api(resource_path, method, - path_params, query_params, header_params, - body, post_params, files, - response_type, auth_settings, - _return_http_data_only, collection_formats, - _preload_content, _request_timeout) - else: - thread = self.pool.apply_async(self.__call_api, (resource_path, - method, path_params, query_params, - header_params, body, - post_params, files, - response_type, auth_settings, - _return_http_data_only, - collection_formats, - _preload_content, _request_timeout)) - return thread - - def request(self, method, url, query_params=None, headers=None, - post_params=None, body=None, _preload_content=True, - _request_timeout=None): - """Makes the HTTP request using RESTClient.""" - if method == "GET": - return self.rest_client.GET(url, - query_params=query_params, - _preload_content=_preload_content, - _request_timeout=_request_timeout, - headers=headers) - elif method == "HEAD": - return self.rest_client.HEAD(url, - query_params=query_params, - _preload_content=_preload_content, - _request_timeout=_request_timeout, - headers=headers) - elif method == "OPTIONS": - return self.rest_client.OPTIONS(url, - query_params=query_params, - headers=headers, - post_params=post_params, - _preload_content=_preload_content, - _request_timeout=_request_timeout, - body=body) - elif method == "POST": - return self.rest_client.POST(url, - query_params=query_params, - headers=headers, - post_params=post_params, - _preload_content=_preload_content, - _request_timeout=_request_timeout, - body=body) - elif method == "PUT": - return self.rest_client.PUT(url, - query_params=query_params, - headers=headers, - post_params=post_params, - _preload_content=_preload_content, - _request_timeout=_request_timeout, - body=body) - elif method == "PATCH": - return self.rest_client.PATCH(url, - query_params=query_params, - headers=headers, - post_params=post_params, - _preload_content=_preload_content, - _request_timeout=_request_timeout, - body=body) - elif method == "DELETE": - return self.rest_client.DELETE(url, - query_params=query_params, - headers=headers, - _preload_content=_preload_content, - _request_timeout=_request_timeout, - body=body) - else: - raise ValueError( - "http method must be `GET`, `HEAD`, `OPTIONS`," - " `POST`, `PATCH`, `PUT` or `DELETE`." - ) - - def parameters_to_tuples(self, params, collection_formats): - """Get parameters as list of tuples, formatting collections. - - :param params: Parameters as dict or list of two-tuples - :param dict collection_formats: Parameter collection formats - :return: Parameters as list of tuples, collections formatted - """ - new_params = [] - if collection_formats is None: - collection_formats = {} - for k, v in six.iteritems(params) if isinstance(params, dict) else params: # noqa: E501 - if k in collection_formats: - collection_format = collection_formats[k] - if collection_format == 'multi': - new_params.extend((k, value) for value in v) - else: - if collection_format == 'ssv': - delimiter = ' ' - elif collection_format == 'tsv': - delimiter = '\t' - elif collection_format == 'pipes': - delimiter = '|' - else: # csv is the default - delimiter = ',' - new_params.append( - (k, delimiter.join(str(value) for value in v))) - else: - new_params.append((k, v)) - return new_params - - def prepare_post_parameters(self, post_params=None, files=None): - """Builds form parameters. - - :param post_params: Normal form parameters. - :param files: File parameters. - :return: Form parameters with files. - """ - params = [] - - if post_params: - params = post_params - - if files: - for k, v in six.iteritems(files): - if not v: - continue - file_names = v if type(v) is list else [v] - for n in file_names: - with open(n, 'rb') as f: - filename = os.path.basename(f.name) - filedata = f.read() - mimetype = (mimetypes.guess_type(filename)[0] or - 'application/octet-stream') - params.append( - tuple([k, tuple([filename, filedata, mimetype])])) - - return params - - def select_header_accept(self, accepts): - """Returns `Accept` based on an array of accepts provided. - - :param accepts: List of headers. - :return: Accept (e.g. application/json). - """ - if not accepts: - return - - accepts = [x.lower() for x in accepts] - - if 'application/json' in accepts: - return 'application/json' - else: - return ', '.join(accepts) - - def select_header_content_type(self, content_types): - """Returns `Content-Type` based on an array of content_types provided. - - :param content_types: List of content-types. - :return: Content-Type (e.g. application/json). - """ - if not content_types: - return 'application/json' - - content_types = [x.lower() for x in content_types] - - if 'application/json' in content_types or '*/*' in content_types: - return 'application/json' - else: - return content_types[0] - - def update_params_for_auth(self, headers, querys, auth_settings): - """Updates header and query params based on authentication setting. - - :param headers: Header parameters dict to be updated. - :param querys: Query parameters tuple list to be updated. - :param auth_settings: Authentication setting identifiers list. - """ - if not auth_settings: - return - - for auth in auth_settings: - auth_setting = self.configuration.auth_settings().get(auth) - if auth_setting: - if not auth_setting['value']: - continue - elif auth_setting['in'] == 'header': - headers[auth_setting['key']] = auth_setting['value'] - elif auth_setting['in'] == 'query': - querys.append((auth_setting['key'], auth_setting['value'])) - else: - raise ValueError( - 'Authentication token must be in `query` or `header`' - ) - - def __deserialize_file(self, response): - """Deserializes body to file - - Saves response body into a file in a temporary folder, - using the filename from the `Content-Disposition` header if provided. - - :param response: RESTResponse. - :return: file path. - """ - fd, path = tempfile.mkstemp(dir=self.configuration.temp_folder_path) - os.close(fd) - os.remove(path) - - content_disposition = response.getheader("Content-Disposition") - if content_disposition: - filename = re.search(r'filename=[\'"]?([^\'"\s]+)[\'"]?', - content_disposition).group(1) - path = os.path.join(os.path.dirname(path), filename) - - with open(path, "wb") as f: - f.write(response.data) - - return path - - def __deserialize_primitive(self, data, klass): - """Deserializes string to primitive type. - - :param data: str. - :param klass: class literal. - - :return: int, long, float, str, bool. - """ - try: - return klass(data) - except UnicodeEncodeError: - return six.u(data) - except TypeError: - return data - - def __deserialize_object(self, value): - """Return a original value. - - :return: object. - """ - return value - - def __deserialize_date(self, string): - """Deserializes string to date. - - :param string: str. - :return: date. - """ - try: - from dateutil.parser import parse - return parse(string).date() - except ImportError: - return string - except ValueError: - raise rest.ApiException( - status=0, - reason="Failed to parse `{0}` as date object".format(string) - ) - - def __deserialize_datatime(self, string): - """Deserializes string to datetime. - - The string should be in iso8601 datetime format. - - :param string: str. - :return: datetime. - """ - try: - from dateutil.parser import parse - return parse(string) - except ImportError: - return string - except ValueError: - raise rest.ApiException( - status=0, - reason=( - "Failed to parse `{0}` as datetime object" - .format(string) - ) - ) - - def __deserialize_model(self, data, klass): - """Deserializes list or dict to model. - - :param data: dict, list. - :param klass: class literal. - :return: model object. - """ - - if not klass.swagger_types and not hasattr(klass, - 'get_real_child_model'): - return data - - kwargs = {} - if klass.swagger_types is not None: - for attr, attr_type in six.iteritems(klass.swagger_types): - if (data is not None and - klass.attribute_map[attr] in data and - isinstance(data, (list, dict))): - value = data[klass.attribute_map[attr]] - kwargs[attr] = self.__deserialize(value, attr_type) - - instance = klass(**kwargs) - - if hasattr(instance, 'get_real_child_model'): - klass_name = instance.get_real_child_model(data) - if klass_name: - instance = self.__deserialize(data, klass_name) - return instance -- cgit v1.2.3