# -*- coding: utf-8 -*-
"""
Secure connectivity management for NiPyApi
"""
from __future__ import absolute_import
import logging
import ssl
from copy import copy
import six
import urllib3
from future.utils import raise_from as _raise
import nipyapi
log = logging.getLogger(__name__)
__all__ = ['create_service_user', 'create_service_user_group',
'set_service_auth_token', 'service_logout',
'get_service_access_status', 'add_user_to_access_policy',
'update_access_policy', 'get_access_policy_for_resource',
'create_access_policy', 'list_service_users', 'get_service_user',
'set_service_ssl_context', 'add_user_group_to_access_policy',
'bootstrap_security_policies', 'service_login',
'remove_service_user', 'list_service_user_groups',
'get_service_user_group', 'remove_service_user_group'
]
# These are the known-valid policy actions
_valid_actions = ['read', 'write', 'delete']
# These are the services that these functions know how to configure
_valid_services = ['nifi', 'registry']
[docs]def list_service_users(service='nifi'):
"""Lists all users of a given service, takes a service name as a string"""
assert service in _valid_services
with nipyapi.utils.rest_exceptions():
out = getattr(nipyapi, service).TenantsApi().get_users()
if service == 'nifi':
return out.users
return out
[docs]def get_service_user(identifier, identifier_type='identity', service='nifi'):
"""
Gets the unique user matching to the given identifier and type.
Args:
identifier (str): the string to search for
identifier_type (str): the field to search in
service (str): the name of the service
Returns:
None if no match, else single object
"""
assert service in _valid_services
assert isinstance(identifier, six.string_types)
assert isinstance(identifier_type, six.string_types)
obj = list_service_users(service)
out = nipyapi.utils.filter_obj(
obj, identifier, identifier_type, greedy=False
)
return out
[docs]def remove_service_user(user, service='nifi', strict=True):
"""
Removes a given User from the given Service
Args:
user: [(nifi.UserEntity), (registry.User)] Target User object
service (str): 'nifi' or 'registry'
strict (bool): Whether to throw an error if User not found
Returns:
Updated User Entity or None
"""
assert service in _valid_services
if service == 'registry':
assert isinstance(user, nipyapi.registry.User)
submit = {
'id': user.identifier,
'version': user.revision.version
}
else:
assert isinstance(user, nipyapi.nifi.UserEntity)
submit = {
'id': user.id,
'version': user.revision.version
}
assert isinstance(strict, bool)
try:
return getattr(nipyapi, service).TenantsApi().remove_user(**submit)
except getattr(nipyapi, service).rest.ApiException as e:
if 'Unable to find user' in e.body or 'does not exist' in e.body:
if not strict:
return None
_raise(ValueError(e.body), e)
[docs]def create_service_user(identity, service='nifi', strict=True):
"""
Attempts to create a user with the provided identity in the given service
Args:
identity (str): Identity string for the user
service (str): 'nifi' or 'registry'
strict (bool): If Strict, will error if user already exists
Returns:
The new (User) or (UserEntity) object
"""
assert service in _valid_services
assert isinstance(identity, six.string_types)
assert isinstance(strict, bool)
if service == 'registry':
user_obj = nipyapi.registry.User(
identity=identity
)
else:
# must be nifi
user_obj = nipyapi.nifi.UserEntity(
revision=nipyapi.nifi.RevisionDTO(
version=0
),
component=nipyapi.nifi.UserDTO(
identity=identity
)
)
try:
return getattr(nipyapi, service).TenantsApi().create_user(user_obj)
except (nipyapi.nifi.rest.ApiException,
nipyapi.registry.rest.ApiException) as e:
if 'already exists' in e.body and not strict:
return get_service_user(identity, service=service)
_raise(ValueError(e.body), e)
[docs]def create_service_user_group(identity, service='nifi',
users=None, strict=True):
"""
Attempts to create a user with the provided identity and member users in
the given service
Args:
identity (str): Identity string for the user group
service (str): 'nifi' or 'registry'
users (list): A list of nifi.UserEntity or registry.User
belonging to the group
strict (bool): Whether to throw an error on already exists
Returns:
The new (UserGroup) or (UserGroupEntity) object
"""
assert service in _valid_services
assert isinstance(identity, six.string_types)
users_ids = None
if service == 'nifi':
if users:
assert all(
isinstance(user, nipyapi.nifi.UserEntity) for user in users
)
users_ids = [{'id': user.id} for user in users]
user_group_obj = nipyapi.nifi.UserGroupEntity(
revision=nipyapi.nifi.RevisionDTO(
version=0
),
component=nipyapi.nifi.UserGroupDTO(
identity=identity,
users=users_ids
)
)
else:
if users:
assert all(
isinstance(user, nipyapi.registry.User) for user in users
)
users_ids = [{'identifier': user.identifier} for user in users]
user_group_obj = nipyapi.registry.UserGroup(
identity=identity,
users=users_ids
)
try:
return getattr(nipyapi, service).TenantsApi().create_user_group(
user_group_obj
)
except (nipyapi.nifi.rest.ApiException,
nipyapi.registry.rest.ApiException) as e:
if 'already exists' in e.body:
if not strict:
return get_service_user_group(identity, service=service)
_raise(ValueError(e.body), e)
[docs]def list_service_user_groups(service='nifi'):
"""
Returns list of service user groups for a given service
Args:
service (str): 'nifi' or 'registry'
Returns:
[(nifi.UserGroupEntity, registry.UserGroup)]
"""
assert service in _valid_services
with nipyapi.utils.rest_exceptions():
out = getattr(nipyapi, service).TenantsApi().get_user_groups()
if service == 'nifi':
return out.user_groups
return out
[docs]def get_service_user_group(identifier, identifier_type='identity',
service='nifi'):
"""
Gets the unique group matching to the given identifier and type.
Args:
identifier (str): the string to search for
identifier_type (str): the field to search in, identity or id
service (str): the name of the service
Returns:
None if no match, else single object
"""
assert service in _valid_services
assert isinstance(identifier, six.string_types)
assert isinstance(identifier_type, six.string_types)
obj = list_service_user_groups(service)
out = nipyapi.utils.filter_obj(
obj, identifier, identifier_type, greedy=False)
return out
[docs]def remove_service_user_group(group, service='nifi', strict=True):
"""
Removes a given User Group from the given Service
Args:
group: [(nifi.UserEntity), (registry.User)] Target User object
service (str): 'nifi' or 'registry'
strict (bool): Whether to throw an error if User not found
Returns:
Updated User Group or None
"""
assert service in _valid_services
if service == 'registry':
assert isinstance(group, nipyapi.registry.UserGroup)
submit = {
'id': group.identifier,
'version': group.revision.version
}
else:
assert isinstance(group, nipyapi.nifi.UserGroupEntity)
submit = {
'id': group.id,
'version': group.revision.version
}
assert isinstance(strict, bool)
try:
return getattr(
nipyapi, service
).TenantsApi().remove_user_group(**submit)
except getattr(nipyapi, service).rest.ApiException as e:
if 'Unable to find user' in e.body or 'does not exist' in e.body:
if not strict:
return None
_raise(ValueError(e.body), e)
[docs]def service_login(service='nifi', username=None, password=None,
bool_response=False):
"""
Login to the currently configured NiFi or NiFi-Registry server.
Login requires a secure connection over https.
Prior to calling this method, the host must be specified
and the SSLContext should be configured (if necessary).
Successful login will result in a generated token (JWT) being
cached in the api_client config that will be passed in all future
REST API calls. To clear that token, call service_logout.
The token is temporary and will expire after a duration set by
the server. After a token expires, you must call
this method again to generate a new token.
Args:
service (str): 'nifi' or 'registry'; the service to login to
username (str): The username to submit
password (str): The password to use
bool_response (bool): If True, the function will return False instead
of an error. Useful for connection testing.
Returns:
(bool): True if successful, False or an Error if not. See bool_response
"""
log_args = locals()
log_args['password'] = 'REDACTED'
log.info("Called service_login with args %s", log_args)
assert service in _valid_services
assert username is None or isinstance(username, six.string_types)
assert password is None or isinstance(password, six.string_types)
assert isinstance(bool_response, bool)
configuration = getattr(nipyapi, service).configuration
assert configuration.host, "Host must be set prior to logging in."
assert configuration.host.startswith("https"), \
"Login is only available when connecting over HTTPS."
default_pword = getattr(nipyapi.config, 'default_' + service + '_password')
default_uname = getattr(nipyapi.config, 'default_' + service + '_username')
# We use copy so we don't clobber the default by mistake
pword = password if password is not None else copy(default_pword)
uname = username if username is not None else copy(default_uname)
assert pword, "Password must be set or in default config"
assert uname, "Username must be set or in default config"
# set username/password in configuration for initial login
# Registry pulls from config, NiFi allows submission
configuration.username = uname
configuration.password = pword
log.info("Attempting tokenAuth login with user identity [%s]",
configuration.username)
try:
if service == 'nifi':
token = nipyapi.nifi.AccessApi().create_access_token(
username=uname, password=pword)
else:
token = nipyapi.registry.AccessApi() \
.create_access_token_using_basic_auth_credentials()
set_service_auth_token(token=token, service=service)
return True
except getattr(nipyapi, service).rest.ApiException as e:
if bool_response:
return False
_raise(ValueError(e.body), e)
[docs]def set_service_auth_token(token=None, token_name='tokenAuth', service='nifi'):
"""
Helper method to set the auth token correctly for the specified service
Args:
token (Optional[str]): The token to set. Defaults to None.
token_name (str): the api_key field name to set the token to. Defaults
to 'tokenAuth'
service (str): 'nifi' or 'registry', the service to set
Returns:
(bool): True on success, False if token not set
"""
assert service in _valid_services
assert isinstance(token_name, six.string_types)
assert token is None or isinstance(token, six.string_types)
if service == 'registry':
configuration = nipyapi.config.registry_config
else:
configuration = nipyapi.config.nifi_config
if token:
configuration.api_key[token_name] = token
configuration.api_key_prefix[token_name] = 'Bearer'
else:
# If not token, then assume we are doing logout and cleanup
if token_name in configuration.api_key:
configuration.api_key.pop(token_name)
if token_name not in configuration.api_key:
return False
return True
[docs]def service_logout(service='nifi'):
"""
Logs out from the service by resetting the token
Args:
service (str): 'nifi' or 'registry'; the target service
Returns:
(bool): True of access removed, False if still set
"""
assert service in _valid_services
set_service_auth_token(token=None, service=service)
try:
status = get_service_access_status(service, bool_response=True)
except ValueError as e:
if 'Cannot set verify_mode to CERT_NONE' in str(e):
status = None
# Logout throws error with incorrect ssl setup
else:
raise e
if not status:
return True
return False
[docs]def get_service_access_status(service='nifi', bool_response=False):
"""
Gets the access status for the current session
Args:
service (str): A String of 'nifi' or 'registry' to indicate which
service to check status for
bool_response (bool): If True, the function will return False on
hitting an Error instead of raising it. Useful for connection
testing.
Returns:
(bool) if bool_response, else the Service Access Status of the User
"""
log.info("Called get_service_access_status with args %s", locals())
assert service in _valid_services
assert isinstance(bool_response, bool)
if bool_response:
# Assume we are using this as a connection test and therefore disable
# the Warnings urllib3 will shower us with
log.debug("- bool_response is True, disabling urllib3 warnings")
logging.getLogger('urllib3').setLevel(logging.ERROR)
try:
out = getattr(nipyapi, service).AccessApi().get_access_status()
log.info("Got server response, returning")
return out
except urllib3.exceptions.MaxRetryError as e:
log.debug("- Caught exception %s", type(e))
if bool_response:
log.debug("Connection failed with error %s and bool_response is "
"True, returning False", e)
return False
log.debug("- bool_response is False, raising Exception")
raise e
except getattr(nipyapi, service).rest.ApiException as e:
expected_errors = [
'Authentication object was not found',
'only supported when running over HTTPS'
]
if any(x in e.body for x in expected_errors):
if bool_response:
return False
raise e
[docs]def add_user_to_access_policy(user, policy, service='nifi', refresh=True,
strict=True):
"""
Attempts to add the given user object to the given access policy
Args:
user (User) or (UserEntity): User object to add
policy (AccessPolicyEntity) or (AccessPolicy): Access Policy object
service (str): 'nifi' or 'registry' to identify the target service
refresh (bool): Whether to refresh the policy object before submit
strict (bool): If True, will return error if user already present,
if False will ignore the already exists
Returns:
Updated Policy object
"""
assert service in _valid_services
assert isinstance(
policy,
nipyapi.registry.AccessPolicy if service == 'registry'
else nipyapi.nifi.AccessPolicyEntity
)
assert isinstance(
user,
nipyapi.registry.User if service == 'registry'
else nipyapi.nifi.UserEntity
)
user_id = user.id if service == 'nifi' else user.identifier
if refresh:
policy_tgt = getattr(nipyapi, service).PoliciesApi().get_access_policy(
policy.id if service == 'nifi' else policy.identifier
)
else:
policy_tgt = policy
assert isinstance(
policy_tgt,
nipyapi.registry.AccessPolicy if service == 'registry' else
nipyapi.nifi.AccessPolicyEntity
)
policy_users = policy_tgt.users if service == 'registry' else\
policy_tgt.component.users
policy_user_ids = [
i.identifier if service == 'registry' else i.id for i in policy_users
]
if user_id not in policy_user_ids:
if service == 'registry':
policy_tgt.users.append(user)
elif service == 'nifi':
policy_tgt.component.users.append({'id': user_id})
return nipyapi.security.update_access_policy(policy_tgt, service)
if strict and user_id in policy_user_ids:
raise ValueError("Strict is True and User ID already in Policy")
[docs]def add_user_group_to_access_policy(user_group, policy, service='nifi',
refresh=True):
"""
Attempts to add the given user group object to the given access policy
Args:
user_group (UserGroup) or (UserGroupEntity): User group object to add
policy (AccessPolicyEntity) or (AccessPolicy): Access Policy object
service (str): 'nifi' or 'registry' to identify the target service
refresh (bool): Whether to refresh the policy object before submission
Returns:
Updated Policy object
"""
assert service in _valid_services
assert isinstance(
policy,
nipyapi.registry.AccessPolicy if service == 'registry'
else nipyapi.nifi.AccessPolicyEntity
)
assert isinstance(
user_group,
nipyapi.registry.UserGroup if service == 'registry'
else nipyapi.nifi.UserGroupEntity
)
user_group_id = user_group.id if service == 'nifi' else \
user_group.identifier
if refresh:
policy_tgt = getattr(nipyapi, service).PoliciesApi().get_access_policy(
policy.id if service == 'nifi' else policy.identifier
)
else:
policy_tgt = policy
assert isinstance(
policy_tgt,
nipyapi.registry.AccessPolicy if service == 'registry' else
nipyapi.nifi.AccessPolicyEntity
)
policy_user_groups = policy_tgt.users if service == 'registry' else\
policy_tgt.component.user_groups
policy_user_group_ids = [
i.identifier if service == 'registry' else i.id
for i in policy_user_groups
]
assert user_group_id not in policy_user_group_ids
if service == 'registry':
policy_tgt.user_groups.append(user_group)
elif service == 'nifi':
policy_tgt.component.user_groups.append({'id': user_group_id})
return nipyapi.security.update_access_policy(policy_tgt, service)
[docs]def update_access_policy(policy, service='nifi'):
"""
Applies an updated access policy to the service indicated
Args:
policy (PolicyEntity): The policy object to submit
service (str): 'nifi' or 'registry' to indicate the target service
Returns:
(PolicyEntity): The updated policy if successful
"""
assert service in _valid_services
assert isinstance(
policy,
nipyapi.registry.AccessPolicy if service == 'registry'
else nipyapi.nifi.AccessPolicyEntity
), "Policy type {0} not valid.".format(type(policy))
with nipyapi.utils.rest_exceptions():
return getattr(nipyapi, service).PoliciesApi().update_access_policy(
id=policy.id if service == 'nifi' else policy.identifier,
body=policy
)
[docs]def get_access_policy_for_resource(resource,
action,
r_id=None,
service='nifi',
auto_create=False):
"""
Attempts to retrieve the access policy for a given resource and action,
and optionally resource_id if targeting NiFi. Optionally creates the policy
if it doesn't already exist
Args:
resource (str): A valid resource in the target service
action (str): A valid action, typically 'read', 'write' or 'delete'
r_id (Optional[str]): The UUID of the resource, valid only if targeting
NiFi resources
service (str): Which service to target, typically 'nifi' or 'registry'
auto_create (bool): Whether to create the targeted policy if it doesn't
already exist
Returns:
The relevant AccessPolicy object
"""
assert service in _valid_services
assert action in _valid_actions
assert r_id is None or isinstance(r_id, six.string_types)
assert isinstance(resource, six.string_types)
assert isinstance(auto_create, bool)
log.info("Called get_access_policy_for_resource with Args %s", locals())
# Strip leading '/' from resource as lookup endpoint prepends a '/'
resource = resource[1:] if resource.startswith('/') else resource
log.info("Getting %s Policy for %s:%s:%s", service, action,
resource, str(r_id))
if service == 'nifi':
pol_api = nipyapi.nifi.PoliciesApi()
else:
pol_api = nipyapi.registry.PoliciesApi()
try:
nipyapi.utils.bypass_slash_encoding(service, True)
response = pol_api.get_access_policy_for_resource(
action=action,
resource='/'.join([resource, r_id]) if r_id else resource
)
nipyapi.utils.bypass_slash_encoding(service, False)
return response
except nipyapi.nifi.rest.ApiException as e:
if any(
pol_string in e.body for pol_string in
['Unable to find access policy', 'No policy found']
):
log.info("Access policy not found")
if not auto_create:
return None
return nipyapi.security.create_access_policy(
resource, action, r_id, service
)
log.info("Unexpected Error, raising...")
_raise(ValueError(e.body), e)
finally:
nipyapi.utils.bypass_slash_encoding(service, False)
[docs]def create_access_policy(resource, action, r_id=None, service='nifi'):
"""
Creates an access policy for the given resource, action and optionally
resource_id for NiFi.
Args:
resource (str): a valid resource type for this service, e.g. 'bucket'
action (str): a valid action type for this service, typically 'read',
'write' or 'delete'
r_id (optional[str]): if NiFi, the resource ID of the resource
service (str): the service to target
Returns:
An access policy object for that service
"""
assert isinstance(resource, six.string_types)
assert action in _valid_actions
assert r_id is None or isinstance(r_id, six.string_types)
assert service in _valid_services
if resource[0] != '/':
r = '/' + resource
else:
r = resource
with nipyapi.utils.rest_exceptions():
if service == 'nifi':
return nipyapi.nifi.PoliciesApi().create_access_policy(
body=nipyapi.nifi.AccessPolicyEntity(
revision=nipyapi.nifi.RevisionDTO(version=0),
component=nipyapi.nifi.AccessPolicyDTO(
action=action,
resource='/'.join([r, r_id]) if r_id else r
)
)
)
# elif service == 'registry':
return nipyapi.registry.PoliciesApi().create_access_policy(
body=nipyapi.registry.AccessPolicy(
action=action,
resource=r
)
)
[docs]def set_service_ssl_context(
service='nifi',
ca_file=None,
client_cert_file=None,
client_key_file=None,
client_key_password=None,
check_hostname=None):
"""
Create an SSLContext for connecting over https to a secured NiFi or
NiFi-Registry instance.
This method can be used to create an SSLContext for
two-way TLS in which a client cert is used by the service to authenticate
the client.
This method can also be used for one-way TLS in which the client
verifies the server's certificate, but authenticates using a
different form of credentials, such as LDAP username/password.
If you are using one-way TLS with a certificate signed by a
root CA trusted by your system/platform, this step is not
necessary as the default TLS-handshake should "just work."
Args:
service (str): 'nifi' or 'registry' to indicate which service
config to set the ssl context to
ca_file (str): A PEM file containing certs for the root CA(s)
for the NiFi Registry server
client_cert_file (str): A PEM file containing the public
certificates for the user/client identity
client_key_file (str): An encrypted (password-protected) PEM file
containing the client's secret key
client_key_password (str): The password to decrypt the client_key_file
check_hostname (bool): Enable or Disable hostname checking
Returns:
(None)
"""
assert service in ['nifi', 'registry']
if client_key_file is None:
ssl_context = ssl.create_default_context(ssl.Purpose.SERVER_AUTH)
else:
ssl_context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
try:
ssl_context.load_cert_chain(
certfile=client_cert_file,
keyfile=client_key_file,
password=client_key_password
)
except FileNotFoundError as e:
_raise(
FileNotFoundError(
"Unable to read keyfile {0} or certfile {1}"
.format(client_key_file, client_cert_file)), e)
if ca_file is not None:
ssl_context.load_verify_locations(cafile=ca_file)
if check_hostname is not None:
ssl_context.check_hostname = check_hostname
else:
ssl_context.check_hostname = nipyapi.config.global_ssl_host_check
if service == 'registry':
nipyapi.config.registry_config.ssl_context = ssl_context
elif service == 'nifi':
nipyapi.config.nifi_config.ssl_context = ssl_context
# pylint: disable=W0702,R0912
[docs]def bootstrap_security_policies(service, user_identity=None,
group_identity=None):
"""
Creates a default security context within NiFi or Nifi-Registry
Args:
service (str): 'nifi' or 'registry' to indicate which service
user_identity: a service user to establish in the security context
group_identity: a service group to establish in the security context
Returns:
None
"""
assert service in _valid_services, "service not in %s" % _valid_services
valid_ident_obj = [nipyapi.nifi.UserEntity, nipyapi.registry.User]
if user_identity is not None:
assert type(user_identity) in valid_ident_obj
if 'nifi' in service:
rpg_id = nipyapi.canvas.get_root_pg_id()
if user_identity is None and group_identity is None:
nifi_user_identity = nipyapi.security.get_service_user(
nipyapi.config.default_nifi_username,
service='nifi'
)
else:
nifi_user_identity = user_identity
access_policies = [
('write', 'process-groups', rpg_id),
('read', 'process-groups', rpg_id),
('write', 'data/process-groups', rpg_id),
('read', 'data/process-groups', rpg_id),
('read', 'system', None),
]
for pol in access_policies:
ap = nipyapi.security.get_access_policy_for_resource(
action=pol[0],
resource=pol[1],
r_id=pol[2],
service='nifi',
auto_create=True
)
if nifi_user_identity is None:
# I should not rely upon a try/catch there
# but it's the simplest way (I just hope it won't
# break the server :-) )
try:
nipyapi.security.add_user_group_to_access_policy(
user_group=group_identity,
policy=ap,
service='nifi'
)
except: # noqa
pass
else:
nipyapi.security.add_user_to_access_policy(
user=nifi_user_identity,
policy=ap,
service='nifi',
strict=False
)
else:
if user_identity is None and group_identity is None:
reg_user_identity = nipyapi.security.get_service_user(
nipyapi.config.default_registry_username,
service='registry'
)
else:
reg_user_identity = user_identity
all_buckets_access_policies = [
("read", "/buckets"),
("write", "/buckets"),
("delete", "/buckets")
]
for action, resource in all_buckets_access_policies:
pol = nipyapi.security.get_access_policy_for_resource(
resource=resource,
action=action,
service='registry',
auto_create=True
)
if reg_user_identity is None:
nipyapi.security.add_user_group_to_access_policy(
user_group=group_identity,
policy=pol,
service='registry'
)
else:
nipyapi.security.add_user_to_access_policy(
user=reg_user_identity,
policy=pol,
service='registry',
strict=False
)
# Setup Proxy Access
nifi_proxy = nipyapi.security.create_service_user(
identity=nipyapi.config.default_proxy_user,
service='registry',
strict=False
)
proxy_access_policies = [
("read", "/proxy"),
("write", "/proxy"),
("delete", "/proxy"),
]
for action, resource in proxy_access_policies:
pol = nipyapi.security.get_access_policy_for_resource(
resource=resource,
action=action,
service='registry',
auto_create=True
)
nipyapi.security.add_user_to_access_policy(
user=nifi_proxy,
policy=pol,
service='registry',
strict=False
)