cvat/cvat-sdk/gen/templates/openapi-generator/api_client.mustache

1081 lines
45 KiB
Plaintext
Raw Permalink Normal View History

2025-09-16 01:19:40 +00:00
{{>partial_header}}
from __future__ import annotations
import json
import atexit
import mimetypes
import importlib
from multiprocessing.pool import ThreadPool
import io
import os
import re
import typing
from http.cookies import SimpleCookie
from urllib.parse import quote
from urllib3 import HTTPResponse
from urllib3.fields import RequestField
from urllib3.util import parse_url
{{#tornado}}
import tornado.gen
{{/tornado}}
from {{packageName}} import rest
from {{packageName}}.configuration import Configuration
from {{packageName}}.exceptions import ApiTypeError, ApiValueError, ApiException
from {{packageName}}.model_utils import (
ModelNormal,
ModelSimple,
ModelComposed,
check_allowed_values,
check_validations,
date,
datetime,
deserialize_file,
file_type,
model_to_dict,
none_type,
validate_and_convert_types,
to_json,
get_file_data_and_close_file,
)
from typing import TYPE_CHECKING
if TYPE_CHECKING:
# Enable introspection. Can't work normally due to cyclic imports
from {{packageName}}.apis import *
from {{packageName}}.models import *
class ApiClient(object):
"""Generic API client for OpenAPI client library builds.
OpenAPI generic API client. This client handles the client-
server communication, and is invariant across implementations. Specifics of
the methods and models for each application are generated from the OpenAPI
templates.
NOTE: This class is auto generated by OpenAPI Generator.
Ref: https://openapi-generator.tech
Do not edit the class manually.
Class members:
{{#apiInfo}}{{#apis}}
{{>api_name}}: {{classname}}{{/apis}}{{/apiInfo}}
"""
_pool = None
def __init__(self,
configuration: typing.Optional[Configuration] = None,
headers: typing.Optional[dict[str, str]] = None,
cookies: typing.Optional[dict[str, str]] = None,
pool_threads: int = 1):
"""
:param configuration: configuration object for this client
:param headers: header to include when making calls to the API
:param cookies: cookies to include when making calls to the API
:param pool_threads: The number of threads to use for async requests
to the API. More threads means more concurrent API requests.
"""
if configuration is None:
configuration = Configuration.get_default_copy()
self.configuration = configuration
self.pool_threads = pool_threads
self.rest_client = rest.RESTClientObject(configuration)
self.default_headers: dict[str, str] = headers or {}
self.cookies = SimpleCookie()
if cookies:
self.cookies.update(cookies)
# Set default User-Agent.
self.user_agent = '{{{httpUserAgent}}}{{^httpUserAgent}}OpenAPI-Generator/{{{packageVersion}}}/python{{/httpUserAgent}}'
def __enter__(self):
return self
def __exit__(self, exc_type, exc_value, traceback):
self.close()
def close(self):
if self._pool:
self._pool.close()
self._pool.join()
self._pool = None
if hasattr(atexit, 'unregister'):
atexit.unregister(self.close)
@property
def pool(self):
"""Create thread pool on first request
avoids instantiating unused threadpool for blocking clients.
"""
if self._pool is None:
atexit.register(self.close)
self._pool = ThreadPool(self.pool_threads)
return self._pool
@property
def user_agent(self):
"""User agent for this API client"""
return self.default_headers['User-Agent']
@user_agent.setter
def user_agent(self, value):
self.default_headers['User-Agent'] = value
def _serialize_post_parameter(self, obj):
if isinstance(obj, (str, int, float, bool)):
return str(obj)
elif isinstance(obj, io.IOBase):
return self._serialize_file(obj)
raise ApiValueError(
'Unable to prepare type {} for serialization'.format(
obj.__class__.__name__))
def _convert_body_to_post_params(self, body):
# body must be a flat structure, lists of primitives is possible
body = self.sanitize_for_serialization(body, read_files=False)
assert isinstance(body, dict), type(body)
post_params = []
for k, v in body.items():
if isinstance(v, (tuple, list)):
for i, entry in enumerate(v):
post_params.append((
f'{k}[{i}]',
self._serialize_post_parameter(entry)
))
else:
post_params.append((k, self._serialize_post_parameter(v)))
return post_params
def set_default_header(self, header_name, header_value):
self.default_headers[header_name] = header_value
{{#tornado}}
@tornado.gen.coroutine
{{/tornado}}
{{#asyncio}}async {{/asyncio}}def __call_api(
self,
resource_path: str,
method: str,
path_params: typing.Optional[dict[str, typing.Any]] = None,
query_params: typing.Optional[list[tuple[str, typing.Any]]] = None,
header_params: typing.Optional[dict[str, typing.Any]] = None,
body: typing.Optional[typing.Any] = None,
post_params: typing.Optional[list[tuple[str, typing.Any]]] = None,
files: typing.Optional[dict[str, list[io.IOBase]]] = None,
response_schema: typing.Optional[tuple[typing.Any]] = None,
auth_settings: typing.Optional[list[str]] = None,
collection_formats: typing.Optional[dict[str, str]] = None,
*,
_parse_response: bool = True,
_request_timeout: typing.Optional[typing.Union[int, float, tuple]] = None,
_host: typing.Optional[str] = None,
_check_type: typing.Optional[bool] = None,
_check_status: bool = True,
_request_auths: typing.Optional[list[dict[str, typing.Any]]] = None
):
config = self.configuration
# header parameters
header_params = header_params or {}
header_params.update(self.get_common_headers())
if header_params:
header_params = self.sanitize_for_serialization(header_params)
header_params = dict(self.parameters_to_tuples(header_params,
collection_formats))
# path parameters
if path_params:
path_params = self.sanitize_for_serialization(path_params)
path_params = self.parameters_to_tuples(path_params,
collection_formats)
for k, v in path_params:
# specified safe chars, encode everything
resource_path = resource_path.replace(
'{%s}' % k,
quote(str(v), safe=config.safe_chars_for_path_param)
)
# query parameters
if query_params:
query_params = self.sanitize_for_serialization(query_params)
query_params = self.parameters_to_tuples(query_params,
collection_formats)
# post parameters
post_params = post_params if post_params else []
if post_params or files:
post_params = self.sanitize_for_serialization(post_params)
post_params = self.parameters_to_tuples(post_params,
collection_formats)
post_params.extend(self.files_parameters(files))
if header_params.get('Content-Type', '').startswith("multipart"):
if body:
post_params.extend(self._convert_body_to_post_params(body))
body = None
if post_params:
post_params = self.parameters_to_multipart(post_params, (dict))
else:
# body
if body:
body = self.sanitize_for_serialization(body)
# request url
host = _host or self.configuration.host
if _host is None:
url = host + resource_path
else:
# use server/host defined in path or operation instead
url = host + resource_path
# auth setting
self.update_params_for_auth(header_params, query_params,
auth_settings, resource_path, method, body,
request_host=host, request_auths=_request_auths)
try:
# perform request and return response
response = {{#asyncio}}await {{/asyncio}}{{#tornado}}yield {{/tornado}}self.request(
method, url, query_params=query_params, headers=header_params,
post_params=post_params, body=body,
_parse_response=_parse_response,
_request_timeout=_request_timeout,
_check_status=_check_status)
except ApiException as e:
e.body = e.body.decode('utf-8')
raise e
self.last_response = response
return_data = None
if _parse_response:
self._update_cookies_from_response(response)
if response_schema:
return_data = self.deserialize(
response,
response_schema,
_check_type=_check_type
)
{{#tornado}}
raise tornado.gen.Return((return_data, response))
{{/tornado}}
{{^tornado}}
return (return_data, response)
{{/tornado}}
def get_common_headers(self) -> dict[str, str]:
"""
Returns a headers dict with all the required headers for requests
"""
headers = {}
headers.update(self.default_headers)
if self.cookies:
headers['Cookie'] = self.cookies.output(attrs=[], header="", sep=";").strip()
return headers
def _update_cookies_from_response(self, response: HTTPResponse):
self.cookies.update(SimpleCookie(response.headers.get("Set-Cookie")))
def parameters_to_multipart(self, params, collection_types):
"""Get parameters as list of tuples, formatting as json if value is collection_types
:param params: Parameters as list of two-tuples
:param dict collection_types: Parameter collection types
:return: Parameters as list of tuple or urllib3.fields.RequestField
"""
new_params = []
if collection_types is None:
collection_types = (dict)
for k, v in params.items() if isinstance(params, dict) else params: # noqa: E501
if isinstance(
v, collection_types): # v is instance of collection_type, formatting as application/json
v = json.dumps(v, ensure_ascii=False).encode("utf-8")
field = RequestField(k, v)
field.make_multipart(content_type="application/json; charset=utf-8")
new_params.append(field)
else:
new_params.append((k, v))
return new_params
@classmethod
def sanitize_for_serialization(cls, obj, *,
read_files: bool = True):
"""Prepares data for transmission before it is sent with the rest client
If obj is None, return None.
If obj is str, int, long, float, bool, return directly.
If obj is datetime.datetime, datetime.date
convert to string in iso8601 format.
If obj is list, sanitize each element in the list.
If obj is dict, return the dict.
If obj is OpenAPI model, return the properties dict.
If obj is io.IOBase, return the bytes
:param obj: The data to serialize.
:param read_files: Whether to read file data or leave files as is.
:return: The serialized form of data.
"""
return to_json(obj, read_files=read_files)
def deserialize(self, response: HTTPResponse, response_schema: tuple, *, _check_type: bool):
"""Deserializes response into an object.
:param response (urllib3.HTTPResponse): object to be deserialized.
:param response_schema: For the response, a tuple containing:
valid classes
a list containing valid classes (for list schemas)
a dict containing a tuple of valid classes as the value
Example values:
(str,)
(Pet,)
(float, none_type)
([int, none_type],)
({str: (bool, str, int, float, date, datetime, str, none_type)},)
:param _check_type (bool): whether to check the types of the data
received from the server
:return: deserialized object
"""
if response_schema == (file_type,):
# TODO: response schema can be "oneOf" with a file option,
# this implementation does not cover this.
# handle file downloading
# save response body into a tmp file and return the instance
content_disposition = response.headers.get("Content-Disposition")
return deserialize_file(response.data, self.configuration,
content_disposition=content_disposition)
encoding = "utf-8"
content_type = response.headers.get('content-type')
if content_type is not None:
match = re.search(r"charset=([a-zA-Z\-\d]+)[\s\;]?", content_type)
if match:
encoding = match.group(1)
response_data = response.data.decode(encoding)
# fetch data from response object
try:
received_data = json.loads(response_data)
except ValueError:
received_data = response_data
# store our data under the key of 'received_data' so users have some
# context if they are deserializing a string and the data type is wrong
deserialized_data = validate_and_convert_types(
received_data,
response_schema,
['received_data'],
True,
_check_type,
configuration=self.configuration
)
return deserialized_data
def call_api(
self,
resource_path: str,
method: str,
path_params: typing.Optional[dict[str, typing.Any]] = None,
query_params: typing.Optional[list[tuple[str, typing.Any]]] = None,
header_params: typing.Optional[dict[str, typing.Any]] = None,
body: typing.Optional[typing.Any] = None,
post_params: typing.Optional[list[tuple[str, typing.Any]]] = None,
files: typing.Optional[dict[str, list[io.IOBase]]] = None,
response_schema: typing.Optional[tuple[typing.Any]] = None,
auth_settings: typing.Optional[list[str]] = None,
collection_formats: typing.Optional[dict[str, str]] = None,
*,
_async_call: typing.Optional[bool] = None,
_parse_response: bool = True,
_request_timeout: typing.Optional[typing.Union[int, float, tuple]] = None,
_host: typing.Optional[str] = None,
_check_type: typing.Optional[bool] = None,
_request_auths: typing.Optional[list[dict[str, typing.Any]]] = None,
_check_status: bool = True,
):
"""Makes the HTTP request (synchronous) and returns deserialized data.
To make an _async_call request, set the _async_call parameter.
:param resource_path: Path to method endpoint.
:param method: Method to call.
:param path_params: Path parameters in the url.
:param query_params: Query parameters in the url.
:param header_params: Header parameters to be
placed in the request header.
:param body: Request body.
:param post_params dict: Request post form parameters,
for `application/x-www-form-urlencoded`, `multipart/form-data`.
:param auth_settings list: Auth Settings names for the request.
:param response_schema: For the response, a tuple containing:
valid classes
a list containing valid classes (for list schemas)
a dict containing a tuple of valid classes as the value
Example values:
(str,)
(Pet,)
(float, none_type)
([int, none_type],)
({str: (bool, str, int, float, date, datetime, str, none_type)},)
:param files: key -> field name, value -> a list of open file
objects for `multipart/form-data`.
:type files: dict
:param _async_call bool: execute request asynchronously
:type _async_call: bool, optional
:param collection_formats: dict of collection formats for path, query,
header, and post parameters.
:type collection_formats: dict, optional
:param _parse_response: if False, the urllib3.HTTPResponse object will
be returned without reading/decoding response
data. Response headers will not be
processed (cookies as well). Default is True.
:type _parse_response: bool, optional
:param _request_timeout: timeout setting for this request. If one
number provided, it will be total request
timeout. It can also be a pair (tuple) of
(connection, read) timeouts.
:param _check_type: boolean describing if the data back from the server
should have its type checked.
:type _check_type: bool, optional
:param _request_auths: set to override the auth_settings for an a single
request; this effectively ignores the authentication
in the spec for a single request.
:type _request_auths: list, optional
:return:
If _async_call parameter is True,
the request will be called asynchronously.
The method will return the request thread.
If parameter _async_call is False or missing,
then the method will return the response directly.
"""
params = {
"resource_path": resource_path,
"method": method,
"path_params": path_params,
"query_params": query_params,
"header_params": header_params,
"body": body,
"post_params": post_params,
"files": files,
"response_schema": response_schema,
"auth_settings": auth_settings,
"collection_formats": collection_formats,
"_parse_response": _parse_response,
"_request_timeout": _request_timeout,
"_host": _host,
"_check_type": _check_type,
"_request_auths": _request_auths,
"_check_status": _check_status,
}
if not _async_call:
return self.__call_api(**params)
return self.pool.apply_async(self.__call_api, (), kwds=params)
def request(
self,
method: str,
url: str,
query_params: typing.Optional[list[tuple[str, typing.Any]]] = None,
headers: typing.Optional[dict[str, typing.Any]] = None,
post_params: typing.Optional[typing.Union[list, dict]] = None,
body: typing.Optional[typing.Any] = None,
*,
_parse_response: bool = True,
_request_timeout: typing.Optional[typing.Union[float, tuple[float, float]]] = None,
_check_status: bool = True,
):
"""Makes the HTTP request using RESTClient."""
if method == "GET":
return self.rest_client.GET(url,
query_params=query_params,
_parse_response=_parse_response,
_request_timeout=_request_timeout,
headers=headers,
_check_status=_check_status)
elif method == "HEAD":
return self.rest_client.HEAD(url,
query_params=query_params,
_parse_response=_parse_response,
_request_timeout=_request_timeout,
headers=headers,
_check_status=_check_status)
elif method == "OPTIONS":
return self.rest_client.OPTIONS(url,
query_params=query_params,
headers=headers,
post_params=post_params,
_parse_response=_parse_response,
_request_timeout=_request_timeout,
_check_status=_check_status,
body=body)
elif method == "POST":
return self.rest_client.POST(url,
query_params=query_params,
headers=headers,
post_params=post_params,
_parse_response=_parse_response,
_request_timeout=_request_timeout,
_check_status=_check_status,
body=body)
elif method == "PUT":
return self.rest_client.PUT(url,
query_params=query_params,
headers=headers,
post_params=post_params,
_parse_response=_parse_response,
_request_timeout=_request_timeout,
_check_status=_check_status,
body=body)
elif method == "PATCH":
return self.rest_client.PATCH(url,
query_params=query_params,
headers=headers,
post_params=post_params,
_parse_response=_parse_response,
_request_timeout=_request_timeout,
_check_status=_check_status,
body=body)
elif method == "DELETE":
return self.rest_client.DELETE(url,
query_params=query_params,
headers=headers,
_parse_response=_parse_response,
_request_timeout=_request_timeout,
_check_status=_check_status,
body=body)
else:
raise ApiValueError(
"http method must be `GET`, `HEAD`, `OPTIONS`,"
" `POST`, `PATCH`, `PUT` or `DELETE`."
)
def parameters_to_tuples(self, params, collection_formats):
"""Get parameters as list of tuples, formatting collections.
:param params: Parameters as dict or list of two-tuples
:param dict collection_formats: Parameter collection formats
:return: Parameters as list of tuples, collections formatted
"""
new_params = []
if collection_formats is None:
collection_formats = {}
for k, v in params.items() if isinstance(params, dict) else params: # noqa: E501
if k in collection_formats:
collection_format = collection_formats[k]
if collection_format == 'multi':
new_params.extend((k, value) for value in v)
else:
if collection_format == 'ssv':
delimiter = ' '
elif collection_format == 'tsv':
delimiter = '\t'
elif collection_format == 'pipes':
delimiter = '|'
else: # csv is the default
delimiter = ','
new_params.append(
(k, delimiter.join(str(value) for value in v)))
else:
new_params.append((k, v))
return new_params
def _serialize_file(self, file_instance: io.IOBase) -> tuple[str, typing.Union[str, bytes], str]:
if file_instance.closed is True:
raise ApiValueError("Cannot read a closed file.")
filename = os.path.basename(file_instance.name)
filedata = get_file_data_and_close_file(file_instance)
mimetype = mimetypes.guess_type(filename)[0] or 'application/octet-stream'
return filename, filedata, mimetype
def files_parameters(self,
files: typing.Optional[dict[str, list[io.IOBase]]] = None):
"""Builds form parameters.
:param files: None or a dict with key=param_name and
value is a list of open file objects
:return: List of tuples of form parameters with file data
"""
if files is None:
return []
params = []
for param_name, file_instances in files.items():
if file_instances is None:
# if the file field is nullable, skip None values
continue
for file_instance in file_instances:
if file_instance is None:
# if the file field is nullable, skip None values
continue
try:
params.append((param_name, self._serialize_file(file_instance)))
except ApiValueError as e:
raise ApiValueError("The passed in file_type "
"for %s must be open." % param_name) from e
return params
def select_header_accept(self, accepts):
"""Returns `Accept` based on an array of accepts provided.
:param accepts: List of headers.
:return: Accept (e.g. application/json).
"""
if not accepts:
return
accepts = [x.lower() for x in accepts]
if 'application/json' in accepts:
return 'application/json'
else:
return ', '.join(accepts)
def select_header_content_type(self, content_types, method=None, body=None):
"""Returns `Content-Type` based on an array of content_types provided.
:param content_types: List of content-types.
:param method: http method (e.g. POST, PATCH).
:param body: http body to send.
:return: Content-Type (e.g. application/json).
"""
if not content_types:
return None
content_types = [x.lower() for x in content_types]
if (method == 'PATCH' and
'application/json-patch+json' in content_types and
isinstance(body, list)):
return 'application/json-patch+json'
if 'application/json' in content_types or '*/*' in content_types:
return 'application/json'
else:
return content_types[0]
def build_origin_header(self, request_host: typing.Optional[str] = None) -> str:
if request_host is None:
request_host = self.configuration.host
parsed_url = parse_url(request_host)
origin = f"{parsed_url.scheme}://{parsed_url.host}"
if parsed_url.port is not None and not (
parsed_url.scheme == "http" and parsed_url.port == 80 or
parsed_url.scheme == "https" and parsed_url.port == 443
):
origin = f"{origin}:{parsed_url.port}"
return origin
def update_params_for_auth(
self,
headers: dict[str, str],
queries: list[tuple[str, typing.Any]],
auth_settings: typing.Optional[typing.Iterable[str]] = None,
resource_path: typing.Optional[str] = None,
method: str = "GET",
body: typing.Optional[typing.Any] = None,
request_host: typing.Optional[str] = None,
request_auths: typing.Optional[list[dict[str, typing.Any]]] = None,
):
"""Updates header and query params based on authentication settings.
:param headers: Headers dict to be updated.
:param queries: Query parameters tuple list to be updated.
:param auth_settings: Names of the authentication settings from the configuration
to use for the request. Can be overridden by passing "request_auths".
Different resources can have different authentication requirements
(e.g. allow unauthenticated requests or require a specific authentication token).
Passing "None" means use any configured authentication from the configuration.
:param resource_path: A string representation of the HTTP request resource path.
:param method: A string representation of the HTTP request method.
:param body: A object representing the body of the HTTP request.
The object type is the return value of _encoder.default().
:param request_host: request host[:port]
:param request_auths: The authentication settings to use instead of the configured ones.
"""
if request_auths:
for auth_setting in request_auths:
self._apply_auth_params(headers, queries, resource_path, method, body, auth_setting)
return
if auth_settings is None:
auth_settings = self.configuration.auth_settings()
for auth_key in auth_settings:
auth_setting = self.configuration.auth_settings().get(auth_key)
if auth_setting:
self._apply_auth_params(headers, queries, resource_path, method, body, auth_setting)
# A custom extension for CSRF HTTPS headers - add related headers automatically
if auth_key in ('csrfAuth', 'csrfHeaderAuth') and 'Origin' not in headers:
headers['Origin'] = self.build_origin_header(request_host)
def _apply_auth_params(self, headers, queries, resource_path, method, body, auth_setting):
if auth_setting['in'] == 'cookie':
if headers.setdefault('Cookie', ''):
headers['Cookie'] += '; '
headers['Cookie'] += auth_setting['key'] + "=" + auth_setting['value']
elif auth_setting['in'] == 'header':
if auth_setting['type'] != 'http-signature':
headers[auth_setting['key']] = auth_setting['value']
{{#hasHttpSignatureMethods}}
else:
# The HTTP signature scheme requires multiple HTTP headers
# that are calculated dynamically.
signing_info = self.configuration.signing_info
auth_headers = signing_info.get_http_signature_headers(
resource_path, method, headers, body, queries)
headers.update(auth_headers)
{{/hasHttpSignatureMethods}}
elif auth_setting['in'] == 'query':
queries.append((auth_setting['key'], auth_setting['value']))
else:
raise ApiValueError(
'Authentication token must be in `query` or `header`'
)
{{#apiInfo}}{{#apis}}
{{>api_name}}: '{{classname}}'{{/apis}}{{/apiInfo}}
_apis: dict[str, object] = { {{#apiInfo}}{{#apis}}
'{{>api_name}}': [None, '{{classname}}'],{{/apis}}{{/apiInfo}}
}
def _make_api_instance(self, klass_name):
package = __name__.rsplit('.', maxsplit=1)[0]
module = importlib.import_module(package + '.apis')
api_klass = getattr(module, klass_name)
return api_klass(self)
def __getattr__(self, key):
notfound = object()
api_instance, api_klassname = self._apis.get(key, notfound)
if api_instance is notfound:
raise AttributeError(f"Can't find the '{key}' attribute")
if api_instance is None:
api_instance = self._make_api_instance(api_klassname)
setattr(self, key, api_instance)
return api_instance
class Endpoint(object):
def __init__(self,
settings: typing.Optional[dict[str, typing.Any]] = None,
params_map: typing.Optional[dict[str, typing.Any]] = None,
root_map: typing.Optional[dict[str, typing.Any]] = None,
headers_map: typing.Optional[dict[str, typing.Any]] = None,
api_client: typing.Optional[ApiClient] = None
):
"""Creates an endpoint
Args:
settings (dict): see below key value pairs
'response_schema' (tuple/None): response type
'auth' (list): a list of auth type keys
'endpoint_path' (str): the endpoint path
'operation_id' (str): endpoint string identifier
'http_method' (str): POST/PUT/PATCH/GET etc
'servers' (list): list of str servers that this endpoint is at
params_map (dict): see below key value pairs
'all' (list): list of str endpoint parameter names
'required' (list): list of required parameter names
'nullable' (list): list of nullable parameter names
'enum' (list): list of parameters with enum values
'validation' (list): list of parameters with validations
root_map (dict):
'validations' (dict): the dict mapping endpoint parameter tuple
paths to their validation dictionaries
'allowed_values' (dict): the dict mapping endpoint parameter
tuple paths to their allowed_values (enum) dictionaries
'openapi_types' (dict): param_name to openapi type
'attribute_map' (dict): param_name to camelCase name
'location_map' (dict): param_name to 'body', 'file', 'form',
'header', 'path', 'query'
collection_format_map (dict): param_name to `csv` etc.
headers_map (dict): see below key value pairs
'accept' (list): list of Accept header strings
'content_type' (list): list of Content-Type header strings
api_client (ApiClient) api client instance
"""
self.settings = settings
self.params_map = params_map
self.params_map['all'].extend([
'_async_call',
'_host_index',
'_parse_response',
'_request_timeout',
'_validate_inputs',
'_validate_outputs',
'_check_status',
'_content_type',
'_spec_property_naming',
'_request_auths'
])
self.params_map['nullable'].extend(['_request_timeout'])
self.validations = root_map['validations']
self.allowed_values = root_map['allowed_values']
self.openapi_types = root_map['openapi_types']
extra_types = {
'_async_call': (bool,),
'_host_index': (none_type, int),
'_parse_response': (bool,),
'_request_timeout': (none_type, float, (float,), [float], int, (int,), [int]),
'_validate_inputs': (bool,),
'_validate_outputs': (bool,),
'_check_status': (bool,),
'_spec_property_naming': (bool,),
'_content_type': (none_type, str),
'_request_auths': (none_type, list)
}
self.openapi_types.update(extra_types)
self.attribute_map = root_map['attribute_map']
self.location_map = root_map['location_map']
self.collection_format_map = root_map['collection_format_map']
self.headers_map = headers_map
self.api_client = api_client
@property
def path(self) -> str:
return self.settings['endpoint_path']
def __validate_inputs(self, kwargs):
for param in self.params_map['enum']:
if param in kwargs:
check_allowed_values(
self.allowed_values,
(param,),
kwargs[param]
)
for param in self.params_map['validation']:
if param in kwargs:
check_validations(
self.validations,
(param,),
kwargs[param],
configuration=self.api_client.configuration
)
if kwargs['_validate_inputs'] is False:
return
for key, value in kwargs.items():
fixed_val = validate_and_convert_types(
value,
self.openapi_types[key],
[key],
kwargs['_spec_property_naming'],
kwargs['_validate_inputs'],
configuration=self.api_client.configuration
)
kwargs[key] = fixed_val
def __gather_params(self, kwargs):
params = {
'body': None,
'collection_format': {},
'file': {},
'form': [],
'header': {},
'path': {},
'query': []
}
for param_name, param_value in kwargs.items():
param_location = self.location_map.get(param_name)
if param_location is None:
continue
if param_location:
if param_location == 'body':
params['body'] = param_value
continue
base_name = self.attribute_map[param_name]
if (param_location == 'form' and
self.openapi_types[param_name] == (file_type,)):
params['file'][base_name] = [param_value]
elif (param_location == 'form' and
self.openapi_types[param_name] == ([file_type],)):
# param_value is already a list
params['file'][base_name] = param_value
elif param_location in {'form', 'query'}:
param_value_full = (base_name, param_value)
params[param_location].append(param_value_full)
if param_location not in {'form', 'query'}:
params[param_location][base_name] = param_value
collection_format = self.collection_format_map.get(param_name)
if collection_format:
params['collection_format'][base_name] = collection_format
return params
def call_with_http_info(self,
_parse_response: bool = True,
_request_timeout: typing.Union[int, float, tuple] = None,
_validate_inputs: bool = True,
_validate_outputs: bool = True,
_check_status: bool = True,
_spec_property_naming: bool = False,
_content_type: typing.Optional[str] = None,
_host_index: typing.Optional[int] = None,
_request_auths: typing.Optional[list] = None,
_async_call: bool = False,
**kwargs) -> tuple[typing.Optional[typing.Any], HTTPResponse]:
"""
Keyword Args:
endpoint args
_parse_response (bool): if False, the response data will not be parsed,
None is returned for data.
Default is True.
_request_timeout (int/float/tuple): timeout setting for this request. If
one number provided, it will be total request timeout. It can also
be a pair (tuple) of (connection, read) timeouts.
Default is None.
_validate_inputs (bool): specifies if type checking
should be done one the data sent to the server.
Default is True.
_validate_outputs (bool): specifies if type checking
should be done one the data received from the server.
Default is True.
_check_status (bool): whether to check response status
for being positive or not.
Default is True
_spec_property_naming (bool): True if the variable names in the input data
are serialized names, as specified in the OpenAPI document.
False if the variable names in the input data
are pythonic names, e.g. snake case (default)
_content_type (str/None): force body content-type.
Default is None and content-type will be predicted by allowed
content-types and body.
_host_index (int/None): specifies the index of the server
that we want to use.
Default is read from the configuration.
_request_auths (list): set to override the auth_settings for an a single
request; this effectively ignores the authentication
in the spec for a single request.
Default is None
_async_call (bool): execute request asynchronously
Returns:
(parsed_model, response)
If the method is called asynchronously, returns the request
thread.
"""
kwargs['_parse_response'] = _parse_response
kwargs['_request_timeout'] = _request_timeout
kwargs['_validate_inputs'] = _validate_inputs
kwargs['_validate_outputs'] = _validate_outputs
kwargs['_check_status'] = _check_status
kwargs['_spec_property_naming'] = _spec_property_naming
kwargs['_content_type'] = _content_type
kwargs['_host_index'] = _host_index
kwargs['_request_auths'] = _request_auths
kwargs['_async_call'] = _async_call
try:
index = self.api_client.configuration.server_operation_index.get(
self.settings['operation_id'], self.api_client.configuration.server_index
) if kwargs['_host_index'] is None else kwargs['_host_index']
server_variables = self.api_client.configuration.server_operation_variables.get(
self.settings['operation_id'], self.api_client.configuration.server_variables
)
_host = self.api_client.configuration.get_host_from_settings(
index, variables=server_variables, servers=self.settings['servers']
)
except IndexError:
if self.settings['servers']:
raise ApiValueError(
"Invalid host index. Must be 0 <= index < %s" %
len(self.settings['servers'])
)
_host = None
for key, value in kwargs.items():
if key not in self.params_map['all']:
raise ApiTypeError(
"Got an unexpected parameter '%s'"
" to method `%s`" %
(key, self.settings['operation_id'])
)
# only throw this nullable ApiValueError if _validate_inputs
# is False, if _validate_inputs==True we catch this case
# in self.__validate_inputs
if (key not in self.params_map['nullable'] and value is None
and kwargs['_validate_inputs'] is False):
raise ApiValueError(
"Value may not be None for non-nullable parameter `%s`"
" when calling `%s`" %
(key, self.settings['operation_id'])
)
for key in self.params_map['required']:
if key not in kwargs.keys():
raise ApiValueError(
"Missing the required parameter `%s` when calling "
"`%s`" % (key, self.settings['operation_id'])
)
self.__validate_inputs(kwargs)
params = self.__gather_params(kwargs)
accept_headers_list = self.headers_map['accept']
if accept_headers_list:
params['header']['Accept'] = self.api_client.select_header_accept(
accept_headers_list)
if kwargs.get('_content_type'):
params['header']['Content-Type'] = kwargs['_content_type']
else:
content_type_headers_list = self.headers_map['content_type']
if content_type_headers_list:
if params['body'] != "":
content_types_list = self.api_client.select_header_content_type(
content_type_headers_list, self.settings['http_method'],
params['body'])
if content_types_list:
params['header']['Content-Type'] = content_types_list
return self.api_client.call_api(
self.settings['endpoint_path'], self.settings['http_method'],
params['path'],
params['query'],
params['header'],
body=params['body'],
post_params=params['form'],
files=params['file'],
response_schema=self.settings['response_schema'],
auth_settings=self.settings['auth'],
_async_call=kwargs['_async_call'],
_check_type=kwargs['_validate_outputs'],
_check_status=kwargs['_check_status'],
_parse_response=kwargs['_parse_response'],
_request_timeout=kwargs['_request_timeout'],
_host=_host,
_request_auths=kwargs['_request_auths'],
collection_formats=params['collection_format'])