Source code for cf_api

from __future__ import print_function
from six import string_types
import os
import re
import time
import traceback
from cf_api import exceptions as exc
from requests_factory import Request
from requests_factory import Response
from requests_factory import RequestFactory
from requests_factory import WebSocket

if 'true' == os.getenv('CF_API_DEBUG', ''):
    import sys
    sys.path.insert(0, os.getcwd())

try:
    from urllib.parse import urlparse
    from urllib.parse import urlencode
    from urllib.parse import parse_qs
except ImportError:
    from urlparse import urlparse
    from urllib import urlencode
    from urlparse import parse_qs

try:
    import jwt
    from jwt.contrib.algorithms.pycrypto import RSAAlgorithm
    jwt.register_algorithm('RS256', RSAAlgorithm(RSAAlgorithm.SHA256))
except ValueError as e:
    if str(e) != 'Algorithm already has a handler.':
        print(traceback.format_exc())
        raise e


def _print_deprecated_message(from_name, to_name):
    print('{0} is deprecated. Please migrate to {1}'
          .format(from_name, to_name))


def _get_default_var(val, env):
    return os.getenv(env) if val is None else val


[docs]class CloudControllerRequest(Request): """Encapsulates a request to the Cloud Controller API """ def __init__(self, factory=None): super(CloudControllerRequest, self).__init__(factory) def response_class(r): return CloudControllerResponse(r).set_factory(factory) self.set_response_class(response_class)
[docs] def get_by_name(self, value, name='name'): """Executes the list entities function searching for the given name Returns: CloudControllerResponse """ return self.search(name, value)
[docs] def search(self, *qlist, **qdict): """Sets the ``q`` query parameter used in listing most entities and executes the request Args: *qlist (tuple[str]): every 2 strings will be joined with a ':' as per the search format and each set to a ``q`` param. If only 1 string is passed, it will be assumed to contain a ':' and will be set in the ``q`` param directly. **qdict (dict): sets any additional query parameters Returns: CloudControllerResponse """ if len(qlist) == 0 and len(qdict) == 0: raise exc.InvalidArgsException('No search query was given', 500) if len(qlist) == 1: return self.set_query(q=qlist[0], **qdict).get() else: qlist = [str(s) for s in qlist] qlist = [('q', ':'.join(qlist[i:i+2])) for i in range(0, len(qlist), 2)] return self.set_query(*qlist, **qdict).get()
[docs]class CloudControllerResponse(Response): """Encapsulates a response from the Cloud Controller API """ _factory = None _resource_class = None def set_factory(self, factory): self._factory = factory self._resource_class = Resource return self def set_resource_class(self, resource_class): self._resource_class = resource_class return self @property def error_message(self): """Extracts the error message from this response """ if not self.has_error: return None if isinstance(self._response_parsed, dict) and \ 'error_description' in self._response_parsed: return self._response_parsed['error_description'] return self._response_text @property def error_code(self): """Extracts the error code from this response """ if not self.has_error: return None if isinstance(self._response_parsed, dict) and \ 'error_code' in self._response_parsed: return self._response_parsed['error_code'] return self._response_text @property def resources(self): """Attempts to parse the response as a resource list Returns: list[Resource] """ return [self._resource_class(r).set_factory(self._factory) for r in self.data['resources']] @property def resource(self): """If the response is an array this gets the first item resource, else return the raw response Returns: Resource """ if 'resources' in self.data: if len(self.data['resources']) == 0: return None else: return self._resource_class(self.data['resources'][0]) \ .set_factory(self._factory) else: return self._resource_class(self.data).set_factory(self._factory)
[docs]class Resource(dict): """Provides shortcuts to the most commonly used fields of Cloud Controller resource objects. """ _factory = None def __init__(self, response, factory=None): super(Resource, self).__init__() self._factory = factory for n, v in response.items(): self[n] = v def set_factory(self, factory): self._factory = factory return self def __getattr__(self, item): return self['entity'].get(item, None) @property def guid(self): """Shortcut to ``metadata.guid`` """ return self['metadata']['guid'] @property def name(self): """Shortcut to ``entity.name`` """ return self['entity']['name'] @property def state(self): return self['entity']['state'] @property def status(self): return self['entity']['status'] @property def label(self): return self['entity']['label'] @property def space_guid(self): """Shortcut to ``entity.space_guid`` """ return self['entity']['space_guid'] @property def org_guid(self): """Shortcut to ``entity.organization_guid`` """ return self['entity']['organization_guid'] @property def service_plan_guid(self): return self['entity']['service_plan_guid'] @property def spaces_url(self): """Shortcut to ``entity.spaces_url`` """ return self['entity']['spaces_url'] @property def routes_url(self): """Shortcut to ``entity.routes_url`` """ return self['entity']['routes_url'] @property def stack_url(self): return self['entity']['stack_url'] @property def service_bindings_url(self): return self['entity']['service_bindings_url'] @property def apps_url(self): return self['entity']['apps_url'] @property def service_instances_url(self): return self['entity']['service_instances_url'] @property def organization_url(self): """Shortcut to ``entity.organization_url`` """ return self['entity']['organization_url'] def space(self): return self._factory.request(self.space_url).get() def apps(self): return self._factory.request(self.apps_url).get() def spaces(self): return self._factory.request(self.spaces_url).get() def routes(self): return self._factory.request(self.routes_url).get() def orgs(self): return self._factory.request(self.organization_url).get() def service_instances(self): return self._factory.request(self.service_instances_url).get()
[docs]class CloudController(RequestFactory): """Provides base endpoints for building Cloud Controller requests Attributes: uaa (UAA): UAA instance that was used to authenticate. doppler (Doppler): Doppler instance that may be used to access logs. ssh_proxy (SSHProxy): SSH Proxy instance that may be used to access an app instance shell. version (int): Cloud Controller API version to be used when making requests info (CFInfo): CFInfo instance containing CF service entries for various internal services (i.e. Cloud Controller, UAA, Doppler, etc.) """ uaa = None doppler = None ssh_proxy = None version = 2 info = None _last_refresh_time = 0 def __init__(self, base_url): """Creates a new instance of the CloudController Args: base_url (str): The base URL of the Cloud Controller API """ super(CloudController, self).__init__() self.set_base_url(base_url)\ .application_json()\ .set_request_class(CloudControllerRequest)\ .set_response_class(CloudControllerResponse)
[docs] def new_doppler(self): """Creates a new, authenticated Doppler instance Returns: Doppler """ return new_doppler( base_url=self.info.doppler_url, verify_ssl=self.verify_ssl, access_token=self.uaa.get_access_token().to_string() )
[docs] def set_info(self, info): """Sets the API info object Args: info (CFInfo): API info data """ self.info = info return self
[docs] def set_version(self, version): """Sets the API version to be use when creating requests Args: version (int): API version number """ self.version = version return self
[docs] def set_uaa(self, uaa): """Sets the internal UAA client Args: uaa (UAA): an initialized instance of UAA class Returns: self (CloudController) """ self.uaa = uaa return self
[docs] def set_doppler(self, doppler): """Sets the internal doppler client Args: doppler (Doppler) Returns: self (CloudController) """ self.doppler = doppler return self
[docs] def set_ssh_proxy(self, ssh_proxy): """Sets the internal ssh proxy configuration Args: ssh (SSHProxy) Returns: self (CloudController) """ self.ssh_proxy = ssh_proxy return self
[docs] def update_tokens(self, res): """This method accepts a token response object from the UAA server. Note that the ``res.data`` dict must contain at least ``access_token``. Args: res (CloudControllerResponse) """ if self.uaa: self.uaa.update_tokens(res) if res.has_error: res.raise_error() self.set_bearer_auth(res.data['access_token'])
[docs] def refresh_tokens(self, force=False): """This method will refresh the internal access token based on the expiration time encapsulated in the current token. Args: force (bool): Default is False. If True, then the tokens will be refreshed regardless of whether they've expired Returns: self (CloudController) """ if not self.uaa: raise exc.InvalidStateException( 'UAA is not initialized. UAA is required to refresh tokens!', 500) if force or self.should_refresh(): res = self.uaa.refresh_token() self.update_tokens(res) print('updated tokens') return self
[docs] def set_refresh_tokens_callback(self, callback=None): """This method sets the internal request callback to refresh the tokens when they expire. This is a convenience method and is not set by default. Args: callback (callable): An optional callback that will be invoked on every request to the Cloud Controller and will supply three required arguments: req (instanceof Request): the request to be executed cc (CloudController): the cloud controller object did_refresh (bool): whether the tokens were updated .. note:: Note that this callback is called after the tokens are updated internally. Returns: self (CloudController) """ def _refresh_token(req, cc): did_refresh = cc.should_refresh() cc.refresh_tokens() req.set_bearer_auth(cc.uaa.get_access_token().to_string()) if callable(callback): callback(req, cc, did_refresh) self.set_callback(_refresh_token, self) return self
[docs] def should_refresh(self): """Determines whether the internal refresh_interval has passed Returns: bool """ return self.uaa.get_access_token().is_expired
[docs] def get_all_resources(self, req): """Recursively gets all the resources specified by the request. Do not execute ``req.get()`` before passing it in. This implementation will call ``req.get()`` and continue calling ``next_url`` from the response and collect the results until the response has no ``next_url``. Args: req (Request): a user prepared request that will be used to execute and collect all results Returns: list[Resource]: collected resources in a single list """ res = req.get() results = [] while True: results.extend(res.resources) next_url = res.data.get('next_url', None) if not next_url: break res = self.request(next_url).get() return results
[docs] def request(self, *urls, **kwargs): """Creates a new CloudControllerRequest object with the API version prepended to the url segments specified. Args: *urls: list of URL segments **kwargs: supported params include v (int): version to use on this request (i.e. 2, 3). Defaults to the internal ``self.version`` Returns: CloudControllerRequest """ version = kwargs.get('v', self.version) if version is None or \ (len(urls) > 0 and re.match('^/?v\d+/', urls[0])): return super(CloudController, self).request(*urls) else: version_url = ''.join(['v', str(version)]) return super(CloudController, self).request(version_url, *urls)
[docs] def apps(self, *args): """Convenience function passing the ``apps`` url segment. Args: *args: url segments that will be appended after ``apps`` Returns: CloudControllerRequest """ return self.request('apps', *args)
[docs] def app_usage_events(self, *args): """Convenience function passing the ``app_usage_events`` url segment. Args: *args: url segments that will be appended after ``app_usage_events`` Returns: CloudControllerRequest """ return self.request('app_usage_events', *args)
[docs] def service_instances(self, *args): """Convenience function passing the ``service_instances`` url segment. Args: *args: url segments that will be appended after ``service_instances`` Returns: CloudControllerRequest """ return self.request('service_instances', *args)
[docs] def services(self, *args): """Convenience function passing the ``services`` url segment. Args: *args: url segments that will be appended after ``services`` Returns: CloudControllerRequest """ return self.request('services', *args)
[docs] def blobstores(self, *args): """Convenience function passing the ``blobstores`` url segment. Args: *args: url segments that will be appended after ``blobstores`` Returns: CloudControllerRequest """ return self.request('blobstores', *args)
[docs] def buildpacks(self, *args): """Convenience function passing the ``buildpacks`` url segment. Args: *args: url segments that will be appended after ``buildpacks`` Returns: CloudControllerRequest """ return self.request('buildpacks', *args)
[docs] def events(self, *args): """Convenience function passing the ``events`` url segment. Args: *args: url segments that will be appended after ``events`` Returns: CloudControllerRequest """ return self.request('events', *args)
[docs] def quota_definitions(self, *args): """Convenience function passing the ``quota_definitions`` url segment. Args: *args: url segments that will be appended after ``quota_definitions`` Returns: CloudControllerRequest """ return self.request('quota_definitions', *args)
[docs] def organizations(self, *args): """Convenience function passing the ``organizations`` url segment. Args: *args: url segments that will be appended after ``organizations`` Returns: CloudControllerRequest """ return self.request('organizations', *args)
[docs] def private_domains(self, *args): """Convenience function passing the ``private_domains`` url segment. Args: *args: url segments that will be appended after ``private_domains`` Returns: CloudControllerRequest """ return self.request('private_domains', *args)
[docs] def routes(self, *args): """Convenience function passing the ``routes`` url segment. Args: *args: url segments that will be appended after ``routes`` Returns: CloudControllerRequest """ return self.request('routes', *args)
[docs] def security_groups(self, *args): """Convenience function passing the ``security_groups`` url segment. Args: *args: url segments that will be appended after ``security_groups`` Returns: CloudControllerRequest """ return self.request('security_groups', *args)
[docs] def service_bindings(self, *args): """Convenience function passing the ``service_bindings`` url segment. Args: *args: url segments that will be appended after ``service_bindings`` Returns: CloudControllerRequest """ return self.request('service_bindings', *args)
[docs] def service_brokers(self, *args): """Convenience function passing the ``service_brokers`` url segment. Args: *args: url segments that will be appended after ``service_brokers`` Returns: CloudControllerRequest """ return self.request('service_brokers', *args)
[docs] def service_plan_visibilities(self, *args): """Convenience function passing the ``service_plan_visibilities`` url segment. Args: *args: url segments that will be appended after ``service_plan_visibilities`` Returns: CloudControllerRequest """ return self.request('service_plan_visibilities', *args)
[docs] def service_plans(self, *args): """Convenience function passing the ``service_plans`` url segment. Args: *args: url segments that will be appended after ``service_plans`` Returns: CloudControllerRequest """ return self.request('service_plans', *args)
[docs] def shared_domains(self, *args): """Convenience function passing the ``shared_domains`` url segment. Args: *args: url segments that will be appended after ``shared_domains`` Returns: CloudControllerRequest """ return self.request('shared_domains', *args)
[docs] def space_quota_definitions(self, *args): """Convenience function passing the ``space_quota_definitions`` url segment. Args: *args: url segments that will be appended after ``space_quota_definitions`` Returns: CloudControllerRequest """ return self.request('space_quota_definitions', *args)
[docs] def spaces(self, *args): """Convenience function passing the ``spaces`` url segment. Args: *args: url segments that will be appended after ``spaces`` Returns: CloudControllerRequest """ return self.request('spaces', *args)
[docs] def stacks(self, *args): """Convenience function passing the ``stacks`` url segment. Args: *args: url segments that will be appended after ``stacks`` Returns: CloudControllerRequest """ return self.request('stacks', *args)
[docs] def users(self, *args): """Convenience function passing the ``users`` url segment. Args: *args: url segments that will be appended after ``users`` Returns: CloudControllerRequest """ return self.request('users', *args)
[docs] def resource_match(self, *args): """Convenience function passing the ``resource_match`` url segment. Args: *args: url segments that will be appended after ``resource_match`` Returns: CloudControllerRequest """ return self.request('resource_match', *args)
@classmethod def new_instance(cls, **kwargs): return new_cloud_controller(cloud_controller_class=cls, **kwargs)
[docs]class UAA(RequestFactory): """Provides base functions for building UAA requests """ _access_token = None _refresh_token = None _client_id = None _client_secret = None def __init__(self, base_url, client_id=None, client_secret=None): super(UAA, self).__init__() self.accept_json() self.set_base_url(base_url) self.set_client_credentials(client_id, client_secret) @property def access_token(self): """Deprecated. **DO NOT USE!** """ _print_deprecated_message('UAA.access_token', 'UAA.get_access_token()') return self._access_token @property def client_id(self): """Get the internal client ID Returns: str """ return self._client_id @property def client_secret(self): """Get the internal client secret Returns: str """ return self._client_secret
[docs] def get_access_token(self): """Get the internal access token Returns: JWT """ return self._access_token
[docs] def get_refresh_token(self): """Get the internal refresh token Returns: JWT """ return self._refresh_token
[docs] def set_client_credentials(self, client_id, client_secret, set_basic_auth=False): """Set the internal client ID and secret Args: client_id (str): UAA client id client_secret (str): UAA client secret set_basic_auth (bool): if true, this will set the client ID and secret in the Authorization as basic auth """ self._client_id = client_id self._client_secret = client_secret if self._client_id is not None and \ self._client_secret is not None and \ set_basic_auth: self.set_basic_auth(client_id, client_secret) return self
[docs] def set_access_token(self, access_token): """Set the internal access token Args: access_token (str) Returns: UAA """ if not isinstance(access_token, JWT): access_token = JWT(access_token) self._access_token = access_token return self
[docs] def set_refresh_token(self, refresh_token): """Set the internal refresh token Args: refresh_token (str) Returns: UAA """ if not isinstance(refresh_token, JWT): refresh_token = JWT(refresh_token) self._refresh_token = refresh_token return self
[docs] def update_tokens(self, res): """Accepts a response object from the UAA Token Authorization API and updates this object with access_token and response_token. Args: res: Expects res.data['access_token'] and (optionally) res.data['refresh_token'] to be set. """ if res.has_error: res.raise_error() self.set_access_token(res.data['access_token']) if 'refresh_token' in res.data: self.set_refresh_token(res.data['refresh_token'])
[docs] def with_authorization(self): """Deprecated. **DO NOT USE!** """ _print_deprecated_message('uaa.with_authorization()', 'uaa.client_credentials()') return self.client_credentials()
[docs] def client_credentials(self): """Sends a request for client_credentials grant_type Request parameters:: POST /oauth/token Accept: application/json response_type = 'token', grant_type = 'client_credentials', Returns: Response """ return self.request('oauth/token').accept_json().set_params( response_type='token', grant_type='client_credentials', ).post()
[docs] def password_grant(self, username, password): """Sends a request for password grant_type Request parameters:: POST /oauth/token Accept: application/json response_type = 'token' grant_type = 'password' client_id = self._client_id client_secret = self._client_secret username = username password = password Returns: Response """ return self.request('oauth/token').accept_json().set_params( response_type='token', grant_type='password', client_id=self._client_id, client_secret=self._client_secret, username=username, password=password, ).post()
[docs] def refresh_token(self, refresh_token=None): """Sends a request for refresh_token grant_type. This will use the internally set _refresh_token if the refresh_token arg is not set Request parameters:: POST /oauth/token Accept: application/json grant_type = 'refresh_token', client_id = self._client_id, client_secret = self._client_secret, refresh_token = str(refresh_token or self._refresh_token) Args: refresh_token (str|None): set a specific refresh token to be used. Returns: Response """ return self.request('oauth/token').set_params( grant_type='refresh_token', client_id=self._client_id, client_secret=self._client_secret, refresh_token=str(refresh_token or self._refresh_token) ).post()
[docs] def token_key(self): """Gets the token key for the internally set client_id Returns: Response """ return self.request('token_key').get()
[docs] def authorization_code(self, code, response_type, redirect_uri=None, **kwargs): """Sends a request for the authorization_code grant_type to acquire an access_token Request parameters:: POST /oauth/token Accept: application/json code = code response_type = response_type grant_type = 'authorization_code' redirect_uri = redirect_uri Args: code (str): one time use code passed by UAA to the redirect_uri after successful user login at UAA response_type (str): must be the response_type string used to acquire the code redirect_uri (str|None): must be the redirect_uri used to acquire the code **kwargs (dict): any custom request body parameters Returns: Response """ kwargs.update( code=code, response_type=response_type, grant_type='authorization_code', ) if redirect_uri is not None: kwargs['redirect_uri'] = redirect_uri return self.request('oauth/token').set_params(**kwargs).post()
[docs] def authorization_code_url(self, response_type, scope=None, redirect_uri=None, state=None, **kwargs): """The following list summarizes the various authorization code flows in the UAA docs.:: Browser Flow: response_type='code', scope='...', redirect_uri='http://localhost/auth' API Flow: response_type='code', state='somerandomstr', redirect_uri='http://localhost/auth' Hybrid Flow: response_type='id_token code', redirect_uri='http://localhost/auth' OpenID Connect Flow: response_type='id_token', scope='...', redirect_uri='http://localhost/auth' response_type='token id_token', scope='...', redirect_uri='http://localhost/auth' Args: response_type (str): valid response type strings (code, id_token, token) scope (str|list[str]|None): any scope strings you need redirect_uri (str|None): redirect URI that will receive the auth code state (str|None): used in API flow kwargs (dict): any additional request body params Returns: str: a URI to redirect the user for login with UAA. On successful login, UAA will redirect to redirect_uri with a "code" query parameter containing a one time use code. The code is handled by the ``self.authorization_code()`` token function. """ kwargs.update( response_type=response_type, client_id=self._client_id, ) if isinstance(scope, string_types): scope = [scope] if isinstance(scope, list): kwargs['scope'] = '+'.join(scope) if redirect_uri is not None: kwargs['redirect_uri'] = redirect_uri if state is not None: kwargs['state'] = state return '?'.join([self.get_url('oauth/authorize'), urlencode(kwargs)])
[docs] def verify_token(self, token, **decode_kwargs): """Verifies the OAuth2 Token (or ID Token) using the client's public key. Args: token (str): oauth token to be verified decode_kwargs (dict): keyword args for jwt.decode(**kwargs). This is useful for specifying conditions of verification. Returns: dict: contents of the JWT """ res = self.token_key() key = res.data['value'] return jwt.decode( token, key=key, verify=True, **decode_kwargs )
def one_time_password(self, client_id=None): client_id = client_id or self.client_id req = self.request('oauth/authorize')\ .application_json()\ .accept_json()\ .set_query(client_id=client_id, response_type='code')\ .set_bearer_auth(self.get_access_token().to_string())\ .set_custom_requests_args(allow_redirects=False) res = req.get() if 302 != res.response.status_code: raise exc.CFException( 'Not authorized ({0})'.format(res.response.status_code), 403) loc = res.headers.get('location', None) if not loc: raise exc.CFException('Not authorized (no redirect)', 403) qs = parse_qs(urlparse(loc).query) code = qs.get('code', [None])[0] if not code: raise exc.CFException('Not authorized (no code)', 403) return code
[docs]class JWT(object): """Wrapper around the JWT object """ def __init__(self, token_str, verify=False, **verify_kwargs): self.token = token_str self.attrs = jwt.decode(token_str, verify=verify, **verify_kwargs) def __getattr__(self, item): """This getter extracts keys from the JWT's internal dict. If a key is not found in the JWT, then return None Args: item (str) Returns: int|str|None """ return self.attrs.get(item, None) def __str__(self): return self.token
[docs] def to_string(self): """Get the original string representation of the token """ return self.__str__()
@property def is_expired(self): """Indicates if the token is expired """ return int(time.time()) >= self.attrs['exp']
[docs]class Doppler(RequestFactory): """Provides base functions for building Doppler/Loggregator API endpoints """ websocket_class = None def __init__(self, base_url, websocket_class=WebSocket): super(Doppler, self).__init__() self.set_base_url(base_url).set_websocket_class(websocket_class)
[docs] def set_websocket_class(self, websocket_class): """Sets the internal websocket wrapper class Args: websocket_class (WebSocket|callable): websocket class wrapper Returns: self (Doppler) """ self.websocket_class = websocket_class return self
[docs] def apps(self, first, *url): """Create a new request object using the url segments Args: first (str): required url segment url (tuple[str]): optional url segments Returns: Request """ return self.request('apps', first, *url)
[docs] def ws_request(self, first, *url): """Create a new WebSocket instance using the url segments Args: first (str): required url segment url (tuple[str]): optional url segments Returns: WebSocket """ doppler_url = self.request(first, *url).base_url doppler_url = re.sub('^http(s?):', 'ws\\1:', doppler_url, count=1) return self.websocket_class(doppler_url, verify_ssl=self.verify_ssl, **self.headers)
[docs]class CFInfo(object): def __init__(self, cc): self.data = cc.request('v2/info').get().data @property def uaa_url(self): """The base URL for UAA Returns: str """ return self.data['token_endpoint'] @property def doppler_url(self): """The base URL for Doppler Returns: str """ return self.data['doppler_logging_endpoint'] @property def ssh_url(self): """The base SSH proxy url Returns: str """ return self.data['app_ssh_endpoint'] @property def ssh_client_id(self): """The client ID to be used in authenticating with UAA before using the SSH proxy Returns: str """ return self.data['app_ssh_oauth_client'] @property def ssh_host_key_fingerprint(self): """The SSH proxy host key fingerprint to verify when connecting to an application instance via SSH Returns: str """ return self.data['app_ssh_host_key_fingerprint']
[docs]class SSHProxy(object): def __init__(self, uaa, ssh_proxy, client_id, fingerprint): if '//' not in ssh_proxy: ssh_proxy = ''.join(['//', ssh_proxy]) parts = urlparse(ssh_proxy) self.uaa = uaa self.host = parts.hostname self.port = int(parts.port if parts.port is not None else 22) self.client_id = client_id self.fingerprint = fingerprint
def decode_jwt(access_token): """Decodes a JWT (UAA Access Tokens) without verifying the signature. Returns: dict """ return jwt.decode(access_token, verify=False, algorithms='RS256')
[docs]def new_uaa( cc=None, base_url=None, verify_ssl=None, validate_ssl=None, username=None, password=None, client_id=None, client_secret=None, authorization_code=None, refresh_token=None, access_token=None, no_auth=False, cloud_controller_class=None, uaa_class=None, ): """Creates a new UAA client object. This function requires args base_url OR cc; base_url takes precedence over cc. If base_url not given, then cc must be an instance of str or CloudController. If an instance of str, then it's converted into an instance of CloudController. CloudController.info().get() is called to get the UAA endpoint. If no other authorization method is set, then client_credentials authorization will be attempted with the client_id and secret; however, if no_auth=True is passed, then no authorization will be attempted. This method supports environment variable settings if some required arguments are left blank. See cc, client_id, client_secret, and verify_ssl. Args: cc (str|CloudController|None): optional(if base_url is set), defaults to env var PYTHON_CF_URL. If base_url is passed, then this value is ignored client_id (str|None): required, defaults to env var PYTHON_CF_CLIENT_ID client_secret (str|None): required, defaults to env var PYTHON_CF_CLIENT_SECRET verify_ssl (bool|None): optional, defaults to env var !PYTHON_CF_IGNORE_SSL base_url (str): optional if cc is set, sets the UAA base endpoint url username (str): optional, user's name password (str): optional, user's password authorization_code (dict): optional, authorization_code method arguments. Setting this triggers the authorization_code() authorization method refresh_token (str): optional, refresh token string. Setting this will also trigger the refresh_token() authorization method. But if access_token is set, then the refresh token auth method will not be triggered, and the refresh token will be set on the UAA object. access_token (str): optional, access token string. If set, this set the access_token on the UAA object that is returned. If refresh_token is passed, then that will be set on the UAA object as well. no_auth (bool): optional, indicates to skip authorizing UAA. Neither access_token nor refresh_token will be set if this value is True. validate_ssl (bool|None): **DEPRECATED** use verify_ssl **DEPRECATED** Returns: UAA """ if validate_ssl is not None: _print_deprecated_message('validate_ssl in (cf_api.new_uaa)', 'verify_ssl in (cf_api.new_uaa)') verify_ssl = validate_ssl client_id = _get_default_var(client_id, 'PYTHON_CF_CLIENT_ID') client_secret = _get_default_var(client_secret, 'PYTHON_CF_CLIENT_SECRET') verify_ssl = os.getenv('PYTHON_CF_IGNORE_SSL', '') != 'true' \ if verify_ssl is None else verify_ssl if not isinstance(client_id, string_types) or \ not client_id or \ not isinstance(client_secret, string_types): raise exc.InvalidArgsException('Invalid UAA client credentials', 500) if cloud_controller_class is None: cloud_controller_class = CloudController if uaa_class is None: uaa_class = UAA if not base_url: cc = _get_default_var(cc, 'PYTHON_CF_URL') if isinstance(cc, str): cc = cloud_controller_class(cc).set_verify_ssl(verify_ssl) cc.set_info(CFInfo(cc)) base_url = cc.info.uaa_url uaa = uaa_class(base_url)\ .set_verify_ssl(verify_ssl)\ .set_client_credentials(client_id, client_secret, set_basic_auth=True) if no_auth: res = None elif username and password: res = uaa.password_grant( username, password ) elif authorization_code: if not isinstance(authorization_code, dict): raise exc.InvalidArgsException( 'authorization_code must be an instance of dict', 500) res = uaa.authorization_code(**authorization_code) elif access_token: uaa.set_access_token(access_token) if refresh_token: uaa.set_refresh_token(refresh_token) res = None elif refresh_token: res = uaa.refresh_token(refresh_token) else: res = uaa.client_credentials() if res is not None: uaa.update_tokens(res) return uaa
[docs]def new_doppler(cc=None, base_url=None, verify_ssl=None, access_token=None): """Sets a the doppler base endpoint on this client. If cc is set, then all other args will be ignored. Args: cc (CloudController|None): initialized cloud controller instance base_url (str|None): base doppler url verify_ssl (bool|None): verify SSL certs access_token (str|None): access token string Returns: self (Doppler) """ if not base_url: if not isinstance(cc, CloudController): raise exc.InvalidArgsException( 'cc must be an instance of CloudController', 500) if not isinstance(cc.uaa, UAA): raise exc.InvalidStateException( 'cc UAA client is not set on Cloud Controller', 500) base_url = cc.info.doppler_url access_token = cc.uaa.get_access_token().to_string() verify_ssl = cc.verify_ssl base_url = re.sub('^ws(s?):', 'http\\1:', base_url, count=1) return Doppler(base_url)\ .set_verify_ssl(verify_ssl)\ .set_bearer_auth(access_token)
[docs]def new_cloud_controller( base_url=None, validate_ssl=None, verify_ssl=None, username=None, password=None, client_id=None, client_secret=None, init_doppler=False, authorization_code=None, refresh_token=None, access_token=None, no_auth=False, version=2, cloud_controller_class=None, uaa_class=None, **kwargs ): """Creates a new Cloud Controller client object AND attempts to get an Access Token from UAA using the user and/or client credentials. If username/password is given then the password grant_type will be used, otherwise, the client_credentials grant_type will be used. The following arguments will default to the environment variable settings Args: base_url (str): required, defaults to env var PYTHON_CF_URL client_id (str|None): required, defaults to env var PYTHON_CF_CLIENT_ID client_secret (str|None): required, defaults to env var PYTHON_CF_CLIENT_SECRET verify_ssl (bool|None): optional, defaults to env var !PYTHON_CF_IGNORE_SSL username (str|None): optional, see new_uaa() password (str|None): optional, see new_uaa() authorization_code (dict|None): optional, see new_uaa() refresh_token (str|None): optional, see new_uaa() access_token (str|None): optional, see new_uaa() no_auth (bool|None): optional, see new_uaa() validate_ssl (bool|None): **DEPRECATED** use verify_ssl **DEPRECATED** Returns: CloudController """ if validate_ssl is not None: _print_deprecated_message( 'validate_ssl in (cf_api.new_cloud_controller)', 'verify_ssl in (cf_api.new_cloud_controller)') verify_ssl = validate_ssl base_url = _get_default_var(base_url, 'PYTHON_CF_URL') verify_ssl = os.getenv('PYTHON_CF_IGNORE_SSL', '') != 'true' \ if verify_ssl is None else verify_ssl if cloud_controller_class is None: cloud_controller_class = CloudController cc = cloud_controller_class(base_url)\ .set_verify_ssl(verify_ssl).set_version(version) info = CFInfo(cc) cc.set_info(info) uaa = new_uaa( base_url=cc.info.uaa_url, verify_ssl=verify_ssl, username=username, password=password, client_id=client_id, client_secret=client_secret, authorization_code=authorization_code, refresh_token=refresh_token, access_token=access_token, no_auth=no_auth, cloud_controller_class=cloud_controller_class, uaa_class=uaa_class, ) cc.set_uaa(uaa) if uaa.get_access_token(): cc.set_bearer_auth(uaa.get_access_token().to_string()) doppler = new_doppler( base_url=cc.info.doppler_url, verify_ssl=verify_ssl, access_token=uaa.get_access_token().to_string() ) cc.set_doppler(doppler) elif not no_auth: raise exc.InvalidStateException('Unable to authorize with UAA', 401) if uaa.get_access_token(): ssh_proxy = SSHProxy( uaa, cc.info.ssh_url, cc.info.ssh_client_id, cc.info.ssh_host_key_fingerprint ) cc.set_ssh_proxy(ssh_proxy) return cc