# -*- coding: utf-8 -*-
"""
For Managing NiFi Parameter Contexts
"""
from __future__ import absolute_import
import logging
import six
import nipyapi
from nipyapi.utils import exception_handler, enforce_min_ver
from nipyapi.nifi import ParameterContextEntity, ParameterDTO,\
ParameterEntity, ParameterContextDTO
log = logging.getLogger(__name__)
__all__ = [
"list_all_parameter_contexts", "create_parameter_context",
"delete_parameter_context", "get_parameter_context",
"update_parameter_context", "prepare_parameter",
"delete_parameter_from_context", "upsert_parameter_to_context",
"assign_context_to_process_group",
"remove_context_from_process_group"
]
[docs]def list_all_parameter_contexts():
"""
Lists all Parameter Contexts available on the Canvas
Returns:
list(ParameterContextEntity)
"""
enforce_min_ver('1.10.0')
handle = nipyapi.nifi.FlowApi()
return handle.get_parameter_contexts().parameter_contexts
[docs]@exception_handler(404, None)
def get_parameter_context(identifier, identifier_type='name', greedy=True):
"""
Gets one or more Parameter Contexts matching a given identifier
Args:
identifier (str): The Name or ID matching Parameter Context(s)
identifier_type (str): 'name' or 'id'
greedy (bool): False for exact match, True for string match
Returns:
None for no matches, Single Object for unique match,
list(Objects) for multiple matches
"""
enforce_min_ver('1.10.0')
assert isinstance(identifier, six.string_types)
assert identifier_type in ['name', 'id']
if identifier_type == 'id':
handle = nipyapi.nifi.ParameterContextsApi()
out = handle.get_parameter_context(identifier)
else:
obj = list_all_parameter_contexts()
out = nipyapi.utils.filter_obj(
obj, identifier, identifier_type, greedy=greedy
)
return out
[docs]def create_parameter_context(name, description=None, parameters=None,
inherited_contexts=None):
"""
Create a new Parameter Context with optional description and
initial Parameters
Args:
name (str): The Name for the new Context
description (str): An optional description
parameters (list[ParameterEntity]): A list of prepared Parameters
inherited_contexts (list[ParameterContextEntity]): A list of
inherited Parameter Contexts
Returns:
(ParameterContextEntity) The New Parameter Context
"""
enforce_min_ver('1.10.0')
assert isinstance(name, str)
assert description is None or isinstance(description, str)
handle = nipyapi.nifi.ParameterContextsApi()
inherited = inherited_contexts if inherited_contexts else []
out = handle.create_parameter_context(
body=ParameterContextEntity(
revision=nipyapi.nifi.RevisionDTO(version=0),
component=ParameterContextDTO(
name=name,
description=description,
parameters=parameters if parameters else [],
# list() per NiFi Jira 7995
inherited_parameter_contexts=inherited
# requires empty list per NiFi Jira 9470
)
)
)
return out
[docs]def update_parameter_context(context):
"""
Update an already existing Parameter Context
Args:
context (ParameterContextEntity): Parameter Context updated
to be applied
refresh (bool): Whether to refresh the object before Updating
Returns:
(ParameterContextEntity) The updated Parameter Context
"""
enforce_min_ver('1.10.0')
def _update_complete(context_id, request_id):
return nipyapi.nifi.ParameterContextsApi()\
.get_parameter_context_update(
context_id, request_id)\
.request.complete
if not isinstance(context, ParameterContextEntity):
raise ValueError("Supplied Parameter Context update should "
"be an instance of nipyapi.nifi.ParameterContextDTO")
handle = nipyapi.nifi.ParameterContextsApi()
target = get_parameter_context(context.id, identifier_type='id')
update_request = handle.submit_parameter_context_update(
context_id=target.id,
body=ParameterContextEntity(
id=target.id,
revision=target.revision,
component=context.component
)
)
nipyapi.utils.wait_to_complete(
_update_complete, target.id, update_request.request.request_id,
nipyapi_delay=1, nipyapi_max_wait=10
)
_ = handle.delete_update_request(
context_id=target.id,
request_id=update_request.request.request_id
)
return get_parameter_context(context.id, identifier_type='id')
[docs]def delete_parameter_context(context, refresh=True):
"""
Removes a Parameter Context
Args:
context (ParameterContextEntity): Parameter Context to be deleted
refresh (bool): Whether to refresh the Context before Deletion
Returns:
(ParameterContextEntity) The removed Parameter Context
"""
enforce_min_ver('1.10.0')
assert isinstance(context, nipyapi.nifi.ParameterContextEntity)
handle = nipyapi.nifi.ParameterContextsApi()
if refresh:
context = handle.get_parameter_context(context.id)
return handle.delete_parameter_context(
id=context.id,
version=context.revision.version
)
[docs]def prepare_parameter(name, value, description=None, sensitive=False):
"""
Parses basic inputs into a Parameter object ready for submission
Args:
name (str): The Name for the Parameter
value (str, int, float): The Value for the Parameter
description (str): Optional Description for the Parameter
sensitive (bool): Whether to mark the Parameter Value as sensitive
Returns:
(ParameterEntity) The ParameterEntity ready for use
"""
enforce_min_ver('1.10.0')
assert all(x is None or isinstance(x, str) for x in [name, description])
out = ParameterEntity(
parameter=ParameterDTO(
name=name,
value=value,
description=description,
sensitive=sensitive
)
)
return out
[docs]def delete_parameter_from_context(context, parameter_name):
"""
Delete a specific Parameter from a Parameter Context
Args:
context (ParameterContextEntity): The Parameter Context to Update
parameter_name (str): The Parameter to delete
Returns:
(ParameterContextEntity) The updated Parameter Context
"""
enforce_min_ver('1.10.0')
context.component.parameters = [
ParameterEntity(
parameter=ParameterDTO(
name=parameter_name
)
)
]
return update_parameter_context(
context=context
)
[docs]def upsert_parameter_to_context(context, parameter):
"""
Insert or Update Parameter within a Parameter Context
Args:
context (ParameterContextEntity): The Parameter Context to Modify
parameter(ParameterEntity): The ParameterEntity to insert or update
Returns:
(ParameterContextEntity) The updated Parameter Context
"""
enforce_min_ver('1.10.0')
context.component.parameters = [parameter]
return update_parameter_context(context=context)
[docs]def assign_context_to_process_group(pg, context_id, cascade=False):
"""
Assigns a given Parameter Context to a specific Process Group
Optionally cascades down to direct children Process Groups
Args:
pg (ProcessGroupEntity): The Process Group to target
context_id (str): The ID of the Parameter Context
cascade (bool): Cascade Parameter Context down to child Process Groups?
Returns:
(ProcessGroupEntity) The updated Process Group
"""
assert isinstance(context_id, str)
if cascade:
# Update the specified Process Group & all children
child_pgs = nipyapi.canvas.list_all_process_groups(pg_id=pg.id)
for child_pg in child_pgs:
nipyapi.canvas.update_process_group(
pg=child_pg,
update={
'parameter_context': {
'id': context_id
}
}
)
return nipyapi.canvas.update_process_group(
pg=pg,
update={
'parameter_context': {
'id': context_id
}
}
)
[docs]def remove_context_from_process_group(pg):
"""
Clears any Parameter Context from the given Process Group
Args:
pg (ProcessGroupEntity): The Process Group to target
Returns:
(ProcessGroupEntity) The updated Process Group
"""
return nipyapi.canvas.update_process_group(
pg=pg,
update={
'parameter_context': {
'id': None
}
}
)